TypeError("Event data is not compatible with JSON type: {}".format(e))
TypeError("Event data is not compatible with JSON type: {}".format(e))
我正在尝试向事件中心发送和接收 json 数据。但是我收到错误消息:接收数据时事件数据与 JSON 类型不兼容。这是我的发送和接收代码。
#Event_Hub_send.py
import sys
import logging
import datetime
import time
import os
from azure.eventhub import EventHubClient, Sender, EventData
logger = logging.getLogger("azure")
ADDRESS = "amqps://myns.servicebus.windows.net/myeventhub"
USER = "RootManageSharedAccessKey"
KEY = "mykey"
try:
if not ADDRESS:
raise ValueError("No EventHubs URL supplied.")
# Create Event Hubs client
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
sender = client.add_sender(partition="0")
client.run()
try:
employee = [
{ "name": "sankar", "age": 28 },
{ "name": "Madhan", "age": 21 },
{ "name": "Vishwa", "age": 32 }
]
start_time = time.time()
for i in employee:
print("Sending message: {}".format(i))
sender.send(EventData(i))
except:
raise
finally:
end_time = time.time()
client.stop()
run_time = end_time - start_time
logger.info("Runtime: {} seconds".format(run_time))
except KeyboardInterrupt:
pass
发送时我没有收到任何错误。
#Event_Hub_Receive.py
import os
import sys
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset
import json
logger = logging.getLogger("azure")
ADDRESS = "amqps://myns.servicebus.windows.net/myeventhub"
USER = "RootManageSharedAccessKey"
KEY = "mykey"
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"
total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
receiver = client.add_receiver(
CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
client.run()
start_time = time.time()
for event_data in receiver.receive(timeout=100):
data = event_data.body_as_json()
print(data)
end_time = time.time()
client.stop()
run_time = end_time - start_time
print("Received {} messages in {} seconds".format(total, run_time))
except KeyboardInterrupt:
pass
finally:
client.stop()
我试图以 json 的形式接收数据。但抛出事件数据与 JSON 类型不兼容:无法解码 JSON 对象。
raise TypeError("Event data is not compatible with JSON type: {}".format(e))
TypeError: Event data is not compatible with JSON type: No JSON object could be decoded
所以我尝试将方法更改为 event_data.body_as_str()
。但我收到了以下回复:
agename
agename
Received 0 messages in 0.263074874878 seconds
有人可以建议吗
尝试这样做:
import json # add this
for i in employee:
print("Sending message: {}".format(i))
sender.send(EventData(json.dumps(i)))
i
似乎是 dict
类型,如果是这样的话,上面的方法应该有效。
我正在尝试向事件中心发送和接收 json 数据。但是我收到错误消息:接收数据时事件数据与 JSON 类型不兼容。这是我的发送和接收代码。
#Event_Hub_send.py
import sys
import logging
import datetime
import time
import os
from azure.eventhub import EventHubClient, Sender, EventData
logger = logging.getLogger("azure")
ADDRESS = "amqps://myns.servicebus.windows.net/myeventhub"
USER = "RootManageSharedAccessKey"
KEY = "mykey"
try:
if not ADDRESS:
raise ValueError("No EventHubs URL supplied.")
# Create Event Hubs client
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
sender = client.add_sender(partition="0")
client.run()
try:
employee = [
{ "name": "sankar", "age": 28 },
{ "name": "Madhan", "age": 21 },
{ "name": "Vishwa", "age": 32 }
]
start_time = time.time()
for i in employee:
print("Sending message: {}".format(i))
sender.send(EventData(i))
except:
raise
finally:
end_time = time.time()
client.stop()
run_time = end_time - start_time
logger.info("Runtime: {} seconds".format(run_time))
except KeyboardInterrupt:
pass
发送时我没有收到任何错误。
#Event_Hub_Receive.py
import os
import sys
import logging
import time
from azure.eventhub import EventHubClient, Receiver, Offset
import json
logger = logging.getLogger("azure")
ADDRESS = "amqps://myns.servicebus.windows.net/myeventhub"
USER = "RootManageSharedAccessKey"
KEY = "mykey"
CONSUMER_GROUP = "$default"
OFFSET = Offset("-1")
PARTITION = "0"
total = 0
last_sn = -1
last_offset = "-1"
client = EventHubClient(ADDRESS, debug=False, username=USER, password=KEY)
try:
receiver = client.add_receiver(
CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET)
client.run()
start_time = time.time()
for event_data in receiver.receive(timeout=100):
data = event_data.body_as_json()
print(data)
end_time = time.time()
client.stop()
run_time = end_time - start_time
print("Received {} messages in {} seconds".format(total, run_time))
except KeyboardInterrupt:
pass
finally:
client.stop()
我试图以 json 的形式接收数据。但抛出事件数据与 JSON 类型不兼容:无法解码 JSON 对象。
raise TypeError("Event data is not compatible with JSON type: {}".format(e))
TypeError: Event data is not compatible with JSON type: No JSON object could be decoded
所以我尝试将方法更改为 event_data.body_as_str()
。但我收到了以下回复:
agename
agename
Received 0 messages in 0.263074874878 seconds
有人可以建议吗
尝试这样做:
import json # add this
for i in employee:
print("Sending message: {}".format(i))
sender.send(EventData(json.dumps(i)))
i
似乎是 dict
类型,如果是这样的话,上面的方法应该有效。