Описание тега structured-streaming

1 ответ

Как обновить или даже сбросить строки в персистентной таблице с учетом нескольких одновременных считывателей?

У меня есть таблица exchangeRates, которая обновляется в пакетном режиме один раз в неделю. Это должно использоваться другими пакетными и потоковыми заданиями в разных кластерах - поэтому я хочу сохранить это как постоянную общую таблицу для общего …
0 ответов

Предотвращение повторной обработки данных во время обновлений приложения Spark Structured Streaming

Я использую структурированный поток с Spark 2.2. Мы используем Kafka в качестве нашего источника и используем контрольные точки для восстановления после сбоев, и e2e гарантирует только один раз. Я хотел бы получить дополнительную информацию о том, к…
21 мар '18 в 19:01
2 ответа

Каково влияние 'coalesce' перед 'partitionBy' в этом потоковом запросе?

У меня есть потоковый запрос (Spark Structured Streaming), который получает данные из темы Kafka (два раздела), например: val df = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "172.29.57.25:9092,172.29.57.30:9092") .option("…
1 ответ

SparkSql - Соединение при выполнении запроса выдает "объект не является экземпляром объявления класса"

Я выполняю запрос на SparkSession который бросает Object is not an instance of declaring classниже приведен код, после которого Dataset<Row> results = spark.sql("SELECT t1.someCol FROM table1 t1 join table2 t2 on t1.someCol=t2.someCol"); resul…
3 ответа

Как получить смещения Кафки для структурированного запроса для ручного и надежного управления смещениями?

Spark 2.2 представил структурированный потоковый источник Kafka. Как я понимаю, он использует каталог контрольных точек HDFS для хранения смещений и гарантии доставки сообщений "точно один раз". Но старые доки (такие как https://blog.cloudera.com/bl…
0 ответов

Объединение потоковых и статических фреймов данных ничего не делает

Поэтому я пытаюсь настроить поток из Kafka, а затем добавить столбцы из таблицы в этот фрейм данных на основе соответствующего идентификатора в сообщении и таблице Kafka. У меня есть два отдельных файла, первый вызывает метод в streamKafka_ASH для н…
0 ответов

Spark Structured Streaming не работает непрерывно?

Код сервера: import java.net.ServerSocket; import java.net.Socket; import java.nio.charset.Charset; import java.util.Random; public class Socket_server { public static void main(String[] args) throws Exception { ServerSocket sc = new ServerSocket(99…
03 янв '18 в 06:44
1 ответ

Декодирование перечислений Java / пользовательских нестандартных классов с использованием Structured Spark Streaming

Я пытаюсь использовать структурированную потоковую передачу в Spark 2.1.1 для чтения из Kafka и декодирования закодированных сообщений Avro. У меня есть UDF, определенный в соответствии с этим вопросом. val sr = new CachedSchemaRegistryClient(conf.k…
1 ответ

Как объединить Spark в массив Кафки

В настоящее время у меня есть следующий df +-------+--------------------+-----+ | key| created_at|count| +-------+--------------------+-----+ |Bullish|[2017-08-06 08:00...| 12| |Bearish|[2017-08-06 08:00...| 1| +-------+--------------------+-----+ Я…
1 ответ

Эвристика водяного знака

Насколько точны оценки водяных знаков при обработке потока в пучке Apache или в потоке искры. Мой источник данных - файлы из gcs/s3, но я использую время события, связанное с каждым событием, как метку времени для функции управления окнами. Любые ид…
2 ответа

Как обрабатывать новые файлы в каталоге HDFS, когда их запись в конце концов закончена?

В моем случае у меня есть файлы CSV, постоянно загружаемые в HDFS. Как только новый файл будет загружен, я хотел бы обработать новый файл с помощью Spark SQL (например, вычислить максимум поля в файле, преобразовать файл в parquet). т.е. у меня есть…
1 ответ

Ошибка в структурированной потоковой передаче Spark с источником файла и приемником файла

Моя команда сейчас вступает в сферу структурированного потокового вещания. Я относительно новичок в структурированном потоке. У меня есть требование с Источник - CSVРаковина - JSON Подробности Env: Кластер: Искра 2.2.1Язык программирования: ScalaИнс…
11 июн '18 в 04:48
1 ответ

Графана для искровой структурированной трансляции

Я следовал этим шагам, чтобы настроить Prometheus, Graphite Exporter и Grafana для построения метрик для Spark 2.2.1, в которой работает структурированная потоковая передача. Метрики коллекции на этом посте довольно устарели; и не включает в себя ка…
1 ответ

Что делает "row +: SavedState.toSeq" в StateStoreRestoreExec.doExecute?

Мы можем видеть StateStoreRestoreExec следующим образом. case class StateStoreRestoreExec( keyExpressions: Seq[Attribute], stateId: Option[OperatorStateId], child: SparkPlan) extends UnaryExecNode with StateStoreReader { override protected def doExe…
22 авг '17 в 14:59
2 ответа

Как рассчитать время для извлечения записей из Кафки?

У меня есть простая работа с триггером =15 секунд, Source=Kafka и Sink=S3. Можно ли узнать, сколько времени заняло скачивание сообщений с Кафки? Или, скажем, если у меня была Sink=Console, она возвращает данные о драйвере, можно ли найти сколько вре…
1 ответ

Spark Streaming 2.11 - java.util.NoSuchElementException: ошибка None.get при выполнении функции SQL

вопрос Я использую Spark, чтобы присоединиться к содержимому файлов CSV. После первого соединения с потоковым CSV-файлом я могу записать поток без ошибок и увидеть, что было выведено много строк, но как только я пытаюсь записать свой второй поток, я…
1 ответ

Как преобразовать DataSet<Row> в DataSet сообщений JSON для записи в Kafka?

Я использую Spark 2.1.1. У меня есть следующее DataSet&lt;Row&gt; DS1; name | ratio | count // column names "hello" | 1.56 | 34 (ds1.isStreaming дает true) и я пытаюсь создать DataSet&lt;String&gt; ds2. другими словами, когда я пишу в кафку, я хочу …
0 ответов

Простой пример структурированной потоковой передачи Apache Spark с classNotFound: range

Я пытаюсь запустить простой пример структурированной потоковой передачи с веб-сайта Apache Spark. Я подаю свое заявление в Spark следующим образом: spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 spark_structured_streaming.p…
24 авг '17 в 12:22
1 ответ

Получение либо исключения "java.lang.NoSuchFieldError: METASTORE_CLIENT_SOCKET_LIFETIME", либо "Обязательное поле 'client_protocol' не установлено!"

Я использую Структурированную Потоковую передачу Spark 2.1 (которая внутренне использует банки Jive 1.2.1). Я пытаюсь разработать ForEachWriter для Hive для записи потоковых данных в Hive через JDBC-коннектор. На моем кластере у меня Hive 1.1.0. Есл…
0 ответов

Количество записанных записей в искровой структурированной потоковой передаче

Я пытаюсь получить ряд прочитанных и записанных записей в структурированном потоке. я использую SparkListener.onTaskEnd() для захвата входных и выходных метрик, но кажется, что количество записанных записей (taskEnd.taskMetrics().outputMetrics().rec…