使用 Kafka-Connector 从 Python-Opc Ua-Client 获取数据到 Kafka-topic 的可能性
Possiblity using Kafka-Connector get Data from Python-Opcua-Client to Kafka-topic
有一个python opcua client
。
datachange_notification(self, node, val, data)
将作为 python opcua client
的一部分执行,每当 client
到达新数据时,请参阅下面的脚本,是否可以发送到达的数据(在此上下文中是 val
) 到 kafka-topic.
def datachange_notification(self, node, val, data):
saveData = save_file.Savefile()
#save data locally when data arrived according to the nodeid=2
if str(node) == "Node(NumericNodeId(ns=2;i=2))":
if val != None and val != 0:
print("Python: New data change event", node, val)
saveData.saveBlockByClient(val)
#save data locally when data arrived according to the nodeid=3
if str(node) == "Node(NumericNodeId(ns=2;i=3))":
if val != None and val != 0:
print("Python: New data change event", node, val)
saveData.saveSourceByValidator(str(val))
可能吗?当然。
导入一个Kafka库并新建一个Producer实例然后发送消息
如果您使用评论中提到的 AsyncIO 客户端,我建议研究 aiokafka python 库
请尝试是否适用于kafka-python package-
from kafka import KafkaProducer
class KafkaOpcUaSubHandler(object):
def __init__(self):
self.msg={}
def connect_kafka(self, server, port):
kafka_producer = KafkaProducer(
bootstrap_server ='{}:{}'.format(server, port),
retries=0, batch_size=0, compression_type=None
)
def kafka_message_producer(self, kafka_topic, msg):
kafka_producer = self.connect_kafka(kafka_server, kafka_port)
your_kafka_message = kafka_producer
.send(kafka_topic, json.dumps(msg, default=str)
.encode('utf-8'))
kafka_producer.flush()
def datachange_notification(self, node, val, data):
self.your_function(node, val, self.msg, data)
def your_function(self, node, val, msg, data):
payload = []
payload = [val, str(data.monitored_item.Value.SourceTimestamp)]
self.kafka_message_producer(kafka_topic, payload)
有一个python opcua client
。
datachange_notification(self, node, val, data)
将作为 python opcua client
的一部分执行,每当 client
到达新数据时,请参阅下面的脚本,是否可以发送到达的数据(在此上下文中是 val
) 到 kafka-topic.
def datachange_notification(self, node, val, data):
saveData = save_file.Savefile()
#save data locally when data arrived according to the nodeid=2
if str(node) == "Node(NumericNodeId(ns=2;i=2))":
if val != None and val != 0:
print("Python: New data change event", node, val)
saveData.saveBlockByClient(val)
#save data locally when data arrived according to the nodeid=3
if str(node) == "Node(NumericNodeId(ns=2;i=3))":
if val != None and val != 0:
print("Python: New data change event", node, val)
saveData.saveSourceByValidator(str(val))
可能吗?当然。
导入一个Kafka库并新建一个Producer实例然后发送消息
如果您使用评论中提到的 AsyncIO 客户端,我建议研究 aiokafka python 库
请尝试是否适用于kafka-python package-
from kafka import KafkaProducer
class KafkaOpcUaSubHandler(object):
def __init__(self):
self.msg={}
def connect_kafka(self, server, port):
kafka_producer = KafkaProducer(
bootstrap_server ='{}:{}'.format(server, port),
retries=0, batch_size=0, compression_type=None
)
def kafka_message_producer(self, kafka_topic, msg):
kafka_producer = self.connect_kafka(kafka_server, kafka_port)
your_kafka_message = kafka_producer
.send(kafka_topic, json.dumps(msg, default=str)
.encode('utf-8'))
kafka_producer.flush()
def datachange_notification(self, node, val, data):
self.your_function(node, val, self.msg, data)
def your_function(self, node, val, msg, data):
payload = []
payload = [val, str(data.monitored_item.Value.SourceTimestamp)]
self.kafka_message_producer(kafka_topic, payload)