使用 Kafka-Connector 从 Python-Opc Ua-Client 获取数据到 Kafka-topic 的可能性

Possiblity using Kafka-Connector get Data from Python-Opcua-Client to Kafka-topic

有一个python opcua clientdatachange_notification(self, node, val, data) 将作为 python opcua client 的一部分执行,每当 client 到达新数据时,请参阅下面的脚本,是否可以发送到达的数据(在此上下文中是 val) 到 kafka-topic.

    def datachange_notification(self, node, val, data):
        saveData = save_file.Savefile()
        #save data locally when data arrived according to the nodeid=2
        if str(node) == "Node(NumericNodeId(ns=2;i=2))":
            if val != None and val != 0:
                print("Python: New data change event", node, val)
                saveData.saveBlockByClient(val)
        #save data locally when data arrived according to the nodeid=3
        if str(node) == "Node(NumericNodeId(ns=2;i=3))":
            if val != None and val != 0:
                print("Python: New data change event", node, val)
                saveData.saveSourceByValidator(str(val))

可能吗?当然。

导入一个Kafka库并新建一个Producer实例然后发送消息

如果您使用评论中提到的 AsyncIO 客户端,我建议研究 aiokafka python 库

请尝试是否适用于kafka-python package-

from kafka import KafkaProducer

class KafkaOpcUaSubHandler(object): 
    def __init__(self):
        self.msg={}

    def connect_kafka(self, server, port):
        kafka_producer = KafkaProducer(
                bootstrap_server ='{}:{}'.format(server, port),
                retries=0, batch_size=0, compression_type=None
            )


    def kafka_message_producer(self, kafka_topic, msg):
        kafka_producer = self.connect_kafka(kafka_server, kafka_port)
        your_kafka_message = kafka_producer
                             .send(kafka_topic, json.dumps(msg, default=str)
                             .encode('utf-8'))
        kafka_producer.flush()


    def datachange_notification(self, node, val, data):
       self.your_function(node, val, self.msg, data)

    def your_function(self, node, val, msg, data):
        payload = []
        payload = [val, str(data.monitored_item.Value.SourceTimestamp)]
        self.kafka_message_producer(kafka_topic, payload)