NATS 流媒体服务器订阅者速率限制和 exactly once 交付
NATS streaming server subscriber rate limiting and exactly once delivery
我正在玩 NATS 流媒体,但订阅者速率限制有问题。当我将飞行中的最大值设置为 1 并将超时设置为 1 秒并且我有一个基本上是 Thread.sleep(1000) 的消费者时,我会多次收到同一事件。我认为通过限制飞行并使用手动确认,这不应该发生。我怎样才能在非常慢的消费者那里获得 exatly once 交付?
case class EventBus[I, O](inputTopic: String, outputTopic: String, connection: Connection, eventProcessor: StatefulEventProcessor[I, O]) {
// the event bus could be some abstract class while the `Connection` coulbd be injected using DI
val substritionOptions: SubscriptionOptions = new SubscriptionOptions.Builder()
.setManualAcks(true)
.setDurableName("foo")
.setMaxInFlight(1)
.setAckWait(1, TimeUnit.SECONDS)
.build()
if (!inputTopic.isEmpty) {
connection.subscribe(inputTopic, new MessageHandler() {
override def onMessage(m: Message) {
m.ack()
try {
val event = eventProcessor.deserialize(m.getData)
eventProcessor.onEvent(event)
} catch {
case any =>
try {
val command = new String(m.getData)
eventProcessor.onCommand(command)
} catch {
case any => println(s"de-serialization error: $any")
}
} finally {
println("got event")
}
}
}, substritionOptions)
}
if (!outputTopic.isEmpty) {
eventProcessor.setBus(e => {
try {
connection.publish(outputTopic, eventProcessor.serialize(e))
} catch {
case ex => println(s"serialization error $ex")
}
})
}
}
abstract class StatefulEventProcessor[I, O] {
private var bus: Option[O => Unit] = None
def onEvent(event: I): Unit
def onCommand(command: String): Unit
def serialize(o: O): Array[Byte] =
SerializationUtils.serialize(o.asInstanceOf[java.io.Serializable])
def deserialize(in: Array[Byte]): I =
SerializationUtils.deserialize[I](in)
def setBus(push: O => Unit): Unit = {
if (bus.isDefined) {
throw new IllegalStateException("bus already set")
} else {
bus = Some(push)
}
}
def push(event: O) =
bus.get.apply(event)
}
EventBus("out-1", "out-2", sc, new StatefulEventProcessor[String, String] {
override def onEvent(event: String): Unit = {
Thread.sleep(1000)
push("!!!" + event)
}
override def onCommand(command: String): Unit = {}
})
(0 until 100).foreach(i => sc.publish("out-1", SerializationUtils.serialize(s"test-$i")))
首先,NATS Streaming 没有完全一次(重新)传送保证。 MaxInflight 为您提供的保证是,在未确认消息的数量低于该数量之前,服务器不会向订阅者发送 new 消息。因此,在 MaxInflight(1) 的情况下,您要求服务器仅在收到来自先前传递的消息的确认后才发送下一条新消息。但是,这不会阻止未确认消息的重新传递。
服务器无法保证或不知道订阅者实际收到了消息。这就是 ACK 的作用,让服务器知道消息已被订阅者正确处理。如果服务器不接受重新投递(即使达到 MaxInflight),那么 "lost" 消息将永远停止您的订阅。请记住,NATS 流媒体服务器和客户端并未通过 TCP 连接直接相互连接(它们都连接到 NATS 服务器,又名 gnatsd)。
我正在玩 NATS 流媒体,但订阅者速率限制有问题。当我将飞行中的最大值设置为 1 并将超时设置为 1 秒并且我有一个基本上是 Thread.sleep(1000) 的消费者时,我会多次收到同一事件。我认为通过限制飞行并使用手动确认,这不应该发生。我怎样才能在非常慢的消费者那里获得 exatly once 交付?
case class EventBus[I, O](inputTopic: String, outputTopic: String, connection: Connection, eventProcessor: StatefulEventProcessor[I, O]) {
// the event bus could be some abstract class while the `Connection` coulbd be injected using DI
val substritionOptions: SubscriptionOptions = new SubscriptionOptions.Builder()
.setManualAcks(true)
.setDurableName("foo")
.setMaxInFlight(1)
.setAckWait(1, TimeUnit.SECONDS)
.build()
if (!inputTopic.isEmpty) {
connection.subscribe(inputTopic, new MessageHandler() {
override def onMessage(m: Message) {
m.ack()
try {
val event = eventProcessor.deserialize(m.getData)
eventProcessor.onEvent(event)
} catch {
case any =>
try {
val command = new String(m.getData)
eventProcessor.onCommand(command)
} catch {
case any => println(s"de-serialization error: $any")
}
} finally {
println("got event")
}
}
}, substritionOptions)
}
if (!outputTopic.isEmpty) {
eventProcessor.setBus(e => {
try {
connection.publish(outputTopic, eventProcessor.serialize(e))
} catch {
case ex => println(s"serialization error $ex")
}
})
}
}
abstract class StatefulEventProcessor[I, O] {
private var bus: Option[O => Unit] = None
def onEvent(event: I): Unit
def onCommand(command: String): Unit
def serialize(o: O): Array[Byte] =
SerializationUtils.serialize(o.asInstanceOf[java.io.Serializable])
def deserialize(in: Array[Byte]): I =
SerializationUtils.deserialize[I](in)
def setBus(push: O => Unit): Unit = {
if (bus.isDefined) {
throw new IllegalStateException("bus already set")
} else {
bus = Some(push)
}
}
def push(event: O) =
bus.get.apply(event)
}
EventBus("out-1", "out-2", sc, new StatefulEventProcessor[String, String] {
override def onEvent(event: String): Unit = {
Thread.sleep(1000)
push("!!!" + event)
}
override def onCommand(command: String): Unit = {}
})
(0 until 100).foreach(i => sc.publish("out-1", SerializationUtils.serialize(s"test-$i")))
首先,NATS Streaming 没有完全一次(重新)传送保证。 MaxInflight 为您提供的保证是,在未确认消息的数量低于该数量之前,服务器不会向订阅者发送 new 消息。因此,在 MaxInflight(1) 的情况下,您要求服务器仅在收到来自先前传递的消息的确认后才发送下一条新消息。但是,这不会阻止未确认消息的重新传递。
服务器无法保证或不知道订阅者实际收到了消息。这就是 ACK 的作用,让服务器知道消息已被订阅者正确处理。如果服务器不接受重新投递(即使达到 MaxInflight),那么 "lost" 消息将永远停止您的订阅。请记住,NATS 流媒体服务器和客户端并未通过 TCP 连接直接相互连接(它们都连接到 NATS 服务器,又名 gnatsd)。