Описание тега apache-beam-io
Apache Beam - это унифицированный SDK для пакетной и потоковой обработки. Этот тег следует использовать для вопросов, связанных с чтением данных в конвейер Apache Beam или записью вывода конвейера в пункт назначения.
1
ответ
Модуль объекта не имеет атрибута BigqueryV2 - Локальный Apache Beam
Я пытаюсь запустить конвейер локально (Sierra) с Apache Beam, используя API ввода / вывода луча для Google BigQuery. Я установил свою среду, используя Virtualenv, как было предложено в кратком обзоре Beam Python, и я могу запустить пример wordcount.…
12 мар '17 в 13:27
1
ответ
Потоковые группы мутации в гаечный ключ
Я пытаюсь направить MutationGroups в гаечный ключ с SpannerIO. Цель состоит в том, чтобы писать новые MuationGroups каждые 10 секунд, так как мы будем использовать гаечный ключ для запроса KPI ближайшего времени. Когда я не использую никаких окон, я…
23 июл '18 в 14:06
0
ответов
Apache-beam программа для анализа настроений
Я написал программу apache-beam, которая берет тексты из файла input.txt и проводит некоторый анализ настроений и вывод, который я хочу сохранить в формате csv, чтобы вставить его в bigquery. import os import logging import csv import json import re…
26 фев '19 в 11:12
1
ответ
Dataflow GroupBy -> несколько выходов на основе ключей
Есть ли простой способ, которым я могу перенаправить вывод GroupBy в несколько выходных файлов на основе групповых ключей? Bin.apply(GroupByKey.<String, KV<Long,Iterable<TableRow>>>create()) .apply(ParDo.named("Print Bins").of( ...…
12 окт '17 в 17:45
1
ответ
Массовая вставка JdbcIO Google Dataflow (Apache beam) в базу данных MySQL
Я использую Dataflow SDK 2.X Java API ( Apache Beam SDK) для записи данных в MySQL. Я создал конвейеры на основе документации Apache Beam SDK для записи данных в MySQL с использованием потока данных. Он вставляет одну строку за раз, когда мне нужно …
08 дек '17 в 12:55
2
ответа
Использование CoGroupByKey с пользовательским типом приводит к ошибке кодера
Я хочу соединить два PCollection (из другого входа соответственно) и реализовать, выполнив шаг, описанный здесь, раздел "Объединение с CoGroupByKey": https://cloud.google.com/dataflow/model/group-by-key В моем случае я хочу объединить информацию о б…
26 сен '17 в 07:40
0
ответов
Как сделать ReadAllFromText не блочным конвейером?
Я хотел бы реализовать очень простой лучевой конвейер: read google storage links to text files from PubSub topic->read each text line by line->write to BigQuery. Apache Beam имеет предварительно реализованный PTransform для каждого процесса. Т…
02 сен '18 в 21:47
1
ответ
apache beam bigtable Итеративная мутация
Я перевожу свой поток данных Google Java Java 1.9 в Beam 2.0 и пытаюсь использовать BigtableIO.Write .... .apply("", BigtableIO.write() .withBigtableOptions(bigtableOptions) .withTableId("twoSecondVitals")); В ParDo перед BigtableIO я изо всех сил п…
24 июн '17 в 00:21
0
ответов
Не писать по указанному пути, когда Pcollection пуст
Мой код (внутри основной функции для создания конвейера):- outCollection.apply("WriteToFile", TextIO.write().to(path).withHeader(header) .withFooter(footer).withoutSharding()); Текущее поведение таково, что он записывает файл по заданному пути с вер…
20 дек '18 в 06:17
1
ответ
Разница между beam.ParDo и beam.Map в типе вывода?
Я использую Apache-Beam для запуска некоторых преобразований данных, включая извлечение данных из txt, csv и различных источников данных. Одна вещь, которую я заметил, это разница результатов при использовании beam.Map и beam.ParDo В следующем приме…
24 дек '18 в 11:35
1
ответ
В чем различия между `WriteToBigQuery` и`BigQuerySink`
После этого ответа мне интересно, каковы принципиальные различия (если таковые имеются) между WriteToBigQuery а также BigQuerySink Apache Beam Python SDK. Каковы соображения или ограничения использования одного над другим?
01 янв '19 в 08:54
1
ответ
Нет ошибки транслятора при запуске задания Apache Beam в кластере Flink
Я создал очень простую работу Apache Beam для теста, она написана на Scala и выглядит так: object Test { def main(args: Array[String]): Unit = { val options = PipelineOptionsFactory.fromArgs(args: _*).create() val p = Pipeline.create(options) printl…
10 янв '19 в 14:41
0
ответов
Соедините Beam JDBC IO с Кассандрой
Невозможно соединить Cassandra с помощью драйвера jdbc, получающего ошибку java.sql.SQLException: невозможно создать PoolableConnectionFactory (isValid() вернул false) Apache Beam JDBC IO не работает с Casandra. Я пытался работать с cassandra-jdbc-1…
04 фев '19 в 11:30
1
ответ
Как добавить дополнительное поле к лучу FileIO.matchAll() результат?
У меня есть PCollection KV, где ключом является gcs file_patterns, а значением является некоторая дополнительная информация о файлах (например, системы "Source", которые генерировали файлы). Например, KV("gs://bucket1/dir1/*", "SourceX"), KV("gs://b…
23 фев '19 в 06:29
0
ответов
TensorFlow Transform Python с использованием AWS S3 в качестве источника данных
Я пытаюсь запустить TensorFlow Transform, используя Python, Apache Flink в качестве Beam Runner. Я заметил, что Beam не имеет AWS S3 в качестве разъема io, и хотел бы узнать, как это обойти. Вот список поддерживаемых io-коннекторов, но Python+S3 даж…
11 янв '19 в 17:17
1
ответ
Кодер по умолчанию для объекта Pojo в Apache Beam
Согласно документации Apache Beam, я могу найти кодеры для конкретных типов данных, а также пользовательские кодеры. Это обеспечивает возможность создания пользовательских кодеров путем регистрации в реестре кода. Но я хотел бы знать, есть ли какой-…
18 фев '19 в 01:46
1
ответ
Подтверждение сообщения Google Pub/Sub на Apache Beam
Я пытаюсь прочитать из pub/sub с помощью следующего кода Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>")…
16 май '17 в 13:58
1
ответ
Apache Beam выбрасывает Cannot setCoder(null): java
Я новичок в Apache Beam и пытаюсь подключиться к облачному экземпляру Google базы данных MySQL. Когда я запускаю приведенный ниже фрагмент кода, он выдает приведенное ниже исключение. Logger logger = LoggerFactory.getLogger(GoogleSQLPipeline.class);…
19 июн '17 в 04:19
1
ответ
Детали конвейера потока данных для источника / приемников BigQuery не отображаются
Согласно этому объявлению команды Google Dataflow, мы сможем увидеть подробную информацию о наших источниках и приемниках BigQuery в консоли, если мы будем использовать 1.6 SDK. Однако, хотя новые "Параметры конвейера" действительно отображаются, де…
24 июн '16 в 00:28
1
ответ
Создание / запись в закаленную (устаревшую) таблицу BigQuery с помощью облачного потока данных Google
Существует ли простой в использовании пример, как настроить потоковый режим Dataflow Pipeline, чтобы записать каждое окно в отдельную таблицу BigQuery (и при необходимости создать ее)? Т.е. - таблица_20160701, таблица_20160702 и т. Д.
07 июл '16 в 00:43