Akka 分布式 Pub/Sub 背压
Akka Distributed Pub/Sub back-pressure
我正在使用 Akka Distributed Pub/Sub 并且只有一个发布者和一个订阅者。我的发布者比订阅者快得多。有没有办法在某个点后减慢发布者的速度?
发布商代码:
public class Publisher extends AbstractActor {
private ActorRef mediator;
static public Props props() {
return Props.create(Publisher.class, () -> new Publisher());
}
public Publisher () {
this.mediator = DistributedPubSub.get(getContext().system()).mediator();
this.self().tell(0, ActorRef.noSender());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, msg -> {
// Sending message to Subscriber
mediator.tell(
new DistributedPubSubMediator.Send(
"/user/" + Subscriber.class.getName(),
msg.toString(),
false),
getSelf());
getSelf().tell(++msg, ActorRef.noSender());
})
.build();
}
}
订阅者代码:
public class Subscriber extends AbstractActor {
static public Props props() {
return Props.create(Subscriber.class, () -> new Subscriber());
}
public Subscriber () {
ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
mediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, msg -> {
System.out.println("Subscriber message received: " + msg);
Thread.sleep(10000);
})
.build();
}
}
不幸的是,按照目前的设计,我认为没有办法向原始发件人提供 "back-pressure"。由于您正在使用 ActorRef.tell
将消息发送到 mediator
,因此无法获得下游接收器正在备份的信号。这是因为tell
,你使用的方法,returns一个void
.
切换到询问
如果您将 tell
切换为 ask
,您可以设置一个适当的 Timeout
值,当您在特定时间内未收到回复时至少会通知您持续时间。
切换到流
"Back-pressure" is a primary feature of akka streams。因此,通过切换到流实现,您将能够实现您想要的目标。
如果可以创建一个流Source
from your original data, then you could use Sink.actorRef
to create a Sink
from the mediator
and use Flow.throttle来控制流向中介的速率。
我正在使用 Akka Distributed Pub/Sub 并且只有一个发布者和一个订阅者。我的发布者比订阅者快得多。有没有办法在某个点后减慢发布者的速度?
发布商代码:
public class Publisher extends AbstractActor {
private ActorRef mediator;
static public Props props() {
return Props.create(Publisher.class, () -> new Publisher());
}
public Publisher () {
this.mediator = DistributedPubSub.get(getContext().system()).mediator();
this.self().tell(0, ActorRef.noSender());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(Integer.class, msg -> {
// Sending message to Subscriber
mediator.tell(
new DistributedPubSubMediator.Send(
"/user/" + Subscriber.class.getName(),
msg.toString(),
false),
getSelf());
getSelf().tell(++msg, ActorRef.noSender());
})
.build();
}
}
订阅者代码:
public class Subscriber extends AbstractActor {
static public Props props() {
return Props.create(Subscriber.class, () -> new Subscriber());
}
public Subscriber () {
ActorRef mediator = DistributedPubSub.get(getContext().system()).mediator();
mediator.tell(new DistributedPubSubMediator.Put(getSelf()), getSelf());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(String.class, msg -> {
System.out.println("Subscriber message received: " + msg);
Thread.sleep(10000);
})
.build();
}
}
不幸的是,按照目前的设计,我认为没有办法向原始发件人提供 "back-pressure"。由于您正在使用 ActorRef.tell
将消息发送到 mediator
,因此无法获得下游接收器正在备份的信号。这是因为tell
,你使用的方法,returns一个void
.
切换到询问
如果您将 tell
切换为 ask
,您可以设置一个适当的 Timeout
值,当您在特定时间内未收到回复时至少会通知您持续时间。
切换到流
"Back-pressure" is a primary feature of akka streams。因此,通过切换到流实现,您将能够实现您想要的目标。
如果可以创建一个流Source
from your original data, then you could use Sink.actorRef
to create a Sink
from the mediator
and use Flow.throttle来控制流向中介的速率。