Реактивные потоки - пакетирование с таймаутом

Я пытаюсь заменить библиотеку обработки собственных журналов, которая очень похожа на ReactiveStreams io.projectreactor, Цель состоит в том, чтобы уменьшить объем поддерживаемого нами кода и использовать все новые функции, добавленные сообществом (следя за объединением операторов).

Для начала мне нужно использовать stdio и объединить многострочные записи журнала в текстовые объекты, которые будут передаваться по конвейеру. Сценарий использования подробно объясняется в главе о многострочных записях в журнале Filebeat (за исключением случаев, когда мы хотим, чтобы он был обработан).

Пока что код, который я имею:

BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
          .subscribe();          

Это учитывает многострочное слияние при обнаружении нового заголовка журнала, но в существующей библиотеке мы также очищаем накопленные строки по истечении времени ожидания (т. Е. Если текст не получен в течение 5 секунд, очистите запись).

Что было бы правильным способом смоделировать это в Reactor? Мне нужно написать свой собственный оператор, или я могу настроить любой из существующих?

Будем очень благодарны за любые ссылки на соответствующие примеры и документы для достижения этого варианта использования в Project Reactor или RxJava.

2 ответа

Решение

Это зависит от того, как вы определяете начало и конец каждого буфера, поэтому следующий код RxJava 2 предназначен как подсказка об использовании значения основного источника для открытия и закрытия шлюза буфера:

TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();

Function<Flowable<String>, Flowable<List<String>>> f = o -> 
        o.buffer(o.filter(v -> v.contains("Start")), 
                 v -> Flowable.merge(o.filter(w -> w.contains("End")), 
                                     Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

pp.publish(f)
.subscribe(System.out::println);

pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");

pp.onNext("Start");
pp.onNext("C");

scheduler.advanceTimeBy(5, TimeUnit.MINUTES);

pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();

Печать:

[Start, A, B, End]
[Start, C]
[Start, D, End]

Он работает, делясь источником через publish что позволяет повторно использовать одно и то же значение из апстрима без одновременного запуска нескольких исходных копий. Открытие определяется обнаружением строки "Старт" на линии. Закрытие определяется либо обнаружением строки "Конец", либо срабатыванием таймера после периода отсрочки.

Редактировать:

Если "Пуск" также является индикатором для следующего пакета, вы можете заменить проверку "Конец" на "Пуск" и изменить содержимое буфера, так как в противном случае он будет включать новый заголовок в предыдущий буфер:

pp.publish(f)
.doOnNext(v -> {
    int s = v.size();
    if (s > 1 && v.get(s - 1).contains("Start")) {
        v.remove(s - 1);
    }
})
.subscribe(System.out::println);

buffer Оператор кажется наиболее подходящим и простым решением для меня.

У этого есть размер и основанные на времени стратегии. У вас есть журнал, так что я думаю, вы можете интерпретировать количество строк как размер буфера.

Вот пример - как генерировать элементы, сгруппированные по 4 или 5 секундам:

    Observable<String> lineReader = Observable.<String>create(subscriber -> {
        try {
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            for (String line = br.readLine(); line != null; line = br.readLine()) {
                subscriber.onNext(line);
            }
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }).subscribeOn(Schedulers.newThread());

    lineReader
      .buffer(5, TimeUnit.SECONDS,4)
      .filter(lines -> !lines.isEmpty())
      .subscribe(System.out::println);
Другие вопросы по тегам