Длинный объектный поток с завершением асинхронного преобразования слишком скоро
Я передаю ответ от запроса узла в поток преобразования, используя through2Concurrent
, Этот ответ поступает в виде буфера и анализируется с помощью объекта JSONStream
, Это затем попадает в мой поток преобразования. Затем функция преобразования потока выполняет HTTP-запросы, форматирует ответ и сохраняет его в MongoDB. Мы используем параллельные потоки, потому что это займет недопустимо много времени, чтобы справиться со всем остальным.
response Stream -> JSONStream.parse() -> Transform Stream
описание проблемы
Первоначальный поток ответов содержит примерно 18 000 объектов после анализа. Тем не менее, поток заканчивается и finish
Событие получено до того, как обработаны все 18 000 объектов. Не выдается никакой ошибки, но только около 2000 - 5000 объектов фактически обрабатываются до окончания потока. Точное число обрабатывается варьируется.
Вот соответствующий код:
const analyticsTransformer = through2Concurrent.obj({
maxConcurrency: 15
}, async (doc, enc, cb) => {
// Make an http request. This is a relatively long request.
const res = await apim.getAnalytics(doc);
// Save response to mongo.
await UsageData.save(res);
cb();
});
// Kick off the streaming.
broker.getInstances()
.pipe(JSONStream.parse('*')
.pipe(analyticsTransformer)
.on('finish', () => {
// We reach this way too quickly before we have handled all 18,000 objects
})
.on('error', err => {
// No errors are caught.
})
Что я пробовал
- Ожидание события 'end': тот же результат. Необработанные предметы и досрочное прекращение.
- С помощью
through2
(неthrough2Concurrent
): Получить ETIMEOUT после прохождения нескольких тысяч объектов. - Настройка
highWaterMark
до 18 000: это единственное, что сработало. Я могу обработать все объекты, если я изменю этоhighWatermark
значение, но это на самом деле просто повязка на проблему. Я хочу знать, почему это работает и что я могу сделать, чтобы надежно решить свои проблемы с потоковой передачей.
Настройка highWaterMark
выглядит так:
const analyticsTransformer = through2Concurrent.obj({
highWaterMark: 18,000,
maxConcurrency: 15
}, async (doc, enc, cb) => {
// ...
});
Почему меняется highWaterMark
ценность работы?
Какова реальная причина моего раннего прекращения потока?
Как я могу это исправить?
Заранее спасибо всем, кто может помочь!:)