При индексировании с учетом состояния ParDo запускается однопоточным в Dataflow Runner

Мы генерируем последовательный индекс в ParDo, используя Java SDK 2.0.0 от Beam. Так же, как пример простого индекса с сохранением состояния во введении Beam к обработке с сохранением состояния, мы используемValueState<Integer>ячейка и наша единственная операция с ней - извлечь значение и приращение, когда нам понадобится следующий индекс:

Integer statefulIndex = firstNonNull(index.read(), 0);
index.write(statefulIndex + 1);

При работе с Google Dataflow Runner мы заметили в интерфейсе мониторинга Dataflow, что время ожидания этого ParDo накапливалось синхронно с истекшим временем. Мы смогли подтвердить, что ParDo выполняет однопоточный, выполнив ssh'ing к рабочему узлу и используя top а также1для просмотра использования процессора на ядро. Комментируя ячейку обработки с сохранением состояния и оставляя код без изменений, тот же ParDo использует все ядра нашегоn1-standard-32рабочий узел.

Даже если исполнитель потока данных может распараллелить индексирование с сохранением состояния на основе каждой пары ключей и окон (в настоящее время у нас есть одно окно и один ключ), отсутствие параллелизма приводит к такому значительному снижению производительности, что мы не можем его использовать. Это ожидаемое поведение бегуна потока данных?

Наивно, я ожидал, что закулисная индексация Beam будет работать аналогично JavaAtomicInteger, Существуют ли ограничения, препятствующие параллельной обработке с ValueState<Integer> клетка или эта функциональность просто еще не встроена в бегун?

1 ответ

Это не только ожидаемое поведение бегуна потока данных, но и логическая необходимость в любом контексте. Неважно, используете ли вы состояние в Beam или AtomicInteger в однопроцессной Java-программе: если операция "A" записывает значение, а операция "B" считывает значение, то "B" должно выполняться после "A". Общий термин для этого - отношения "случается раньше".

Эта форма вычислений с состоянием противоположна параллельным вычислениям. По определению чтение, наблюдающее запись, имеет причинно-следственную связь. По определению, две параллельные операции не имеют причинно-следственной связи.

Теперь вы, возможно, ожидаете параллельные потоки, которые одновременно обращаются к ячейке состояния, как в стандартном шаблоне многопоточного программирования с некоторым общим состоянием с контролем параллелизма. В этом примере, если эти потоки на самом деле параллельны, вы получите дублирующиеся индексы. Сделав шаг назад, Beam нацеливается на массивные "смущающие параллельные" вычисления, прозрачно распределенные по большому кластеру машин. Детализированные элементы управления параллелизмом, кроме того, что их крайне сложно получить, не всегда легко переводятся в массовые распределенные вычисления.

Другие вопросы по тегам