Как создать Datalake, используя Apache Kafka, Amazon Glue и Amazon S3?

Я хочу сохранить все данные из темы Кафки в Amazon S3. У меня есть кластер Kafka, который получает в одной теме 200000 сообщений в секунду, и каждое сообщение со значением имеет 50 полей (строки, метки времени, целые числа и числа с плавающей запятой).

Моя основная идея - использовать Kafka Connector для хранения данных в корзине s3, а затем использовать Amazon Glue для преобразования данных и сохранения их в другой корзине. У меня есть следующие вопросы:

1) Как это сделать? Эта архитектура будет хорошо работать? Я пытался использовать Amazon EMR (Spark Streaming), но у меня было слишком много забот. Как сократить время обработки и сбои задач с помощью Apache Spark для потоковой передачи событий из Apache Kafka?

2) Я пытался использовать Kafka Connect из Confluent, но у меня есть несколько вопросов:

  • Могу ли я подключиться к своему кластеру Kafka из другого экземпляра Kafka и запустить автономно мой Kafka Connector s3?

  • Что означает эту ошибку "ОШИБКА Задача s3-sink-0 выкинула неперехваченную
    неустранимое исключение "?

ОШИБКА Задача s3-sink-0 вызвала необработанное и невосстанавливаемое исключение (org.apache.kafka.connect.runtime.WorkerTask:142) java.lang.NullPointerException в io.confluent.connect.hdfs.HdfsSinkTask.close(HdfsSinkT:122) по адресу org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:290) по адресу org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:4ka at или orgkakakaka или ork)..connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:146) в org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) в org.apache.kafka.connect.runas.er (WorkerTask.java:175) на java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) на java.util.concurrent.FutureTask.run(FutureTask.java:266) на java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) в java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) в java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:26,086] ERROR задача уничтожается и не будет восстановлена ​​до тех пор, пока не будет перезапущена вручную (org.apache.kafka.connect.runtime.WorkerTask:143) [2018-10-05 15:32:27,980] WARN не удалось создать Dir, используя каталог из файла url: / Тарг. пропуская. (org.reflections.Reflections:104) java.lang.NullPointerException в org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239) в org.reflections.vfs.Vfs.fromURL(Vfs.java:98)) в org.reflections.vfs.Vfs.fromURL(Vfs.java:91) в org.reflections.Reflections.scan(Reflections.java:237) в org.reflections.Reflections.scan(Reflections.java:204) в org.reflections.Reflections.(Reflections.java:129) в org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:268) в org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerer.java:377) at java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:27,981] WARN не удалось создать Vfs.Dir из URL. игнорирование исключения и продолжение (org.reflections.Reflections:208) org.reflections.ReflectionsException: не удалось создать Vfs.Dir из URL-адреса, не найдено подходящего UrlType [файл:/targ], либо использовать fromURL (окончательный URL-адрес, окончательный список) urlTypes) или используйте статический setDefaultURLTypes(окончательный список urlTypes) или addDefaultURLTypes(UrlType urlType) с вашим специализированным UrlType. в org.reflections.vfs.Vfs.fromURL(Vfs.java:109) в org.reflections.vfs.Vfs.fromURL(Vfs.java:91) в org.reflections.Reflections.scan(Reflections.java:237) в org.reflections.Reflections.scan(Reflections.java:204) в org.reflections.Reflections.(Reflections.java:129) в org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:268) в org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:377) на java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:35,441] ИНФОРМАЦИЯ Размышления Потребовалось 12393 мс для сканирования 429 URL-адресов, что позволило получить 13521 ключ и 95814 значений (org.reflections.Reflections:229).

  • Если вы можете возобновить шаги, чтобы подключиться к Kafka и продолжить на S3 от
    другой экземпляр Кафки, как вы будете делать?
  • Что означает все эти поля key.converter, value.converter, key.converter.schemas.enable, value.converter.schemas.enable, internal.key.converter, internal.value.converter, internal.key.converter.schemas.enable, internal.value.converter.schemas.enable?

Каковы возможные значения для key.converter, value.converter?

3) Как только мои необработанные данные окажутся в корзине, я хотел бы использовать Amazon Glue, чтобы получить эти данные, десериализовать Protobuffer, изменить формат некоторых полей и, наконец, сохранить их в другом контейнере в Parquet. Как я могу использовать свою собственную библиотеку Java Protobuffer в Amazon Glue?

4) Если я хочу сделать запрос в Amazon Athena, как я могу автоматически загрузить разделы (год, месяц, день, час)? С гусеницами и планировщиками Amazon Glue?

1 ответ

В дополнение к ответу @cricket_007

Могу ли я подключиться к своему кластеру Kafka из другого экземпляра Kafka и запустить автономно мой Kafka Connector s3?

Kafka S3 Connector является частью дистрибутива Confluent, который также включает в себя Kafka, а также другие связанные сервисы, но он не предназначен для непосредственного запуска на ваших брокерах, а скорее:

  • в качестве отдельного работника, работающего с конфигурацией соединителя, указанной при запуске службы
  • или в качестве дополнительного рабочего кластера, работающего на стороне вашего кластера Kafka Brokers. В этом случае взаимодействие / запуск коннекторов лучше с помощью API-интерфейса REST Kafka Connect (для документации с примерами найдите "Управление коннекторами Kafka").

Если вы можете возобновить шаги для подключения к Kafka и продолжить работу с s3 с другого экземпляра Kafka, как вы будете это делать?

Вы говорите о другом экземпляре Kafka Connect?

  • если это так, вы можете просто запустить службу Kafka Connect в распределенном режиме, который должен был обеспечить надежность, которую вы, похоже, ищете...

Или вы имеете в виду еще один кластер Kafka (брокеров)?

  • в этом случае вы можете попробовать (но это будет экспериментально, а я сам не пробовал...) запустить Kafka Connect в автономном режиме и просто обновить bootstrap.servers параметр конфигурации вашего соединителя, чтобы указать на новый кластер. Почему это может сработать: в автономном режиме смещения разъема (ов) приемника хранятся локально на вашем рабочем месте (в отличие от распределенного режима, где смещения хранятся непосредственно в кластере Kafka...). Почему это может не сработать: оно просто не предназначено для этого использования, и я предполагаю, что ваши темы и разделы могут быть точно такими же...?

Каковы возможные значения для key.converter, value.converter?

Проверьте документацию Confluent для kafka-connect-s3;)

Как я могу использовать свою собственную библиотеку Java Protobuffer в Amazon Glue?

Не уверен в фактическом методе, но задания Glue порождают кластер EMR за кулисами, поэтому я не понимаю, почему это не должно быть возможно...

Если я хочу сделать запрос в Amazon Athena, как я могу автоматически загрузить разделы (год, месяц, день, час)? С гусеницами и планировщиками Amazon Glue?

Да.

Предполагая ежедневное разбиение, вы могли бы по расписанию запустить сканер первым делом утром, как только вы можете ожидать, что новые данные создадут папку этого дня на S3 (так что по крайней мере один объект для этого дня существует на S3)... Сканер добавит раздел того дня, который затем будет доступен для запроса с любым вновь добавленным объектом.

Мы используем S3 Connect для сотен тем и обрабатываем данные с использованием Hive, Athena, Spark, Presto и т. Д. Кажется, все работает нормально, хотя мне кажется, что реальная база данных может быстрее возвращать результаты.

В любом случае, чтобы ответить о Connect

Могу ли я подключиться к своему кластеру Kafka из другого экземпляра Kafka и запустить автономно мой Kafka Connector s3?

Я не уверен, что понимаю вопрос, но Kafka Connect необходимо подключиться к одному кластеру, вам не нужны два кластера Kafka, чтобы использовать его. Обычно вы запускаете процессы Kafka Connect как часть их собственного кластера, а не посредников.

Что означает эта ошибка "Ошибка задачи s3-sink-0 вызвала необнаруженное неисправимое исключение"?

Это означает, что вам нужно просмотреть журналы, чтобы выяснить, какое исключение выдается и останавливает соединитель от чтения данных.

WARN could not create Dir using directory from url file:/targ... Если вы используете разъем HDFS, я не думаю, что вы должны использовать файл по умолчанию:// URI

Если вы можете возобновить шаги для подключения к Kafka и продолжить работу на s3 с другого экземпляра Kafka, как вы будете это делать?

Вы не можете "возобновить из другого экземпляра Kafka". Как уже упоминалось, Connect может использовать только один кластер Kafka, и любые использованные смещения и группы потребителей сохраняются вместе с ним.

Что означает все эти поля

Эти поля удалены из последних выпусков Kafka, их можно игнорировать. Вы определенно не должны менять их

internal.key.converter,internal.value.converter, internal.key.converter.schemas.enable, internal.value.converter.schemas.enable

Это ваши сериализаторы и десериализаторы, как у обычного производителя потребительского API.

key.converter, value.converter

Я считаю, что это важно только для JSON-конвертеров. Смотрите https://rmoff.net/2017/09/06/kafka-connect-jsondeserializer-with-schemas-enable-requires-schema-and-payload-fields

key.converter.schemas.enable, value.converter.schemas.enable

десериализовать Protobuf, изменить формат некоторых полей и, наконец, сохранить его в другом контейнере в Parquet

Kafka Connect должен быть загружен конвертером Protobuf, и я не знаю, есть ли он (я думаю, что Blue Apron что-то написал... Поиск github).

Вообще говоря, Avro будет намного проще конвертировать в Parquet, потому что для этого уже существуют нативные библиотеки. S3 Connect by Confluent в настоящее время не пишет формат Паркет, но существует в открытом PR. Альтернативой является использование библиотеки Pinterset Secor.

Я не знаю, клей, но если это как улей, вы бы использовали ADD JAR во время запроса загрузить внешние плагины и функции кода

У меня минимальный опыт работы с Афиной, но Клей поддерживает все разделы как метастаф Улья. Автоматическая часть была бы сканером, вы можете поместить фильтр в запрос, чтобы сделать сокращение раздела

Другие вопросы по тегам