Ветви, созданные с помощью BranchPythonOperator, не объединяются?
Я использую BranchPythonOperator, чтобы сделать ветви в потоке воздуха. Мой вариант использования - мне нужно сделать две ветки от мейнстрима. Ветвь A (у которой мало задач) будет следовать, когда присутствует somefile.csv, в противном случае ветвь B(у которой нет задачи) будет следовать. Наконец, обе ветви должны быть объединены, чтобы снова стать мейнстримом. Теперь я могу следить за ответвлением А или ответвлением В, но проблема в том, следую ли я за выполнением заключительных основных задач ответвления В, и если я следую за ответвлением ответвления А, пропускают заключительные основные задачи.
MainstreamTaskA.setDownStream(MainstreamTaskB)
MainstreamTaskB.setDownStream(BranchATaskA)
BranchATaskA.setDownStream(MainstreamTaskC)
MainstreamTaskB.setDownStream(MainstreamTaskC)
Я установил правило триггера как "all_done" в MainstreamTaskB и MainstreamTaskC.
Может ли кто-нибудь провести меня через это?
1 ответ
Я не вижу другую ветку в ваших зависимостях. Единственная ветка BranchATaskA
, Но исходя из того, что вы упомянули, у вас должны быть следующие зависимости задач и две задачи ветвления BranchATaskA
а также BranchATaskB
,
MainstreamTaskA >> MainstreamTaskB
MainstreamTaskB >> BranchATaskA >> MainstreamTaskC
MainstreamTaskB >> BranchATaskB >> MainstreamTaskC
Вы должны иметь триггерное правило как all_done
на MainstreamTaskC
,