JSON Apache Storm 中的 Kafka spout

JSON Kafka spout in Apache Storm

我正在使用 Kafka spout 构建 Storm 拓扑。我正在以 JSON 格式从 Kafka(没有 Zookeeper)消费,Storm 应该输出它。
如何为 JSON 数据类型定义正确的模式? 目前,我有这样的代码库和基本的 spout 实现:

val cluster = new LocalCluster()
val bootstrapServers = "localhost:9092"
val topologyBuilder = new TopologyBuilder()

val spoutConfig = KafkaSpoutConfig.builder(bootstrapServers, "test").build()

topologyBuilder.setSpout("kafka_spout", new KafkaSpout(spoutConfig), 1)

val config = new Config()
cluster.submitTopology("kafkaTest", config, topologyBuilder.createTopology())

cluster.shutdown()

我是 Apache Storm 的新手,很乐意提供任何建议。

您可以做几件事:

你可以定义一个RecordTranslator。此接口允许您根据从 Kafka 读取的 ConsumerRecord 定义 spout 将如何构建元组。

默认实现如下所示:

public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");

    @Override
    public List<Object> apply(ConsumerRecord<K, V> record) {
        return new Values(record.topic(),
                record.partition(),
                record.offset(),
                record.key(),
                record.value());
    }

    @Override
    public Fields getFieldsFor(String stream) {
        return FIELDS;
    }

正如你所看到的,你会得到一个 ConsumerRecord,这是一个内置在底层 Kafka 客户端库中的类型,然后必须把它变成一个 List<Object>,这将是元组值。如果您想在发出数据之前对记录做一些复杂的事情,这就是您要做的。例如,如果你想将键、值和偏移量填充到它随后发出的数据结构中,你可以在此处执行此操作。你使用像 KafkaSpoutConfig.builder(bootstrapServers, "test").setRecordTranslator(myTranslator).build()

这样的翻译器

如果您只想将 key/value 反序列化为您自己的数据之一 classes,一个更好的选择是实现 Deserializer。这将让您定义如何反序列化从 Kafka 获得的 key/value。如果你想反序列化例如您自己的数据的价值 class,您可以使用此界面来完成。

默认 StringDeserializer 这样做:

    @Override
    public String deserialize(String topic, byte[] data) {
        try {
            if (data == null)
                return null;
            else
                return new String(data, encoding);
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + encoding);
        }
    }

创建自己的 Deserializer 后,您可以通过执行类似 KafkaSpoutConfig.builder(bootstrapServers, "test").setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, YourDeserializer.class).build() 的操作来使用它。有一个类似的消费者 属性 用于设置值反序列化器。