如何从 Flink runner 上的 Google Dataflow (Apache Beam) 向 Kafka 发送消息

How to send messages from Google Dataflow (Apache Beam) on the Flink runner to Kafka

我正在尝试编写一个概念验证,它从 Kafka 获取消息,使用 Flink 上的 Beam 转换它们,然后将结果推送到不同的 Kafka 主题。

我使用 KafkaWindowedWordCountExample 作为起点,这是我想要做的第一部分,但它输出到文本文件而不是 Kafka。 FlinkKafkaProducer08 看起来很有前途,但我不知道如何将其插入管道。我在想它会用 UnboundedFlinkSink 或类似的东西包裹起来,但似乎不存在。

对我正在尝试做的事情有什么建议或想法吗?

我是运行最新的incubator-beam(截至昨晚Github),集群模式下的Flink 1.0.0和Kafka 0.9.0.1,都在Google 计算引擎 (Debian Jessie)。

Beam 目前没有 UnboundedSink class。大多数无界接收器是使用 ParDo.

实现的

您不妨查看 KafkaIO connector。这是一个适用于所有 Beam 运行器的 Kafka reader,并实现了并行读取、检查点和其他 UnboundedSource API。该拉取请求还通过在 ParDo:

中写入 Kafka,在 TopHashtags 示例管道中包含一个粗略的接收器
class KafkaWriter extends DoFn<String, Void> {

  private final String topic;
  private final Map<String, Object> config;
  private transient KafkaProducer<String, String> producer = null;

  public KafkaWriter(Options options) {
    this.topic = options.getOutputTopic();
    this.config = ImmutableMap.<String, Object>of(
        "bootstrap.servers", options.getBootstrapServers(),
        "key.serializer",    StringSerializer.class.getName(),
        "value.serializer",  StringSerializer.class.getName());
  }

  @Override
  public void startBundle(Context c) throws Exception {
    if (producer == null) { // in Beam, startBundle might be called multiple times.
      producer = new KafkaProducer<String, String>(config);
    }
  }

  @Override
  public void finishBundle(Context c) throws Exception {
    producer.close();
  }

  @Override
  public void processElement(ProcessContext ctx) throws Exception {
    producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
  }
}

当然,我们也想在 KafkaIO 中添加接收器支持。它实际上与上面的 KafkaWriter 相同,但使用起来更简单。

用于写入 Kafka 的接收器转换已于 2016 年添加到 Apache Beam / Dataflow。有关用法示例,请参阅 Apache Beam 中 KafkaIO 的 JavaDoc。