Описание тега apache-beam-io

Apache Beam - это унифицированный SDK для пакетной и потоковой обработки. Этот тег следует использовать для вопросов, связанных с чтением данных в конвейер Apache Beam или записью вывода конвейера в пункт назначения.
1 ответ

Модуль объекта не имеет атрибута BigqueryV2 - Локальный Apache Beam

Я пытаюсь запустить конвейер локально (Sierra) с Apache Beam, используя API ввода / вывода луча для Google BigQuery. Я установил свою среду, используя Virtualenv, как было предложено в кратком обзоре Beam Python, и я могу запустить пример wordcount.…
1 ответ

Потоковые группы мутации в гаечный ключ

Я пытаюсь направить MutationGroups в гаечный ключ с SpannerIO. Цель состоит в том, чтобы писать новые MuationGroups каждые 10 секунд, так как мы будем использовать гаечный ключ для запроса KPI ближайшего времени. Когда я не использую никаких окон, я…
0 ответов

Apache-beam программа для анализа настроений

Я написал программу apache-beam, которая берет тексты из файла input.txt и проводит некоторый анализ настроений и вывод, который я хочу сохранить в формате csv, чтобы вставить его в bigquery. import os import logging import csv import json import re…
26 фев '19 в 11:12
1 ответ

Dataflow GroupBy -> несколько выходов на основе ключей

Есть ли простой способ, которым я могу перенаправить вывод GroupBy в несколько выходных файлов на основе групповых ключей? Bin.apply(GroupByKey.<String, KV<Long,Iterable<TableRow>>>create()) .apply(ParDo.named("Print Bins").of( ...…
1 ответ

Массовая вставка JdbcIO Google Dataflow (Apache beam) в базу данных MySQL

Я использую Dataflow SDK 2.X Java API ( Apache Beam SDK) для записи данных в MySQL. Я создал конвейеры на основе документации Apache Beam SDK для записи данных в MySQL с использованием потока данных. Он вставляет одну строку за раз, когда мне нужно …
2 ответа

Использование CoGroupByKey с пользовательским типом приводит к ошибке кодера

Я хочу соединить два PCollection (из другого входа соответственно) и реализовать, выполнив шаг, описанный здесь, раздел "Объединение с CoGroupByKey": https://cloud.google.com/dataflow/model/group-by-key В моем случае я хочу объединить информацию о б…
0 ответов

Как сделать ReadAllFromText не блочным конвейером?

Я хотел бы реализовать очень простой лучевой конвейер: read google storage links to text files from PubSub topic->read each text line by line->write to BigQuery. Apache Beam имеет предварительно реализованный PTransform для каждого процесса. Т…
1 ответ

apache beam bigtable Итеративная мутация

Я перевожу свой поток данных Google Java Java 1.9 в Beam 2.0 и пытаюсь использовать BigtableIO.Write .... .apply("", BigtableIO.write() .withBigtableOptions(bigtableOptions) .withTableId("twoSecondVitals")); В ParDo перед BigtableIO я изо всех сил п…
0 ответов

Не писать по указанному пути, когда Pcollection пуст

Мой код (внутри основной функции для создания конвейера):- outCollection.apply("WriteToFile", TextIO.write().to(path).withHeader(header) .withFooter(footer).withoutSharding()); Текущее поведение таково, что он записывает файл по заданному пути с вер…
1 ответ

Разница между beam.ParDo и beam.Map в типе вывода?

Я использую Apache-Beam для запуска некоторых преобразований данных, включая извлечение данных из txt, csv и различных источников данных. Одна вещь, которую я заметил, это разница результатов при использовании beam.Map и beam.ParDo В следующем приме…
24 дек '18 в 11:35
1 ответ

В чем различия между `WriteToBigQuery` и`BigQuerySink`

После этого ответа мне интересно, каковы принципиальные различия (если таковые имеются) между WriteToBigQuery а также BigQuerySink Apache Beam Python SDK. Каковы соображения или ограничения использования одного над другим?
01 янв '19 в 08:54
1 ответ

Нет ошибки транслятора при запуске задания Apache Beam в кластере Flink

Я создал очень простую работу Apache Beam для теста, она написана на Scala и выглядит так: object Test { def main(args: Array[String]): Unit = { val options = PipelineOptionsFactory.fromArgs(args: _*).create() val p = Pipeline.create(options) printl…
10 янв '19 в 14:41
0 ответов

Соедините Beam JDBC IO с Кассандрой

Невозможно соединить Cassandra с помощью драйвера jdbc, получающего ошибку java.sql.SQLException: невозможно создать PoolableConnectionFactory (isValid() вернул false) Apache Beam JDBC IO не работает с Casandra. Я пытался работать с cassandra-jdbc-1…
1 ответ

Как добавить дополнительное поле к лучу FileIO.matchAll() результат?

У меня есть PCollection KV, где ключом является gcs file_patterns, а значением является некоторая дополнительная информация о файлах (например, системы "Source", которые генерировали файлы). Например, KV("gs://bucket1/dir1/*", "SourceX"), KV("gs://b…
0 ответов

TensorFlow Transform Python с использованием AWS S3 в качестве источника данных

Я пытаюсь запустить TensorFlow Transform, используя Python, Apache Flink в качестве Beam Runner. Я заметил, что Beam не имеет AWS S3 в качестве разъема io, и хотел бы узнать, как это обойти. Вот список поддерживаемых io-коннекторов, но Python+S3 даж…
11 янв '19 в 17:17
1 ответ

Кодер по умолчанию для объекта Pojo в Apache Beam

Согласно документации Apache Beam, я могу найти кодеры для конкретных типов данных, а также пользовательские кодеры. Это обеспечивает возможность создания пользовательских кодеров путем регистрации в реестре кода. Но я хотел бы знать, есть ли какой-…
18 фев '19 в 01:46
1 ответ

Подтверждение сообщения Google Pub/Sub на Apache Beam

Я пытаюсь прочитать из pub/sub с помощью следующего кода Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>")…
1 ответ

Apache Beam выбрасывает Cannot setCoder(null): java

Я новичок в Apache Beam и пытаюсь подключиться к облачному экземпляру Google базы данных MySQL. Когда я запускаю приведенный ниже фрагмент кода, он выдает приведенное ниже исключение. Logger logger = LoggerFactory.getLogger(GoogleSQLPipeline.class);…
1 ответ

Детали конвейера потока данных для источника / приемников BigQuery не отображаются

Согласно этому объявлению команды Google Dataflow, мы сможем увидеть подробную информацию о наших источниках и приемниках BigQuery в консоли, если мы будем использовать 1.6 SDK. Однако, хотя новые "Параметры конвейера" действительно отображаются, де…
1 ответ

Создание / запись в закаленную (устаревшую) таблицу BigQuery с помощью облачного потока данных Google

Существует ли простой в использовании пример, как настроить потоковый режим Dataflow Pipeline, чтобы записать каждое окно в отдельную таблицу BigQuery (и при необходимости создать ее)? Т.е. - таблица_20160701, таблица_20160702 и т. Д.