从 Elasticsearch 读取数据到 Flink 聚合?
Reading data form Elasticsearch into Flink aggregation?
我正在尝试使用 Kafka 消息(作为 StreamSource)更新 Elasticsearch 中的文档。使用 windows 和 Elasticsearch 连接器作为接收器批量写入 Elasticsearch 很好,但是,我们需要更新文档中的现有数据并以批量性能方式读取(不是针对每个元组,而是针对例如在 byKey()
拆分后的整个 window 我们要聚合)
我们现在正在使用 Storm Trident,它在 persistentAggregate
之前执行批量读取并在之后写回更新的聚合,从而最大限度地减少与后端的交互。我在 Flink 中找不到类似的东西 - 有什么提示吗?
直播中 运行 两个 window 调用怎么样 -
window1
- 从 elasticsearch
批量读取
window2
- 批量导入 elasticsearch。
streamData
.window1(bulkRead and update/join)
.processFunction(...)
.window2(BulkPush)
- 您可以使用任何适合 bulk-read 的方法,例如
Storm Trident
。
- 在 window2 link
中使用 BulkProcessor
我正在尝试使用 Kafka 消息(作为 StreamSource)更新 Elasticsearch 中的文档。使用 windows 和 Elasticsearch 连接器作为接收器批量写入 Elasticsearch 很好,但是,我们需要更新文档中的现有数据并以批量性能方式读取(不是针对每个元组,而是针对例如在 byKey()
拆分后的整个 window 我们要聚合)
我们现在正在使用 Storm Trident,它在 persistentAggregate
之前执行批量读取并在之后写回更新的聚合,从而最大限度地减少与后端的交互。我在 Flink 中找不到类似的东西 - 有什么提示吗?
直播中 运行 两个 window 调用怎么样 -
window1
- 从 elasticsearch
window2
- 批量导入 elasticsearch。
streamData
.window1(bulkRead and update/join)
.processFunction(...)
.window2(BulkPush)
- 您可以使用任何适合 bulk-read 的方法,例如
Storm Trident
。 - 在 window2 link 中使用 BulkProcessor