Chronicle Queue читает любые сообщения с помощью readDocument

В очереди хроник у меня есть два типа сообщений. Я хочу прочитать это сообщение с помощью того же продавца и, если это возможно, тем же методом, например, с помощью tailer.readDocument().

Кто угодно, если это возможно, типы сообщений относятся к разным типам объектов. У них нет отношений.

В моей реальной логике чтения мне нужно прочитать все записи очереди, и порядок важен, например:

Сообщение в очередиA СообщениеA СообщениеB

Мне нужно прочитать сообщение B только после сообщения A в этом примере, поэтому я ищу метод, который считывает все записи независимо от типа сообщения.

2 ответа

Самый простой подход - писать сообщения с помощью MethodWriter / MethodReader https://github.com/OpenHFT/Chronicle-Queue#high-level-interface.

Вы начинаете с определения асинхронного интерфейса, в котором все методы имеют:

  • аргументы, которые являются только входами
  • не ожидается ни возвращаемого значения, ни исключений.

Простой асинхронный интерфейс

      import net.openhft.chronicle.wire.SelfDescribingMarshallable;
interface MessageListener {
    void method1(Message1 message);

    void method2(Message2 message);
}

static class Message1 extends SelfDescribingMarshallable {
    String text;

    public Message1(String text) {
        this.text = text;
    }
}

static class Message2 extends SelfDescribingMarshallable {
    long number;

    public Message2(long number) {
        this.number = number;
    }
}

Для записи в очередь вы можете вызвать прокси, реализующий этот интерфейс.

      SingleChronicleQueue queue1 = ChronicleQueue.singleBuilder(path).build();

MessageListener writer1 = queue1.acquireAppender().methodWriter(MessageListener.class);

// call method on the interface to send messages
writer1.method1(new Message1("hello"));
writer1.method2(new Message2(234));

Эти вызовы производят сообщения, которые можно сбросить следующим образом.

      # position: 262568, header: 0
--- !!data #binary
method1: {
  text: hello
}
# position: 262597, header: 1
--- !!data #binary
method2: {
  number: !int 234
}

Для чтения сообщений вы можете предоставить читателя, который вызывает вашу реализацию теми же вызовами, что и вы.

      // a proxy which print each method called on it
MessageListener processor = ObjectUtils.printAll(MessageListener.class)
// a queue reader which turns messages into method calls.
MethodReader reader1 = queue1.createTailer().methodReader(processor);

assertTrue(reader1.readOne());
assertTrue(reader1.readOne());
assertFalse(reader1.readOne());

Запуск этого примера печатает:

      method1 [!Message1 {
  text: hello
}
]
method2 [!Message2 {
  number: 234
}
]

У Nice есть другой способ создания процессора. Я имею в виду, что в вашем примере вы печатаете объекты, которые я хочу заполнить двумя разными типами объектов. До сих пор я не нашел способа использовать для этого методы одного и того же слушателя.

Другие вопросы по тегам