Code examples

Welcome to the code examples. It is a good starting point if you want to learn how to initialize the kortex API and how to access its services.

Using a MQTT session

This piece of code shows how to initialize a session with the Kortex API using the MQTT transport. Once the session has been created, a client communicating with the base service is created and the function GetArmState() is executed. Once this is all done, everything is closed smoothly using the correct teardown sequence.

# Comm imports
from kortex_api.MqttTransport import MqttTransport
from kortex_api.RouterClient import RouterClient

# Service imports
from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient
from kortex_api.autogen.client_stubs.SessionClientRpc import SessionClient

# Protobuf messages imports
from kortex_api.autogen.messages import Session_pb2

# Constants
MQTT_PORT = 1883
CONTROLLER_ADDRESS = '192.168.1.10'

session_info = Session_pb2.CreateSessionInfo()
session_info.username = "admin"
session_info.password = "admin"
session_info.session_inactivity_timeout = 60000   # (milliseconds)
session_info.connection_inactivity_timeout = 2000 # (milliseconds)
error_callback = lambda kException: print("_________ callback error _________ {}".format(kException))


# Create objects
transport = MqttTransport()
router = RouterClient(transport, error_callback)
transport.connect(CONTROLLER_ADDRESS, MQTT_PORT)

# Create session
print("Creating session for communication")
session_client = SessionClient(router)
session_client.CreateSession(session_info)
print("Session created")

# Call some RPC which requires a session
base = BaseClient(router)
print(base.GetArmState())


# TEARDOWN

# Close API session
session_client.CloseSession()

# Disconnect from the transport object
transport.disconnect()

Getting the robot feedback with a UDP session

This other piece of code shows how to initialize a session with the Kortex API using the UDP transport. Once the session has been created, a client communicating with the base cyclic service is created and the function RefreshFeedback() is executed. Once this is all done, everything is closed smoothly using the correct teardown sequence.

# Comm imports
from kortex_api.UDPTransport import UDPTransport
from kortex_api.RouterClient import RouterClient
from kortex_api.SessionManager import SessionManager

# Service imports
from kortex_api.autogen.client_stubs.BaseCyclicClientRpc import BaseCyclicClient

# Protobuf messages imports
from kortex_api.autogen.messages import Session_pb2

# Constants
UDP_PORT = 10001
CONTROLLER_ADDRESS = '192.168.1.10'

session_info = Session_pb2.CreateSessionInfo()
session_info.username = "admin"
session_info.password = "admin"
session_info.session_inactivity_timeout = 60000   # (milliseconds)
session_info.connection_inactivity_timeout = 2000 # (milliseconds)
error_callback = lambda kException: print("_________ callback error _________ {}".format(kException))


# Create objects
transport = UDPTransport()
router = RouterClient(transport, error_callback)
transport.connect(CONTROLLER_ADDRESS, UDP_PORT)

# Create session
print("Creating session for communication")
session_manager = SessionManager(router)
session_manager.CreateSession(session_info)
print("Session created")

# Call some RPC which requires a session
cyclic = BaseCyclicClient(router)
print(cyclic.RefreshFeedback())


# TEARDOWN

# Close API session
session_manager.CloseSession()

# Disconnect from the transport object
transport.disconnect()

Run a program

This example shows how to start a program that is already stored on the controller. It assumes that the controller and the robot has been powered on. Assign the program ID you want to start to this variable PROGRAM_ID. Once this is all done, everything is closed smoothly using the correct teardown sequence.

# Comm imports
from kortex_api.MqttTransport import MqttTransport
from kortex_api.RouterClient import RouterClient
from kortex_api.exceptions import KClientException

# Service imports
from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient
from kortex_api.autogen.client_stubs.SessionClientRpc import SessionClient
from kortex_api.autogen.client_stubs.ProgramRunnerClientRpc import ProgramRunnerClient

# Protobuf messages imports
from kortex_api.autogen.messages import Session_pb2
from kortex_api.autogen.messages import ProgramRunner_pb2
from kortex_api.autogen.messages import Common_pb2

import sys

# MQTT port on the controller
MQTT_PORT = 1883

# IP Address of the controller
CONTROLLER_ADDRESS = '192.168.1.10'

# ID of the program you want to run
PROGRAM_ID = 1

session_info = Session_pb2.CreateSessionInfo()
session_info.username = "admin"
session_info.password = "admin"
session_info.session_inactivity_timeout = 60000   # (milliseconds)
session_info.connection_inactivity_timeout = 2000 # (milliseconds)
error_callback = lambda kException: print("_________ callback error _________ {}".format(kException))

# Create objects used to initialize communication with the controller.
transport = MqttTransport()
router = RouterClient(transport, error_callback)
transport.connect(CONTROLLER_ADDRESS, MQTT_PORT)

# Create session
print("Creating session for communication")
session_client = SessionClient(router)
session_client.CreateSession(session_info)
print("Session created")

# Call some RPC which requires a session
base = BaseClient(router)
operating_mode = Common_pb2.ModeSelection()
operating_mode.operating_mode = Common_pb2.OPERATING_MODE_HOLD_TO_RUN

try:
    base.SelectOperatingMode(operating_mode)

except KClientException as ex:

    # Get error and sub error codes
    error_code = ex.get_error_code()
    sub_error_code = ex.get_error_sub_code()
    print("Error while selecting the operating mode.")
    print("Error_code:{0} , Sub_error_code:{1} ".format(error_code, sub_error_code))
    sys.exit()


program_runner = ProgramRunnerClient(router)
program_handle = Common_pb2.ProgramHandle()
program_runnable = ProgramRunner_pb2.RunnableHandle()
program_config = ProgramRunner_pb2.ProgramStartConfiguration()

program_handle.identifier = PROGRAM_ID
program_runnable.program_handle = program_handle
program_config.handle = program_runnable

try:
    program_runner.Start(program_config)

except KClientException as ex:

    # Get error and sub error codes
    error_code = ex.get_error_code()
    sub_error_code = ex.get_error_sub_code()
    print("Error while starting the program.")
    print("Error_code:{0} , Sub_error_code:{1} ".format(error_code, sub_error_code))
    sys.exit()



# TEARDOWN

# Close API session
session_client.CloseSession()

# Disconnect from the transport object
transport.disconnect()