Агрегировать непрерывный поток чисел из файла, используя Hazelcast Jet
Я пытаюсь суммировать непрерывный поток чисел из файла, используя Hazelcast Jet
pipe
.drawFrom(Sources.fileWatcher)<dir>))
.map(s->Integer.parseInt(s))
.addTimestamps()
.window(WindowDefinition.sliding(10000,1000))
.aggregate(AggregateOperations.summingDouble(x->x))
.drainTo(Sinks.logger());
Несколько вопросов
- Он не дает ожидаемого результата, я ожидаю, как только в файле появится новое число, нужно просто добавить его к существующей сумме.
- Для этого мне нужно дать окно и
addTimestamp
метод, мне просто нужно сделать сумму бесконечного потока - Как мы можем достичь отказоустойчивости, то есть, если перезапуск сервера сохранит агрегированный результат, а когда он появится, он агрегирует из последней вычисленной суммы?
- если сервер не работает, и при загрузке сервера в файл поступает несколько цифр, будет ли он считывать данные с последней точки, когда сервер вышел из строя, или пропустит числа, когда он не работает, и будет считывать только число, полученное после сервер был включен.
1 ответ
Ответ на вопрос 1 и 2: вы ищете rollingAggregate
Вам не нужны временные метки или окна.
pipe
.drawFrom(Sources.fileWatcher(<dir>))
.rollingAggregate(AggregateOperations.summingDouble(Double::parseDouble))
.drainTo(Sinks.logger());
Ответ на Q3 и Q4: fileWatcher
источник не отказоустойчив. Причина в том, что он читает локальные файлы, и когда член умирает, локальные файлы не будут доступны в любом случае. Когда задание возобновится, оно начнет читать с текущей позиции и пропустит добавленные номера, когда задание было приостановлено.
Кроме того, поскольку вы используете глобальное агрегирование, данные из всех файлов будут перенаправляться на один элемент кластера, а другие участники будут простаивать.