使用 Alpakka 将 1 个输入连接到 n 个输出
Connect 1 input to n outputs with Alpakka
我正在尝试将生产者连接到消费者的一些变体,有时我需要为每条消息生成 1 条额外消息(例如,1 条到输出主题,1 条消息到不同主题) 同时保持对此的保证。
我正在考虑执行 mapConcat 并输出多个 ProducerRecord 对象,我担心在边缘情况下松散的保证,即第一条消息足以让提交发生在该偏移量上,从而导致第二条消息的潜在丢失.此外,您似乎不能只执行 .flatmap,因为您将进入图表 API,它变得更加混乱,因为一旦您合并到提交流程中,您就变得更加难以确保您不只是忽略重复的偏移量。
Consumer.committableSource(consumerSettings, Subscriptions.topics(inputTopic))
.map(msg => (msg, addLineage(msg.record.value())))
.mapConcat(input =>
if (math.random > 0.25)
List(ProducerMessage.Message(
new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
input._1.committableOffset
))
else List(ProducerMessage.Message(
new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
input._1.committableOffset
),ProducerMessage.Message(
new ProducerRecord[Array[Byte], Array[Byte]](outputTopic2, input._1.record.key(), input._2),
input._1.committableOffset
))
)
.via(Producer.flow(producerSettings))
.map(_.message.passThrough)
.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) {
(batch, elem) => batch.updated(elem)
}
.mapAsync(parallelism = 3)(_.commitScaladsl())
.runWith(Sink.ignore)
原来的一对一文档在这里:https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#connecting-producer-and-consumer
有没有人想到/解决了这个问题?
Alpakka Kafka 连接器最近引入了 flexiFlow
支持您的用例:Let one stream element produce multiple messages to Kafka
我正在尝试将生产者连接到消费者的一些变体,有时我需要为每条消息生成 1 条额外消息(例如,1 条到输出主题,1 条消息到不同主题) 同时保持对此的保证。
我正在考虑执行 mapConcat 并输出多个 ProducerRecord 对象,我担心在边缘情况下松散的保证,即第一条消息足以让提交发生在该偏移量上,从而导致第二条消息的潜在丢失.此外,您似乎不能只执行 .flatmap,因为您将进入图表 API,它变得更加混乱,因为一旦您合并到提交流程中,您就变得更加难以确保您不只是忽略重复的偏移量。
Consumer.committableSource(consumerSettings, Subscriptions.topics(inputTopic))
.map(msg => (msg, addLineage(msg.record.value())))
.mapConcat(input =>
if (math.random > 0.25)
List(ProducerMessage.Message(
new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
input._1.committableOffset
))
else List(ProducerMessage.Message(
new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
input._1.committableOffset
),ProducerMessage.Message(
new ProducerRecord[Array[Byte], Array[Byte]](outputTopic2, input._1.record.key(), input._2),
input._1.committableOffset
))
)
.via(Producer.flow(producerSettings))
.map(_.message.passThrough)
.batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) {
(batch, elem) => batch.updated(elem)
}
.mapAsync(parallelism = 3)(_.commitScaladsl())
.runWith(Sink.ignore)
原来的一对一文档在这里:https://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#connecting-producer-and-consumer
有没有人想到/解决了这个问题?
Alpakka Kafka 连接器最近引入了 flexiFlow
支持您的用例:Let one stream element produce multiple messages to Kafka