在 Akka Streams 自定义图形阶段包装 Pub-Sub Java API

Wrapping Pub-Sub Java API in Akka Streams Custom Graph Stage

我正在使用来自提供实时流的数据供应商的 Java API。我想使用 Akka 流处理这个流。

Java API 有一个 pub sub 设计,大致是这样工作的:

Subscription sub = createSubscription();
sub.addListener(new Listener() {
        public void eventsReceived(List events) {
                for (Event e : events)
                        buffer.enqueue(e)
        }
});

我曾尝试将此订阅的创建和随附的缓冲区嵌入自定义图形阶段,但没有取得太大成功。谁能指导我使用 Akka 与此 API 交互的最佳方式? Akka Streams 是最好的工具吗?

要提供 Source,您不一定需要使用自定义图表阶段。 Source.queue 将具体化为一个缓冲队列,您可以向其中添加元素,然后这些元素将通过流传播。

有一些棘手的事情需要注意。首先是实现 Source.queue 有一些微妙之处,因此您可以设置订阅。像这样:

def bufferSize: Int = ???

Source.fromMaterializer { (mat, att) =>
  val (queue, source) = Source.queue[Event](bufferSize).preMaterialize()(mat)
  val subscription = createSubscription()
  subscription.addListener(
    new Listener() {
      def eventsReceived(events: java.util.List[Event]): Unit = {
        import scala.collection.JavaConverters.iterableAsScalaIterable
        import akka.stream.QueueOfferResult._

        iterableAsScalaIterable(events).foreach { event =>
          queue.offer(event) match {
            case Enqueued => ()  // do nothing
            case Dropped => ??? // handle a dropped pubsub element, might well do nothing
            case QueueClosed => ??? // presumably cancel the subscription...
          }
        }
      }
    }
  )

  source.withAttributes(att)
}

Source.fromMaterializer 用于在每次物化时访问物化器(这是将流定义编译成角色的东西)。当我们物化时,我们使用物化器 preMaterialize 队列源,这样我们就可以访问队列。我们的订阅将传入元素添加到队列中。

如果消费者跟不上,此 pubsub 的 API 似乎不支持背压。如果缓冲区已满,队列将丢弃已递交的元素:在这种情况下你可能不想做任何事情,但我在比赛中指出你应该在这里做出明确的决定。

丢弃最新元素是此队列的同步行为(还有其他可用的队列实现,但这些实现会异步进行通信,这对于突发内存消耗非常不利)。如果您更喜欢其他东西,在队列中有一个非常小的缓冲区并将“整体”SourceSource.fromMaterializer 返回的那个)附加到一个表示永久需求的阶段可能是有意义的.例如,buffer(downstreamBufferSize, OverflowStrategy.dropHead) 将丢弃尚未处理的最旧事件。或者,可以以某种有意义的方式组合您的 Event,在这种情况下,如果下游无法快速处理它们,conflate 阶段将自动组合传入的 Event

好答案!我确实构建了类似的东西。还有 kamon 指标来监控队列大小 exc.

class AsyncSubscriber(projectId: String, subscriptionId: String, metricsRegistry: CustomMetricsRegistry, pullParallelism: Int)(implicit val ec: Executor) {
  private val logger = LoggerFactory.getLogger(getClass)

  def bufferSize: Int = 1000

  def source(): Source[(PubsubMessage, AckReplyConsumer), Future[NotUsed]] = {
    Source.fromMaterializer { (mat, attr) =>
      val (queue, source) = Source.queue[(PubsubMessage, AckReplyConsumer)](bufferSize).preMaterialize()(mat)

      val receiver: MessageReceiver = {
        (message: PubsubMessage, consumer: AckReplyConsumer) => {
          metricsRegistry.inputEventQueueSize.update(queue.size())
          queue.offer((message, consumer)) match {
            case QueueOfferResult.Enqueued => 
              metricsRegistry.inputQueueAddEventCounter.increment()
            case QueueOfferResult.Dropped =>
              metricsRegistry.inputQueueDropEventCounter.increment()
              consumer.nack()
              logger.warn(s"Buffer is full, message nacked. Pubsub should retry don't panic. If this happens too often, we should also tweak the buffer size or the autoscaler.")
            case QueueOfferResult.Failure(ex) =>
              metricsRegistry.inputQueueDropEventCounter.increment()
              consumer.nack()
              logger.error(s"Failed to offer message with id=${message.getMessageId()}", ex)
            case QueueOfferResult.QueueClosed => 
              logger.error("Destination Queue closed. Something went terribly wrong. Shutting down the jvm.")
              consumer.nack()
              mat.shutdown()
              sys.exit(1)
          }
        }
      }

      val subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId)
      val subscriber = Subscriber.newBuilder(subscriptionName, receiver).setParallelPullCount(pullParallelism).build
      subscriber.startAsync().awaitRunning()
      source.withAttributes(attr)
    }
  }
}