Использование Grok в потоковой передачи Flink

Флинк Трубопровод выглядит следующим образом:

  1. читать сообщения (строки) из темы кафки.
  2. сопоставление с образцом через преобразование grok в формат json.
  3. Агрегации за промежуток времени по извлеченному полю из json.

Ниже приведен код для сопоставления с шаблоном с помощью grok.

    SingleOutputStreamOperator<JSONObject> mainStream = messageStream.rebalance()
                    .map(new MapFunction<String, JSONObject>() {    
                        private static final long serialVersionUID = 6;

                        @Override
                        public JSONObject map(String value) throws Exception {
                            JSONObject logJson = new JSONObject();  
                            grok.compile(pattern); //pattern is some pattern defined in the class
                            Match gm = grok.match(value);
                            gm.captures();
                            logJson.putAll(gm.toMap());
                            return logJson;
                        }})

В приведенном выше написании кода grok.compile(pattern) внутри карта работает нормально. Если этого не сделать, выдается следующая ошибка

Реализация MapFunction не сериализуема

Вызывается: java.io.NotSerializableException: com.google.code.regexp.Pattern

Есть ли способ, которым я мог бы удалить grok.compile за пределами карты. Насколько я понимаю, компиляция шаблона с каждым сообщением не требуется и может создать узкое место, если нет. сообщений становится довольно большим.

PS: я импортировал пакет oi.thekraken.grok.api.Grok

РЕДАКТИРОВАТЬ:

Я просмотрел реализацию grok, а класс Grok реализует Serializable. https://github.com/thekrakken/java-grok/blob/master/src/main/java/io/thekraken/grok/api/Grok.java

1 ответ

Ваш код не показывает, откуда взята локальная переменная grok, но:

Flink требует, чтобы все операторы были Serializable, потому что они могут перемещаться в кластере. Это также относится ко всем членам операторов. Можете ли вы опубликовать полный нерабочий пример? Это может упростить просмотр места сбоя сериализации.

Дополнительную информацию о сериализации flink можно найти в документации flink по адресу https://flink.apache.org/faq.html и https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html

По сути, вы можете зарегистрировать сериализатор kryo для пользовательских типов или реализовать (де) сериализацию самостоятельно, если вам нужны члены-операторы, которые не сериализуются напрямую.

Кстати: я думаю, что вы правы, пытаясь уменьшить количество раз, когда шаблон компилируется

Другие вопросы по тегам