使用来自 Google Pubsub 的消息并将其发布到 Kafka

Consuming messages from Google Pubsub and publishing it to Kafka

我正在尝试使用同步 PULL API 使用 Google PubSub 消息。这在 Apache Beam Google PubSub IO 连接器库中可用。 我想使用 KafkaIO 将消费的消息写入 Kafka。我想使用 FlinkRunner 来执行作业,因为我们 运行 这个应用程序在 GCP 之外。

我面临的问题是使用的消息在 GCP PubSub 中没有得到确认。我已确认本地 Kafka 实例具有从 GCP PubSub 消费的消息。 GCP DataFlow 中的文档表明,当管道以数据接收器(在我的例子中是 Kafka)终止时,数据包将最终确定。

但是由于代码是 运行在 Apache Flink 而不是 GCP DataFlow 中,我认为与确认提交的消息相关的某种回调不会被触发。
我在这里做错了什么?

                   pipeline
                    .apply("Read  GCP PubSub Messages", PubsubIO.readStrings()
                            .fromSubscription(subscription)
                    )
                    .apply(ParseJsons.of(User.class))
                    .setCoder(SerializableCoder.of(User.class))
                    .apply("Filter-1", ParDo.of(new FilterTextFn()))
                    .apply(AsJsons.of(User.class).withMapper(new ObjectMapper()))
                    .apply("Write to Local Kafka",
                            KafkaIO.<Void,String>write()
                                    .withBootstrapServers("127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094")
                                    .withTopic("test-topic")
                                    .withValueSerializer((StringSerializer.class))
                                    .values()
                    );

在 Beam documentation on the PubSub IO class 中提到了这个:

Checkpoints are used both to ACK received messages back to Pubsub (so that they may be retired on the Pubsub end), and to NACK already consumed messages should a checkpoint need to be restored (so that Pubsub will resend those messages promptly).

ACK 没有链接到数据流,您应该对数据流有相同的行为。 ack 在检查点上发送。通常检查点是您在流上设置的 windows。

但是,你没有设置window!默认情况下, windows 是全局的,它只在最后关闭,如果你优雅地停止你的工作(甚至,我不确定这一点)。无论如何,更好的解决方案是固定 windows(例如 5 分钟)以确认每个 windows.

上的消息

我修复此解决方案的方法是使用 Guillaume Blaquiere (https://whosebug.com/users/11372593/guillaume-blaquiere) 关于查看检查点的建议。即使在管道中添加 Window.into() 函数后,源 PubSub 订阅端点也没有收到 ACK。
问题出在我没有提到检查点配置的 Flink 服务器配置中。如果没有这些参数,检查点将被禁用。

state.backend: rocksdb
state.checkpoints.dir: file:///tmp/flink-1.9.3/state/checkpoints/

这些配置应该放在 flink_home/conf/flink-conf.yaml 中。 添加这些条目并重新启动 flink 后。在 GCP pubsub 监控图表中,所有积压(未确认的消息)都变为 0。