Flink потребляет больше памяти, чем ожидалось

Я использую Flink 1.4.1 обрабатывать транзакционные события и HDFS для хранения информации о контрольных точках для обеспечения отказоустойчивости.

Было создано задание для сбора информации о клиентах, днях недели и часах дня, что позволило создать профиль, как показано в приведенном ниже коде.

val stream = env.addSource(consumer)
val result = stream
  .map(openTransaction => {
    val transactionDate = openTransaction.get("transactionDate")
    val date = if (transactionDate.isTextual)
      LocalDateTime.parse(transactionDate.asText, DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
    else
      transactionDate.asLong
    (openTransaction.get("clientId").asLong, openTransaction.get("amount").asDouble, new Timestamp(date))
  })
  .keyBy(0)
  .window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
  .sum(1)

В вышеприведенном коде поток имеет три поля: "транзакция транзакции", "идентификатор клиента" и "сумма". Мы создаем поток с ключами через clientId и скользящее окно, суммирующее сумму. В нашей базе данных около 100.000 уникальных активных идентификаторов клиентов.

Через некоторое время общая оперативная память, используемая заданием, стабилизируется на уровне 36 ГБ, но сохраненная контрольная точка в HDFS использует только 3 ГБ. Есть ли способ уменьшить использование оперативной памяти для работы, возможно, путем настройки коэффициента репликации Flink или с помощью RocksDB?

1 ответ

Использование RocksDB - абсолютно то, что вы должны учитывать для этого размера состояния, и, в зависимости от шаблонов использования, может иметь гораздо меньшие контрольные точки, поскольку оно выполняет это постепенно, только копируя новые или обновленные SST.

Имейте в виду некоторые вещи:

  • Каждая параллельная подзадача с состоянием оператора будет иметь свой собственный экземпляр RocksDB.
  • Если вы переключаетесь на RocksDB для контрольной точки, и она начинает работать медленнее, чем вам нужно, убедитесь, что используемая сериализация максимально эффективна.
  • Flink предоставляет некоторые PredefinedOptions, основанные на вашей файловой системе поддержки, убедитесь, что вы выбрали это соответствующим образом
  • Если предопределенные параметры не работают для вас, вы можете переопределить OptionsFactory для бэкэнда RocksDB и настроить отдельные параметры RocksDB

Еще одна вещь, которую следует отметить при использовании памяти во Flink с временными окнами с ключами, это то, что "таймеры" могут использовать значительное количество памяти, если вы собираетесь использовать сотни тысяч или миллионы. Таймеры мерцания основаны на куче (на момент написания этой статьи) и синхронно проверяются независимо от состояния вашего сервера.

Другие вопросы по тегам