Длинный объектный поток с завершением асинхронного преобразования слишком скоро

Я передаю ответ от запроса узла в поток преобразования, используя through2Concurrent, Этот ответ поступает в виде буфера и анализируется с помощью объекта JSONStream, Это затем попадает в мой поток преобразования. Затем функция преобразования потока выполняет HTTP-запросы, форматирует ответ и сохраняет его в MongoDB. Мы используем параллельные потоки, потому что это займет недопустимо много времени, чтобы справиться со всем остальным.

response Stream -> JSONStream.parse() -> Transform Stream

описание проблемы
Первоначальный поток ответов содержит примерно 18 000 объектов после анализа. Тем не менее, поток заканчивается и finish Событие получено до того, как обработаны все 18 000 объектов. Не выдается никакой ошибки, но только около 2000 - 5000 объектов фактически обрабатываются до окончания потока. Точное число обрабатывается варьируется.

Вот соответствующий код:

const analyticsTransformer = through2Concurrent.obj({
  maxConcurrency: 15
}, async (doc, enc, cb) => {
  // Make an http request. This is a relatively long request.
  const res = await apim.getAnalytics(doc);
  // Save response to mongo.
  await UsageData.save(res);
  cb();
});

// Kick off the streaming.
broker.getInstances()
  .pipe(JSONStream.parse('*')
  .pipe(analyticsTransformer)
  .on('finish', () => {
    // We reach this way too quickly before we have handled all 18,000 objects
  })
  .on('error', err => {
    // No errors are caught.
  })

Что я пробовал

  • Ожидание события 'end': тот же результат. Необработанные предметы и досрочное прекращение.
  • С помощью through2 (не through2Concurrent): Получить ETIMEOUT после прохождения нескольких тысяч объектов.
  • Настройка highWaterMark до 18 000: это единственное, что сработало. Я могу обработать все объекты, если я изменю это highWatermark значение, но это на самом деле просто повязка на проблему. Я хочу знать, почему это работает и что я могу сделать, чтобы надежно решить свои проблемы с потоковой передачей.

Настройка highWaterMark выглядит так:

const analyticsTransformer = through2Concurrent.obj({
  highWaterMark: 18,000,
  maxConcurrency: 15
}, async (doc, enc, cb) => {
  // ...
});

Почему меняется highWaterMark ценность работы?

Какова реальная причина моего раннего прекращения потока?

Как я могу это исправить?

Заранее спасибо всем, кто может помочь!:)

0 ответов

Другие вопросы по тегам