Описание тега spark-streaming-kafka

Интеграция Spark Streaming для Kafka. Подход Direct Stream обеспечивает простой параллелизм, соответствие 1:1 между разделами Kafka и разделами Spark, а также доступ к смещениям и метаданным.
1 ответ

pyspark поддерживает lib-streaming-kafka-0-10 lib?

Моя версия кластера kafka - 0.10.0.0, и я хочу использовать поток pyspark для чтения данных kafka. но в Руководстве по интеграции Spark Streaming + Kafka, http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html нет примера кода на …
05 авг '17 в 12:53
1 ответ

Spark Streaming указывает начальные и конечные смещения

У меня есть сценарий, в котором я хочу повторно обработать определенный пакет данных, поступающих из Kafka, используя Spark DStreams. скажем, я хочу обработать следующие партии данных. Тема-Раздел1-{1000,2000} Тема-Раздел2-{500-600} Ниже приведен фр…
19 дек '18 в 20:27
1 ответ

Проблема с кафкой при потоковом воспроизведении

Я пытаюсь прочитать данные от потребителя kafka, используя spark2-shell. Пожалуйста, найдите мой код ниже. Я запускаю свою spark2-shell следующим образом: spark2-shell --jars kafka-clients-0.10.1.2.6.2.0-205.jar, spark-sql-kafka-0-10_2.11-2.1.1.jar …
19 дек '18 в 05:05
0 ответов

java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign

Моя искровая работа работает хорошо при отправке в локальный искровой кластер (spark-2.3.1-bin-hadoop2.7). Но я получил несколько ошибок при отправке его в кластер spark (spark-2.3.1-bin-hadoop3.0.0) на кластере k8s в openstack. В начале я получил E…
0 ответов

Поток искры замедляется

В нашем приложении spark мы используем поток Kafka и храним данные в Cassandra DB. Во-первых, мы запустили поток без противодавления и испытали странную аномалию, когда время обработки было постоянным ~ 1 минута, однако задержка планирования увеличи…
0 ответов

Не удалось запустить Python примеры потоковой передачи Spark Kafka в Spark 2.3.0

Я планирую перейти с версии 2.2.0 на Spark 2.3.0. Я пытаюсь запустить существующие примеры, чтобы убедиться, что все будет работать. Я запускаю пример kafka_wordcount.py Код как следовать r""" Counts words in UTF8 encoded, '\n' delimited text receiv…
1 ответ

Как получить агрегированные данные за определенный день в потоковой передаче с искрой

У меня есть одно искробезопасное паровое задание, которое читает потоки из kafka и записывает вывод в HDFS. Моя проблема в том, что мне нужны сводные результаты за весь день до определенного времени. Поскольку структурированная потоковая передача Sp…
1 ответ

Разделение сообщений Кафки построчно в Spark структурированной потоковой передаче

Я хочу прочитать сообщение из темы Кафки в моей работе Spark Structured Streaming во фрейм данных. но я получаю все сообщение в одном смещении, поэтому в кадре данных только это сообщение входит в одну строку вместо нескольких строк. (в моем случае …
0 ответов

Spark-kafka - каталог контрольных точек, дублирующий данные

Ниже сценарий, который я протестировал Я произвел 200 записей, и он создал 20 файлов по 10 записей в каждом. (После ожидания в течение 2 минут) я снова произвел 200 записей и немедленно убил приложение (используя yarn -kill). На этот раз он создал т…
0 ответов

Разделение RDD и группового входа в потоке зажигания дает ошибку несоответствия

Поэтому я борюсь с данными kafka json, используя потоковую передачу искры, и я застрял в разделении 15 различных схем таблиц, которые входят в один файл json, в 15 DF, чтобы я мог создавать схемы поверх каждой из них. ошибка несоответствия:70: ошибк…
0 ответов

Драйвер Spark иногда не перезапускается, даже если отправлено в режиме супервизора и кластера

У меня в Кубернетесе 2 мастера по искрам и 3 рабочих по искрам. В настоящее время я отправляю искровые задания в режиме развертывания кластера с включенным контролируемым флагом. Теперь, чтобы проверить, возрождается ли драйвер на другом искателе ил…
1 ответ

Spark Kafka потокового в Spark 2.3.0 с питоном

Я недавно обновился до Spark 2.3.0. У меня была существующая работа на спарк, которая раньше работала на спарк 2.2.0. Я сталкиваюсь с Java-исключением AbstractMethodError Мой простой код: from pyspark import SparkContext from pyspark.streaming impor…
0 ответов

Вставить элементы списка в потоковое воспроизведение

У меня есть список (не статичный), в котором хранятся ключи для потоковой передачи данных. Я хочу разделить эти ключи с исполнителями, чтобы избежать дублирования, но не смог найти подходящий способ сделать это. Список: List<String> keyList = …
0 ответов

Искровой потоковый коммутатор Kafka Cluster без перезапуска

У меня есть два кластера kafka, оба имеют все темы и данные. Когда один кластер kafka вышел из строя, я хочу перенаправить на другой кластер kafka, но как работа потоковой передачи искры работает правильно с преобразованием смещения потребителя. Каж…
21 фев '19 в 12:11
0 ответов

Обновление или вставка потоков с помощью Spark Cassandra Connector

С помощью Spark Cassandra Connector Все потоковые данные всегда вставляются в Cassandra DB. Хотя это не желаемый результат. То, что я хотел бы достичь, это добавить значение в базе данных, когда employeetitle совпадения столбцов. Вот то, что я имею …
3 ответа

Массив JSON в Dataframe в Spark, полученный Kafka

Я пишу приложение Spark в Scala, используя Spark Structured Streaming, которое получает некоторые данные, отформатированные в стиле JSON от Kafka. Это приложение может получить как один, так и несколько объектов JSON, отформатированных таким образом…
16 дек '18 в 09:24
1 ответ

Как приоритизировать зависимость maven от пути к классу Spark при отправке задания Spark?

У меня есть дистрибутив Cloudera Hadoop, Spark и т. Д., Где версия Spark-Kafka составляет 0,8 (т.е. spark-streaming-kafka-0-8_2.11). Проблема в том, что версия 0.8 Apache Spark с интеграцией Kafka имеет встроенную версию Kafka 0.8.2.1, и мне требует…
1 ответ

Вручную зафиксировать смещение в кафке Direct Stream в python

Я портирую потоковое приложение, написанное на scala, на python. Я хочу вручную зафиксировать смещение для DStream. Это делается в Scala, как показано ниже: stream = KafkaUtils.createDirectStream(soomeConfigs) stream.foreachRDD { rdd => val offse…
1 ответ

Сохранить смещение сообщения в Kafka с помощью KafkaUtils.createDirectStream

Как сохранить смещение сообщения в Kafka, если я использую KafkaUtils.createDirectStream для чтения сообщений. Кафка теряет значение смещения каждый раз, когда приложение выходит из строя. Затем оно читает значение, предоставленное в auto.offset.res…
0 ответов

Spark SQL потоковая передача с Kafka на данных Json: функция from_json не в состоянии проанализировать многострочный JSON из темы Kafka

Здесь я отправляю данные json в kafka из темы "test", передаю схему json, выполняю некоторые преобразования и печатаю их на консоли. Вот код:- val kafkadata = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .o…