Весенний фильтр Кафка не фильтрует потребительские записи

Я пытаюсь отфильтровать сообщения ConsumerRecord перед использованием, основываясь на содержимом одного из полей в ConsumerRecord.

Пример записи потребителя перед применением фильтра (ищите GP_ID в значении):

 ConsumerRecord(topic = jdbc-project, partition = 0, offset = 0, CreateTime = 1551118248440, serialized key size = -1, serialized value size = 69, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"GP_ID": {"bytes": "@"}, "PROJECT_ID": {"bytes": "\u001E\u008C"}, "START_DATE": 1009843200000, "END_DATE": 1041292800000, "TITLE": "Project- FPH", "STATUS_CODE": "INACTIVE"})

KafkaRecordVO(projectId=7820, gpId=64)

Когда я устанавливаю recordFilterStrategy, как показано ниже в kafkaListenerContainerFactory ():

@Bean
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactoryProject() {
    ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(new RecordFilterStrategy<String, GenericRecord>() {
        @Override
        public boolean filter(ConsumerRecord<String, GenericRecord> consumerRecord) {
            long gpId= KafkaRecordVO.convertByteBufferToLong(consumerRecord.value().get("GP_ID"));
            if(gpId == 10766L || gpId == 10823L || gpId == 10459L || gpId == 10649L)
                return false;
            else
                return true;
        }
    });
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    return factory;
}

KafkaRecordVO.convertByteBufferToLong преобразует значение байтового буфера в длинное значение.

Это оценивает правильно и возвращает истину / ложь.

Но, когда он потребляется слушателем Кафки, как показано ниже:

@KafkaListener(id = "project", topics = "jdbc-project", containerFactory = "kafkaListenerContainerFactoryProject")
public void consumeProject(ConsumerRecord<String, GenericRecord> record,Acknowledgment acknowledgment) {
    log.debug(record.toString());
    KafkaRecordVO recordVo = new KafkaRecordVO().projectId(record.value().get("PROJECT_ID"))
                                                .budgetYear(record.value().get("GP_ID"));
    log.debug(recordVo.toString());
}

Это возвращает запись, очищающую значение поля, по которому я отфильтровал: "GP_ID"

Это примеры журналов, созданных после применения фильтра (посмотрите значение GP_ID в значении):

ConsumerRecord(topic = jdbc-project, partition = 0, offset = 171275, CreateTime = 1551118279371, serialized key size = -1, serialized value size = 181, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = { "GP_ID": {"bytes": ""}, "PROJECT_ID": {"bytes": "\u0005â^"}, "START_DATE": 1470009600000, "END_DATE": 1532995200000, "TITLE": "Project 2016 - 2016", "STATUS_CODE": "INACTIVE"})

KafkaRecordVO(projectId=385630, gpId=0)

Я получаю это в журнале с другим полем: "GP_ID": {"bytes": ""} в моем kafkaListener для этой темы. Как не скинуть значение? Что здесь не так?

1 ответ

То, что вы предлагаете, не имеет смысла; адаптер фильтра имеет этот код...

@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
    if (!filter(consumerRecord)) {
        switch (this.delegateType) {
            case ACKNOWLEDGING_CONSUMER_AWARE:
                this.delegate.onMessage(consumerRecord, acknowledgment, consumer);
                break;
            case ACKNOWLEDGING:
                this.delegate.onMessage(consumerRecord, acknowledgment);
                break;
            case CONSUMER_AWARE:
                this.delegate.onMessage(consumerRecord, consumer);
                break;
            case SIMPLE:
                this.delegate.onMessage(consumerRecord);
        }
    }
    else {
        ackFilteredIfNecessary(acknowledgment);
    }
}

Он не выполняет никаких манипуляций с записью.

Попробуйте запустить в отладчике, чтобы увидеть, если вы видите, что происходит.

Другие вопросы по тегам