Очередь хроники: StoreFileListener несколько onAcquired и onReleased
Я использую Chronicle 4.5.27.
Ниже приведена простая реализация Writer и Reader с StoreFileListner. В Reader я получаю несколько событий onAcquired и onReleased.
Почему это произойдет? Я ожидаю получить только один Acquire (когда файл получен для чтения) и один Release (после завершения чтения).
В журналах ниже для Reader можно увидеть несколько событий onAcquired и onReleased.
Обратите внимание, что это поведение является случайным. Также обратите внимание, что Writer был намеренно замедлен с помощью Jvm.pause, чтобы имитировать реальную систему, где данные могут быть недоступны непрерывно.
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.StoreFileListener;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueueBuilder;
import java.io.File;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class ChronicleFactory {
public SingleChronicleQueue createChronicle(String instance, String persistenceDir, RollCycles rollCycles) {
SingleChronicleQueue chronicle = null;
try {
chronicle = SingleChronicleQueueBuilder.binary(persistenceDir).rollCycle(rollCycles).storeFileListener(new StoreFileListener() {
@Override
public void onReleased(int i, File file) {
String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
System.out.println(currentTime + ": " + Thread.currentThread().getName() + " onReleased called for file: " + file.getAbsolutePath() + " for cycle: " + i);
}
@Override
public void onAcquired(int cycle, File file) {
String currentTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
System.out.println(currentTime + ": " + Thread.currentThread().getName() + " onAcquired called for file: " + file.getAbsolutePath() + " for cycle: " + cycle);
}
}).build();
} catch (Exception e) {
e.printStackTrace();
}
return chronicle;
}
}
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.queue.ExcerptAppender;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import org.apache.commons.lang3.RandomStringUtils;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
public class MarketDataWriter {
private static AtomicLong dataSeq = new AtomicLong();
private static long longSequence = 0;
private static int intSequence = 0;
public static void main(String args[]) {
String path = "C:\\Logs\\ChronicleData\\marketdata";
writeMarketData(path);
}
private static void writeMarketData(String path) {
ChronicleFactory chronicleFactory = new ChronicleFactory();
SingleChronicleQueue chronicle = chronicleFactory.createChronicle("MD", path, RollCycles.MINUTELY);
ExcerptAppender appender = chronicle.acquireAppender();
while (true) {
Jvm.pause(100); //NOTE: Slowing down writer to understand file rolling
appender.writeBytes(b -> {
b.writeLong(getLongSequence());
b.writeInt(getIntSequence());
});
}
}
private static long getLongSequence() {
return longSequence++;
}
private static int getIntSequence() {
return intSequence++;
}
}
import net.openhft.chronicle.queue.ExcerptTailer;
import net.openhft.chronicle.queue.RollCycles;
import net.openhft.chronicle.queue.impl.single.SingleChronicleQueue;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SimpleMarketDataReader {
private static final ExecutorService executor = Executors.newCachedThreadPool();
public static void main(String args[]) {
String pathForMarketData = "C:\\Logs\\ChronicleData\\marketdata";
readMarketData(pathForMarketData);
}
public static void readMarketData(String pathForMarketDataFile) {
ChronicleFactory chronicleFactory = new ChronicleFactory();
SingleChronicleQueue chronicle = chronicleFactory.createChronicle("Reader", pathForMarketDataFile, RollCycles.MINUTELY);
//Create another thread to read same file
SimpleMarketDataReaderNewChronicle simpleMarketDataReaderNewChronicle = new SimpleMarketDataReaderNewChronicle();
executor.submit(simpleMarketDataReaderNewChronicle);
ExcerptTailer tailer = chronicle.createTailer();
try {
while (true) {
tailer.readBytes(b -> {
b.readLong();
b.readInt();
//System.out.println("Long Sequence in SimpleMarketDataReader: " + b.readLong());
//System.out.println("User data is: " + userData);
//System.out.println("Int Sequence is: " + b.readInt());
});
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Выход писателя:
2018 -01-03 09:36:00.079: main onAcquired вызвало файл: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4 для цикла: 25249536
2018 -01-03 09:37:00.098: main onReleased вызывается для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4 для цикла: 25249536
Выход считывателя:
2018 -01-03 09:36:00.065: main onAcquired вызвано для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4 для цикла: 25249536
2018 -01-03 09:36:00.075: main onReleased вызывается для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4 для цикла: 25249536
2018 -01-03 09:36:00.078: main onAcquired вызвал файл: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4 для цикла: 25249536
2018 -01-03 09:36:00.082: main onReleased вызывается для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4 для цикла: 25249536
2018 -01-03 09:36:00.086: main onAcquired вызвало файл: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4 для цикла: 25249536
2018 -01-03 09:37:00.103: main onReleased вызывается для файла: C:\Logs\ChronicleData\marketdata\20180103-0936.cq4 для цикла: 25249536