Apache Beam 组合分组值
Apache Beam Combine grouped values
我正在尝试找到一种方法来重新排序我的 Kafka 消息,并使用 Apache Beam 结合 [ 将有序消息发送到新主题=46=]数据流.
我有 Kafka 发布者发送以下格式的字符串消息:
{system_timestamp}-{event_name}?{parameters}
例如:
1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3
我想做的是根据消息的 {system-timestamp} 部分并在 5 秒内 window 重新排序事件,因为我们的发布者没有'保证消息将按照 {system-timestamp} 值发送。
我编写了一个模拟排序器函数,用于对从 Kafka 接收到的事件进行排序(使用 KafkaIO 源):
static class SortEventsFunc extends DoFn<KV<String, Iterable<String>>, KV<String, Iterable<String>>> {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, Iterable<String>> element = c.element();
System.out.println("");
System.out.print("key: " + element.getKey() + ";");
Iterator<String> it = element.getValue().iterator();
List<String> list = new ArrayList<>();
while (it.hasNext()) {
String val = it.next();
System.out.print("value: " + val);
list.add(val);
}
Collections.sort(list, Comparator.naturalOrder());
c.output(KV.of(element.getKey(), list));
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
DirectOptions directOptions = options.as(DirectOptions.class);
directOptions.setRunner(DirectRunner.class);
// Create the Pipeline object with the options we defined above.
Pipeline pipeline = Pipeline.create(options);
pipeline
// read from Kafka
.apply(KafkaIO.<String,String>read()
.withBootstrapServers("localhost:9092")
.withTopics(new ArrayList<>((Arrays.asList("events"))))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
// apply window
.apply(Window.<KV<String,String>>into(
FixedWindows.of(Duration.standardSeconds(5L))))
// group by key before sorting
.apply(GroupByKey.<String, String>create()) // return PCollection<KV<String, Iterable<String>>
// sort events
.apply(ParDo.of(new SortEventsFunc()))
//combine KV<String, Iterable<String>> input to KafkaIO acceptable KV<String, String> format
.apply(Combine.perKey()) //:TODO somehow convert KV<String, Iterable<String>> to KV<String, String>
// write ordered events to Kafka
.apply(KafkaIO.<String, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("events-sorted")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
);
pipeline.run();
}
所以我使用 GroupByKey.<String, String>create()
转换对消息进行了分组,在 sortrin 事件之后,我需要以某种方式将它们从 KV<String, Iterable<String>>
转换为 KafkaIO 接受的 KV<String, String> or KV<Void, String>
值。
所以我想做的就是忽略通过分组转换键创建的,简单地
将每个值作为单独的消息传递给 KafkaIO writer。
我探索了 Combine#perKey
转换,但它接受 SerializableFunction,它只能将所有值组合成一个字符串。(带有一些分隔符),因此我只传递了一个值作为一个连接的字符串而不是每个值(由 KafkaIO#read()
读取)到 KafkaIO 编写器。
其实很简单!
这里的技巧是,您可以在 @ProcessElement
方法中多次调用 c.output
。
因此,在您的情况下,只需定义一个 DoFn<KV<String, Iterable<String>>, KV<String, String>>
,遍历 c.element().getValue()
集合,然后为每个集合调用 c.output
。
我正在尝试找到一种方法来重新排序我的 Kafka 消息,并使用 Apache Beam 结合 [ 将有序消息发送到新主题=46=]数据流.
我有 Kafka 发布者发送以下格式的字符串消息:
{system_timestamp}-{event_name}?{parameters}
例如:
1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3
我想做的是根据消息的 {system-timestamp} 部分并在 5 秒内 window 重新排序事件,因为我们的发布者没有'保证消息将按照 {system-timestamp} 值发送。
我编写了一个模拟排序器函数,用于对从 Kafka 接收到的事件进行排序(使用 KafkaIO 源):
static class SortEventsFunc extends DoFn<KV<String, Iterable<String>>, KV<String, Iterable<String>>> {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, Iterable<String>> element = c.element();
System.out.println("");
System.out.print("key: " + element.getKey() + ";");
Iterator<String> it = element.getValue().iterator();
List<String> list = new ArrayList<>();
while (it.hasNext()) {
String val = it.next();
System.out.print("value: " + val);
list.add(val);
}
Collections.sort(list, Comparator.naturalOrder());
c.output(KV.of(element.getKey(), list));
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
DirectOptions directOptions = options.as(DirectOptions.class);
directOptions.setRunner(DirectRunner.class);
// Create the Pipeline object with the options we defined above.
Pipeline pipeline = Pipeline.create(options);
pipeline
// read from Kafka
.apply(KafkaIO.<String,String>read()
.withBootstrapServers("localhost:9092")
.withTopics(new ArrayList<>((Arrays.asList("events"))))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
// apply window
.apply(Window.<KV<String,String>>into(
FixedWindows.of(Duration.standardSeconds(5L))))
// group by key before sorting
.apply(GroupByKey.<String, String>create()) // return PCollection<KV<String, Iterable<String>>
// sort events
.apply(ParDo.of(new SortEventsFunc()))
//combine KV<String, Iterable<String>> input to KafkaIO acceptable KV<String, String> format
.apply(Combine.perKey()) //:TODO somehow convert KV<String, Iterable<String>> to KV<String, String>
// write ordered events to Kafka
.apply(KafkaIO.<String, String>write()
.withBootstrapServers("localhost:9092")
.withTopic("events-sorted")
.withKeySerializer(StringSerializer.class)
.withValueSerializer(StringSerializer.class)
);
pipeline.run();
}
所以我使用 GroupByKey.<String, String>create()
转换对消息进行了分组,在 sortrin 事件之后,我需要以某种方式将它们从 KV<String, Iterable<String>>
转换为 KafkaIO 接受的 KV<String, String> or KV<Void, String>
值。
所以我想做的就是忽略通过分组转换键创建的,简单地
将每个值作为单独的消息传递给 KafkaIO writer。
我探索了 Combine#perKey
转换,但它接受 SerializableFunction,它只能将所有值组合成一个字符串。(带有一些分隔符),因此我只传递了一个值作为一个连接的字符串而不是每个值(由 KafkaIO#read()
读取)到 KafkaIO 编写器。
其实很简单!
这里的技巧是,您可以在 @ProcessElement
方法中多次调用 c.output
。
因此,在您的情况下,只需定义一个 DoFn<KV<String, Iterable<String>>, KV<String, String>>
,遍历 c.element().getValue()
集合,然后为每个集合调用 c.output
。