Чтение сообщений в Chronicle Queue Tailer (v5.16.11) не приводит к автоматическому перемещению индекса к следующему циклу броска
Я использую CQ v5.16.11 (с openjdk 11) для сохранения данных с ежедневными циклами прокрутки. Процесс работает без перерыва с воскресенья по пятницу, поэтому у меня 5 файлов cq4 в неделю. Я запустил процесс в течение 1,5 недель и имею 8 файлов (3 для 1-й и 5 для 2-й недели).
Итак, файлы, которые у меня есть:
20181003.cq4 cycle=17807,
20181004.cq4 cycle=17808,
20181005.cq4 cycle=17809,
20181007.cq4 cycle=17811,
20181008.cq4 cycle=17812,
20181009.cq4 cycle=17813,
20181010.cq4 cycle=17814,
20181011.cq4 cycle=17815,
Обратите внимание на отсутствующий файл для 20181006.cq4 (цикл =17810), так как процесс не запускается в субботу.
Я использую этот код для чтения данных:
tailer.toEnd();
lastTailerIndex = tailer.index();
tailer.toStart();
while (tailer.index() <= lastTailerIndex) {
// read data
if (tailer.readBytes(data) {
/// do something with data bytes
}
if (tailer.index() == lastTailerIndex) {
break;
}
}
Это правильно считывает данные за 1-ю неделю, но не считывает данные за 2-ю неделю, поскольку не выполняет автоматический переход к следующему циклу.
Любая идея, почему это происходит или как это исправить?
Вопрос похож на этот, который был для более старой версии
- Чтение сообщения из хронической очереди не перемещает текущий индекс в следующий цикл автоматически
- Я создал единую очередь с ежедневной прокаткой
Журналы:
2018-10-12 12:41:15,784 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/metadata.cq4t took 19.237 ms.
2018-10-12 12:41:15,876 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181011.cq4 took 0.063 ms.
2018-10-12 12:41:15,881 DEBUG [main] net.openhft.chronicle.queue.impl.single.PretoucherState - /site/data/20181011.cq4 - Reset pretoucher to pos 4835096 as the underlying MappedBytes changed.
2018-10-12 12:41:15,887 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181003.cq4 took 0.065 ms.
2018-10-12 12:41:15,995 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181011.cq4 took 0.082 ms.
2018-10-12 12:41:15,996 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181003.cq4
2018-10-12 12:41:15,997 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181011.cq4
2018-10-12 12:41:16,418 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181004.cq4 took 0.112 ms.
2018-10-12 12:41:16,418 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181003.cq4
2018-10-12 12:41:16,813 DEBUG [main] net.openhft.chronicle.bytes.MappedFile - Allocation of 0 chunk in /site/data/20181005.cq4 took 0.084 ms.
2018-10-12 12:41:16,813 DEBUG [main] net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder - File released /site/data/20181004.cq4
[Правка 1]: то же самое произошло в прошлые выходные, то есть, как и ожидалось, нового файла на 13 октября не было. Теперь у меня есть файлы с 7 по 15 октября (с отсутствующим файлом 13 октября). Если я сделаю tailer.toStart(); while(tailer.readBytes() { ...}
он читает только файлы с 7 по 12 октября и не читает 14 и 15 октября.
[Правка 2]: воспроизведена проблема, как указано ниже https://github.com/OpenHFT/Chronicle-Queue/issues/537
- Настройка / Libs: jvm openjdk 11, Ubuntu 16.04, openhft.affinity / 3.1.9, Хроник-карта /3.16.0, Хроника-очередь /5.16.11, Хроник-байты /1.16.23, Хроника-ядро /1.16.20 Хроника-проволока /1.16.16, Хроника-нити /1.16.3, Анна /4.4.0
- шаги:
- Запустите WriterProcess - дайте ему закончить.
- Запустите ReaderProcess - см. 5 операторов печати.
- Остановить ReaderProcess
- Подождите некоторое время - 10 минут.
- Снова запустите WriterProcess - дайте ему закончить или продолжайте этот процесс.
- Запустите ReaderProcess - он печатает только первые 5 операторов печати, и после этого ничего не печатается. Даже если WriterProcess запущен / запись в очередь, хвостовик в этом процессе не продвигается.
public class WriterProcess {
public static void main(String[] args) throws InterruptedException {
final String dir = "/tmp/demo/";
final LocalTime localTime = LocalTime.of(17, 0);
final ZoneId zoneID = ZoneId.of("America/New_York");
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir)
.blockSize((long) Math.pow(2, 23))
.rollCycle(RollCycles.MINUTELY)
.rollTime(localTime, zoneID)
.build();
final ExcerptAppender appender = queue.acquireAppender();
// pre touch
scheduledExecutorService.scheduleAtFixedRate(appender::pretouch,0,30, TimeUnit.SECONDS);
// write data
System.out.println("writing data ...");
writeData(appender, 5);
// close queue
System.out.println("shutting down now ...");
queue.close();
scheduledExecutorService.shutdown();
scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
}
public static void writeData(ExcerptAppender appender, int count) {
int ctr = 0;
String dateStr;
Date date = new Date();
while (true) {
dateStr = date.toString();
appender.writeText("["+ctr+"] Written " + dateStr);
System.out.println("["+ctr+"] Written " + dateStr);
ctr++;
if (ctr >= count) {
break;
}
try {
Thread.sleep(65_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ReaderProcess {
public static void main(String[] args) {
final String dir = "/tmp/demo/";
final LocalTime localTime = LocalTime.of(17, 0);
final ZoneId zoneID = ZoneId.of("America/New_York");
final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir)
.blockSize((long) Math.pow(2, 23))
.rollCycle(RollCycles.MINUTELY)
.rollTime(localTime, zoneID)
.build();
final ExcerptTailer tailer = queue.createTailer();
tailer.toStart();
// read data
System.out.println("reading data ...");
readData(tailer, 25);
// close
System.out.println("shutting down now ...");
queue.close();
}
public static void readData(ExcerptTailer tailer, int count) {
int ctr = 0;
Bytes data = Bytes.allocateDirect(new byte[500]);
while (true) {
if (tailer.readBytes(data)) {
System.out.println("["+ctr+"] Read {"+ data + "}");
ctr++;
if (ctr >= count) {
break;
}
}
}
}
}
1 ответ
Я написал немного более простую версию, которая работает с хроникой 2.17 и версиями, которые он использует. Самое большое изменение, которое я сделал, было очистить байты data
перед прочтением в противном случае он только добавляется, чтобы ничего не перезаписывать.
import net.openhft.chronicle.bytes.Bytes;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class WriterProcess {
static final String dir = OS.TMP + "/demo-" + System.nanoTime() + "/";
public static void main(String[] args) throws InterruptedException {
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir)
.testBlockSize()
.rollCycle(RollCycles.TEST_SECONDLY)
.build();
final ExcerptAppender appender = queue.acquireAppender();
// pre touch
scheduledExecutorService.scheduleAtFixedRate(appender::pretouch, 3, 30, TimeUnit.SECONDS);
new Thread(ReaderProcess::main).start();
// write data
System.out.println("writing data ...");
writeData(appender, 100);
// close queue
System.out.println("shutting down now ...");
queue.close();
scheduledExecutorService.shutdown();
scheduledExecutorService.awaitTermination(1, TimeUnit.SECONDS);
}
public static void writeData(ExcerptAppender appender, int count) {
int ctr = 0;
while (true) {
LocalDateTime date = LocalDateTime.now();
appender.writeText("[" + ctr + "] Written " + date);
System.out.println("[" + ctr + "] Written " + date);
ctr++;
if (ctr >= count) {
break;
}
try {
Thread.sleep(2_200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class ReaderProcess {
public static void main(String... args) {
final String dir = WriterProcess.dir;
final SingleChronicleQueue queue = SingleChronicleQueueBuilder.binary(dir)
.testBlockSize()
.rollCycle(RollCycles.TEST_SECONDLY)
.build();
final ExcerptTailer tailer = queue.createTailer();
tailer.toStart();
// read data
System.out.println("reading data ...");
readData(tailer, 100);
// close
System.out.println("shutting down now ...");
queue.close();
}
public static void readData(ExcerptTailer tailer, int count) {
int ctr = 0;
Bytes data = Bytes.allocateDirect(64);
while (true) {
data.clear();
if (tailer.readBytes(data)) {
System.out.println("[" + ctr + "] Read {" + data + "}");
ctr++;
if (ctr >= count) {
break;
}
}
}
}
}