Агрегировать непрерывный поток чисел из файла, используя Hazelcast Jet

Я пытаюсь суммировать непрерывный поток чисел из файла, используя Hazelcast Jet

pipe
    .drawFrom(Sources.fileWatcher)<dir>))
    .map(s->Integer.parseInt(s))
    .addTimestamps()
    .window(WindowDefinition.sliding(10000,1000))
    .aggregate(AggregateOperations.summingDouble(x->x))
    .drainTo(Sinks.logger());

Несколько вопросов

  1. Он не дает ожидаемого результата, я ожидаю, как только в файле появится новое число, нужно просто добавить его к существующей сумме.
  2. Для этого мне нужно дать окно и addTimestamp метод, мне просто нужно сделать сумму бесконечного потока
  3. Как мы можем достичь отказоустойчивости, то есть, если перезапуск сервера сохранит агрегированный результат, а когда он появится, он агрегирует из последней вычисленной суммы?
  4. если сервер не работает, и при загрузке сервера в файл поступает несколько цифр, будет ли он считывать данные с последней точки, когда сервер вышел из строя, или пропустит числа, когда он не работает, и будет считывать только число, полученное после сервер был включен.

1 ответ

Ответ на вопрос 1 и 2: вы ищете rollingAggregateВам не нужны временные метки или окна.

pipe
    .drawFrom(Sources.fileWatcher(<dir>))
    .rollingAggregate(AggregateOperations.summingDouble(Double::parseDouble))
    .drainTo(Sinks.logger());

Ответ на Q3 и Q4: fileWatcher источник не отказоустойчив. Причина в том, что он читает локальные файлы, и когда член умирает, локальные файлы не будут доступны в любом случае. Когда задание возобновится, оно начнет читать с текущей позиции и пропустит добавленные номера, когда задание было приостановлено.

Кроме того, поскольку вы используете глобальное агрегирование, данные из всех файлов будут перенаправляться на один элемент кластера, а другие участники будут простаивать.

Другие вопросы по тегам