Thank you again for your help and replies!
For privacy reasons, I am not allowed to share the whole repository. I am sorry
However, I can provide a piece of code that could serve as a guideline for anyone interested. The OpcuaCommProxy is just a wrapper for using asyncua OPCUA connections with less verbosity.
The code is not clean, not optimum and it is also not efficient because this was created for a prototype, so feel free to criticize it as it has a lot of room to improve
#
# Description: Creates a OPCUA client that gets messages from a given OPCUA host, translates them to the IGTLink protocol and sends them to Slicer.
# via a IGTLink server.
#
import sys
import argparse
from time import sleep
import socket
import numpy as np
import pyigtl
from asyncua import Node
###################################
from opcua_robot_comm.opcuacommproxy import OpcuaCommProxy ##(I CANNOT SHARE THIS)
###################################
ras2lps = np.identity(4)
ras2lps[0, 0] = -1.0
ras2lps[1, 1] = -1.0
lps2ras = np.linalg.inv(ras2lps)
# Hardcoded registration matrix from A to B
lps_a_t_b = np.array(#some 4x4 homogeneous matrix)
ras_a_t_b = ras2lps @ lps_a_t_b @ lps2ras
# Define SubscriptionHandler
class SubscriptionHandler:
"""
The SubscriptionHandler is used to handle the data that is received for the subscription.
"""
val = None
def __init__(self, igtl_server):
self.igtl_server = igtl_server
def datachange_notification(self, node: Node, val, data):
"""
Callback for asyncua Subscription.
This method will be called when the Client received a data change message from the Server.
"""
self.val = val
# Send transform matrix
self._update_tool_transform()
def _updateg_tool_transform(self):
"""
Sends a tool transform to the IGTLink server.
"""
# Send transform matrix
try:
mat = np.array(self.val)
mat = np.reshape(mat, (4, 4))
self.mat = mat
except KeyboardInterrupt:
print("Program terminated by User")
sys.exit()
if __name__ == "__main__":
# Define arguments for the OPCUA-IGTL bridge
argparser = argparse.ArgumentParser()
default_opcua_address = socket.gethostbyname(socket.gethostname())
default_igtl_address = "127.0.0.1"
argparser.add_argument("--opcua_host", default=default_opcua_address, help="OPCUA Host to connect to")
argparser.add_argument("--opcua_port", default=4840, help="OPCUA Port to connect to")
argparser.add_argument("--igtl_port", default=18944, help="IGTL Port to connect to")
# Parse arguments and organize them
args = argparser.parse_args()
opcua_host = args.opcua_host
opcua_port = args.opcua_port
igtl_port = args.igtl_port
# Create the OPCUA Client
opcua_client = OpcuaCommProxy("opc.tcp://" + opcua_host + ":" + str(opcua_port))
# Create the IGTLink server and connect to the client
igtl_server = pyigtl.OpenIGTLinkServer(
igtl_port,
local_server=True,
)
print(f"Started IGTL server at: {igtl_server.host}:{igtl_server.port}")
sig = True
# Wait for IGTL client to connect to our server
while not igtl_server.is_connected():
sleep(0.1)
# Connect to the OPCUA server once the IGTL client is connected
opcua_client.connect()
subs_handler = SubscriptionHandler(igtl_server=igtl_server)
opcua_client.subscribe_to_var("ns=YourNameSpaceNumber;s=YourOPCUAVariableName", subs_handler)
print("[IGTL Translator] Server connected to client")
# Keep running as long as the server is connected or the user interrupts the program with Ctrl+C
while sig:
try:
if igtl_server.is_connected():
try:
mat = subs_handler.mat
except AttributeError:
mat = np.eye(4)
# Transform LPS to RAS
mat = np.linalg.inv(lps2ras @ ras_a_t_b) @ mat
# Send tool transform
# This is not in the datachange notification to prevent too many events from being sent in the OPCUA
# event loop. We send it every 0.1 seconds instead.
transform_message = pyigtl.TransformMessage(mat, device_name=r"tool_transform") # The device name has a limit of 20 characters, be careful with this, or you will receive cut names at the 20th character.
# print(f"Sending transform: {mat}")
igtl_server.send_message(transform_message, wait=False)
print(f"Sent message {mat}")
sleep(0.1)
else:
sig = False
print("Connection interrupted")
except KeyboardInterrupt:
sig = False
print("Program terminated by User")
print("Terminated")
# Force program termination
sys.exit()