Apache Storm однократная обработка
В настоящее время мы используем Apache Storm 0.9.5 в режиме кластерной топологии для обработки записей Amazon Kinesis (носик) и хранения их в хранилище данных Redshift (болт). Наш кластер Storm развернут в AWS и состоит из 1 узла nimbus + UI, 1 узла zookeeper и 3 узлов supervisor + logviewer. Наша конфигурация топологии поддерживает обработку нескольких потоков Kinesis и для каждого потока включает:
- Один носик потока Kinesis для прослушивания входящих записей
- Один болт Redshift для вставки записей в хранилище данных
Топология:
final TopologyBuilder topologyBuilder = new TopologyBuilder();
// for every configured kinesis stream
final List<KinesisStreamSpout> kinesisStreamSpouts = kinesisStreamService.getKinesisStreamSpouts();
for (final KinesisStreamSpout kinesisStreamSpout : kinesisStreamSpouts) {
final String spoutId = kinesisStreamSpout.getSpoutId();
topologyBuilder.setSpout(spoutId, kinesisStreamSpout.getKinesisSpout());
// set the corresponding redshift bolt
final String streamName = kinesisStreamSpout.getStreamName();
final RedshiftBolt redshiftBolt = new RedshiftBolt(streamName);
topologyBuilder.setBolt(redshiftBolt.getId(),
redshiftBolt, stormProperties.getNumberOfWorkersPerStream()).shuffleGrouping(spoutId);
}
return topologyBuilder.createTopology();
Недостатком системы была неспособность гарантировать однократную обработку входных сообщений, в результате чего в целевую базу данных было вставлено несколько записей с одним и тем же бизнес-ключом. Чтобы понять масштабы проблемы, мы провели контролируемый тест и обнаружили, что примерно треть всех входных записей была отправлена на обработку более одного раза.
В соответствии с этим потоком (который в настоящее время остается без ответа), мы также рассмотрели возможность использования Trident для обеспечения однократной обработки, но также пришли к выводу, что более важно встроить идемпотентность в систему (наряду с, по крайней мере, семантика), а не добавлять сложность, снижать производительность и генерировать состояние, как предложено в этой статье.
В настоящее время мы ищем советы о том, как наилучшим образом реализовать идемпотентность в существующей топологии таким образом, чтобы поддерживать кластеризацию. До сих пор мы склонялись к введению RedisBolt, который бы определял значения с помощью идентификатора сообщения кортежа. Существует ли существующий шаблон для достижения этого с помощью Apache Storm?
1 ответ
Если вы не хотите использовать Trident, вы можете прочитать следующую статью о "транзакционных топологиях". Это концепция Trident, и вы все равно можете применять ее "вручную". Кажется, это хороший шаблон для вашего случая использования: https://storm.apache.org/documentation/Transactional-topologies.html
Кроме того, я хочу добавить, что Storm (как и любая другая система, такая как Apache Flink [disclaimer: я приверженец Flink] и Apache Spark Streaming) может гарантировать только однократную обработку внутри системы. Если данные пересылаются во внешнюю систему, то один раз может быть достигнут только тогда и только тогда, когда внешняя система может поддерживать идемпотентные операции.