Flink потребляет больше памяти, чем ожидалось
Я использую Flink 1.4.1
обрабатывать транзакционные события и HDFS для хранения информации о контрольных точках для обеспечения отказоустойчивости.
Было создано задание для сбора информации о клиентах, днях недели и часах дня, что позволило создать профиль, как показано в приведенном ниже коде.
val stream = env.addSource(consumer)
val result = stream
.map(openTransaction => {
val transactionDate = openTransaction.get("transactionDate")
val date = if (transactionDate.isTextual)
LocalDateTime.parse(transactionDate.asText, DateTimeFormatter.ISO_DATE_TIME).toInstant(ZoneOffset.UTC).toEpochMilli
else
transactionDate.asLong
(openTransaction.get("clientId").asLong, openTransaction.get("amount").asDouble, new Timestamp(date))
})
.keyBy(0)
.window(SlidingEventWeekTimeWindows.of(Time.days(28), Time.hours(1)))
.sum(1)
В вышеприведенном коде поток имеет три поля: "транзакция транзакции", "идентификатор клиента" и "сумма". Мы создаем поток с ключами через clientId и скользящее окно, суммирующее сумму. В нашей базе данных около 100.000 уникальных активных идентификаторов клиентов.
Через некоторое время общая оперативная память, используемая заданием, стабилизируется на уровне 36 ГБ, но сохраненная контрольная точка в HDFS использует только 3 ГБ. Есть ли способ уменьшить использование оперативной памяти для работы, возможно, путем настройки коэффициента репликации Flink или с помощью RocksDB?
1 ответ
Использование RocksDB - абсолютно то, что вы должны учитывать для этого размера состояния, и, в зависимости от шаблонов использования, может иметь гораздо меньшие контрольные точки, поскольку оно выполняет это постепенно, только копируя новые или обновленные SST.
Имейте в виду некоторые вещи:
- Каждая параллельная подзадача с состоянием оператора будет иметь свой собственный экземпляр RocksDB.
- Если вы переключаетесь на RocksDB для контрольной точки, и она начинает работать медленнее, чем вам нужно, убедитесь, что используемая сериализация максимально эффективна.
- Flink предоставляет некоторые PredefinedOptions, основанные на вашей файловой системе поддержки, убедитесь, что вы выбрали это соответствующим образом
- Если предопределенные параметры не работают для вас, вы можете переопределить OptionsFactory для бэкэнда RocksDB и настроить отдельные параметры RocksDB
Еще одна вещь, которую следует отметить при использовании памяти во Flink с временными окнами с ключами, это то, что "таймеры" могут использовать значительное количество памяти, если вы собираетесь использовать сотни тысяч или миллионы. Таймеры мерцания основаны на куче (на момент написания этой статьи) и синхронно проверяются независимо от состояния вашего сервера.