在 Akka Streams 自定义图形阶段包装 Pub-Sub Java API
Wrapping Pub-Sub Java API in Akka Streams Custom Graph Stage
我正在使用来自提供实时流的数据供应商的 Java API。我想使用 Akka 流处理这个流。
Java API 有一个 pub sub 设计,大致是这样工作的:
Subscription sub = createSubscription();
sub.addListener(new Listener() {
public void eventsReceived(List events) {
for (Event e : events)
buffer.enqueue(e)
}
});
我曾尝试将此订阅的创建和随附的缓冲区嵌入自定义图形阶段,但没有取得太大成功。谁能指导我使用 Akka 与此 API 交互的最佳方式? Akka Streams 是最好的工具吗?
要提供 Source
,您不一定需要使用自定义图表阶段。 Source.queue
将具体化为一个缓冲队列,您可以向其中添加元素,然后这些元素将通过流传播。
有一些棘手的事情需要注意。首先是实现 Source.queue
有一些微妙之处,因此您可以设置订阅。像这样:
def bufferSize: Int = ???
Source.fromMaterializer { (mat, att) =>
val (queue, source) = Source.queue[Event](bufferSize).preMaterialize()(mat)
val subscription = createSubscription()
subscription.addListener(
new Listener() {
def eventsReceived(events: java.util.List[Event]): Unit = {
import scala.collection.JavaConverters.iterableAsScalaIterable
import akka.stream.QueueOfferResult._
iterableAsScalaIterable(events).foreach { event =>
queue.offer(event) match {
case Enqueued => () // do nothing
case Dropped => ??? // handle a dropped pubsub element, might well do nothing
case QueueClosed => ??? // presumably cancel the subscription...
}
}
}
}
)
source.withAttributes(att)
}
Source.fromMaterializer
用于在每次物化时访问物化器(这是将流定义编译成角色的东西)。当我们物化时,我们使用物化器 preMaterialize
队列源,这样我们就可以访问队列。我们的订阅将传入元素添加到队列中。
如果消费者跟不上,此 pubsub 的 API 似乎不支持背压。如果缓冲区已满,队列将丢弃已递交的元素:在这种情况下你可能不想做任何事情,但我在比赛中指出你应该在这里做出明确的决定。
丢弃最新元素是此队列的同步行为(还有其他可用的队列实现,但这些实现会异步进行通信,这对于突发内存消耗非常不利)。如果您更喜欢其他东西,在队列中有一个非常小的缓冲区并将“整体”Source
(Source.fromMaterializer
返回的那个)附加到一个表示永久需求的阶段可能是有意义的.例如,buffer(downstreamBufferSize, OverflowStrategy.dropHead)
将丢弃尚未处理的最旧事件。或者,可以以某种有意义的方式组合您的 Event
,在这种情况下,如果下游无法快速处理它们,conflate
阶段将自动组合传入的 Event
。
好答案!我确实构建了类似的东西。还有 kamon 指标来监控队列大小 exc.
class AsyncSubscriber(projectId: String, subscriptionId: String, metricsRegistry: CustomMetricsRegistry, pullParallelism: Int)(implicit val ec: Executor) {
private val logger = LoggerFactory.getLogger(getClass)
def bufferSize: Int = 1000
def source(): Source[(PubsubMessage, AckReplyConsumer), Future[NotUsed]] = {
Source.fromMaterializer { (mat, attr) =>
val (queue, source) = Source.queue[(PubsubMessage, AckReplyConsumer)](bufferSize).preMaterialize()(mat)
val receiver: MessageReceiver = {
(message: PubsubMessage, consumer: AckReplyConsumer) => {
metricsRegistry.inputEventQueueSize.update(queue.size())
queue.offer((message, consumer)) match {
case QueueOfferResult.Enqueued =>
metricsRegistry.inputQueueAddEventCounter.increment()
case QueueOfferResult.Dropped =>
metricsRegistry.inputQueueDropEventCounter.increment()
consumer.nack()
logger.warn(s"Buffer is full, message nacked. Pubsub should retry don't panic. If this happens too often, we should also tweak the buffer size or the autoscaler.")
case QueueOfferResult.Failure(ex) =>
metricsRegistry.inputQueueDropEventCounter.increment()
consumer.nack()
logger.error(s"Failed to offer message with id=${message.getMessageId()}", ex)
case QueueOfferResult.QueueClosed =>
logger.error("Destination Queue closed. Something went terribly wrong. Shutting down the jvm.")
consumer.nack()
mat.shutdown()
sys.exit(1)
}
}
}
val subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId)
val subscriber = Subscriber.newBuilder(subscriptionName, receiver).setParallelPullCount(pullParallelism).build
subscriber.startAsync().awaitRunning()
source.withAttributes(attr)
}
}
}
我正在使用来自提供实时流的数据供应商的 Java API。我想使用 Akka 流处理这个流。
Java API 有一个 pub sub 设计,大致是这样工作的:
Subscription sub = createSubscription();
sub.addListener(new Listener() {
public void eventsReceived(List events) {
for (Event e : events)
buffer.enqueue(e)
}
});
我曾尝试将此订阅的创建和随附的缓冲区嵌入自定义图形阶段,但没有取得太大成功。谁能指导我使用 Akka 与此 API 交互的最佳方式? Akka Streams 是最好的工具吗?
要提供 Source
,您不一定需要使用自定义图表阶段。 Source.queue
将具体化为一个缓冲队列,您可以向其中添加元素,然后这些元素将通过流传播。
有一些棘手的事情需要注意。首先是实现 Source.queue
有一些微妙之处,因此您可以设置订阅。像这样:
def bufferSize: Int = ???
Source.fromMaterializer { (mat, att) =>
val (queue, source) = Source.queue[Event](bufferSize).preMaterialize()(mat)
val subscription = createSubscription()
subscription.addListener(
new Listener() {
def eventsReceived(events: java.util.List[Event]): Unit = {
import scala.collection.JavaConverters.iterableAsScalaIterable
import akka.stream.QueueOfferResult._
iterableAsScalaIterable(events).foreach { event =>
queue.offer(event) match {
case Enqueued => () // do nothing
case Dropped => ??? // handle a dropped pubsub element, might well do nothing
case QueueClosed => ??? // presumably cancel the subscription...
}
}
}
}
)
source.withAttributes(att)
}
Source.fromMaterializer
用于在每次物化时访问物化器(这是将流定义编译成角色的东西)。当我们物化时,我们使用物化器 preMaterialize
队列源,这样我们就可以访问队列。我们的订阅将传入元素添加到队列中。
如果消费者跟不上,此 pubsub 的 API 似乎不支持背压。如果缓冲区已满,队列将丢弃已递交的元素:在这种情况下你可能不想做任何事情,但我在比赛中指出你应该在这里做出明确的决定。
丢弃最新元素是此队列的同步行为(还有其他可用的队列实现,但这些实现会异步进行通信,这对于突发内存消耗非常不利)。如果您更喜欢其他东西,在队列中有一个非常小的缓冲区并将“整体”Source
(Source.fromMaterializer
返回的那个)附加到一个表示永久需求的阶段可能是有意义的.例如,buffer(downstreamBufferSize, OverflowStrategy.dropHead)
将丢弃尚未处理的最旧事件。或者,可以以某种有意义的方式组合您的 Event
,在这种情况下,如果下游无法快速处理它们,conflate
阶段将自动组合传入的 Event
。
好答案!我确实构建了类似的东西。还有 kamon 指标来监控队列大小 exc.
class AsyncSubscriber(projectId: String, subscriptionId: String, metricsRegistry: CustomMetricsRegistry, pullParallelism: Int)(implicit val ec: Executor) {
private val logger = LoggerFactory.getLogger(getClass)
def bufferSize: Int = 1000
def source(): Source[(PubsubMessage, AckReplyConsumer), Future[NotUsed]] = {
Source.fromMaterializer { (mat, attr) =>
val (queue, source) = Source.queue[(PubsubMessage, AckReplyConsumer)](bufferSize).preMaterialize()(mat)
val receiver: MessageReceiver = {
(message: PubsubMessage, consumer: AckReplyConsumer) => {
metricsRegistry.inputEventQueueSize.update(queue.size())
queue.offer((message, consumer)) match {
case QueueOfferResult.Enqueued =>
metricsRegistry.inputQueueAddEventCounter.increment()
case QueueOfferResult.Dropped =>
metricsRegistry.inputQueueDropEventCounter.increment()
consumer.nack()
logger.warn(s"Buffer is full, message nacked. Pubsub should retry don't panic. If this happens too often, we should also tweak the buffer size or the autoscaler.")
case QueueOfferResult.Failure(ex) =>
metricsRegistry.inputQueueDropEventCounter.increment()
consumer.nack()
logger.error(s"Failed to offer message with id=${message.getMessageId()}", ex)
case QueueOfferResult.QueueClosed =>
logger.error("Destination Queue closed. Something went terribly wrong. Shutting down the jvm.")
consumer.nack()
mat.shutdown()
sys.exit(1)
}
}
}
val subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId)
val subscriber = Subscriber.newBuilder(subscriptionName, receiver).setParallelPullCount(pullParallelism).build
subscriber.startAsync().awaitRunning()
source.withAttributes(attr)
}
}
}