JSON Apache Storm 中的 Kafka spout
JSON Kafka spout in Apache Storm
我正在使用 Kafka spout 构建 Storm 拓扑。我正在以 JSON 格式从 Kafka(没有 Zookeeper)消费,Storm 应该输出它。
如何为 JSON 数据类型定义正确的模式?
目前,我有这样的代码库和基本的 spout 实现:
val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()
val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()
topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())
cluster.shutdown()
我是 Apache Storm 的新手,很乐意提供任何建议。
您可以做几件事:
你可以定义一个RecordTranslator
。此接口允许您根据从 Kafka 读取的 ConsumerRecord
定义 spout 将如何构建元组。
默认实现如下所示:
public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
@Override
public List<Object> apply(ConsumerRecord<K, V> record) {
return new Values(record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
}
@Override
public Fields getFieldsFor(String stream) {
return FIELDS;
}
正如你所看到的,你会得到一个 ConsumerRecord
,这是一个内置在底层 Kafka 客户端库中的类型,然后必须把它变成一个 List<Object>
,这将是元组值。如果您想在发出数据之前对记录做一些复杂的事情,这就是您要做的。例如,如果你想将键、值和偏移量填充到它随后发出的数据结构中,你可以在此处执行此操作。你使用像 KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build()
这样的翻译器
如果您只想将 key/value 反序列化为您自己的数据之一 classes,一个更好的选择是实现 Deserializer
。这将让您定义如何反序列化从 Kafka 获得的 key/value。如果你想反序列化例如您自己的数据的价值 class,您可以使用此界面来完成。
默认 StringDeserializer
这样做:
@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
创建自己的 Deserializer
后,您可以通过执行类似 KafkaSpoutConfig.builder(bootstrapServers, "test").setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, YourDeserializer.class).build()
的操作来使用它。有一个类似的消费者 属性 用于设置值反序列化器。
我正在使用 Kafka spout 构建 Storm 拓扑。我正在以 JSON 格式从 Kafka(没有 Zookeeper)消费,Storm 应该输出它。
如何为 JSON 数据类型定义正确的模式?
目前,我有这样的代码库和基本的 spout 实现:
val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()
val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()
topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)
val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())
cluster.shutdown()
我是 Apache Storm 的新手,很乐意提供任何建议。
您可以做几件事:
你可以定义一个RecordTranslator
。此接口允许您根据从 Kafka 读取的 ConsumerRecord
定义 spout 将如何构建元组。
默认实现如下所示:
public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
@Override
public List<Object> apply(ConsumerRecord<K, V> record) {
return new Values(record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
}
@Override
public Fields getFieldsFor(String stream) {
return FIELDS;
}
正如你所看到的,你会得到一个 ConsumerRecord
,这是一个内置在底层 Kafka 客户端库中的类型,然后必须把它变成一个 List<Object>
,这将是元组值。如果您想在发出数据之前对记录做一些复杂的事情,这就是您要做的。例如,如果你想将键、值和偏移量填充到它随后发出的数据结构中,你可以在此处执行此操作。你使用像 KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build()
如果您只想将 key/value 反序列化为您自己的数据之一 classes,一个更好的选择是实现 Deserializer
。这将让您定义如何反序列化从 Kafka 获得的 key/value。如果你想反序列化例如您自己的数据的价值 class,您可以使用此界面来完成。
默认 StringDeserializer
这样做:
@Override
public String deserialize(String topic, byte[] data) {
try {
if (data == null)
return null;
else
return new String(data, encoding);
} catch (UnsupportedEncodingException e) {
throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
}
}
创建自己的 Deserializer
后,您可以通过执行类似 KafkaSpoutConfig.builder(bootstrapServers, "test").setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, YourDeserializer.class).build()
的操作来使用它。有一个类似的消费者 属性 用于设置值反序列化器。