Использование Grok в потоковой передачи Flink
Флинк Трубопровод выглядит следующим образом:
- читать сообщения (строки) из темы кафки.
- сопоставление с образцом через преобразование grok в формат json.
- Агрегации за промежуток времени по извлеченному полю из json.
Ниже приведен код для сопоставления с шаблоном с помощью grok.
SingleOutputStreamOperator<JSONObject> mainStream = messageStream.rebalance()
.map(new MapFunction<String, JSONObject>() {
private static final long serialVersionUID = 6;
@Override
public JSONObject map(String value) throws Exception {
JSONObject logJson = new JSONObject();
grok.compile(pattern); //pattern is some pattern defined in the class
Match gm = grok.match(value);
gm.captures();
logJson.putAll(gm.toMap());
return logJson;
}})
В приведенном выше написании кода grok.compile(pattern)
внутри карта работает нормально. Если этого не сделать, выдается следующая ошибка
Реализация MapFunction не сериализуема
Вызывается: java.io.NotSerializableException: com.google.code.regexp.Pattern
Есть ли способ, которым я мог бы удалить grok.compile за пределами карты. Насколько я понимаю, компиляция шаблона с каждым сообщением не требуется и может создать узкое место, если нет. сообщений становится довольно большим.
PS: я импортировал пакет oi.thekraken.grok.api.Grok
РЕДАКТИРОВАТЬ:
Я просмотрел реализацию grok, а класс Grok реализует Serializable. https://github.com/thekrakken/java-grok/blob/master/src/main/java/io/thekraken/grok/api/Grok.java
1 ответ
Ваш код не показывает, откуда взята локальная переменная grok, но:
Flink требует, чтобы все операторы были Serializable, потому что они могут перемещаться в кластере. Это также относится ко всем членам операторов. Можете ли вы опубликовать полный нерабочий пример? Это может упростить просмотр места сбоя сериализации.
Дополнительную информацию о сериализации flink можно найти в документации flink по адресу https://flink.apache.org/faq.html и https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html
По сути, вы можете зарегистрировать сериализатор kryo для пользовательских типов или реализовать (де) сериализацию самостоятельно, если вам нужны члены-операторы, которые не сериализуются напрямую.
Кстати: я думаю, что вы правы, пытаясь уменьшить количество раз, когда шаблон компилируется