Описание тега akka-kafka

NoneAkka -kafka - это потребитель kafka на основе акторов, построенный поверх высокоуровневого потребителя kafka, который позволяет асинхронную / одновременную обработку сообщений от kafka, сохраняя при этом верхнюю границу количества сообщений в полете и явно управляя фиксацией смещения.
0 ответов

Обработка ошибок в Akka Kafka Producer

Я использую реактивный-кафка-ядро 0.10.1 (нацеливание на Кафку 0.9.х). Похоже, что актер-производитель Кафки останавливается всякий раз, когда возникает ошибка в функции обратного вызова. Есть ли способ настроить это поведение? Наш вариант использов…
22 июл '16 в 23:47
1 ответ

Производитель Kafka с регулируемым количеством сообщений в секунду

Каков наилучший способ написать производителя Apache Kafka с устойчивой, но регулируемой производительностью. Пример: производитель должен посылать брокеру постоянные 1000 сообщений в секунду. Во время выполнения выходной сигнал должен быть настроен…
1 ответ

Ответ метаданных - неверное имя хоста брокера

Я использую SimpleConsumer Python-Kafka для прослушивания темы в брокере Kafka. Брокер Kafka работает на компьютере с именем хоста BROKER_HOST. Теперь SimpleConsumer запрашивает метаданные темы у брокера BROKER_HOST для темы и получает кортеж (Broke…
11 фев '15 в 08:34
1 ответ

Несовместимое ограничение равенства при использовании Akka Kafka Streams

Я пытаюсь использовать Akka Kafka Streams, следуя документации Akka Kafka Streams. Вот код, который у меня есть: ConsumerSettings<byte[], ETLProcessMessage> consumerSettings = ConsumerSettings .create(actorSystem, new ByteArrayDeserializer(), …
2 ответа

Akka Kafka Producersettings: перегруженное значение метода применяется с альтернативами:

Я снова и снова сталкиваюсь с проблемой, когда помещаю настройки производителя в свой код. Когда у меня его нет, все работает отлично. Ниже приведен файл одного файла, он содержит весь код, я пытаюсь записать файл в поток Кафка. И получаю эту ошибку…
1 ответ

NoSuchMethodError: KafkaConsumer.subscribe

Я использую следующие зависимости: val akkaVersion = "2.4.9" val kafkaVersion = "0.10.0.1" dependencies = ... "com.typesafe.akka" %% "akka-actor" % akkaVersion, "com.typesafe.akka" %% "akka-cluster" % akkaVersion, "com.softwaremill.reactivekafka" %%…
0 ответов

avro4s не может десериализовать AnyRef

У меня есть простой класс дела case class KafkaContainer(key: String, payload: AnyRef) тогда я хочу отправить это в тему kafka через производителя я делаю это val byteArrayStream = new ByteArrayOutputStream() val output = AvroOutputStream.binary[Kaf…
01 авг '18 в 15:03
1 ответ

Проблемы с обновлением схемы AVRO

У меня есть простой класс дела: case class User(id: String, login: String, key: String) я добавляю поле "имя" case class User(id: String, login: String, name: String, key: String) затем добавьте это поле в схему avro (user.avsc) { "namespace": "test…
09 июл '18 в 09:02
1 ответ

Как сессионизировать / сгруппировать события в Akka Streams?

Требование заключается в том, что я хочу написать потоковое приложение Akka, которое прослушивает непрерывные события от Kafka, а затем выполняет сеанс данных события во временном интервале на основе некоторого значения идентификатора, встроенного в…
12 мар '18 в 10:28
1 ответ

Reactive-Kafka Stream Consumer: найдены мертвые буквы

Я пытаюсь использовать сообщения от Кафки, используя реактивную библиотеку Акки. Я получаю одно сообщение, и после этого я получил [INFO] [01/24/2017 10:36:52.934] [CommittableSourceConsumerMain-akka.actor.default-dispatcher-5] [akka://CommittableSo…
1 ответ

Получение последнего сообщения от kafka Тема с использованием akka-stream-kafka при соединении с websocket

Можно ли вообще получить последнее сообщение по теме Kafka, используя Akka Streams Kafka? Я создаю веб-сокет, который прослушивает тему Кафки, но в настоящее время он извлекает все предыдущие unred сообщения при подключении. Это может составить дово…
0 ответов

Цепочка Akka Streams Kafka с Akka-http от источника [Bytestring]

Я получаю файл в виде строки байтов от потребителя Kafka Reactive Streams, который я хочу отправить в службу. Есть ли возможность извлечь Source[Bytestring, Any] из Kafka Reactive Stream Consumer, чтобы я мог связать поток от Kafka до Akka-http, не …
2 ответа

Каков хороший шаблон для фиксации смещения потребителя Kafka после обработки сообщения?

Я использую Akka Streams Kafka для передачи сообщений Kafka удаленному сервису. Я хочу гарантировать, что служба получает каждое сообщение ровно один раз (по крайней мере один раз и самое большее один раз). Вот код, который я придумал: private def s…
06 мар '17 в 20:14
1 ответ

Как вы возвращаете будущее, содержащее список сообщений после того, как все доступные сообщения были использованы из темы Кафки?

Я, вероятно, упускаю суть потребителя Kafka, но я хочу сделать следующее: Потребитель подписывается на тему, захватывает все сообщения в теме и возвращает будущее со списком всех этих сообщений. Код, который я написал, чтобы попытаться выполнить это…
06 мар '17 в 23:33
0 ответов

Lagom Kafka Проблема интеграции клиентов

Я работаю над lagom и пытаюсь подключиться к теме, предоставленной службой Lagom в среде Java, отличной от Lagom. Я подписался на тему, используя: LagomClientFactory clientFactory = LagomClientFactory.create("legacy-system",LagomClientFactory.class.…
24 янв '18 в 09:33
1 ответ

Стратегия контроля потока Akka Kafka не работает

Я запускаю приложение Akka Streams Kafka и хочу включить стратегию контроля для потребителя потока, чтобы в случае сбоя брокера и смерти потребителя потока после истечения времени ожидания супервизор мог перезапустить потребителя. Вот мой полный код…
06 окт '17 в 06:28
1 ответ

Использование реактивной-кафки для условной обработки сообщений

Я пытался использовать реактив-кафку, и у меня возникла проблема с условной обработкой, на которую я не нашел удовлетворительного ответа. В основном я пытаюсь использовать одну тему кафки, которая содержит огромное количество сообщений (около 10 мил…
15 фев '18 в 22:16
0 ответов

Соединение с отказоустойчивым брокером Akka Stream Kafka

Есть ли способ подключиться к отказоустойчивому брокеру kafka из реактивного потока akka kafka, если тот, к которому он подключен, выходит из строя?
3 ответа

Akka Stream TCP + производитель Akka Stream Kafka не прекращает публиковать сообщения и не выводить сообщения об ошибках

У меня есть следующий поток: Source(IndexedSeq(ByteString.empty)) .via( Tcp().outgoingConnection(bsAddress, bsPort) .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true)) .map(_.utf8String) ) .map(m => new ProducerRecord[Array[Byt…
14 дек '16 в 05:56
1 ответ

Akka streams kafka фиксирует смещение после фильтра

Я пытаюсь использовать стратегию принятия по крайней мере один раз для смещения в потоках akka, и я не могу понять, какова ожидаемая схема для случаев, когда я использую фильтр в своем потоке. я ожидаю, что ни одно из отфильтрованных сообщений не по…
13 мар '18 в 18:05