Описание тега spark-structured-streaming

Spark Structured Streaming allows processing live data streams using DataFrame and Dataset APIs.
1 ответ

Количество входных строк в искровой структурированной потоковой передаче с пользовательским приемником

Я использую пользовательский приемник в структурированном потоке (spark 2.2.0) и заметил, что spark создает неверные метрики для количества входных строк - он всегда равен нулю. Моя конструкция потока: StreamingQuery writeStream = session .readStrea…
0 ответов

Spark Streaming: агрегирование данных, когда временные метки не в порядке

Spark Streaming Job считывает данные о событиях из тем Kafka, агрегирует их по меткам времени и производит подсчет. Теперь проблема в том, что входящие метки времени не в порядке. Они могли бы иметь +/- 5days разница с текущей отметкой времени. Это …
0 ответов

Использование Spark структурированного потокового кластера

Я создавал искровое структурированное потоковое приложение и пытаюсь понять развертывание на EMR. Приложение делает следующее; Подписаться на тему Кафки Агрегировать по временному окну и идентификатору пользователя (чтобы избежать невозможности полу…
2 ответа

Каково влияние 'coalesce' перед 'partitionBy' в этом потоковом запросе?

У меня есть потоковый запрос (Spark Structured Streaming), который получает данные из темы Kafka (два раздела), например: val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "172.29.57.25:9092,172.29.57.30:9092") .option("…
1 ответ

Spark структурированный поток: текущая партия отстает

Это кажется очень простой реализацией, но похоже, что есть некоторые проблемы. Это задание считывает смещения (данные событий пользовательского интерфейса) из темы kafka, выполняет некоторую агрегацию и записывает ее в базу данных Aerospike. В случа…
0 ответов

Отставание по теме Kafka при использовании структурированной потоковой передачи Spark

Можно ли как-то выяснить отставание по теме kafka при использовании структурированной потоковой передачи? Я проверил streamQuery.lastprogress и streamQuery.status , ни одно из них не содержит таких значений. Хотите узнать, есть ли способ узнать.
0 ответов

Длинное временное окно искры потоковой передачи вызывает исключение "выходит за пределы 64 КБ"

У меня есть потоковый фрейм данных Spark с часовым окном, которое срабатывает каждую минуту. val momentumDataAggQuery = withTime .selectExpr("parsed.symbol", "parsed.bid", "parsed.ask", "event_time") .withWatermark("event_time", "5 seconds") .groupB…
0 ответов

Паркетные поля, показывающие NULL при чтении через HIVE, НО, показывающие значения при чтении через spark

Я записываю свой искровой поток данных в виде файла паркета в моей HDFS. Я создал таблицу кустов в верхней части этой папки HDFS. моя искривленная структурированная команда потоковой записи выглядит следующим образом: parquet_frame.writeStream.optio…
0 ответов

(Пример монетизации рекламы для блоков данных) Как найти последнее совпадение в потоке?

В блоге "Представление объединений Stream-Stream в Apache Spark 2.3" обсуждается объединение кликов с показами на основе их adId: # Define watermarks impressionsWithWatermark = impressions \ .selectExpr("adId AS impressionAdId", "impressionTime") \ …
25 окт '18 в 12:45
0 ответов

Spark Streaming: несоответствие схемы при использовании MicroBatchReader с удалением столбцов

Я пишу собственный потоковый источник Spark. Я хочу поддержать сокращение столбцов. Я не могу поделиться полным кодом, в любом случае я сделал что-то вроде этого: class MyMicroBatchReader(...) extends MicroBatchReader with SupportsPushDownRequiredCo…
0 ответов

Spark Структурированная потоковая кафка avro Производитель

У меня есть датафрейм, скажем: val someDF = Seq( (8, "bat"), (64, "mouse"), (-27, "horse") ).toDF("number", "word") Я хочу отправить этот фрейм данных в тему kafka, используя сериализацию avro и используя реестр схемы. Я верю, что я почти у цели, но…
0 ответов

Частота контрольной точки источника Spark Structured Streaming Kafka

Я потребляю тему Кафки с startingOffsets установлен в earliest, Срок хранения кластера составляет 48 часов. Очевидно, что когда запрос выполняется впервые (без контрольных точек), он будет начинаться с самого раннего смещения. Если приложение было у…
0 ответов

Spark структурированные потоковые мульти-действия совместно используют одну и ту же контрольную точку

Представьте, у меня есть один поток из структурированного потока. val sourceDF = sparkSession.readStream .format("kafka") .option("kafka.bootstrap.servers", revBrokers) .option("subscribe", topic) ... as[(String, String)] и я хочу посчитать количест…
07 ноя '18 в 23:31
1 ответ

Пропуск партий в процессе искровой структурированной потоковой передачи

У меня есть потоковая работа с искровым структурированием, которая использует события из службы Azure Hubs. В некоторых случаях случается, что некоторые пакеты не обрабатываются потоковым заданием. В этом случае в журнале структурированной потоковой…
0 ответов

Как написать структурированный поток в S3?

Я искал, как записать результаты структурированного потока в s3, но не смог найти способ сделать это. Я попробовал несколько вариантов, но каждый из них привел к различной ошибке. Вот что я хочу сделать: Читать поток CSV Уменьшите его до 2-3 строк р…
0 ответов

Как объединить два структурированных потока Spark?

Можно ли объединить два структурированных потока Spark в Spark 2.2.1? Я обнаружил много проблем с выполнением очень простых манипуляций в Spark Structured Streaming. Документация и количество примеров кажутся мне очень ограниченными. У меня есть два…
2 ответа

Как рассчитать агрегации в окне, когда показания датчика не отправляются, если они не изменились с момента последнего события?

Как я могу вычислить агрегации на окне от датчика, когда новые события отправляются, только если значение датчика изменилось с момента последнего события? Показания датчика снимаются в фиксированное время, например каждые 5 секунд, но они передаются…
0 ответов

Контрольная точка для многих потоковых источников

Я работаю с Zeppelin, я читаю много файлов из многих источников в потоковой передаче искры, как это: val var1 = spark .readStream .schema(var1_raw) .option("sep", ",") .option("mode", "PERMISSIVE") .option("maxFilesPerTrigger", 100) .option("treatEm…
0 ответов

Как скорость обработки и интервал триггера взаимодействуют в искровой структурированной потоковой передаче?

Я хотел бы понять следующее: В структурированной потоковой передаче Spark существует понятие триггера, который сообщает, через какой интервал спарк попытается прочитать данные, чтобы начать обработку. Я хотел бы знать, как долго может длиться операц…
18 сен '18 в 19:56
0 ответов

Collect_list не работает на частичных агрегатах

Для кадра данных Spark Structured Streaming, collect_list выдает ошибку: ОШИБКА Исполнитель: Исключение в задаче 0.0 на этапе 11.0 (TID 1024) java.lang.RuntimeException: Collect не может использоваться в частичных агрегациях. У меня есть следующий к…