两个不同队列的 Chronicle Queue 预告片可以交错吗?

Can ChronicleQueue tailers for two different queues be interleaved?

我有两个独立的 ChronicleQueues,它们由监控 Java 应用程序中的网络套接字流的独立线程创建。当我在单独的单线程程序中独立读取每个队列时,我可以按预期遍历每个整个队列 - 使用以下最少的代码:

final ExcerptTailer queue1Tailer = queue1.createTailer();
final ExcerptTailer queue2Tailer = queue2.createTailer();

while (true)
{
   try( final DocumentContext context = queue1Tailer.readingDocument() )
   {
      if ( isNull(context.wire()) )
         break;

      counter1++;
      queue1Data = context.wire()
                           .bytes()
                           .readObject(Queue1Data.class);

      queue1Writer.write(String.format("%d\t%d\t%d%n", counter1, queue1Data.getEventTime(), queue1Data.getEventContent()));
   }
}

while (true)
{
   try( final DocumentContext context = queue2Tailer.readingDocument() )
   {
      if ( isNull(context.wire()) )
         break;

      counter2++;
      queue2Data = context.wire()
                           .bytes()
                           .readObject(Queue2Data.class);

      queue2Writer.write(String.format("%d\t%d\t%d%n", counter2, queue2Data.getEventTime(), queue2Data.getEventContent()));
   }
}

在上面,我能够读取所有 Queue1Data 对象,然后按预期读取所有 Queue2Data 对象和访问值。但是,当我尝试交错读取队列时(根据 Queue1Data 对象(时间戳)的 属性 从一个队列中读取对象,读取 Queue2Data 对象直到时间戳之后的第一个对象( limit 下面的变量),找到活动的 Queue1Data 对象 - 然后用它做一些事情)在只读取 queue2Tailer 中的一个对象后,抛出异常 .DecoratedBufferUnderflowException: readCheckOffset0 failed。失败的简化代码如下(我尝试将外部 while(true) 循环放在 queue2Tailer try 块的内部和外部):

final ExcerptTailer queue1Tailer = queue1Queue.createTailer("label1");

try( final DocumentContext queue1Context = queue1Tailer.readingDocument() )
{
   final ExcerptTailer queue2Tailer = queue2Queue.createTailer("label2");
    
   while (true)
   {
      try( final DocumentContext queue2Context = queue2Tailer.readingDocument() )
      {
         if ( isNull(queue2Context.wire()) )
         {
            terminate = true;
            break;
         }
         queue2Data = queue2Context.wire()
                                   .bytes()
                                   .readObject(Queue2Data.class);
         while(true)
         {
            queue1Data = queue1Context.wire()
                                          .bytes()
                                                  .readObject(Queue1Data.class);  // first read succeeds
                                                  
            if (queue1Data.getFieldValue() > limit)   // if this fails the inner loop continues
            {                                         // but the second read fails
               // cache a value
               break;
            }
         }

         // continue working with queu2Data object and cached values
      }   // end try block for queue2 tailer

   } // end outer while loop
}   // end outer try block for queue1 tailer

我已经按照上面的方法进行了尝试,并且在执行处理的函数的开头创建了两个 Tailers(在相对简单的 Java 应用程序中单击按钮时执行的私有函数)。基本上我采用了独立工作的循环,并将它放在函数的另一个循环中,期望没有问题。我想我遗漏了一些关于 tailers 如何定位和用于读取对象的关键信息,但我无法弄清楚它是什么 - 因为相同的基本代码在独立读取队列时有效。使用 isNull(context.wire()) 确定队列中何时不再有对象 我从其中一个示例中获得,但我不确定这是确定何时的 正确 方法按顺序处理队列时,队列中没有更多对象。

如有任何建议,我们将不胜感激。

你一开始就没有写对。 现在,有一种硬核方法可以实现您想要实现的目标(即,在较低级别上明确地执行所有操作),并使用 Chronicle 提供的 MethodReader/MethodWriter 魔法。

硬核方式

写作

// write first event type
try (DocumentContext dc = queueAppender.writingDocument()) {
    dc.wire().writeEventName("first").text("Hello first");
}
// write second event type
try (DocumentContext dc = queueAppender.writingDocument()) {
    dc.wire().writeEventName("second").text("Hello second");
}

这会将不同类型的消息写入同一个队列,您将能够在阅读时轻松区分它们。

阅读

StringBuilder reusable = new StringBuilder();
while (true) {
   try (DocumentContext dc = tailer.readingDocument()) {
       if (!dc.isPresent) {
           continue;
       }
       dc.wire().readEventName(reusable);
       if ("first".contentEquals(reusable)) {
           // handle first
       } else if ("second".contentEquals(reusable)) {
           // handle second
       }
       // optionally handle other events
   }
}

编年史之路(又名彼得的魔法)

这适用于任何可编组类型,以及任何原始类型和 CharSequence sub类(即字符串)和字节。有关详细信息,请阅读 MethodReader/MethodWriter 文档。

假设你有一些数据类:

public class FirstDataType implements Marshallable { // alternatively - extends SelfDescribingMarshallable
    // data fields...
}

public class SecondDataType implements Marshallable { // alternatively - extends SelfDescribingMarshallable
    // data fields...
}

然后,要将这些数据类写入队列,只需要定义接口,像这样:

interface EventHandler {
    void first(FirstDataType first);
    void second(SecondDataType second);
}

写作

那么写数据就这么简单:

final EventHandler writer = appender.methodWriterBuilder(EventHandler).get();
// assuming firstDatum and secondDatum are created earlier
writer.first(firstDatum);
writer.second(secondDatum);

它所做的与硬核部分中的相同 - 它写入事件名称(取自方法编写器中的方法名称,即相应的“first”或“second”),然后是实际的数据对象.

阅读

现在,要从队列中读取这些事件,您需要提供上述接口的实现,它将处理相应的事件类型,例如:

// you implement this to read data from the queue
private class MyEventHandler implements EventHandler {
    public void first(FirstDataType first) {
        // handle first type of events
    }
    public void second(SecondDataType second) {
        // handle second type of events
    }
}

然后你阅读如下:

EventHandler handler = new MyEventHandler();
MethodReader reader = tailer.methodReader(handler);
while (true) {
    reader.readOne(); // readOne returns boolean value which can be used to determine if there's no more data, and pause if appropriate
}

杂项

您不必使用相同的界面进行读取和写入。如果你只想读取第二种类型的事件,你可以定义另一个接口:

interface OnlySecond {
    void second(SecondDataType second);
}

现在,如果您创建一个实现此接口的处理程序并将其提供给 tailer#methodReader() 调用,则 readOne() 调用将仅处理第二种类型的事件,而跳过所有其他事件。

这也适用于 MethodWriters,即如果您有多个进程写入不同类型的数据而一个进程使用所有这些数据,定义多个接口用于写入数据然后单个接口扩展所有其他接口用于读取的情况并不少见,例如:

interface FirstOut {
    void first(String first);
}
interface SecondOut {
    void second(long second);
}
interface ThirdOut {
    void third(ThirdDataType third);
}
interface AllIn extends FirstOut, SecondOut, ThirdOut {
}

(我特意为方法参数使用了不同的数据类型,以展示如何使用各种类型)

通过进一步测试,我发现嵌套循环可以读取包含不同 POJO 类 中的数据的多个队列。上题代码的问题是queue1Context获得了一次,OUTSIDE本想读取queue1Data objects的循环。我的基本误解是 DocumentContext 对象管理步进队列中的对象,而实际上 ExcerptTailer 对象在顺序读取队列时管理步进(维护索引)。

如果它可能对刚开始使用 ChronicleQueues 的其他人有所帮助,则原始问题中的内部循环应该是:

while(true)
{
    try (final DocumentContext queue1Context = queue1Tailer() )
    {
         queue1Data = queue1Context.wire()
                                          .bytes()
                                                  .readObject(Queue1Data.class);  // first read succeeds
                                                  
         if (queue1Data.getFieldValue() > limit)   // if this fails the inner loop continues as expected
         {                                         // and second and subsequent reads now succeed
            // cache a value
               break;
         }
    }
} 

当然,最外层包含 queue1Context 的 try 块(在原始代码中)应该被删除。