Исключение воздушного потока: сбой DataFlow с кодом возврата 1

Я пытаюсь выполнить поток данных JAR через скрипт воздушного потока. Для этого я использую DataFlowJavaOperator. В параметре jar я передаю путь к исполняемому файлу jar, присутствующему в локальной системе. Но когда я пытаюсь запустить это задание, я получаю сообщение об ошибке как

{gcp_dataflow_hook.py:108} INFO - Start waiting for DataFlow process to complete.
[2017-09-12 16:59:38,225] {models.py:1417} ERROR - DataFlow failed with return code 1
Traceback (most recent call last):
  File "/usr/lib/python2.7/site-packages/airflow/models.py", line 1374, in run
    result = task_copy.execute(context=context)
  File "/usr/lib/python2.7/site-packages/airflow/contrib/operators/dataflow_operator.py", line 116, in execute
    hook.start_java_dataflow(self.task_id, dataflow_options, self.jar)
  File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 146, in start_java_dataflow
    task_id, variables, dataflow, name, ["java", "-jar"])
  File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 138, in _start_dataflow
    _Dataflow(cmd).wait_for_done()
  File "/usr/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 119, in wait_for_done
    self._proc.returncode))
Exception: DataFlow failed with return code 1`

Мой скрипт воздушного потока:

from airflow.contrib.operators.dataflow_operator import DataFlowJavaOperator
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from datetime import datetime, timedelta


default_args = {
'owner': 'airflow',
'start_date': datetime(2017, 03, 16),
'email': [<EmailID>],

'dataflow_default_options': {
        'project': '<ProjectId>',
       # 'zone': 'europe-west1-d', (i am not sure what should i pass here)
        'stagingLocation': 'gs://spark_3/staging/'
    }
 }

 dag = DAG('Dataflow',schedule_interval=timedelta(minutes=2), 
 default_args=default_args)

 dataflow1 = DataFlowJavaOperator(
 task_id='dataflow_example',
 jar ='/root/airflow_scripts/csvwriter.jar',
 gcp_conn_id  = 'GCP_smoke', 
 dag=dag)

Я не уверен, какую ошибку я совершаю, Может кто-нибудь, пожалуйста, помогите мне выбраться из этого

Note :I am creating this jar while selecting option as Runnable JAR file by packaging all the external dependencies.

1 ответ

Проблема была с банкой, которую я использовал. Прежде чем использовать банку, убедитесь, что баночка работает как положено.

Пример: если ваш jar был dataflow_job1.jar, выполните jar, используя

java -jar dataflow_job_1.jar --parameters_if_any

Как только ваша банка успешно работает, перейдите к использованию банки в файле Airflow DataflowJavaOperator jar.

Кроме того, если вы сталкиваетесь с ошибками, связанными с кодерами, вам может потребоваться создать свой собственный кодер для выполнения кода. Например, у меня была проблема с классом TableRow, так как у него не было кодера по умолчанию, и поэтому мне пришлось это сделать:

TableRowCoder:

public class TableRowCoder extends Coder<TableRow> {
private static final long serialVersionUID = 1L;
private static final Coder<TableRow> tableRow = TableRowJsonCoder.of();
@Override
public void encode(TableRow value, OutputStream outStream) throws CoderException, IOException {
    tableRow.encode(value, outStream);

}
@Override
public TableRow decode(InputStream inStream) throws CoderException, IOException {
    return new TableRow().set("F1", tableRow.decode(inStream));
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
    // TODO Auto-generated method stub
    return null;
}
@Override
public void verifyDeterministic() throws org.apache.beam.sdk.coders.Coder.NonDeterministicException {


}
}

Затем зарегистрируйте этот кодер в своем коде, используя

pipeline.getCoderRegistry().registerCoderForClass(TableRow.class, new TableRowCoder())

Если все еще есть ошибки (которые не связаны с кодировщиками), перейдите к:

*.jar\META-INF\services\FileSystemRegistrar 

и добавить любые зависимости, которые могут возникнуть.

Например, может быть ошибка постановки как:

Unable to find registrar for gs

Я должен был добавить следующую строку, чтобы это работало.

org.apache.beam.sdk.extensions.gcp.storage.GcsFileSystemRegistrar
Другие вопросы по тегам