Python API mechanism

Overview

This document covers how to communicate with the robot base, in both directions.

Connections with the robot base are covered by the Transport class, Router class and Session Manager class. Once the connection is established, commands can be sent to the robot base and corresponding responses received.

To receive unsolicited messages from the robot base, the notification handler must be implemented.

Transport

A Transport class is used to instantiate an object which identifies the robot base being connected to, by specifying a communications protocol, an IP address and a port number.

Kinova provides two pre-defined Transport object types:

  • MqttTransport - using MQTT as a transport protocol

  • UDPTransport - using UDP as a transport protocol

MqttTransport is used for high-level robot control and configuration.

UDPTransport can only be used to read the robot’s feedback at 1kHz with the BaseCyclic service.

RouterClient

The RouterClient class is used to instantiate an object which sends the command to the robot base specified by the Transport object. A RouterClient object needs a Transport object to work. The purpose of the router is to transmit to the robot base all commands sent through the services.

It will also manage all messages sent and received by that service. A router needs to be supplied each time a new service is declared.

To handle cases where there is a connection problem with the robot base, a callback method can be provided.

errorCallback = lambda kException: print("_________ callback error _________ {}".format(kException))
transport = MqttTransport()
router = RouterClient(transport, errorCallback)
transport.connect(DEVICE_IP, DEVICE_PORT)

Session

In order to be able to use a service from the API with a client, a session must be created. If you are using a MQTT transport, this can be done with the SessionClient. If you are using a UDP transport, this can be done with the SessionManager.

errorCallback = lambda kException: print("_________ callback error _________ {}".format(kException))
transport = MqttTransport()
router = RouterClient(transport, errorCallback)
transport.connect(DEVICE_IP, DEVICE_PORT)

session_info = Session_pb2.CreateSessionInfo()
session_info.username = 'admin'
session_info.password = 'admin'
session_info.session_inactivity_timeout = 60000   # (milliseconds)
session_info.connection_inactivity_timeout = 2000 # (milliseconds)

session_client = SessionClient(router)
session_client.CreateSession(session_info)

NotificationHandler

The robot base provides notifications on different topics as requested by a client application that has a session open with the robot.

The robot base uses a Publish/Subscribe design pattern. Rather than polling periodically for updates, the client application subscribes to a list of topics. Whenever a change happens related to that topic, whether caused by the same client session or not, the publisher sends a notification to all subscribers. If for some reason you want to stop receiving the notifications, you can call the unsubscribe() function. It is included in every client and it takes the handle you want to unsubscribe as an argument.

# In this example, when a user is created `notification_callback` will be called because we register it as a
# handler for `ConfigurationChange` topic notifications.
def notification_callback(data):
 print("********************************")
 print("**  Callback function called  **")
 print(json_format.MessageToJson(data))
 print("********************************")

# Subscribe to ConfigurationChange notifications
notif_handle = base.OnNotificationConfigurationChangeTopic(notification_callback, Base_pb2.NotificationOptions())

Blocking method

All functions are blocking calls. The function is called, and the process waits until an answer is received from the server side.

# RouterClientSendOptions is optional and needs to be passed with the keyword
# options
router_options = RouterClientSendOptions()
router_options.timeout_ms = 5000 # 5 seconds
# The same function call without the options=router_options is valid and will do the same
# using the router default value
all_speed_hard_limits = base.GetAllJointsSpeedHardLimitation()
for speed_limit in all_speed_hard_limits.joints_limitations:
   print("============================================")
   print("Joint: {0}".format(speed_limit.joint_identifier))
   print("Type of limitation: {0}".format(Base_pb2.LimitationType.Name(speed_limit.type)))
   print("Value: {0}".format(speed_limit.value))
   print("============================================")