Update 3D Visualization programmatically

Thank you again for your help and replies!

For privacy reasons, I am not allowed to share the whole repository. I am sorry :frowning:
However, I can provide a piece of code that could serve as a guideline for anyone interested. The OpcuaCommProxy is just a wrapper for using asyncua OPCUA connections with less verbosity.

The code is not clean, not optimum and it is also not efficient because this was created for a prototype, so feel free to criticize it as it has a lot of room to improve :slight_smile:

#
# Description: Creates a OPCUA client that gets messages from a given OPCUA host, translates them to the IGTLink protocol and sends them to Slicer.
# via a IGTLink server.
#
import sys
import argparse
from time import sleep
import socket

import numpy as np
import pyigtl
from asyncua import Node

###################################
from opcua_robot_comm.opcuacommproxy import OpcuaCommProxy ##(I CANNOT SHARE THIS)
###################################

ras2lps = np.identity(4)
ras2lps[0, 0] = -1.0
ras2lps[1, 1] = -1.0
lps2ras = np.linalg.inv(ras2lps)
# Hardcoded registration matrix from A to B
lps_a_t_b = np.array(#some 4x4 homogeneous matrix)
ras_a_t_b = ras2lps @ lps_a_t_b @ lps2ras


# Define SubscriptionHandler
class SubscriptionHandler:
    """
    The SubscriptionHandler is used to handle the data that is received for the subscription.
    """

    val = None

    def __init__(self, igtl_server):
        self.igtl_server = igtl_server

    def datachange_notification(self, node: Node, val, data):
        """
        Callback for asyncua Subscription.
        This method will be called when the Client received a data change message from the Server.
        """
        self.val = val
        # Send transform matrix
        self._update_tool_transform()

    def _updateg_tool_transform(self):
        """
        Sends a tool transform to the IGTLink server.
        """
        # Send transform matrix
        try:
            mat = np.array(self.val)
            mat = np.reshape(mat, (4, 4))
            self.mat = mat
        except KeyboardInterrupt:
            print("Program terminated by User")
            sys.exit()


if __name__ == "__main__":
    # Define arguments for the OPCUA-IGTL bridge
    argparser = argparse.ArgumentParser()
    default_opcua_address = socket.gethostbyname(socket.gethostname())
    default_igtl_address = "127.0.0.1"
    argparser.add_argument("--opcua_host", default=default_opcua_address, help="OPCUA Host to connect to")
    argparser.add_argument("--opcua_port", default=4840, help="OPCUA Port to connect to")

    argparser.add_argument("--igtl_port", default=18944, help="IGTL Port to connect to")

    # Parse arguments and organize them
    args = argparser.parse_args()
    opcua_host = args.opcua_host
    opcua_port = args.opcua_port
    igtl_port = args.igtl_port

    # Create the OPCUA Client
    opcua_client = OpcuaCommProxy("opc.tcp://" + opcua_host + ":" + str(opcua_port))

    # Create the IGTLink server and connect to the client
    igtl_server = pyigtl.OpenIGTLinkServer(
        igtl_port,
        local_server=True,
    )

    print(f"Started IGTL server at: {igtl_server.host}:{igtl_server.port}")
    sig = True

    # Wait for IGTL client to connect to our server
    while not igtl_server.is_connected():
        sleep(0.1)

    # Connect to the OPCUA server once the IGTL client is connected
    opcua_client.connect()
    subs_handler = SubscriptionHandler(igtl_server=igtl_server)
    opcua_client.subscribe_to_var("ns=YourNameSpaceNumber;s=YourOPCUAVariableName", subs_handler)

    print("[IGTL Translator] Server connected to client")

    # Keep running as long as the server is connected or the user interrupts the program with Ctrl+C
    while sig:
        try:
            if igtl_server.is_connected():
                try:
                    mat = subs_handler.mat
                except AttributeError:
                    mat = np.eye(4)

                # Transform LPS to RAS
                mat = np.linalg.inv(lps2ras @ ras_a_t_b) @ mat

                # Send tool transform
                # This is not in the datachange notification to prevent too many events from being sent in the OPCUA
                # event loop. We send it every 0.1 seconds instead.
                transform_message = pyigtl.TransformMessage(mat, device_name=r"tool_transform") # The device name has a limit of 20 characters, be careful with this, or you will receive cut names at the 20th character.
                # print(f"Sending transform: {mat}")
                igtl_server.send_message(transform_message, wait=False)
                print(f"Sent message {mat}")
                sleep(0.1)

            else:
                sig = False
                print("Connection interrupted")
        except KeyboardInterrupt:
            sig = False
            print("Program terminated by User")

    print("Terminated")
    # Force program termination
    sys.exit()
1 Like

Thank you, this script can be very useful for others who want to interface with OPC UA components in Slicer.