agrona 中的 OneToManyRingBuffer 实现
OneToManyRingBuffer implementation in agrona
这可能是一个愚蠢的问题,但是在 aeron (agrona) 中有一个 OneToOneRingBuffer 和一个 ManyToOneRingBuffer 可用。我有一个生产者和许多消费者我想知道如何实现 OneToManyRingBuffer 的等效效果?
Agrona 中的 BroadcastTransmitter
和 BroadcastReceiver
是一种可以为您提供一对多语义的方法。
值得注意的是,速度较慢的消费者可能会看到消息被丢弃。如果这不是您想要的,那么您还可以查看 Aeron IPC,它会在这种情况下对发布者施加背压。
例子
顶层,下面是使用方法:
// Create broadcast buffer
int capacity = 1 << 10; // Must be power of two
int bufferSize = capacity + BroadcastBufferDescriptor.TRAILER_LENGTH;
UnsafeBuffer broadcastBuffer = new UnsafeBuffer(new byte[bufferSize]);
// Create transmitter
BroadcastTransmitter transmitter = new BroadcastTransmitter(broadcastBuffer);
// Create receiver (can create many of these)
BroadcastReceiver broadcastReceiver = new BroadcastReceiver(broadcastBuffer);
CopyBroadcastReceiver copyBroadcastReceiver = new CopyBroadcastReceiver(broadcastReceiver);
// Send message
int msgTypeId = 1;
MutableDirectBuffer msgBuffer = new ExpandableArrayBuffer();
int msgLength = msgBuffer.putStringWithoutLengthAscii(0, "Hello World!");
transmitter.transmit(msgTypeId, msgBuffer, 0, msgLength);
// Receive message
copyBroadcastReceiver.receive(
(msgType, buffer, offset, length) -> System.out.println(buffer.getStringWithoutLengthAscii(offset, length)));
这可能是一个愚蠢的问题,但是在 aeron (agrona) 中有一个 OneToOneRingBuffer 和一个 ManyToOneRingBuffer 可用。我有一个生产者和许多消费者我想知道如何实现 OneToManyRingBuffer 的等效效果?
BroadcastTransmitter
和 BroadcastReceiver
是一种可以为您提供一对多语义的方法。
值得注意的是,速度较慢的消费者可能会看到消息被丢弃。如果这不是您想要的,那么您还可以查看 Aeron IPC,它会在这种情况下对发布者施加背压。
例子
顶层,下面是使用方法:
// Create broadcast buffer
int capacity = 1 << 10; // Must be power of two
int bufferSize = capacity + BroadcastBufferDescriptor.TRAILER_LENGTH;
UnsafeBuffer broadcastBuffer = new UnsafeBuffer(new byte[bufferSize]);
// Create transmitter
BroadcastTransmitter transmitter = new BroadcastTransmitter(broadcastBuffer);
// Create receiver (can create many of these)
BroadcastReceiver broadcastReceiver = new BroadcastReceiver(broadcastBuffer);
CopyBroadcastReceiver copyBroadcastReceiver = new CopyBroadcastReceiver(broadcastReceiver);
// Send message
int msgTypeId = 1;
MutableDirectBuffer msgBuffer = new ExpandableArrayBuffer();
int msgLength = msgBuffer.putStringWithoutLengthAscii(0, "Hello World!");
transmitter.transmit(msgTypeId, msgBuffer, 0, msgLength);
// Receive message
copyBroadcastReceiver.receive(
(msgType, buffer, offset, length) -> System.out.println(buffer.getStringWithoutLengthAscii(offset, length)));