Ветви, созданные с помощью BranchPythonOperator, не объединяются?

Я использую BranchPythonOperator, чтобы сделать ветви в потоке воздуха. Мой вариант использования - мне нужно сделать две ветки от мейнстрима. Ветвь A (у которой мало задач) будет следовать, когда присутствует somefile.csv, в противном случае ветвь B(у которой нет задачи) будет следовать. Наконец, обе ветви должны быть объединены, чтобы снова стать мейнстримом. Теперь я могу следить за ответвлением А или ответвлением В, но проблема в том, следую ли я за выполнением заключительных основных задач ответвления В, и если я следую за ответвлением ответвления А, пропускают заключительные основные задачи.

MainstreamTaskA.setDownStream(MainstreamTaskB)
MainstreamTaskB.setDownStream(BranchATaskA)
BranchATaskA.setDownStream(MainstreamTaskC)
MainstreamTaskB.setDownStream(MainstreamTaskC)

Я установил правило триггера как "all_done" в MainstreamTaskB и MainstreamTaskC.

Может ли кто-нибудь провести меня через это?

1 ответ

Решение

Я не вижу другую ветку в ваших зависимостях. Единственная ветка BranchATaskA, Но исходя из того, что вы упомянули, у вас должны быть следующие зависимости задач и две задачи ветвления BranchATaskA а также BranchATaskB,

MainstreamTaskA >> MainstreamTaskB
MainstreamTaskB >> BranchATaskA >> MainstreamTaskC
MainstreamTaskB >> BranchATaskB >> MainstreamTaskC

Вы должны иметь триггерное правило как all_done на MainstreamTaskC,

Другие вопросы по тегам