Amazon Managed Streaming для Kafka- функции и производительность MSK

Я оцениваю AWS Managed Service Kafka (MSK) и знаю, что в настоящее время он находится в режиме предварительного просмотра, поэтому может иметь не все функции или надлежащую документацию. Я попытался настроить кластер msk и проверял, может ли msk выполнить все варианты использования / требования нашей компании, но в настоящее время ему не хватает документации и примеров.

https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html

У меня есть следующие запросы:

i) Как я могу получить доступ к AWS MSK с клиентами Kafka, работающими на моей локальной системе?

ii) Поддерживает ли MSK эволюцию схемы и точную семантику?

iii) Предоставит ли MSK какой-либо способ обновления конфигурации кластера или настройки? Как и aws glue, обеспечивает изменение параметров для spark executr и памяти драйвера в их управляемой среде.

iv) Можно ли интегрировать MSK с другими сервисами AWS (например, Redshift,EMR и т. д.)?

v) Могу ли я использовать потоковый sql с MSK через ksql? Как я могу настроить KSQL с MSK?

vi) Как я могу выполнять в режиме реального времени прогнозный анализ данных, проходящих через MSK?

vii) Кроме того, насколько надежен MSK по сравнению с другим облачным кластером kafka из Azure/confluent и какой-либо тест производительности по сравнению с vanilla kafka? И какое максимальное количество брокеров может быть запущено в кластере?

1 ответ

Решение

MSK - это в основном кластер vanilla apache kafka, настроенный и управляемый aws (с предопределенными параметрами конфигурации, основанными на типе экземпляра кластера, количестве брокеров и т. Д.), Настроенный для облачной среды.

В идеале, он должен иметь возможность выполнять все / большинство вещей, которые поддерживает Kafka с открытым исходным кодом. Также, если у вас есть конкретный вариант использования или требование, которое не задокументировано, я предлагаю вам связаться со службой поддержки AWS для получения дополнительных разъяснений относительно управляемой части кластера kafka (максимально допустимое количество брокеров, надежность, стоимость).

Я постараюсь ответить на ваши вопросы исходя из моего личного опыта:

i) Как я могу получить доступ к AWS MSK с помощью клиентов kafka, работающих на моей локальной системе?

Вы не можете получить доступ к MSK напрямую с локальной или локальной машины, используя клиент kafka или поток kafka. Поскольку URL-адрес брокера, строка соединения zookeeper являются частными IP-адресами кластера msk vpc/subnet. Чтобы получить доступ через клиент kafka, вам нужно запустить экземпляр ec2 в том же vpc MsK и выполнить клиент kafka (производитель / потребитель) для доступа к кластеру msk.

Чтобы получить доступ к кластеру MSK с локального компьютера или локальных систем, вы можете настроить инфраструктуру Rest Proxy kafka с открытым исходным кодом от Confluent для доступа к кластеру MSK из внешнего мира через остальные API. Эта структура не является полноценным клиентом kafka и не позволяет выполнять все операции клиента kafka, но вы можете выполнять большинство операций в кластере, начиная с выборки метаданных кластера, информации о теме, создания и потребления сообщения и т. Д.

Сначала настройте группу безопасности экземпляра репо и экземпляра ec2 (см. Раздел 1: Предварительная установка или настройка дополнительных компонентов kafka), а затем установите / настройте оставшийся прокси kafka.

sudo yum install confluent-kafka-rest 

Создайте имя файла kafka-rest.properties и добавьте следующее содержимое:

bootstrap.servers=PLAINTEXT://10.0.10.106:9092,PLAINTEXT://10.0.20.27:9092,PLAINTEXT://10.0.0.119:9092
zookeeper.connect=10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181
schema.registry.url=http://localhost:8081

** изменить загрузочный сервер и URL / IP-адрес zookeeper.

Запустите остальной сервер

kafka-rest-start kafka-rest.properties &

Доступ к MSK через rest API с помощью curl или rest клиент / браузер.

Получить список тем

curl "http://localhost:8082/topics"

curl "http://<ec2 instance public ip>:8082/topics"

Чтобы получить доступ с локального или локального компьютера, убедитесь, что к экземпляру ec2, на котором работает остальной сервер, подключен публичный ip или эластичный ip.

Подробнее Rest API работа https://github.com/confluentinc/kafka-rest

ii) Поддерживает ли MSK эволюцию схемы и точную семантику?

Вы можете использовать сообщение Avro вместе с "Реестром схемы", чтобы добиться развития схемы и обслуживания схемы.

Установка и настройка реестра схем похожа на слияние прокси-сервера kafka-rest.

sudo yum install confluent-schema-registry

Создайте имя файла schema-registry.propertie и добавьте следующее содержимое:

listeners=http://0.0.0.0:8081
kafkastore.connection.url=10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181
kafkastore.bootstrap.servers=PLAINTEXT://10.0.10.106:9092,PLAINTEXT://10.0.20.27:9092,PLAINTEXT://10.0.0.119:9092
kafkastore.topic=_schemas
debug=false

** изменить загрузочный сервер и zookeeper (соединение) url/ips.

Запустите службу реестра схемы

schema-registry-start schema-registry.properties &

Обратитесь за дополнительной информацией: https://github.com/confluentinc/schema-registry

https://docs.confluent.io/current/schema-registry/docs/schema_registry_tutorial.html

Ровно семантика является функцией apache kafka, и хотя я не тестировал ее на msk, я считаю, что она должна поддерживать эту функцию, поскольку она является частью только apache kafka с открытым исходным кодом.

iii) Предоставит ли MSK какой-либо способ обновления конфигурации кластера или настройки? Как и aws, клей обеспечивает изменение параметров памяти для искрового исполнителя и драйвера в их управляемой среде.

Да, можно изменить параметр конфигурации во время выполнения. Я проверил, изменив параметр retention.ms с помощью инструмента настройки kafka, и это изменение было немедленно применено к теме. Поэтому я думаю, что вы можете обновить и другие параметры, но MSK может не разрешить все изменения конфигурации, точно так же как AWS glue допускает только несколько изменений параметров конфигурации свечи, потому что разрешение изменения всех параметров пользователем может быть уязвимо для управляемой среды.

Изменить через инструмент конфигурации kafka

kafka-configs.sh --zookeeper 10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181  --entity-type topics --entity-name jsontest --alter --add-config retention.ms=128000

Подтверждено изменение, используя отдых

curl "http://localhost:8082/topics/jsontest"

iv) Можно ли интегрировать MSK с другими сервисами AWS (например, Redshift,EMR и т. д.)?

Да, вы можете подключиться / интегрироваться в другой сервис aws с MSK. Например, вы можете запустить клиент (потребитель) Kafka для чтения данных из kafka и записи в reddshift, rds, s3 или Dynamodb. Убедитесь, что клиент kafka работает на экземпляре ec2 (внутри msk vpc), который имеет надлежащую роль iam для доступа к этой службе, а экземпляр ec2 находится в публичной подсети или в частной подсети (с конечной точкой NAT или vpc для s3).

Также вы можете запустить EMR внутри кластеров MSK vpc/subnet, а затем через EMR(spark) вы можете подключиться к другому сервису.

Потоковая структура Spark с AWS Managed Service Kafka

Запуск кластера EMR в vpc кластера MSK. Разрешите группу безопасности EMR Master и Slave во входящем правиле группы безопасности кластеров MSK для порта 9092.

Запустить оболочку Spark

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0

Подключение к кластеру MSK из потоковой структуры искры

val kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "10.0.10.106:9092,10.0.20.27:9092,10.0.0.119:9092").option("subscribe", "jsontest") .load()

Начните читать / распечатывать сообщение на консоли

val df=kafka.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("console").start()

или же

val df=kafka.selectExpr("CAST(value AS STRING)").writeStream.format("console").start()

v) Могу ли я использовать потоковый sql с MSK через ksql? Как я могу настроить KSQL с MSK?

Да, вы можете настроить KSQL с кластером MSK. По сути, вам нужно запустить экземпляр ec2 в той же vpc / подсети кластера MSK. А затем установите сервер ksql + клиент в экземпляре ec2 и используйте его.

Сначала настройте группу безопасности confluent repo и ec2 instance (см. Раздел 1: Предварительная установка или настройка дополнительных компонентов kafka), а затем установите / настройте сервер / клиент Ksql.

После этого установите сервер ksql

sudo yum install confluent-ksql 

Создайте имя файла ksql-server.properties и добавьте следующее содержимое:

bootstrap.servers=10.0.10.106:9092,10.0.20.27:9092,10.0.0.119:9092
listeners=http://localhost:8088

** изменить загрузочный сервер ips / url.

Запустите сервер ksql

ksql-server-start ksql-server.properties &

После этого запустите ksql cli

ksql http://localhost:8088

И, наконец, запустите команду, чтобы получить список тем

ksql> SHOW TOPICS;

 Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
-----------------------------------------------------------------------------------------
 _schemas    | false      | 1          | 3                  | 0         | 0              
 jsontest    | false      | 1          | 3                  | 1         | 1              
----------------------------- --------------------------------------------------

Обратитесь за дополнительной информацией - https://github.com/confluentinc/ksql

vi) Как я могу выполнять в режиме реального времени прогнозный анализ данных, проходящих через MSK?

Выполнение прогностического анализа или машинного обучения в реальном времени не является специфическим для MSK. То же, что вы будете делать с кластером kafka (или любым потоковым конвейером), то же самое применимо к MSK. Существуют различные способы достижения ваших целей в соответствии с вашими требованиями, но я опишу наиболее распространенный или широко используемый в отрасли:

  • Используйте Spark с MSK (kafka) и выполняйте анализ с помощью потоковой передачи структуры и MLIB (с вашей прогнозной моделью).

  • Вы можете обучить свою прогностическую модель в среде H20.ai, а затем экспортировать модель в виде Java Java. А затем интегрируйте модель java pojo с пользовательским кодом kafka, который обработает сообщение из темы msk (kafka) и выполнит анализ в реальном времени.

  • Вы можете обучить модель и развернуть ее в sagemaker, а затем вызвать из клиентского кода клиента kafka, чтобы получить прогноз в реальном времени, вызвав конечную точку вывода модели sagemaker на основе данных / сообщений kafka.

vii) Кроме того, насколько надежен MSK по сравнению с другим облачным кластером kafka из Azure/confluent и какой-либо тест производительности по сравнению с vanilla kafka? И какое максимальное количество брокеров может быть запущено в кластере?

MSK находится в предварительном просмотре, как вы уже знаете, поэтому пока рано говорить о его надежности. Но в целом, как и все другие сервисы AWS, он должен стать более надежным со временем, а также, надеюсь, с новыми функциями и лучшей документацией.

Я не думаю, что AWS или какой-либо облачный вендор Azure. Облако Google предоставляет эталонный тест производительности своих сервисов, поэтому вам нужно попробовать тестирование производительности с вашей стороны. А клиенты / инструменты kafka (kafka-producer-perf-test.sh, kafka-consumer-perf-test.sh) предоставляют сценарий тестирования производительности, который можно выполнить, чтобы получить представление о производительности кластера. Опять же, тестирование производительности сервиса в реальном производственном сценарии будет сильно различаться в зависимости от различных факторов, таких как (размер сообщения, объем данных, поступающих на kafka, производитель синхронизации или асинхронный вызов, количество потребителей и т. Д.) И производительность будет снижаться до определенного уровня. случай использования, а не общий тест.

Что касается максимального числа брокеров, поддерживаемых в кластере, лучше спросить ребят из AWS через их систему поддержки.


Раздел 1: Предварительная установка или настройка - дополнительные компоненты kafka:

Запустите экземпляр Ec2 в vpc / подсети кластера MSK.

Войти в экземпляр ec2

Настройка репозитория yum для загрузки пакетов компонентов kafka через yum

sudo yum install curl which
  sudo rpm --import https://packages.confluent.io/rpm/5.1/archive.key

Перейдите в /etc/yum.repos.d/ и создайте файл с именем confluent.repo и добавьте следующее содержимое

[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1 

Следующее чистое ням репо

sudo yum clean all 

Разрешите группу безопасности экземпляра ec2 во входящих правилах группы безопасности MSK для портов 9092(подключающийся посредник) и 2081(подключающийся zookeeper).

Раздел 2: Команда для получения брокера кластеров MSK и информации url/ip zookeeper

Порт подключения Zookeeper

aws kafka describe-cluster --region us-east-1 --cluster-arn <cluster arn>

URL-адрес соединения с брокером

aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn <cluster arn>

-------------------------------------------------- --------------------

Замечания:

Обзор MSK и настройки компонентов:

  • Пожалуйста, ознакомьтесь с высокоуровневой архитектурой MSK и настройкой различных компонентов (отдых, реестр схем, балансировка нагрузки и т. Д.). Также, как это будет связано с другими сервисами AWS. Это просто простая эталонная архитектура.

  • Кроме того, вместо настройки rest, реестра реестра и ksql в экземпляре ec2 вы также можете докеризировать внутри контейнера.

  • И если вы настраиваете несколько прокси-серверов отдыха, то вам нужно поместить эту службу прокси-сервера за липким балансировщиком нагрузки (например, nginx, использующий хэш ip), чтобы убедиться, что один и тот же клиентский клиент сопоставляется с одной и той же группой потребителей, чтобы избежать несоответствия или несоответствия при извлечении данных. через чтение данных.

Надеюсь, вы найдете вышеуказанную информацию полезной!

Другие вопросы по тегам