用单线程写 LMAX
Writing with a single thread LMAX
我已经了解了 LMAX 和这个叫做 RingBuffer 的奇妙概念。
所以伙计们告诉我们,当只用一个线程写入环形缓冲区时,性能比多个生产者要好得多...
但是我真的不认为典型的应用程序可以只使用一个线程来写入环形缓冲区...我真的不明白 lmax 是怎么做到的(如果他们这样做的话)。例如N个不同的交易员在交易所下订单,这些都是异步请求转换为订单并放入ringbuffer,他们怎么可能用一个线程来写?
问题 1. 我可能遗漏了一些东西或者误解了某些方面,但是如果你有 N 个并发生产者,如何将它们合并为 1 个而不互相锁定?
问题 2。我记得 rxJava observables,在那里你可以使用 N 个 observables 并将它们合并为 1 个 Observable.merge 我想知道它是否以任何方式阻塞或维护任何锁?
多线程写入对 RingBuffer 的影响很小,但在非常重的负载下可能会很大。
RingBuffer 实现持有一个 next
节点,下一次添加将在该节点进行。如果只有一个线程正在写入环,该过程将始终在最短时间内完成,即 buffer[head++] = newData
.
要在避免锁定的同时处理多线程,您通常会做类似 while ( !buffer[head++].compareAndSet(null,newValue)){}
的事情。当其他线程干扰数据存储时,这个紧密循环将继续执行,从而降低吞吐量。
请注意,我在上面使用了伪代码,请查看我的实现中的 getFree
here 的真实示例。
// Find the next free element and mark it not free.
private Node<T> getFree() {
Node<T> freeNode = head.get();
int skipped = 0;
// Stop when we hit the end of the list
// ... or we successfully transit a node from free to not-free.
// This is the loop that could cause delays under hight thread activity.
while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
skipped += 1;
freeNode = freeNode.next;
}
// ...
}
在内部,RxJava 的合并使用了一个我称之为 emitter-loop 的序列化结构,它使用 synchronized
并且是阻塞的。
我们的 'clients' 主要在对吞吐量和延迟不敏感的情况下或完全单线程和阻塞的情况下使用合并,这并不是真正的问题。
可以编写我称为 queue-drain 的非阻塞序列化程序,但不能将合并配置为使用它。
如果愿意手动处理生产者和消费者线程,也可以直接看一下JCTools' MpscArrayQueue
我已经了解了 LMAX 和这个叫做 RingBuffer 的奇妙概念。 所以伙计们告诉我们,当只用一个线程写入环形缓冲区时,性能比多个生产者要好得多...
但是我真的不认为典型的应用程序可以只使用一个线程来写入环形缓冲区...我真的不明白 lmax 是怎么做到的(如果他们这样做的话)。例如N个不同的交易员在交易所下订单,这些都是异步请求转换为订单并放入ringbuffer,他们怎么可能用一个线程来写?
问题 1. 我可能遗漏了一些东西或者误解了某些方面,但是如果你有 N 个并发生产者,如何将它们合并为 1 个而不互相锁定?
问题 2。我记得 rxJava observables,在那里你可以使用 N 个 observables 并将它们合并为 1 个 Observable.merge 我想知道它是否以任何方式阻塞或维护任何锁?
多线程写入对 RingBuffer 的影响很小,但在非常重的负载下可能会很大。
RingBuffer 实现持有一个 next
节点,下一次添加将在该节点进行。如果只有一个线程正在写入环,该过程将始终在最短时间内完成,即 buffer[head++] = newData
.
要在避免锁定的同时处理多线程,您通常会做类似 while ( !buffer[head++].compareAndSet(null,newValue)){}
的事情。当其他线程干扰数据存储时,这个紧密循环将继续执行,从而降低吞吐量。
请注意,我在上面使用了伪代码,请查看我的实现中的 getFree
here 的真实示例。
// Find the next free element and mark it not free.
private Node<T> getFree() {
Node<T> freeNode = head.get();
int skipped = 0;
// Stop when we hit the end of the list
// ... or we successfully transit a node from free to not-free.
// This is the loop that could cause delays under hight thread activity.
while (skipped < capacity && !freeNode.free.compareAndSet(true, false)) {
skipped += 1;
freeNode = freeNode.next;
}
// ...
}
在内部,RxJava 的合并使用了一个我称之为 emitter-loop 的序列化结构,它使用 synchronized
并且是阻塞的。
我们的 'clients' 主要在对吞吐量和延迟不敏感的情况下或完全单线程和阻塞的情况下使用合并,这并不是真正的问题。
可以编写我称为 queue-drain 的非阻塞序列化程序,但不能将合并配置为使用它。
如果愿意手动处理生产者和消费者线程,也可以直接看一下JCTools' MpscArrayQueue