如何并行写入 Apache Flink 中的接收器
How to parallel write to sinks in Apache Flink
我有一个并行度为 8 的地图 DataStream。我向 DataStream 添加了两个接收器。一种速度慢(Elasticsearch),另一种速度快(HDFS)。但是,我的事件仅在刷新到 ES 后才写入 HDFS,因此使用 ES 比 w/o ES 花费的时间长得多。
dataStream.setParallelism(8);
dataStream.addSink(elasticsearchSink);
dataStream.addSink(hdfsSink);
在我看来,两个接收器使用相同的线程。是否可以通过使用具有两个接收器的相同源,或者我是否必须添加另一项工作,一个用于 earsink,并行写入输出?
我检查了 Map(1/8) 到 Map(8/8) 正在部署和接收数据的日志。
如果 Elasticsearch 接收器跟不上其输入产生的速度,它会减慢其输入运算符。这个概念称为背压,这意味着慢速消费者会阻止快速生产者进行处理。
让您的程序按预期运行(HDFS 接收器写入速度比 Elasticsearch 接收器更快)的唯一方法是缓冲 HDFS 接收器写入但 Elasticsearch 接收器尚未写入的所有记录。如果 Elasticsearch sink 一直较慢,您将 运行 内存/磁盘 space 在某个时间点不足。
Flink 解决慢消费者问题的方法是背压。
我看到了两种解决此问题的方法:
- 增加 ElasticsearchSink 的并行度。这可能有帮助或无帮助,具体取决于您的 Elasticsearch 设置的功能。
- 运行 两个作业作为独立的管道。在这种情况下,您必须计算所有结果两次。
我有一个并行度为 8 的地图 DataStream。我向 DataStream 添加了两个接收器。一种速度慢(Elasticsearch),另一种速度快(HDFS)。但是,我的事件仅在刷新到 ES 后才写入 HDFS,因此使用 ES 比 w/o ES 花费的时间长得多。
dataStream.setParallelism(8);
dataStream.addSink(elasticsearchSink);
dataStream.addSink(hdfsSink);
在我看来,两个接收器使用相同的线程。是否可以通过使用具有两个接收器的相同源,或者我是否必须添加另一项工作,一个用于 earsink,并行写入输出?
我检查了 Map(1/8) 到 Map(8/8) 正在部署和接收数据的日志。
如果 Elasticsearch 接收器跟不上其输入产生的速度,它会减慢其输入运算符。这个概念称为背压,这意味着慢速消费者会阻止快速生产者进行处理。
让您的程序按预期运行(HDFS 接收器写入速度比 Elasticsearch 接收器更快)的唯一方法是缓冲 HDFS 接收器写入但 Elasticsearch 接收器尚未写入的所有记录。如果 Elasticsearch sink 一直较慢,您将 运行 内存/磁盘 space 在某个时间点不足。
Flink 解决慢消费者问题的方法是背压。
我看到了两种解决此问题的方法:
- 增加 ElasticsearchSink 的并行度。这可能有帮助或无帮助,具体取决于您的 Elasticsearch 设置的功能。
- 运行 两个作业作为独立的管道。在这种情况下,您必须计算所有结果两次。