Описание тега apache-beam-pipeline

0 ответов

ошибка при запуске примера подсчета слов луча apache на автономном кластере flink (Apache flink 1.9.0)

Не удалось выполнить цель org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) в проекте word-count-beam: возникла исключительная ситуация при выполнении класса Java. Класс org.apache.beam.model.pipeline.v1.RunnerApi$StandardPTransforms$Prim…
2 ответа

Как объединить два результата и передать их следующему шагу в конвейере apache-beam

См. Ниже фрагмент кода, я хочу ["metric1", "metric2"]быть моим вкладом в RunTask.process. Однако он был запущен дважды с "metric1" и "metric2" соответственно. def run(): pipeline_options = PipelineOptions(pipeline_args) pipeline_…
0 ответов

Производительность Apache Beam против Apache Spark

Если кто-нибудь сравнивал производительность Apache Beam с кодом Apache Spark, не могли бы вы поделиться результатами?
0 ответов

Конвейер Apache Beam читает (из) и записывает (в) google-cloud-firestore

У меня есть настройка конвейера Apache Beam. Я хочу читать / писать из / в облачный хранилище Google, но я не могу найти ни одного примера (с использованием java / scala) для подключения к хранилищу огня. Я прошел через API библиотеки Apache Beam, у…
1 ответ

Apache Beam - добавление задержки в конвейер

У меня есть простой конвейер, который читает из темы Pub Sub и записывает в BigQuery. Хочу ввести 5-минутную задержку между чтением сообщения из темы и записью в BQ. Я думал, что могу сделать это с помощью триггера, как показано ниже, однако сообщен…
1 ответ

Чтение данных из нескольких pubusb в один и тот же большой запрос

Этот вопрос больше связан с пониманием синтаксиса для подключения конвейера gcp в apache beam. вот как настроен мой текущий конвейер options = dataflow_options(project_id=project_id, topic_name=topic_name, job_name=job_name) p = apache_beam.Pipeline…
2 ответа

TextIO.Read().From() против TextIO.ReadFiles() поверх withHintMatchesManyFiles()

В моем случае получения набора подходящих файловых шаблонов от Kafka, PCollection<String> filepatterns = p.apply(KafkaIO.read()...); Здесь каждый шаблон может соответствовать до 300+ файлов. Q1. Как я могу использоватьTextIO.Read() чтобы сопос…
1 ответ

Получение так много предупреждений при использовании List с настраиваемым Java-классом POJO в apache beam java

Я новичок в Apache beam, я использую луч Apache и в качестве бегуна использую поток данных в GCP. Я получаю следующую ошибку при выполнении конвейера. coder of type class org.apache.beam.sdk.coders.ListCoder has a #structuralValue method which does …
1 ответ

как указать брокеров kafka с KafkaIO в Apache Beam

Я пытаюсь настроить конвейер KafkaIO, но не могу понять, как указать брокеров. Указание имени брокера и порта, похоже, этого не делает. Я ни в коем случае не указываю, где находится мой кластер kafka: pipeline .apply(KafkaIO.<Long, String>read…
1 ответ

Зачем использовать Apache Beam Spark Runner, если мы можем напрямую использовать Apache Spark?

Я читал про луч Apache. Перебрал различные бегунки в апач-балке. Но мне интересно, почему кто-то должен использовать луч Apache с бегунком искры, если он может напрямую использовать искру Apache?
0 ответов

Удаление заголовка в Apache Beam

В луче apache, как удалить запись заголовка из файла csv? Обратите внимание, что у меня нет индикатора, чтобы определить, что это запись заголовка, и единственный способ определить запись заголовка - это первая запись.
29 июл '20 в 18:53
1 ответ

простой конвейер Apache Beam генерирует ошибку TypeError: требуется целое число

У меня есть этот простой конвейер apache-beam, написанный на python. import apache_beam as beam from apache_beam.runners.interactive.interactive_runner import InteractiveRunner from apache_beam.runners.direct.direct_runner import DirectRunner from a…
26 авг '20 в 18:51
2 ответа

Предоставление учетных данных BigQuery в конвейере Apache-Beam, закодированном на Python

Я пытаюсь прочитать данные из bigquery в моем конвейере лучей с помощью средства выполнения облачного потока данных. Я хочу предоставить учетные данные для доступа к проекту. Я видел примеры на Java, но не на Python. Единственная возможность, котору…
2 ответа

Apache Beam - Как суммировать PCollection <KV <String, Int >> из всех окон по ключу

Учитывая PCollection<KV<String, Int>> с использованием окон с фиксированным временем, как я могу суммировать все Int посредством Stringключ от всех окон? например PCollection<KV<String, Int>> pc = ...; pc.apply("FixedWindows"…
2 ответа

Apache Beam: обновление побочного ввода, который я читаю из MongoDB, используя MongoDbIO.read() Часть 2

Не уверен в том, как эта GenerateSequence работает для меня, так как мне нужно периодически читать значения из Mongo ежечасно или ежедневно, создать ParDo, который читает MongoDB, а также добавил окно в GlobalWindows с триггером (триггер, который я …
0 ответов

Значение .apply(Window. <Document> into (new GlobalWindows ()). Triggering (Repeatedly.forever(AfterPane.elementCountAtLeast(1))). DiscardingFiredPanes())?

Я использую концепцию луча Apache Window и Trigger, но меня очень смущают результаты. Пожалуйста, помогите мне понять .apply("Window", Window.<Document>into(new GlobalWindows()).triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))).…
0 ответов

проектирование конвейера для нескольких наборов данных

Я пишу конвейер лучей Apache, который использует данные строки для создания нескольких наборов данных, у меня есть данные событий строки, и при добавлении нового файла в корзину облачного хранилища Google я читаю добавленный файл после получения его…
1 ответ

Шаблон луча для потока данных не будет загружен в корзину

Я следил за всеми краткими руководствами / документацией по созданию шаблонов потоков данных на основе Java. Я нашел и сделал следующие шаги: 1) Создайте проект: mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=bea…
1 ответ

Проблема с apache-beam [gcp] в google colab

У меня проблема с запуском apache-beam[gcp] в google colab. Если я запустил следующую ячейку !pip install -q apache-beam[gcp] import apache_beam as beam with beam.Pipeline() as p: p | beam.Create([1, 2, 3]) | beam.Map(print) У меня ошибка: FileNotFo…
1 ответ

писать в несколько тем Kafka в apache-beam?

Я выполняю простую программу подсчета слов, в которой я использовал одну тему Kafka (производитель) в качестве источника ввода, а затем применяю к ней pardo для расчета количества слов. Теперь мне нужна помощь, чтобы написать слова на разные темы, и…