Хроника Очередь V3. Могут ли записи быть потеряны при переносе блока данных?
У меня есть приложение, которое записывает записи в Chronicle Queue (V3), которое также сохраняет значения индекса записей выдержки в других (Chronicle) картах путем предоставления индексированного доступа в очередь. Иногда нам не удается найти данную запись, которую мы ранее сохранили, и я считаю, что это может быть связано с переносом блока данных.
Ниже приведена отдельная программа тестирования, которая воспроизводит такие варианты использования в небольших масштабах. Он многократно записывает запись и немедленно пытается найти результирующее значение индекса, используя отдельный ExcerptTailer. Некоторое время все хорошо, пока первый блок данных не израсходован и не назначен второй файл данных, после чего начнутся ошибки поиска. Если размер блока данных увеличивается, чтобы избежать пролонгации, то никакие записи не будут потеряны. Также использование небольшого размера индексного блока данных, приводящего к созданию нескольких индексных файлов, не вызывает проблем.
Тестовая программа также пытается использовать ExcerptListener, работающий параллельно, чтобы увидеть, были ли записи, которые, по-видимому, "утеряны" автором, когда-либо получены потоком чтения - нет. Также пытается перечитать результирующую очередь от начала до конца, что подтверждает, что они действительно потеряны.
Приступая к выполнению кода, я вижу, что при поиске "отсутствующей записи" внутри AbstractVanillarExcerpt#index он, похоже, успешно находит правильный объект VanillaMappedBytes из dataCache, но определяет, что в нем нет записи и смещения данных в качестве len == 0. В дополнение к тому, что записи не найдены, в какой-то момент после того, как проблемы начинают возникать после переворачивания, в методе VanillaMappedFile # fileChannel создается NPE, поскольку ему был передан нулевой путь к файлу. Путь к коду предполагает, что при разрешении записи, которая была успешно найдена в индексе, файл всегда будет найден, но не в этом случае.
Можно ли надежно использовать Chronicle Queue при пролонгации блоков данных, и если да, то что я делаю, что может быть причиной проблемы, с которой я сталкиваюсь?
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import org.junit.Before;
import org.junit.Test;
import net.openhft.affinity.AffinitySupport;
import net.openhft.chronicle.Chronicle;
import net.openhft.chronicle.ChronicleQueueBuilder;
import net.openhft.chronicle.ExcerptAppender;
import net.openhft.chronicle.ExcerptCommon;
import net.openhft.chronicle.ExcerptTailer;
import net.openhft.chronicle.VanillaChronicle;
public class ChronicleTests {
private static final int CQ_LEN = VanillaChronicle.Cycle.DAYS.length();
private static final long CQ_ENT = VanillaChronicle.Cycle.DAYS.entries();
private static final String ROOT_DIR = System.getProperty(ChronicleTests.class.getName() + ".ROOT_DIR",
"C:/Temp/chronicle/");
private static final String QDIR = System.getProperty(ChronicleTests.class.getName() + ".QDIR", "chronicleTests");
private static final int DATA_SIZE = Integer
.parseInt(System.getProperty(ChronicleTests.class.getName() + ".DATA_SIZE", "100000"));
// Chunk file size of CQ index
private static final int INDX_SIZE = Integer
.parseInt(System.getProperty(ChronicleTests.class.getName() + ".INDX_SIZE", "10000"));
private static final int Q_ENTRIES = Integer
.parseInt(System.getProperty(ChronicleTests.class.getName() + ".Q_ENTRIES", "5000"));
// Data type id
protected static final byte FSYNC_DATA = 1;
protected static final byte NORMAL_DATA = 0;
protected static final byte TH_START_DATA = -1;
protected static final byte TH_END_DATA = -2;
protected static final byte CQ_START_DATA = -3;
private static final long MAX_RUNTIME_MILLISECONDS = 30000;
private static String PAYLOAD_STRING = "1234567890ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
private static byte PAYLOAD_BYTES[] = PAYLOAD_STRING.getBytes();
private Chronicle _chronicle;
private String _cqPath = ROOT_DIR + QDIR;
@Before
public void init() {
buildCQ();
}
@Test
public void test() throws IOException, InterruptedException {
boolean passed = true;
Collection<Long> missingEntries = new LinkedList<Long>();
long sent = 0;
Thread listener = listen();
try {
listener.start();
// Write entries to CQ,
for (int i = 0; i < Q_ENTRIES; i++) {
long entry = writeQEntry(PAYLOAD_BYTES, (i % 100) == 0);
sent++;
// check each entry can be looked up
boolean found = checkEntry(i, entry);
if (!found)
missingEntries.add(entry);
passed &= found;
}
// Wait awhile for the listener
listener.join(MAX_RUNTIME_MILLISECONDS);
if (listener.isAlive())
listener.interrupt();
} finally {
if (listener.isAlive()) { // => exception raised so wait for listener
log("Give listener a chance....");
sleep(MAX_RUNTIME_MILLISECONDS);
listener.interrupt();
}
log("Sent: " + sent + " Received: " + _receivedEntries.size());
// Look for missing entries in receivedEntries
missingEntries.forEach(me -> checkMissingEntry(me));
log("All passed? " + passed);
// Try to find missing entries by searching from the start...
searchFromStartFor(missingEntries);
_chronicle.close();
_chronicle = null;
// Re-initialise CQ and look for missing entries again...
log("Re-initialise");
init();
searchFromStartFor(missingEntries);
}
}
private void buildCQ() {
try {
// build chronicle queue
_chronicle = ChronicleQueueBuilder.vanilla(_cqPath).cycleLength(CQ_LEN).entriesPerCycle(CQ_ENT)
.indexBlockSize(INDX_SIZE).dataBlockSize(DATA_SIZE).build();
} catch (IOException e) {
throw new InitializationException("Failed to initialize Active Trade Store.", e);
}
}
private long writeQEntry(byte dataArray[], boolean fsync) throws IOException {
ExcerptAppender appender = _chronicle.createAppender();
return writeData(appender, dataArray, fsync);
}
private boolean checkEntry(int seqNo, long entry) throws IOException {
ExcerptTailer tailer = _chronicle.createTailer();
if (!tailer.index(entry)) {
log("SeqNo: " + seqNo + " for entry + " + entry + " not found");
return false;
}
boolean isMarker = isMarker(tailer);
boolean isFsyncData = isFsyncData(tailer);
boolean isNormalData = isNormalData(tailer);
String type = isMarker ? "MARKER" : isFsyncData ? "FSYNC" : isNormalData ? "NORMALDATA" : "UNKNOWN";
log("Entry: " + entry + "(" + seqNo + ") is " + type);
return true;
}
private void log(String string) {
System.out.println(string);
}
private void searchFromStartFor(Collection<Long> missingEntries) throws IOException {
Set<Long> foundEntries = new HashSet<Long>(Q_ENTRIES);
ExcerptTailer tailer = _chronicle.createTailer();
tailer.toStart();
while (tailer.nextIndex())
foundEntries.add(tailer.index());
Iterator<Long> iter = missingEntries.iterator();
long foundCount = 0;
while (iter.hasNext()) {
long me = iter.next();
if (foundEntries.contains(me)) {
log("Found missing entry: " + me);
foundCount++;
}
}
log("searchFromStartFor Found: " + foundCount + " of: " + missingEntries.size() + " missing entries");
}
private void checkMissingEntry(long missingEntry) {
if (_receivedEntries.contains(missingEntry))
log("Received missing entry:" + missingEntry);
}
Set<Long> _receivedEntries = new HashSet<Long>(Q_ENTRIES);
private Thread listen() {
Thread returnVal = new Thread("Listener") {
public void run() {
try {
int receivedCount = 0;
ExcerptTailer tailer = _chronicle.createTailer();
tailer.toStart();
while (receivedCount < Q_ENTRIES) {
if (tailer.nextIndex()) {
_receivedEntries.add(tailer.index());
} else {
ChronicleTests.this.sleep(1);
}
}
log("listener complete");
} catch (IOException e) {
log("Interupted before receiving all entries");
}
}
};
return returnVal;
}
private void sleep(long interval) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
// No action required
}
}
protected static final int THREAD_ID_LEN = Integer.SIZE / Byte.SIZE;
protected static final int DATA_TYPE_LEN = Byte.SIZE / Byte.SIZE;
protected static final int TIMESTAMP_LEN = Long.SIZE / Byte.SIZE;
protected static final int CRC_LEN = Long.SIZE / Byte.SIZE;
protected static long writeData(ExcerptAppender appender, byte dataArray[],
boolean fsync) {
appender.startExcerpt(DATA_TYPE_LEN + THREAD_ID_LEN + dataArray.length
+ CRC_LEN);
appender.nextSynchronous(fsync);
if (fsync) {
appender.writeByte(FSYNC_DATA);
} else {
appender.writeByte(NORMAL_DATA);
}
appender.writeInt(AffinitySupport.getThreadId());
appender.write(dataArray);
appender.writeLong(CRCCalculator.calcDataAreaCRC(appender));
appender.finish();
return appender.lastWrittenIndex();
}
protected static boolean isMarker(ExcerptCommon excerpt) {
if (isCqStartMarker(excerpt) || isStartMarker(excerpt) || isEndMarker(excerpt)) {
return true;
}
return false;
}
protected static boolean isCqStartMarker(ExcerptCommon excerpt) {
return isDataTypeMatched(excerpt, CQ_START_DATA);
}
protected static boolean isStartMarker(ExcerptCommon excerpt) {
return isDataTypeMatched(excerpt, TH_START_DATA);
}
protected static boolean isEndMarker(ExcerptCommon excerpt) {
return isDataTypeMatched(excerpt, TH_END_DATA);
}
protected static boolean isData(ExcerptTailer tailer, long index) {
if (!tailer.index(index)) {
return false;
}
return isData(tailer);
}
private static void movePosition(ExcerptCommon excerpt, long position) {
if (excerpt.position() != position)
excerpt.position(position);
}
private static void moveToFsyncFlagPos(ExcerptCommon excerpt) {
movePosition(excerpt, 0);
}
private static boolean isDataTypeMatched(ExcerptCommon excerpt, byte type) {
moveToFsyncFlagPos(excerpt);
byte b = excerpt.readByte();
if (b == type) {
return true;
}
return false;
}
protected static boolean isNormalData(ExcerptCommon excerpt) {
return isDataTypeMatched(excerpt, NORMAL_DATA);
}
protected static boolean isFsyncData(ExcerptCommon excerpt) {
return isDataTypeMatched(excerpt, FSYNC_DATA);
}
/**
* Check if this entry is Data
*
* @param excerpt
* @return true if the entry is data
*/
protected static boolean isData(ExcerptCommon excerpt) {
if (isNormalData(excerpt) || isFsyncData(excerpt)) {
return true;
}
return false;
}
}
2 ответа
Проблема возникает только при инициализации размера блока данных значением, которое не является степенью двойки. Встроенные конфигурации на IndexedChronicleQueueBuilder
(small()
, medium()
, large()
) позаботьтесь о том, чтобы инициализировать, используя степени двух, что дало ключ к правильному использованию.
Несмотря на приведенный выше ответ относительно поддержки, который я полностью ценю, было бы полезно, если бы опытный пользователь Chronicle мог подтвердить, что целостность Chronicle Queue зависит от использования размера блока данных степени два.
Я сожалею, что мы не предоставляем бесплатную поддержку для хроники очереди 3, мы только исследуем и исправляем проблемы в последней транковой версии наших библиотек с открытым исходным кодом, если только у вас нет контракта на поддержку.
Вы можете попробовать обновить до последней версии очереди хроники.
Если вы хотите получить дополнительную информацию о наших различных контрактах поддержки, пожалуйста, напишите sales@chronicle.software.