Lagom PubSubRef 订阅者丢弃消息

Lagom PubSubRef subscriber drops messages

[注意]问题是Lagom框架特有的!

在我目前的项目中,观察到上游高速时,从Source到Kafka主题发布者的消息列表切分问题,看起来下游无法及时处理所有消息。据了解,切割与 PubSubRef.subscribe() 方法 https://github.com/lagom/lagom/blob/master/pubsub/javadsl/src/main/scala/com/lightbend/lagom/javadsl/pubsub/PubSubRef.scala#L85

的行为有关

完整的方法定义:

def subscriber(): Source[T, NotUsed] = {
scaladsl.Source.actorRef[T](bufferSize, OverflowStrategy.dropHead)
  .mapMaterializedValue { ref =>
    mediator ! Subscribe(topic.name, ref)
    NotUsed
  }.asJava
}

OverflowStrategy.dropHead被使用。可以改为使用back-pressure strategy吗?

UPD#1: 用例非常简单,当查询请求发布到命令主题时,从 DB table 获取它并查询对象,结果列表被推送到结果 Kafka 主题。代码片段:

objectsResultTopic = pubSub.refFor(TopicId.of(CustomObject.class, OBJECTS_RESULT_TOPIC));
objectQueryTopic().subscribe().atLeastOnce(
Flow.fromSinkAndSource(
    Flow.fromFunction(this::deserializeCommandAndQueryObjects)
        .mapAsync(concurrency, objects -> objects)
        .flatMapMerge(concurrency, objects -> objects)
        .alsoTo(Sink.foreach(event -> LOG.trace("Sending object {}", object)))
        .to(objectsResultTopic.publisher()),
    Source.repeat(Done.getInstance())
    )
)

如果 deserializeCommandAndQueryObjects 函数生成的对象流大于默认值 buffer-size = 1000 它开始切割元素(我们的案例是 ~ 2.5k 个对象).

UPD#2: 对象数据的来源是:

// returns CompletionStage<Source<CustomObject, ?>>
jdbcSession.withConnection(
  connection -> Source.from(runQuery(connection, rowConverter))
)

并且订阅了 Kafka objectsResultTopic:

TopicProducer.singleStreamWithOffset(
offset -> objectsResultTopic.subscriber().map(gm -> {
    JsonNode node = mapper.convertValue(gm, JsonNode.class);
    return Pair.create(node, offset);
}));

听起来 Lagom 的 distributed publish-subscribe 功能可能不是您所拥有的工作的最佳工具。

你的问题提到了Kafka,但是这个特性并没有使用到Kafka。相反,它通过直接向集群中的所有订阅者广播消息来工作。这是一种 "at most once" 消息传输,可能确实会丢失消息,适用于更关心跟上最新消息而不是处理每条消息的消费者。溢出策略不可自定义,您不希望在这些用例中使用背压,因为这意味着一个缓慢的消费者可能会减慢对所有其他订阅者的交付。

您还有其他一些选择:

  1. 如果你确实想使用 Kafka,你应该使用 Lagom 的 message broker API。这支持 "at least once" 传递语义,可用于确保每个消费者处理每条消息(以可能增加延迟为代价)。

    在这种情况下,Kafka 充当一个巨大的持久缓冲区,因此它甚至比背压更好:生产者和消费者可以以不同的速度进行,并且(与 partitioning 一起使用时)您可以添加消费者以便在需要时更快地扩展和处理消息。

    消息代理API可以在生产者和消费者都在同一个服务中的情况下使用,但是特别适合服务之间的通信。

  2. 如果您发送的消息是持久实体事件,并且消费者是同一服务的一部分,那么 persistent read-side processor 可能是一个不错的选择。

    这也提供 "at least once" 传递,如果处理消息的唯一效果是数据库更新,那么对 Cassandra read-side databases and relational read-side databases 的内置支持提供 "effectively once" 语义,其中数据库更新是 运行 事务性的,以确保事件处理期间发生的故障不会导致部分更新。

  3. 如果您发送的消息是持久实体事件,消费者是同一服务的一部分,但您希望将事件作为流处理,您可以访问 raw stream of events .

  4. 如果您的用例不适合 Lagom 明确支持的用例之一,您可以使用较低级别的 Akka APIs,包括 distributed publish-subscribe,以实施更符合您需求的东西。

最佳选择将取决于您的用例的具体情况:消息的来源和您想要的消费者类型。如果您使用更多详细信息更新您的问题并对此答案添加评论,我可以使用更具体的建议来编辑答案。

如果有人感兴趣,最后我们通过使用 Akka Producer API 解决了这个问题,例如:

ProducerSettings<String, CustomObject> producerSettings = ProducerSettings.create(system, new StringSerializer(), new CustomObjectSerializer());
objectQueryTopic().subscribe().atLeastOnce(
Flow.fromSinkAndSource(
    Flow.fromFunction(this::deserializeCommandAndQueryObjects)
        .mapAsync(concurrency, objects -> objects)
        .flatMapMerge(concurrency, objects -> objects)
        .alsoTo(Sink.foreach(object -> LOG.trace("Sending event {}", object)))
        .map(object -> new ProducerRecord<String, CustomObject>(OBJECTS_RESULT_TOPIC, object))
        .to(Producer.plainSink(producerSettings)),
    Source.repeat(Done.getInstance())));

它无需缓冲即可工作,只是将推入 Kafka 主题。