Что вызывает эту неустойчивую проблему с Node Kafka Streams?
У меня есть производитель и потребитель Кафки.
Производитель делает это:
const returnMessage = {
prop1: 'some string',
prop2: 'another string',
prop3: nestedObject
};
console.log(JSON.stringify(returnMessage))
await stream.writeToStream(JSON.stringify(returnMessage));
Потребитель делает это:
incomingStream.forEach(
message => {
console.log(message.value)
let messageObject = message.value;
...other stuff...
}
);
Теперь, на стороне производителя, возвращаемое сообщение всегда записывается как правильная строка, все хорошо. Но на стороне потребителя, сначала message.value - это правильная строка, из которой можно проанализировать JSON, но при последующих запросах он выглядит как "[объектный объект]". Если
Я чувствую, что здесь что-то упущено... Пожалуйста, помогите, если у вас есть какие-то идеи.
1 ответ
ОК, я понял incomingStream.forEach
происходило внутри маршрута, в то время как поток создавался на уровне контроллера. Я исправил это, переместив forEach
до уровня контроллера, и он отправляет проанализированное сообщение на каждое сообщение, а затем подписывается на события внутри маршрута.