Хроника очереди Tailer останавливается при перезапуске appender
В приведенном ниже коде, перезапуск процесса Tailer в порядке. Тем не менее, перезапуск процесса appender приводит к тому, что tailer не получает больше сообщений. Есть ли способ перезапустить приложение и оставить канал открытым?
Отредактировано: ниже приведен полный класс, который я использовал, чтобы воссоздать проблему последовательно. Окружение: Ubuntu 18ronicle-queue-5.16.9.jar
1) java com.tradeplacer.util.IpcTest продюсер
2) java com.tradeplacer.util.IpcTest потребитель
3) убить производителя
4) перезапустить производителя
5) обратите внимание, что потребитель больше ничего не читает
package com.tradeplacer.util;
import java.nio.ByteBuffer;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.queue.ChronicleQueue;
import net.openhft.chronicle.queue.ChronicleQueueBuilder;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
public class IpcTest {
private static final String DIR = "chronicle-test";
public static final void startProducer() {
new Thread() {
public void run() {
System.out.println("starting producer...");
ChronicleQueue queue = ChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
ExcerptAppender appender = queue.acquireAppender();
ByteBuffer ipcBuffer = ByteBuffer.allocate(8192);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
ipcBuffer.clear();
ipcBuffer.put(("data" + i).getBytes());
Bytes<ByteBuffer> bbb = Bytes.wrapForWrite(ipcBuffer);
appender.writeBytes(bbb);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}.start();
}
public static final void startConsumer() {
new Thread() {
public void run() {
System.out.println("starting consumer...");
ChronicleQueue queue = ChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
ExcerptTailer tailer = queue.createTailer().toEnd(); // skip to end, don't read old messages
Bytes bytes = Bytes.allocateDirect(8192);
while (true) {
try {
long ipcIndex = tailer.index();
boolean read = tailer.readBytes(bytes);
int len = bytes.length();
byte[] data = new byte[len];
bytes.read(data);
if (read) {
System.out.println("read " + data);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}.start();
}
public static void main(final String[] args) {
if ("producer".equals(args[0]))
startProducer();
else
startConsumer();
}
}
1 ответ
Я немного изменил код, чтобы уменьшить создание объекта. На последней версии 5.17.1 я могу многократно перезапускать производителя, и потребитель продолжает читать данные.
ПРИМЕЧАНИЕ. Если вы собираетесь писать текст, writeText
метод может быть лучшим выбором.
Если вы хотите написать что-то более сложное, я предлагаю использовать Wire или каждый MethodReader/MethodWriters, которые позволяют вам выполнять вызовы метода интерфейса.
package net.openhft.chronicle.queue;
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.Pauser;
import java.nio.ByteBuffer;
public class IpcTest {
private static final String DIR = "chronicle-test";
public static final void startProducer() {
System.out.println("starting producer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
ExcerptAppender appender = queue.acquireAppender();
Bytes<ByteBuffer> bytes = Bytes.elasticByteBuffer(8192);
ByteBuffer ipcBuffer = bytes.underlyingObject();
for (int i = 0; i < Integer.MAX_VALUE; i++) {
ipcBuffer.clear();
ipcBuffer.put(("data" + i).getBytes());
bytes.readPositionRemaining(0, ipcBuffer.position());
appender.writeBytes(bytes);
Jvm.pause(1);
}
}
public static final void startConsumer() {
System.out.println("starting consumer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
ExcerptTailer tailer = queue.createTailer().toEnd(); // skip to end, don't read old messages
Bytes<ByteBuffer> bytes = Bytes.elasticHeapByteBuffer(8192);
Pauser pauser = Pauser.balanced();
while (true) {
try {
long ipcIndex = tailer.index();
bytes.clear();
boolean read = tailer.readBytes(bytes);
if (read) {
byte[] data = bytes.underlyingObject().array();
int len = (int) bytes.readRemaining();
System.out.println("read " + new String(data, 0, 0, len));
pauser.reset();
} else {
pauser.pause();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(final String[] args) {
if ("producer".equals(args[0]))
startProducer();
else
startConsumer();
}
}
Использование MethodReader / MethodWriter
public class IpcTest {
interface Hello {
void hello(String text);
}
private static final String DIR = "chronicle-test";
public static final void startProducer() {
System.out.println("starting producer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
Hello hello = queue.methodWriter(Hello.class);
for (int i = 0; i < Integer.MAX_VALUE; i++) {
hello.hello("data" + i);
Jvm.pause(1);
}
}
public static final void startConsumer() {
System.out.println("starting consumer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
Hello hello = text -> System.out.println("read " + text);
MethodReader reader = queue.createTailer().methodReader(hello);
Pauser pauser = Pauser.balanced();
while (true) {
if (reader.readOne()) {
pauser.reset();
} else {
pauser.pause();
}
}
}
public static void main(final String[] args) {
if ("producer".equals(args[0]))
startProducer();
else
startConsumer();
}
}
Вы можете использовать DTO с is AbstractMarshallable
сделать его эффективным для сериализации и десериализации.
package net.openhft.chronicle.queue;
import net.openhft.chronicle.bytes.MethodReader;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.AbstractMarshallable;
public class IpcTest {
static class Hi extends AbstractMarshallable {
String text;
int value;
}
interface Hello {
void hi(Hi hi);
}
private static final String DIR = "chronicle-test";
public static final void startProducer() {
System.out.println("starting producer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
Hello hello = queue.methodWriter(Hello.class);
Hi hi = new Hi();
for (int i = 0; i < Integer.MAX_VALUE; i++) {
hi.text = "data";
hi.value = i;
hello.hi(hi);
Jvm.pause(1);
}
}
public static final void startConsumer() {
System.out.println("starting consumer...");
ChronicleQueue queue = SingleChronicleQueueBuilder.single(DIR).blockSize(65536).rollCycle(RollCycles.MINUTELY).build();
Hello hello = text -> System.out.println("read " + text);
MethodReader reader = queue.createTailer().methodReader(hello);
Pauser pauser = Pauser.balanced();
while (true) {
if (reader.readOne()) {
pauser.reset();
} else {
pauser.pause();
}
}
}
public static void main(final String[] args) {
ClassAliasPool.CLASS_ALIASES.addAlias(Hi.class);
if ("producer".equals(args[0]))
startProducer();
else
startConsumer();
}
}
В этом случае потребитель печатает
....
read !Hi {
text: data,
value: 3862
}
read !Hi {
text: data,
value: 3863
}
read !Hi {
text: data,
value: 3864
}
....