Можно ли чередовать хвостовики ChronicleQueue для двух разных очередей?

У меня есть две отдельные ChronicleQueue, созданные независимыми потоками, которые отслеживают потоки веб-сокетов в приложении Java. Когда я читаю каждую очередь независимо в отдельной однопоточной программе, я могу пройти всю очередь, как и ожидалось, используя следующий минимальный код:

      final ExcerptTailer queue1Tailer = queue1.createTailer();
final ExcerptTailer queue2Tailer = queue2.createTailer();

while (true)
{
   try( final DocumentContext context = queue1Tailer.readingDocument() )
   {
      if ( isNull(context.wire()) )
         break;

      counter1++;
      queue1Data = context.wire()
                           .bytes()
                           .readObject(Queue1Data.class);

      queue1Writer.write(String.format("%d\t%d\t%d%n", counter1, queue1Data.getEventTime(), queue1Data.getEventContent()));
   }
}

while (true)
{
   try( final DocumentContext context = queue2Tailer.readingDocument() )
   {
      if ( isNull(context.wire()) )
         break;

      counter2++;
      queue2Data = context.wire()
                           .bytes()
                           .readObject(Queue2Data.class);

      queue2Writer.write(String.format("%d\t%d\t%d%n", counter2, queue2Data.getEventTime(), queue2Data.getEventContent()));
   }
}

В приведенном выше примере я могу прочитать все объекты Queue1Data, затем все объекты Queue2Data и получить доступ к значениям, как и ожидалось. Однако, когда я пытаюсь чередовать чтение очередей (прочитать объект из одной очереди на основе свойства объекта Queue1Data (отметка времени), прочитать объекты Queue2Data до первого объекта после отметки времени ( переменная ограничения ниже) , обнаружен активный объект Queue1Data - тогда что-то с ним делать) после того, как будет прочитан только один объект из queue2Tailer, выбрасывается исключение . Упрощенный код, который не работает, приведен ниже (я пытался поместить внешний цикл внутри и снаружи очереди2Tailer блокировать):

      final ExcerptTailer queue1Tailer = queue1Queue.createTailer("label1");

try( final DocumentContext queue1Context = queue1Tailer.readingDocument() )
{
   final ExcerptTailer queue2Tailer = queue2Queue.createTailer("label2");
    
   while (true)
   {
      try( final DocumentContext queue2Context = queue2Tailer.readingDocument() )
      {
         if ( isNull(queue2Context.wire()) )
         {
            terminate = true;
            break;
         }
         queue2Data = queue2Context.wire()
                                   .bytes()
                                   .readObject(Queue2Data.class);
         while(true)
         {
            queue1Data = queue1Context.wire()
                                          .bytes()
                                                  .readObject(Queue1Data.class);  // first read succeeds
                                                  
            if (queue1Data.getFieldValue() > limit)   // if this fails the inner loop continues
            {                                         // but the second read fails
               // cache a value
               break;
            }
         }

         // continue working with queu2Data object and cached values
      }   // end try block for queue2 tailer

   } // end outer while loop
}   // end outer try block for queue1 tailer

Я пробовал, как указано выше, а также с обоими Tailer, созданными в начале функции, которая выполняет обработку (частная функция, выполняемая при нажатии кнопки в относительно простом приложении Java). По сути, я взял цикл, который работал независимо, и поместил его в другой цикл функции, не ожидая никаких проблем. Я думаю, что мне не хватает чего-то важного в том, как хвостовики позиционируются и используются для чтения объектов, но я не могу понять, что это такое, поскольку один и тот же базовый код работает при независимом чтении очередей. Использование чтобы определить, когда в очереди больше нет объектов, я получил из одного из примеров, хотя я не уверен, что это правильный способ определить, когда в очереди больше нет объектов при последовательной обработке очереди.

Мы ценим любые предложения.

2 ответа

При дальнейшем тестировании я обнаружил, что возможны вложенные циклы для чтения нескольких очередей, содержащих данные в разных классах POJO. Проблема с кодом в приведенном выше вопросе заключается в том, что он получен один раз, ВНЕ цикла, который я ожидал прочитать queue1Data objects. Мое фундаментальное заблуждение заключалось в том, что DocumentContextобъекты управляли обходом объектов в очереди, в то время как на самом деле ExcerptTailerобъекты управляют пошагово (сохраняя индексы) при последовательном чтении очереди.

На случай, если это может помочь кому-то, кто только начинает работать с ChronicleQueues, внутренний цикл в исходном вопросе должен быть таким:

      while(true)
{
    try (final DocumentContext queue1Context = queue1Tailer() )
    {
         queue1Data = queue1Context.wire()
                                          .bytes()
                                                  .readObject(Queue1Data.class);  // first read succeeds
                                                  
         if (queue1Data.getFieldValue() > limit)   // if this fails the inner loop continues as expected
         {                                         // and second and subsequent reads now succeed
            // cache a value
               break;
         }
    }
} 

И, конечно же, самый внешний блок try, содержащий queue1Context(в исходном коде) следует удалить.

Вы изначально не правильно пишете. Теперь есть хардкорный способ добиться того, чего вы пытаетесь достичь (то есть делать все явно, на более низком уровне), и использовать магию MethodReader/MethodWriter, предоставленную Chronicle.

Хардкорный способ

Пишу

      // write first event type
try (DocumentContext dc = queueAppender.writingDocument()) {
    dc.wire().writeEventName("first").text("Hello first");
}
// write second event type
try (DocumentContext dc = queueAppender.writingDocument()) {
    dc.wire().writeEventName("second").text("Hello second");
}

Это позволит записывать разные типы сообщений в одну и ту же очередь, и вы сможете легко различать их при чтении.

Чтение

      StringBuilder reusable = new StringBuilder();
while (true) {
   try (DocumentContext dc = tailer.readingDocument()) {
       if (!dc.isPresent) {
           continue;
       }
       dc.wire().readEventName(reusable);
       if ("first".contentEquals(reusable)) {
           // handle first
       } else if ("second".contentEquals(reusable)) {
           // handle second
       }
       // optionally handle other events
   }
}

Путь Хроники (он же магия Петра)

Это работает с любыми маршаллируемыми типами, а также с любыми примитивными типами и подклассами CharSequence (т.е. строками) и байтами. Для получения более подробной информации ознакомьтесь с документацией по MethodReader/MethodWriter.

Предположим, у вас есть несколько классов данных:

      public class FirstDataType implements Marshallable { // alternatively - extends SelfDescribingMarshallable
    // data fields...
}

public class SecondDataType implements Marshallable { // alternatively - extends SelfDescribingMarshallable
    // data fields...
}

Затем, чтобы записать эти классы данных в очередь, вам просто нужно определить интерфейс, например:

      interface EventHandler {
    void first(FirstDataType first);
    void second(SecondDataType second);
}

Пишу

Затем запись данных так же проста, как:

      final EventHandler writer = appender.methodWriterBuilder(EventHandler).get();
// assuming firstDatum and secondDatum are created earlier
writer.first(firstDatum);
writer.second(secondDatum);

Это делает то же самое, что и в хардкорной секции - записывает имя события (которое берется из имени метода в записи метода, т.е. "первый" или "второй" соответственно), а затем фактический объект данных.

Чтение

Теперь, чтобы прочитать эти события из очереди, вам нужно предоставить реализацию вышеуказанного интерфейса, которая будет обрабатывать соответствующие типы событий, например:

      // you implement this to read data from the queue
private class MyEventHandler implements EventHandler {
    public void first(FirstDataType first) {
        // handle first type of events
    }
    public void second(SecondDataType second) {
        // handle second type of events
    }
}

И тогда вы читаете следующее:

      EventHandler handler = new MyEventHandler();
MethodReader reader = tailer.methodReader(handler);
while (true) {
    reader.readOne(); // readOne returns boolean value which can be used to determine if there's no more data, and pause if appropriate
}

Разное

Вам не обязательно использовать один и тот же интерфейс для чтения и записи. Если вы хотите читать только события второго типа, вы можете определить другой интерфейс:

      interface OnlySecond {
    void second(SecondDataType second);
}

Теперь, если вы создадите обработчик, реализующий этот интерфейс, и передадите его вызову tailer#methodReader(), вызовы readOne() будут обрабатывать только события второго типа, пропуская все остальные.

Это также работает для MethodWriters, т.е. если у вас есть несколько процессов, записывающих разные типы данных, и один процесс, потребляющий все эти данные, нередко можно определить несколько интерфейсов для записи данных, а затем один интерфейс, расширяющий все остальные для чтения, например:

      interface FirstOut {
    void first(String first);
}
interface SecondOut {
    void second(long second);
}
interface ThirdOut {
    void third(ThirdDataType third);
}
interface AllIn extends FirstOut, SecondOut, ThirdOut {
}

(Я намеренно использовал разные типы данных для параметров метода, чтобы показать, как можно использовать разные типы)

Другие вопросы по тегам