在 akka-streams 中为 JsonFraming 添加自定义 logic/callback/handler when EOF

Add custom logic/callback/handler when EOF for JsonFraming in akka-streams

我有一个流程,我使用来自 Kafka pah 主题的小批量文件路径,读取文件本身(大 JSON 数组)并将它们写回 Kafka 数据主题。

看起来像这样:

      val fileFlow = Flow[Path].flatMapConcat(HdfsSource.data(fs, _))
        .via(JsonFraming.objectScanner(Int.MaxValue))

      Consumer
        .committableSource(newConsumerSettings, Subscriptions.topics(inputTopicNames))
        .map(value => value)
        .grouped(kafkaConsumerBatch)
        .flatMapConcat(paths => Source(paths))
        .map(path => new Path(path.record.value().get("fullPath").asInstanceOf[String]))
        //Based on: https://github.com/akka/alpakka/blob/v3.0.0/doc-examples/src/test/scala/akka/stream/alpakka/eip/scaladsl/PassThroughExamples.scala#L72-L92
        .via(PassThroughFlow(fileFlow))
        .map { case (bytes, path) => (bytes, entityConfigMap(getCountryPrefix(path))) }
        .map(bytesAndPath => (bytesAndPath._1.utf8String.parseJson.asJsObject, bytesAndPath._2))
        .map { case (bytes, entityConfig) => (toGenericRecord(bytes, entityConfig), entityConfig) }
        .map { case (record, entityConfig) =>
          producerMessagesToTopic.mark()
          ProducerMessage.single(
            new ProducerRecord[NotUsed, GenericRecord](getDataTopicName(entityConfig), record),
            passThrough = entityConfig)
        }
        .via {
          akka.kafka.scaladsl.Producer.flexiFlow(prodSettings)
        }
....More logic for logging and running/materializing the flow

现在,问题是,正如我所说,那些 JSON 文件很大,所以我无法获取整个文件内容,将其构建为单独的对象,将它们全部存储到 Kafka 并提交只有在那之后。我的意思是,这就是我需要做的,但我还需要根据 EOF 事件来控制偏移提交。

我想让 Producer 以自己的节奏将数据发送到 Kafka,而不考虑其配置,但以某种方式将我的自定义逻辑注入到 EOF 事件中。可能类似于 passThrough 字段表示文件已完全使用,我们现在可以为上游路径主题提交偏移量。
objectScanner 在其定义中有一个 GraphStageLogic 具有 onUpstreamFinish 回调,但无法直接访问它进行覆盖。 类 和 SimpleLinearGraphStage 一样,JsonObjectParser 被标记为内部 API。

我很感动

...I can't take the entire file content, frame it into separate objects, store them all to Kafka and commit only after that

因为看起来(如果我弄错了你可以评论)偏移量提交实际上是对你已经完全处理文件的确认,所以没有办法不提交偏移量直到完成该偏移量处消息中的文件中的对象已生成到 Kafka。

Source.via(Flow.flatMapConcat.via(...)).map.via(...) 的缺点是它是单个流,第一和第二 via 之间的所有内容都需要一段时间。

如果您可以在输出主题中交错文件中的对象,并且可以接受来自给定文件的对象不可避免地会被生成两次到输出主题的机会(这两者可能会或可能不会强加有意义constraints/difficulties 关于该主题的下游消费者的实现),您可以并行处理文件。 mapAsync 流阶段对此特别有用:

import akka.Done

// assuming there's an implicit Materializer/ActorSystem (depending on the version of Akka Streams you're running) in scope
def process(path: Path): Future[Done] =
  Source.single(path)
    .via(PassThroughFlow(fileFlow))
    .map { case (bytes, path) => (bytes, entityConfigMap(getCountryPrefix(path))) }
    .map(bytesAndPath => (bytesAndPath._1.utf8String.parseJson.asJsObject, bytesAndPath._2))
    .map { case (bytes, entityConfig) => (toGenericRecord(bytes, entityConfig), entityConfig) }
    .map { case (record, entityConfig) =>
      producerMessagesToTopic.mark()
      ProducerMessage.single(
        new ProducerRecord[NotUsed, GenericRecord](getDataTopicName(entityConfig), record),
        passThrough = entityConfig)
    }
    .via {
      akka.kafka.scaladsl.Producer.flexiFlow(prodSettings)
    }
    .runWith(Sink.ignore)

 // then starting right after .flatMapConcat(paths => Source(paths))
 .mapAsync(parallelism) { committableMsg =>
   val p = new Path(committableMsg.record.value().get("fullPath").asInstanceOf[String])
   process(p).map { _ => committableMsg.committableOffset }
 }
 // now have the committable offsets

parallelism 然后限制您在给定时间处理的路径数。保持对提交者的排序(即,在所有消息被完全处理之前,偏移量永远不会到达提交者)。