如何使用 Druid-Tranquility(用于 Superset)读取 divolte-data Kafka 通道?
How to read divolte-data Kafka channel with Druid-Tranquility (for Superset)?
在 Ubuntu 服务器上,我设置了 Divolte Collector 以从网站收集点击流数据。数据被写入名为 divolte-data 的 Kafka 通道。通过设置 Kafka 消费者,我可以看到传入的数据:
V0:j2ive5p1:QHQbOuiuZFozAVQfKqNWJoNstJhEZE85V0:j2pz3aw7:sDHKs71nHrTB5b_1TkKvWWtQ_rZDrvc2D0:B4aEGBSVgTXgxqB85aj4dGeoFjCqpeEGbannerClickMozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36ChromiumChromium8Google Inc. and contributorsBrowser58.0.3029.96"Personal computer
LinuxCanonical Ltd.
然后我想用 Airbnb Superset 可视化数据,它有几个连接到常见数据库的连接器,包括 druid.io(可以读取 Spark)。
Divolte 似乎以非结构化方式将数据存储在 Kafka 中。但显然它可以以结构化的方式映射数据。输入数据是否应该在 JSON 中构建(如文档所述)?
然后如何从Druid-Tranquility读取divolte-data Kafka通道接收到的数据?我尝试在 conf 示例中更改频道名称,但此消费者随后收到零消息。
我找到的解决方案是我可以在 Python 中处理 Kafka 消息,例如使用 Kafka Python 库或 Confluent Kafka Python,然后我将解码消息Avro 读者。
编辑:这是我如何做的更新:
我以为Avro库只是读取Avro文件,但它实际上解决了解码Kafka消息的问题,如下:我首先导入库并将模式文件作为参数,然后创建一个函数来解码将消息放入字典中,我可以在消费者循环中使用它。
from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema
schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
c = Consumer()
c.subscribe(topic)
running = True
while running:
msg = c.poll()
if not msg.error():
msg_value = msg.value()
event_dict = decode(msg_value)
print(event_dict)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
在 Ubuntu 服务器上,我设置了 Divolte Collector 以从网站收集点击流数据。数据被写入名为 divolte-data 的 Kafka 通道。通过设置 Kafka 消费者,我可以看到传入的数据:
V0:j2ive5p1:QHQbOuiuZFozAVQfKqNWJoNstJhEZE85V0:j2pz3aw7:sDHKs71nHrTB5b_1TkKvWWtQ_rZDrvc2D0:B4aEGBSVgTXgxqB85aj4dGeoFjCqpeEGbannerClickMozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/58.0.3029.96 Chrome/58.0.3029.96 Safari/537.36ChromiumChromium8Google Inc. and contributorsBrowser58.0.3029.96"Personal computer
LinuxCanonical Ltd.
然后我想用 Airbnb Superset 可视化数据,它有几个连接到常见数据库的连接器,包括 druid.io(可以读取 Spark)。
Divolte 似乎以非结构化方式将数据存储在 Kafka 中。但显然它可以以结构化的方式映射数据。输入数据是否应该在 JSON 中构建(如文档所述)?
然后如何从Druid-Tranquility读取divolte-data Kafka通道接收到的数据?我尝试在 conf 示例中更改频道名称,但此消费者随后收到零消息。
我找到的解决方案是我可以在 Python 中处理 Kafka 消息,例如使用 Kafka Python 库或 Confluent Kafka Python,然后我将解码消息Avro 读者。
编辑:这是我如何做的更新:
我以为Avro库只是读取Avro文件,但它实际上解决了解码Kafka消息的问题,如下:我首先导入库并将模式文件作为参数,然后创建一个函数来解码将消息放入字典中,我可以在消费者循环中使用它。
from confluent_kafka import Consumer, KafkaError
from avro.io import DatumReader, BinaryDecoder
import avro.schema
schema = avro.schema.Parse(open("data_sources/EventRecord.avsc").read())
reader = DatumReader(schema)
def decode(msg_value):
message_bytes = io.BytesIO(msg_value)
decoder = BinaryDecoder(message_bytes)
event_dict = reader.read(decoder)
return event_dict
c = Consumer()
c.subscribe(topic)
running = True
while running:
msg = c.poll()
if not msg.error():
msg_value = msg.value()
event_dict = decode(msg_value)
print(event_dict)
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False