如何将事件从 siddhi 事件模拟器发送到 android siddhi 应用程序
How to send events from siddhi event simulator to android siddhi app
我在 Android 上有一个 siddhi cep 应用程序 运行。现在我想通过套接字连接将事件模拟器中的事件从流处理编辑器发送到 android 应用程序。到目前为止,我已经成功地制作了 android 服务器套接字,它监听了我制作的 python 客户端模拟器。但是为了简化这个过程,我是否可以使用事件模拟器将事件发送到 android siddhi 应用程序?
我想知道我是否可以更改一些配置,以便事件模拟器将事件发送到 android 套接字,所以我查看了 deployment.yaml
文件中的设置
但是发送配置是为HTTP
定义的
senderConfigurations:
-
id: "http-sender"
# Configuration used for the databridge communication
databridge.config:
# No of worker threads to consume events
# THIS IS A MANDATORY FIELD
workerThreads: 10
# Maximum amount of messages that can be queued internally in MB
# THIS IS A MANDATORY FIELD
maxEventBufferCapacity: 10000000
# Queue size; the maximum number of events that can be stored in the queue
# THIS IS A MANDATORY FIELD
eventBufferSize: 2000
# Keystore file path
# THIS IS A MANDATORY FIELD
keyStoreLocation : ${sys:carbon.home}/resources/security/wso2carbon.jks
# Keystore password
# THIS IS A MANDATORY FIELD
keyStorePassword : wso2carbon
# Session Timeout value in mins
# THIS IS A MANDATORY FIELD
clientTimeoutMin: 30
# Data receiver configurations
# THIS IS A MANDATO
提前致谢。如果您需要更多详细信息,请告诉我
编辑 1
我实际上找到了解决方法,但它有一些问题。所以基本上我将 event generator
的输出接收器重定向到端口,以便接收器拥有所有数据流。 Stream Processor Studio editor
的代码是
@App:name("PatternMatching")
@App:description('Identify event patterns based on the order of event arrival')
define stream RoomTemperatureStream(roomNo string, temp double);
@sink(type="tcp", url='tcp://localhost:5001/abc', sync='false', tcp.no.delay='true', keep.alive='true', worker.threads="10", @map(type='text'))
define stream RoomTemperatureAlertStream(roomNo string, initialTemp double, finalTemp double);
--Capture a pattern where the temperature of a room increases by 5 degrees within 2 minutes
@info(name='query1')
from RoomTemperatureStream
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into RoomTemperatureAlertStream;
它将流作为文本发送到python服务器,需要先启动它,其代码是
#!/usr/bin/env python
# Author : Amarjit Singh
import pickle
import socket
import pandas
from pandas import json
if __name__ == "__main__":
# ------------------ create a socket object-----------------------#
try:
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error as err:
serversocket.close()
print("socket creation failed with error %s" % (err))
except KeyboardInterrupt:
serversocket.close()
print("KeyboardInterrupt - but server socket was closed ")
host = "127.0.0.1"
Server_port = 5001
# ------------------ binding to the port -----------------------#
try:
serversocket.bind((host, Server_port))
serversocket.listen(50) # queue up to 5 requests
print("\n Server has started and waiting for connection request ..... ")
# bind to the port
while True: # extra while is created so that server runs even if there is no data
running = True
clientsocket, addr = serversocket.accept() # accept a connection from client
print("Got a connection from Server%s" % str(addr)) # show connection success message
while running:
receivedData = clientsocket.recv(2048)
# json = receivedData
if receivedData:
print(receivedData)
print(receivedData[0])
print(receivedData[1])
print(receivedData[2])
# roomNo = str(receivedData[0])
# temp = int(client_tuple[1]) # from unicode to int
#
# print(" roomNo = %d: UUID = %s temp = %d" % (roomNo, temp))
except socket.error as err:
serversocket.close()
print("socket creation failed with error %s" % (err))
except KeyboardInterrupt:
serversocket.close()
print("KeyboardInterrupt - but server socket was closed ")
最初,我从模拟器发送 json
数据,但是 pickle.loads
和 json.loads
没有工作。但文本的问题是数据显示为
b'\x02\x00\x00\x00t\x00\x00\x003bed14d31-6697-4a74-8a3f-30dc012914ad-localhost:5001\x00\x00\x00\x03abc\x00\x00\x002roomNo:"X0ZYp",\ninitialTemp:15.97,\nfinalTemp:17.22'
b'\x02\x00\x00\x00t\x00\x00\x003bed14d31-6697-4a74-8a3f-30dc012914ad-localhost:5001\x00\x00\x00\x03abc\x00\x00\x002roomNo:"2X951",\ninitialTemp:13.42,\nfinalTemp:10.76'
b'\x02\x00\x00\x00t\x00\x00\x003bed14d31-6697-4a74-8a3f-30dc012914ad-localhost:5001\x00\x00\x00\x03abc\x00\x00\x002roomNo:"PUaJA",\ninitialTemp:15.46,\nfinalTemp:16.26'
b'\x02\x00\x00\x00t\x00\x00\x003bed14d31-6697-4a74-8a3f-30dc012914ad-localhost:5001\x00\x00\x00\x03abc\x00\x00\x002roomNo:"pnz0i",\ninitialTemp:10.42,\nfinalTemp:15.82'
如何删除这些额外的数据?
Siddhi 有一个 WebSocket 连接器[1],它仍然是 WIP。使用此依赖项,您将能够将 WebSocket 接收器添加到您的应用程序并从中发送事件。
遗憾的是,您不能直接从流处理器[2]发送事件Studio/Editor,但如果您在流处理器编辑器中有一个应用程序运行,并且它有一个 WebSocket 接收器,那么您可以发送事件从模拟器发送到应用程序的接收器流,模拟器将通过 WebSocket 将该消息发送到 android 中的 Siddhi 应用程序。
您只能通过事件模拟器在编辑器中模拟应用程序 运行 或通过事件模拟器 API 模拟部署在流处理器工作节点中的应用程序。
我在 Android 上有一个 siddhi cep 应用程序 运行。现在我想通过套接字连接将事件模拟器中的事件从流处理编辑器发送到 android 应用程序。到目前为止,我已经成功地制作了 android 服务器套接字,它监听了我制作的 python 客户端模拟器。但是为了简化这个过程,我是否可以使用事件模拟器将事件发送到 android siddhi 应用程序?
我想知道我是否可以更改一些配置,以便事件模拟器将事件发送到 android 套接字,所以我查看了 deployment.yaml
文件中的设置
但是发送配置是为HTTP
senderConfigurations:
-
id: "http-sender"
# Configuration used for the databridge communication
databridge.config:
# No of worker threads to consume events
# THIS IS A MANDATORY FIELD
workerThreads: 10
# Maximum amount of messages that can be queued internally in MB
# THIS IS A MANDATORY FIELD
maxEventBufferCapacity: 10000000
# Queue size; the maximum number of events that can be stored in the queue
# THIS IS A MANDATORY FIELD
eventBufferSize: 2000
# Keystore file path
# THIS IS A MANDATORY FIELD
keyStoreLocation : ${sys:carbon.home}/resources/security/wso2carbon.jks
# Keystore password
# THIS IS A MANDATORY FIELD
keyStorePassword : wso2carbon
# Session Timeout value in mins
# THIS IS A MANDATORY FIELD
clientTimeoutMin: 30
# Data receiver configurations
# THIS IS A MANDATO
提前致谢。如果您需要更多详细信息,请告诉我
编辑 1
我实际上找到了解决方法,但它有一些问题。所以基本上我将 event generator
的输出接收器重定向到端口,以便接收器拥有所有数据流。 Stream Processor Studio editor
的代码是
@App:name("PatternMatching")
@App:description('Identify event patterns based on the order of event arrival')
define stream RoomTemperatureStream(roomNo string, temp double);
@sink(type="tcp", url='tcp://localhost:5001/abc', sync='false', tcp.no.delay='true', keep.alive='true', worker.threads="10", @map(type='text'))
define stream RoomTemperatureAlertStream(roomNo string, initialTemp double, finalTemp double);
--Capture a pattern where the temperature of a room increases by 5 degrees within 2 minutes
@info(name='query1')
from RoomTemperatureStream
select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
insert into RoomTemperatureAlertStream;
它将流作为文本发送到python服务器,需要先启动它,其代码是
#!/usr/bin/env python
# Author : Amarjit Singh
import pickle
import socket
import pandas
from pandas import json
if __name__ == "__main__":
# ------------------ create a socket object-----------------------#
try:
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except socket.error as err:
serversocket.close()
print("socket creation failed with error %s" % (err))
except KeyboardInterrupt:
serversocket.close()
print("KeyboardInterrupt - but server socket was closed ")
host = "127.0.0.1"
Server_port = 5001
# ------------------ binding to the port -----------------------#
try:
serversocket.bind((host, Server_port))
serversocket.listen(50) # queue up to 5 requests
print("\n Server has started and waiting for connection request ..... ")
# bind to the port
while True: # extra while is created so that server runs even if there is no data
running = True
clientsocket, addr = serversocket.accept() # accept a connection from client
print("Got a connection from Server%s" % str(addr)) # show connection success message
while running:
receivedData = clientsocket.recv(2048)
# json = receivedData
if receivedData:
print(receivedData)
print(receivedData[0])
print(receivedData[1])
print(receivedData[2])
# roomNo = str(receivedData[0])
# temp = int(client_tuple[1]) # from unicode to int
#
# print(" roomNo = %d: UUID = %s temp = %d" % (roomNo, temp))
except socket.error as err:
serversocket.close()
print("socket creation failed with error %s" % (err))
except KeyboardInterrupt:
serversocket.close()
print("KeyboardInterrupt - but server socket was closed ")
最初,我从模拟器发送 json
数据,但是 pickle.loads
和 json.loads
没有工作。但文本的问题是数据显示为
b'\x02\x00\x00\x00t\x00\x00\x003bed14d31-6697-4a74-8a3f-30dc012914ad-localhost:5001\x00\x00\x00\x03abc\x00\x00\x002roomNo:"X0ZYp",\ninitialTemp:15.97,\nfinalTemp:17.22'
b'\x02\x00\x00\x00t\x00\x00\x003bed14d31-6697-4a74-8a3f-30dc012914ad-localhost:5001\x00\x00\x00\x03abc\x00\x00\x002roomNo:"2X951",\ninitialTemp:13.42,\nfinalTemp:10.76'
b'\x02\x00\x00\x00t\x00\x00\x003bed14d31-6697-4a74-8a3f-30dc012914ad-localhost:5001\x00\x00\x00\x03abc\x00\x00\x002roomNo:"PUaJA",\ninitialTemp:15.46,\nfinalTemp:16.26'
b'\x02\x00\x00\x00t\x00\x00\x003bed14d31-6697-4a74-8a3f-30dc012914ad-localhost:5001\x00\x00\x00\x03abc\x00\x00\x002roomNo:"pnz0i",\ninitialTemp:10.42,\nfinalTemp:15.82'
如何删除这些额外的数据?
Siddhi 有一个 WebSocket 连接器[1],它仍然是 WIP。使用此依赖项,您将能够将 WebSocket 接收器添加到您的应用程序并从中发送事件。
遗憾的是,您不能直接从流处理器[2]发送事件Studio/Editor,但如果您在流处理器编辑器中有一个应用程序运行,并且它有一个 WebSocket 接收器,那么您可以发送事件从模拟器发送到应用程序的接收器流,模拟器将通过 WebSocket 将该消息发送到 android 中的 Siddhi 应用程序。
您只能通过事件模拟器在编辑器中模拟应用程序 运行 或通过事件模拟器 API 模拟部署在流处理器工作节点中的应用程序。