Исправлено окно над неограниченным вводом (PubSub), прекращающее срабатывание после автоматического масштабирования рабочих

Используя scio версии 0.4.7, у меня есть потоковое задание, которое прослушивает тему pubsub, здесь я использую обработку событий с атрибутом timestamp, присутствующим в свойствах сообщения в RFC3339

val rtEvents: SCollection[RTEvent] = sc.pubsubTopic(args("topic"), timestampAttribute = "timestamp").map(jsonToObject)
val windowedEvents = rtEvents.withFixedWindows(Duration.standardMinutes(1L),
  options = WindowOptions(trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
    accumulationMode = DISCARDING_FIRED_PANES,
    allowedLateness = Duration.standardSeconds(1L)
  )
)

Я использую windowedEvents для дальнейшей агрегации и вычислений в конвейере

doSomeAggregation(windowedEvents)

def doSomeAggregation(events: SCollection[RTEvent]): SCollection[(String, Map[String, Int])] =
        events.map(e => (e.properties.key, (e.properties.category, e.id)))
          .groupByKey
          .map { case (key, tuple: Iterable[(String, String)]) =>
            val countPerCategory: Map[String, Int] = tuple.groupBy(_._1)
              .mapValues(_.toList.distinct.size)
            //some other http post and logging here
            (key, countPerCategory)
          }

    sc.close().waitUntilFinish()

Если я запускаю работу со следующими параметрами автоматического масштабирования в потоке данных Google

--workerMachineType=n1-standard-8 --autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=4

задание запускается, и фиксированные окна запускаются правильно, если работает только один рабочий. Как только задание автоматически масштабируется до более чем 1 работника, фиксированные окна прекращают работу, а системная задержка начального шага pubsub и время стены продолжают расти, в то время как водяной знак данных не перемещается вперед.

Что-то не так с моей настройкой триггера? Кто-нибудь еще испытывал это на бегунах потока данных или других бегунах? Любая помощь с благодарностью. Я склонен отказаться от scio и вернуться обратно к apache-beam Java SDK, если я не могу решить эту проблему.

1 ответ

Мне удалось решить проблему. В моей нынешней настройке рабочие не могли общаться друг с другом. Работа молча завершается неудачей без каких-либо ошибок тайм-аута (что-то, вероятно, луч должен распространяться как ошибка)

Если вы используете поток данных в качестве своего бегуна, убедитесь, что брандмауэр, определенный для потока данных в вашем проекте, определен для сети по умолчанию.

Если брандмауэр потока данных определен для вашей сети, вам нужно будет передать дополнительный параметр времени выполнения в вашу работу

--workerMachineType=n1-standard-8 --autoscalingAlgorithm=THROUGHPUT_BASED --maxNumWorkers=4 --network='your-network'

Другие вопросы по тегам