Согласование агрегирования Flink

Таблица транзакций в DynamoDB

Transactions {transaction_id, customer_id, statment_id, transaction_date, transaction_amount}

Таблица заявлений в DynamoDB

Statements {statement_id, customer_id, start_time, end_time, statement_amount}

Ежедневно совершаются миллионы транзакций. Я думаю об использовании Flink для агрегирования сумм транзакций в суммы выписок с использованием потоков DynamoDB.

В любой момент мне нужно знать, агрегированы ли все суммы транзакций, принадлежащие оператору, или нет. То есть отображать, устарела ли сумма в выписке. По сути, я говорю о примирении. Как мне добиться этого во Flink?

1 ответ

Решение

Достаточно просто использовать что-то вроде KeyedProcessFunction для постоянного обновления некоторого состояния Flink, которое объединяет statement_amount для каждого statement_idпо мере поступления новых транзакций. Но вопрос, насколько я понимаю, заключается в том, как узнать, когда эта агрегация завершена, или, другими словами, когда Flink обработал все транзакции для данногоstatement_id.

Приложения потоковой обработки всегда сталкиваются с этой проблемой. В отличие от пакетной обработки, при которой можно просто обработать все данные, а затем получить результат, при потоковой обработке мы обрабатываем одну запись за раз, не зная, что может произойти в будущем или с какой задержкой.

Это приводит нас к компромиссу между задержкой и полнотой. В общем, всегда можно немного подождать, чтобы увидеть, какие дополнительные данные появятся, тем самым увеличивая свои шансы получить результат на основе (более) полной информации. Водяные знаки являются техническим проявлением этого компромисса. Любое потоковое приложение, использующее время события, должно создавать водяные знаки, каждый из которых отмечает точку в потоке меткой времени и объявляет, что поток в этот момент, вероятно, завершен до этой метки времени.

Для некоторых приложений быстрое получение, вероятно, правильного результата - это нормально, и на самом деле это может быть лучше, чем ждать дольше, чтобы получить результат, который с большей вероятностью будет правильным. Но в других приложениях необходимо быть полностью точными (что бы это ни значило, именно).

Что именно вам следует делать - это не технический вопрос, а вопрос бизнес-процесса. В конечном итоге это зависит от того, что согласованное заявление означает для вашего бизнеса. Возможно, вам следует стремиться воспроизвести семантику любого текущего процесса.

При этом Flink предоставляет набор инструментов, которые вы можете комбинировать для решения этого варианта использования различными способами, в зависимости от деталей того, как вы хотите, чтобы это работало. Вот как эти части могут сочетаться:

В каждом заявлении есть end_time. Когда водяной знак для потока транзакции достигает этогоend_time, это первый момент, когда можно было бы подумать, что агрегирование транзакций для этого оператора завершено.

Этот водяной знак (как правило) будет выполняться на основе указания ограничения на сумму, на которую поток транзакций может быть вне очереди. Но вы должны ожидать, что независимо от того, насколько вы пессимистичны, несколько аномальных транзакций нарушат это предположение и задержатся по сравнению с водяными знаками.

Чтобы приспособиться к этому, вы можете либо увеличить задержку водяных знаков, чтобы попытаться покрыть все мыслимые задержки (что, как можно было бы поспорить, в целом невозможно), либо решить, что в какой-то момент вы просто должны пойти дальше и составить заявление, которое требует согласования, но это может потребовать обновления или поправки в будущем. Является ли проблема произвольного опоздания реальной проблемой (как это может быть в банковской сфере, где некоторые международные транзакции могут иметь очень длительные задержки) или чисто теоретической, зависит от вашего фактического варианта использования.

Возможность обрабатывать поздние транзакции потребует, чтобы вы либо (1) сохранили данные инструкции в управляемом состоянии Flink, чтобы добавить позднюю транзакцию (и), которые затем можно использовать для обновления инструкции, либо (2) обработать поздно события особым образом, путем чтения ранее полученного результата из БД и последующего обновления этой записи в БД (что необходимо было бы сделать транзакционным способом). Подход № 2 может быть реализован в отдельном задании, которое потребляет поток поздних транзакций, созданных первым заданием.

Возможно, вы сможете найти выход из этой проблемы, включив в оператор метку времени, которая указывает, что оператор включает в себя именно те транзакции, которые были обработаны до этого момента.

Другие вопросы по тегам