使用 ByteBuffer 时的 Chronicle 队列性能
Chronicle Queue performance when using ByteBuffer
我正在使用 Chronicle Queue 作为 DataStore,它将写入一次但读取多次。我正在尝试获得最佳性能(读取 x 条记录的时间)。
我的数据集(用于我的测试)大约有 300 万条记录,其中每条记录都包含一堆 long 和 double。我最初从 "Highest-level" API 开始,这显然很慢,然后是 Chronicle Documentation 中提到的自我描述数据,最后使用 "raw data" ,它提供了最好的性能。
代码如下:(省略对应的write()代码)
public List<DomainObject> read()
{
final ExcerptTailer tailer = _cq.createTailer();
List<DomainObject> result = new ArrayList<>();
for (; ; ) {
try (final DocumentContext ctx = tailer.readingDocument()) {
Wire wire = ctx.wire();
if(wire != null) {
wire.readBytes(in -> {
final long var1= in.readLong();
final int var2= in.readInt();
final double var3= in.readDouble();
final int var4= in.readInt();
final double var5= in.readDouble();
final int var6= in.readInt();
final double var7= in.readDouble();
result.add(DomainObject.create(var1, var2, var3, var4, var5, var6, var7);
});
}else{
return result;
}
}
}
}
然而,为了提高我的应用程序性能,我开始使用 ByteBuffer 而不是 "DomainObject",因此通过读取方法修改如下:
public List<ByteBuffer> read()
{
final ExcerptTailer tailer = _cq.createTailer();
List<ByteBuffer> result = new ArrayList<>();
for (; ; ) {
try (final DocumentContext ctx = tailer.readingDocument()) {
Wire wire = ctx.wire();
if(wire != null) {
ByteBuffer bb = ByteBuffer.allocate(56);
wire.readBytes(in -> {
in.read(bb); });
result.add(bb);
}else{
return result;
}
}
}
}
上面的代码清单平均耗时 550 毫秒,而第一个清单平均耗时 270 毫秒。
我也尝试使用 this post 中提到的 Bytes.elasticByteBuffer,但速度较慢
我猜第二个代码清单比较慢,因为它必须遍历整个字节数组。
所以我的问题是 - 是否有更高效的方法将字节从 Chronicle Queue 读取到 ByteBuffer?我的数据将始终为 56 个字节,每个数据项为 8 个字节。
我建议您使用 Chronicle-Bytes 而不是原始的 ByteBuffer。 Chronicle 的 Bytes class 是 ByteBuffer 之上的包装器,但更易于使用。
您的代码的问题是您创建了一堆对象而不是流处理。我建议您阅读以下内容:
public void read(Consumer<Bytes> consumer) {
final ExcerptTailer tailer = _cq.createTailer();
for (; ; ) {
try (final DocumentContext ctx = tailer.readingDocument()) {
if (ctx.isPresent()) {
consumer.accept(ctx.wire().bytes());
} else {
break;
}
}
}
}
你的写作方法可能是这样的:
public void write(BytesMarshallable o) {
try (DocumentContext dc = _cq.acquireAppender().writingDocument()) {
o.writeMarshallable(dc.wire().bytes());
}
}
然后您的消费者可能会像:
private BytesMarshallable reusable = new BusinessObject(); //your class here
public accept(Bytes b) {
reusable.readMarshallable(b);
// your business logic here
doSomething(reusable);
}
我正在使用 Chronicle Queue 作为 DataStore,它将写入一次但读取多次。我正在尝试获得最佳性能(读取 x 条记录的时间)。 我的数据集(用于我的测试)大约有 300 万条记录,其中每条记录都包含一堆 long 和 double。我最初从 "Highest-level" API 开始,这显然很慢,然后是 Chronicle Documentation 中提到的自我描述数据,最后使用 "raw data" ,它提供了最好的性能。
代码如下:(省略对应的write()代码)
public List<DomainObject> read()
{
final ExcerptTailer tailer = _cq.createTailer();
List<DomainObject> result = new ArrayList<>();
for (; ; ) {
try (final DocumentContext ctx = tailer.readingDocument()) {
Wire wire = ctx.wire();
if(wire != null) {
wire.readBytes(in -> {
final long var1= in.readLong();
final int var2= in.readInt();
final double var3= in.readDouble();
final int var4= in.readInt();
final double var5= in.readDouble();
final int var6= in.readInt();
final double var7= in.readDouble();
result.add(DomainObject.create(var1, var2, var3, var4, var5, var6, var7);
});
}else{
return result;
}
}
}
}
然而,为了提高我的应用程序性能,我开始使用 ByteBuffer 而不是 "DomainObject",因此通过读取方法修改如下:
public List<ByteBuffer> read()
{
final ExcerptTailer tailer = _cq.createTailer();
List<ByteBuffer> result = new ArrayList<>();
for (; ; ) {
try (final DocumentContext ctx = tailer.readingDocument()) {
Wire wire = ctx.wire();
if(wire != null) {
ByteBuffer bb = ByteBuffer.allocate(56);
wire.readBytes(in -> {
in.read(bb); });
result.add(bb);
}else{
return result;
}
}
}
}
上面的代码清单平均耗时 550 毫秒,而第一个清单平均耗时 270 毫秒。
我也尝试使用 this post 中提到的 Bytes.elasticByteBuffer,但速度较慢
我猜第二个代码清单比较慢,因为它必须遍历整个字节数组。
所以我的问题是 - 是否有更高效的方法将字节从 Chronicle Queue 读取到 ByteBuffer?我的数据将始终为 56 个字节,每个数据项为 8 个字节。
我建议您使用 Chronicle-Bytes 而不是原始的 ByteBuffer。 Chronicle 的 Bytes class 是 ByteBuffer 之上的包装器,但更易于使用。 您的代码的问题是您创建了一堆对象而不是流处理。我建议您阅读以下内容:
public void read(Consumer<Bytes> consumer) {
final ExcerptTailer tailer = _cq.createTailer();
for (; ; ) {
try (final DocumentContext ctx = tailer.readingDocument()) {
if (ctx.isPresent()) {
consumer.accept(ctx.wire().bytes());
} else {
break;
}
}
}
}
你的写作方法可能是这样的:
public void write(BytesMarshallable o) {
try (DocumentContext dc = _cq.acquireAppender().writingDocument()) {
o.writeMarshallable(dc.wire().bytes());
}
}
然后您的消费者可能会像:
private BytesMarshallable reusable = new BusinessObject(); //your class here
public accept(Bytes b) {
reusable.readMarshallable(b);
// your business logic here
doSomething(reusable);
}