Прозрачная потоковая передача и пакетная обработка

Я все еще довольно новичок в мире потоковой и пакетной обработки и пытаюсь понять концепции и речи. Весьма возможно, что ответ на мой вопрос хорошо известен, его легко найти или даже сто раз ответили здесь, в SO, но я не смог его найти.

Фон:

Я работаю в большом научном проекте (исследование ядерного синтеза), и мы производим тонны данных измерений во время экспериментов. Эти данные в основном представляют собой потоки сэмплов, помеченных наносекундной временной меткой, где сэмплы могут быть чем угодно, от одного значения АЦП, через массив таких данных, до глубоко структурированных данных (до сотен записей от 1-битного логического значения до 64-битной двойной точности плавает) на необработанные HD видеокадры или даже текстовые сообщения. Если я правильно понимаю общие термины, я буду считать наши данные "табличными данными", по большей части.

Мы работаем в основном с самодельными программными решениями, начиная со сбора данных и заканчивая простым онлайн-анализом (потоковым) (таким как масштабирование, субсэмплинг и т. Д.), До наших собственных средств хранения, управления и доступа к данным.

Принимая во внимание масштаб операции и усилия по поддержке всех этих реализаций, мы изучаем возможности использования стандартных структур и инструментов для решения большинства наших задач.

Мой вопрос:

В частности, на данном этапе мы сталкиваемся с необходимостью все более и более изощренной (автоматизированной и ручной) аналитики данных для данных в реальном времени / онлайн / в реальном времени, а также "после факта" автономной / пакетной аналитики "исторических" данных. С этой целью я пытаюсь понять, могут ли и как существующие аналитические структуры, такие как Spark, Flink, Storm и т. Д. (Возможно, поддерживаются очередями сообщений, такими как Kafka, Pulsar,...), поддерживать сценарий, в котором

  • данные передаются / передаются в платформу / платформу, прикрепляются идентификаторы, такие как URL-адрес, идентификатор или тому подобное
  • платформа взаимодействует со встроенным или внешним хранилищем для сохранения потоковых данных (в течение многих лет), связанных с идентификатором
  • аналитические процессы теперь могут прозрачно запрашивать / анализировать данные, адресованные идентификатором и произвольным (открытым или закрытым) временным окном, а платформа предоставляет пакеты / выборки данных для анализа либо из внутреннего хранилища, либо поступая в реальном времени из сбора данных

Простая потоковая передача онлайн-данных в хранилище и выполнение запросов оттуда не представляется возможным, поскольку нам нужны как необработанные, так и проанализированные данные для мониторинга в реальном времени и контроля обратной связи в реальном времени в ходе эксперимента. Кроме того, предоставление пользователю возможности запрашивать либо живой входной сигнал, либо историческую партию из хранилища по-другому не было бы идеальным решением, поскольку наши физики в большинстве своем не являются специалистами по данным, и мы хотели бы держать в стороне такие "технические детали" и в идеале использовать те же алгоритмы. следует использовать для анализа новых данных в реальном времени и старых сохраненных данных предыдущих экспериментов.

Sitenotes:

  • мы говорим о загрузке данных в диапазоне от 10-й гигабит в секунду, которая будет увеличиваться от нескольких секунд до минут - могут ли это быть обработаны кандидатами?
  • мы используем временные метки в наносекундном разрешении, даже думая о пико - это накладывает некоторые ограничения на список возможных кандидатов, если я правильно понимаю?

Я был бы очень рад, если бы кто-нибудь смог понять мой вопрос и пролить некоторый свет на эту тему для меня:-)

Большое спасибо и всего наилучшего, Беппо

2 ответа

Решение

Я не думаю, что кто-то может сказать "да, фреймворк X может определенно справиться с вашей рабочей нагрузкой", потому что это во многом зависит от того, что вам нужно от обработки сообщений, например, от надежности обмена сообщениями и от того, как потоки данных могут быть разделены.

Вы можете быть заинтересованы в BenchmarkingDistributedStreamProcessingEngines. В статье используются версии Storm/Flink/Spark, которым несколько лет (похоже, они были выпущены в 2016 году), но, может быть, авторы захотят позволить вам использовать их тест для оценки более новых версий трех платформ?

Очень распространенная установка для потоковой аналитики - это использование источника данных -> Kafka/Pulsar -> платформа аналитики -> долговременное хранилище данных. Это позволяет отделить обработку от загрузки данных и позволяет выполнять такие вещи, как повторная обработка исторических данных, как если бы они были новыми.

Я думаю, что первым шагом для вас должно быть выяснить, сможете ли вы получить необходимый объем данных через Kafka/Pulsar. Либо создайте набор тестов вручную, либо извлеките некоторые данные, которые, по вашему мнению, могут быть репрезентативными для вашей производственной среды, и посмотрите, сможете ли вы передать их через Kafka / Pulsar с требуемой пропускной способностью / задержкой.

Не забудьте рассмотреть возможность разделения ваших данных. Если некоторые из ваших потоков данных могут обрабатываться независимо (т.е. порядок не имеет значения), вам не следует помещать их в одни и те же разделы. Например, вероятно, нет причин смешивать измерения датчиков и потоки видеопотока. Если вы можете разделить ваши данные на независимые потоки, вы с меньшей вероятностью столкнетесь с узкими местами как в Kafka/Pulsar, так и в аналитической среде. Отдельные потоки данных также позволят вам намного лучше распараллелить обработку в аналитической среде, поскольку вы можете запускать, например, подачу видео и обработку датчиков на разных компьютерах.

Как только вы узнаете, можете ли вы получить достаточную пропускную способность через Kafka/Pulsar, вы должны написать небольшой пример для каждой из 3 платформ. Для начала я бы просто получил и отбросил данные из Kafka/Pulsar, которые должны сообщить вам заранее, есть ли узкое место на пути аналитики Kafka / Pulsar ->. После этого вы можете расширить пример, чтобы сделать что-то интересное с данными примера, например, выполнить небольшую обработку, как то, что вы, возможно, захотите сделать в производстве.

Вам также необходимо учитывать, какие виды обработки вам необходимы для ваших потоков данных. Как правило, вы платите штраф за производительность за гарантированную обработку по крайней мере один раз или ровно один раз. Для некоторых типов данных (например, видеопотока) иногда может быть уместно терять сообщения. Как только вы определитесь с необходимыми гарантиями, вы можете соответствующим образом настроить платформы аналитики (например, отключить взлом в Storm) и попробовать сравнительный анализ ваших тестовых данных.

Просто чтобы ответить на некоторые ваши вопросы более четко:

Случай использования анализа / мониторинга данных в реальном времени звучит так, как будто он достаточно хорошо подходит для систем Storm / Flink. Подключите его непосредственно к Kafka/Pulsar, а затем сделайте любую аналитику, которая вам нужна, звучит так, как будто она может работать на вас.

Переработка исторических данных будет зависеть от того, какие запросы вам нужно сделать. Если вам просто нужен временной интервал + идентификатор, вы, вероятно, можете сделать это с помощью Kafka плюс фильтр или соответствующее разбиение. Kafka позволяет начать обработку с определенной временной отметки, и если ваши данные разбиты по идентификатору или вы фильтруете их в качестве первого шага в своей аналитике, вы можете начать с предоставленной временной отметки и прекратить обработку, когда нажмете сообщение вне временного окна. Это применимо только в том случае, если временная метка, которая вас интересует, - это когда сообщение было добавлено в Kafka. Я также не верю, что Kafka поддерживает разрешение менее миллисекунды для временных меток, которые она генерирует.

Если вам нужно выполнить более сложные запросы (например, вам нужно посмотреть на временные метки, сгенерированные вашими датчиками), вы можете использовать Cassandra, Elasticsearch или Solr в качестве постоянного хранилища данных. Вы также захотите выяснить, как вернуть данные из этих систем обратно в вашу аналитическую систему. Например, я считаю, что Spark поставляется с соединителем для чтения из Elasticsearch, в то время как Elasticsearch предоставляет соединитель для Storm. Вы должны проверить, существует ли такой соединитель для вашей комбинации хранилища данных / аналитической системы, или быть готовым написать свой собственный.

Изменить: Разработка, чтобы ответить на ваш комментарий.

Я не знал, что Kafka или Pulsar поддерживают временные метки, указанные пользователем, но, конечно же, они оба поддерживают. Я не вижу, что Pulsar поддерживает временные метки менее миллисекунды, хотя?

Идея, которую вы описываете, определенно может быть поддержана Кафкой.

Что вам нужно, это возможность запустить клиента Kafka / Pulsar в определенное время и читать вперед. Пульсар, кажется, еще не поддерживает это, но Кафка поддерживает.

Вы должны гарантировать, что когда вы записываете данные в раздел, они поступают в порядке отметки времени. Это означает, что вам не разрешено, например, написать первое сообщение 1 с отметкой времени 10, а затем сообщение 2 с отметкой времени 5.

Если вы можете быть уверены, что пишете сообщения для Кафки, описанный вами пример будет работать. Затем вы можете сказать: "Начните с метки времени" прошлой ночью в полночь ", и Кафка начнет там. Когда поступают живые данные, они получат их и добавят в конец своего журнала. Когда потребительская / аналитическая инфраструктура прочитает все данные с последней полуночи до текущего времени, она начнет ждать поступления новых (живых) данных и обрабатывает их по мере поступления. Затем вы можете написать собственный код в своей аналитической среде для убедитесь, что он останавливает обработку, когда достигает первого сообщения с отметкой времени "завтра вечером".

Что касается поддержки временных меток с точностью до миллисекунды, я не думаю, что Kafka или Pulsar будут поддерживать их "из коробки", но вы можете обойти это достаточно легко. Просто добавьте метку времени менее миллисекунды в сообщение в качестве настраиваемого поля. Если вы хотите начать, например, с отметки времени 9 мс 10 нс, вы просите Kafka начать с 9 мс и использовать фильтр в среде аналитики, чтобы отбрасывать все сообщения от 9 мс до 9 мс 10 нс.

Позвольте мне добавить следующие предложения о том, как Apache Pulsar может помочь удовлетворить некоторые ваши требования. Пища для размышлений как бы.

"данные перетекают / передаются в платформу / фреймворк, прикрепляются идентификаторы, такие как URL-адрес, идентификатор или что-то подобное"

Возможно, вы захотите взглянуть на Pulsar Functions, которая позволяет вам писать простые функции (на Java или Python), которые выполняются для каждого отдельного сообщения, публикуемого в теме. Они идеально подходят для этого варианта использования дополнения данных.

платформа взаимодействует со встроенным или внешним хранилищем для сохранения потоковых данных (в течение многих лет), связанных с идентификатором

Pulsar недавно добавил многоуровневое хранилище, которое позволяет вам сохранять потоки событий в S3, хранилище BLOB-объектов Azure или облачном хранилище Google. Это позволит вам хранить данные в течение многих лет в дешевом и надежном хранилище данных

аналитические процессы теперь могут прозрачно запрашивать / анализировать данные, адресованные идентификатором и произвольным (открытым или закрытым) временным окном, а платформа предоставляет пакеты / выборки данных для анализа либо из внутреннего хранилища, либо поступая в реальном времени из сбора данных

Apache Pulsar также добавил интеграцию с механизмом запросов Presto, который позволит вам запрашивать данные за определенный период времени (включая данные из многоуровневого хранилища) и помещать их в раздел для обработки.

Другие вопросы по тегам