Исключение десериализации Quarkus Kafka Streams/Reactive Messaging

Привет, я экспериментировал как с Kafka Streams, так и с MP Reactive Messaging, чтобы читать из темы Kafka, а затем возвращаться к ней.

Ошибка Kafka Streams -

org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.

Ошибка реактивного обмена сообщениями аналогична, но в основном POJO, в котором происходит десериализация обмена сообщениями, выглядит так:

    public class FinancialMessage {
    
    public String user_id;
    public String stock_symbol;
    public String exchange_id;
    public String trade_type;
    public String date_created;
    public String date_submitted;
    public int quantity;
    public double stock_price;
    public double total_cost;
    public int institution_id;
    public int country_id;
    public boolean compliance_services;
    public boolean technical_validation;
    public boolean schema_validation;
    public boolean business_validation;
    public boolean trade_enrichment;
   }

Обратите внимание, что есть пустой конструктор по умолчанию и конструктор со всеми полями.

import java.time.Instant;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;

import io.quarkus.kafka.client.serialization.JsonbSerde;
import io.quarkus.kafka.client.serialization.JsonbSerializer;


@ApplicationScoped
public class ComplianceTopology {

    private static final String KTABLE_TOPIC = "kstreams-ktable-topic";
    private static final String INCOMING_TOPIC = "kstreams-incoming-test";
    private static final String OUTGOING_TOPIC = "kstreams-outgoing-test";


    @Produces
    public Topology buildTopology() {

        StreamsBuilder builder = new StreamsBuilder();

        JsonbSerde<FinancialMessage> financialMessageSerde = new JsonbSerde<>(FinancialMessage.class);

        builder.stream(
            INCOMING_TOPIC,
            Consumed.with(Serdes.Integer(), financialMessageSerde)
        )
        .filter(
            (key, message) -> checkCompliance(message)
        )
        .mapValues (
            checkedMessage -> performComplianceCheck(checkedMessage)
        )
        .to (
            INCOMING_TOPIC,
            Produced.with(Serdes.Integer(), financialMessageSerde)
        );  
        
        return builder.build();
    }

    public boolean checkCompliance (FinancialMessage rawMessage) {
        return (rawMessage.compliance_services);
    }

    public FinancialMessage performComplianceCheck(FinancialMessage checkedMessage) {
        checkedMessage.compliance_services = false;

        return checkedMessage;
    }

}

Тем не менее, я предполагаю, что это называется "ядовитая таблетка", но одно сообщение, созданное MQ с полезной нагрузкой "Aloha", ломает его, и я не могу десериализовать его. Я предполагаю, что причина этого в том, что "Алоха" не распознается как строка, поскольку она заключена в одинарные кавычки. У меня нет доступа к тому, как эти данные отправляются, так как они были отправлены способами MQ. Есть ли способ пропустить обработку этого недесериализуемого сообщения и просто продолжить обработку из темы?

1 ответ

Как указано в сообщении об ошибке

please set the default.deserialization.exception.handler appropriately

вы можете настроить другой обработчик исключений десериализации, чтобы пропускать сообщения, которые не могут быть десериализованы.

Дополнительные сведения см. В документации: https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html.

Другие вопросы по тегам