Что вызывает эту неустойчивую проблему с Node Kafka Streams?

У меня есть производитель и потребитель Кафки.

Производитель делает это:

    const returnMessage = {
        prop1: 'some string',
        prop2: 'another string',
        prop3: nestedObject
    };
    console.log(JSON.stringify(returnMessage))
    await stream.writeToStream(JSON.stringify(returnMessage));

Потребитель делает это:

incomingStream.forEach(
                message => {
                        console.log(message.value)
                        let messageObject = message.value;
                        ...other stuff...
                }
            );

Теперь, на стороне производителя, возвращаемое сообщение всегда записывается как правильная строка, все хорошо. Но на стороне потребителя, сначала message.value - это правильная строка, из которой можно проанализировать JSON, но при последующих запросах он выглядит как "[объектный объект]". Если

Я чувствую, что здесь что-то упущено... Пожалуйста, помогите, если у вас есть какие-то идеи.

1 ответ

Решение

ОК, я понял incomingStream.forEach происходило внутри маршрута, в то время как поток создавался на уровне контроллера. Я исправил это, переместив forEach до уровня контроллера, и он отправляет проанализированное сообщение на каждое сообщение, а затем подписывается на события внутри маршрута.

Другие вопросы по тегам