Реактивные потоки - пакетирование с таймаутом
Я пытаюсь заменить библиотеку обработки собственных журналов, которая очень похожа на ReactiveStreams io.projectreactor
, Цель состоит в том, чтобы уменьшить объем поддерживаемого нами кода и использовать все новые функции, добавленные сообществом (следя за объединением операторов).
Для начала мне нужно использовать stdio и объединить многострочные записи журнала в текстовые объекты, которые будут передаваться по конвейеру. Сценарий использования подробно объясняется в главе о многострочных записях в журнале Filebeat (за исключением случаев, когда мы хотим, чтобы он был обработан).
Пока что код, который я имею:
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
.subscribe();
Это учитывает многострочное слияние при обнаружении нового заголовка журнала, но в существующей библиотеке мы также очищаем накопленные строки по истечении времени ожидания (т. Е. Если текст не получен в течение 5 секунд, очистите запись).
Что было бы правильным способом смоделировать это в Reactor? Мне нужно написать свой собственный оператор, или я могу настроить любой из существующих?
Будем очень благодарны за любые ссылки на соответствующие примеры и документы для достижения этого варианта использования в Project Reactor или RxJava.
2 ответа
Это зависит от того, как вы определяете начало и конец каждого буфера, поэтому следующий код RxJava 2 предназначен как подсказка об использовании значения основного источника для открытия и закрытия шлюза буфера:
TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();
Function<Flowable<String>, Flowable<List<String>>> f = o ->
o.buffer(o.filter(v -> v.contains("Start")),
v -> Flowable.merge(o.filter(w -> w.contains("End")),
Flowable.timer(5, TimeUnit.MINUTES, scheduler)));
pp.publish(f)
.subscribe(System.out::println);
pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");
pp.onNext("Start");
pp.onNext("C");
scheduler.advanceTimeBy(5, TimeUnit.MINUTES);
pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();
Печать:
[Start, A, B, End]
[Start, C]
[Start, D, End]
Он работает, делясь источником через publish
что позволяет повторно использовать одно и то же значение из апстрима без одновременного запуска нескольких исходных копий. Открытие определяется обнаружением строки "Старт" на линии. Закрытие определяется либо обнаружением строки "Конец", либо срабатыванием таймера после периода отсрочки.
Редактировать:
Если "Пуск" также является индикатором для следующего пакета, вы можете заменить проверку "Конец" на "Пуск" и изменить содержимое буфера, так как в противном случае он будет включать новый заголовок в предыдущий буфер:
pp.publish(f)
.doOnNext(v -> {
int s = v.size();
if (s > 1 && v.get(s - 1).contains("Start")) {
v.remove(s - 1);
}
})
.subscribe(System.out::println);
buffer
Оператор кажется наиболее подходящим и простым решением для меня.
У этого есть размер и основанные на времени стратегии. У вас есть журнал, так что я думаю, вы можете интерпретировать количество строк как размер буфера.
Вот пример - как генерировать элементы, сгруппированные по 4 или 5 секундам:
Observable<String> lineReader = Observable.<String>create(subscriber -> {
try {
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
for (String line = br.readLine(); line != null; line = br.readLine()) {
subscriber.onNext(line);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}).subscribeOn(Schedulers.newThread());
lineReader
.buffer(5, TimeUnit.SECONDS,4)
.filter(lines -> !lines.isEmpty())
.subscribe(System.out::println);