使用内存数据(字典、哈希图、数据框)加入和丰富 Kafka 主题?
Joining and enriching Kafka topic with in memory data (Dictionary, Hashmap, Dataframe)?
假设我们有以下
- 一个 Kafka 主题 X 表示实体 X 上发生的事件。
- 实体 X 包含另一个实体 Y 的外键
我想用来自 Kafka 外部的数据(即来自包含所有实体 Y 的 CSV 文件)丰富这个主题 X。
我现在的解决方案如下:
- 以类似字典的结构将 CSV 加载到内存中,以非常快速地进行基于键的查找。
- 从主题X开始消费,丰富内存中的数据,然后将丰富的记录写回新的Kafka主题。
我仍在评估 Kafka 流或 Ksql 是否可以为我做同样的事情,
我的问题是有没有一种有效的方法可以在不损失性能的情况下使用 Kafka 流库或 KSQL 执行此操作?
当然,你可以这样做
final Map m = new Hashmap();
builder.stream(topic).mapValues(v -> m.get(v)).to(out);
但是 Kafka Streams 理想情况下是分布式的,因此您的 CSV 需要在多台机器上同步。
与其构建地图,不如通过 KTable 和 use Kafka Connect Spooldir connector to load the CSV to a topic 使用 KeyValueStore(这也可以在内存中,但使用 RocksDB 更容错)
,从中构建一个 table,然后仅加入主题
假设我们有以下
- 一个 Kafka 主题 X 表示实体 X 上发生的事件。
- 实体 X 包含另一个实体 Y 的外键
我想用来自 Kafka 外部的数据(即来自包含所有实体 Y 的 CSV 文件)丰富这个主题 X。
我现在的解决方案如下:
- 以类似字典的结构将 CSV 加载到内存中,以非常快速地进行基于键的查找。
- 从主题X开始消费,丰富内存中的数据,然后将丰富的记录写回新的Kafka主题。
我仍在评估 Kafka 流或 Ksql 是否可以为我做同样的事情,
我的问题是有没有一种有效的方法可以在不损失性能的情况下使用 Kafka 流库或 KSQL 执行此操作?
当然,你可以这样做
final Map m = new Hashmap();
builder.stream(topic).mapValues(v -> m.get(v)).to(out);
但是 Kafka Streams 理想情况下是分布式的,因此您的 CSV 需要在多台机器上同步。
与其构建地图,不如通过 KTable 和 use Kafka Connect Spooldir connector to load the CSV to a topic 使用 KeyValueStore(这也可以在内存中,但使用 RocksDB 更容错) ,从中构建一个 table,然后仅加入主题