_read() не реализовано в Потоке с возможностью чтения

Этот вопрос заключается в том, как реально реализовать метод чтения читаемого потока.

У меня есть эта реализация потока для чтения:

import {Readable} from "stream";
this.readableStream = new Readable();

Я получаю эту ошибку

events.js:136 throw er; // Необработанное событие 'error' ^

Ошибка [ERR_STREAM_READ_NOT_IMPLEMENTED]: _read() не реализована в Readable._read (_stream_readable.js:554:22) в Readable.read (_stream_readable.js:445:10) в resume_ (_stream_readable.js:825:12) в _combinedTick (internal/process/next_tick.js:138:11) в process._tickCallback (внутренняя /process/next_tick.js:180:9) в Function.Module.runMain (module.js:684:11) при запуске (bootstrap_node.js:191:16) по адресу bootstrap_node.js:613:3

Причина возникновения ошибки очевидна, нам нужно сделать это:

  this.readableStream = new Readable({
      read(size) {
        return true;
      }
    });

Я не очень понимаю, как реализовать метод чтения, хотя.

Единственное, что работает, это просто звонить

this.readableStream.push('some string or buffer');

если я попытаюсь сделать что-то вроде этого:

   this.readableStream = new Readable({
          read(size) {
            this.push('foo');   // call push here!
            return true;
          }
     });

тогда ничего не происходит - ничего не выходит из читабельного!

Кроме того, в этих статьях говорится, что вам не нужно реализовывать метод read:

https://github.com/substack/stream-handbook

https://medium.freecodecamp.org/node-js-streams-everything-you-need-to-know-c9141306be93

Мой вопрос - почему вызов push внутри метода read ничего не делает? Единственное, что работает для меня, это просто вызвать readable.push() в другом месте.

4 ответа

почему вызов push внутри метода read ничего не делает? Единственное, что работает для меня, это просто вызвать readable.push() в другом месте.

Я думаю, это потому, что вы не потребляете его, вам нужно направить его в поток для записи (например, stdout) или просто использовать его через data событие:

const { Readable } = require("stream");

let count = 0;
const readableStream = new Readable({
    read(size) {
        this.push('foo');
        if (count === 5) this.push(null);
        count++;
    }
});

// piping
readableStream.pipe(process.stdout)

// through the data event
readableStream.on('data', (chunk) => {
  console.log(chunk.toString());
});

Они оба должны печатать 5 раз foo (они немного отличаются, хотя). Какой из них вы должны использовать, зависит от того, чего вы пытаетесь достичь.

Кроме того, в этих статьях говорится, что вам не нужно реализовывать метод read:

Вам это может не понадобиться, это должно работать:

const { Readable } = require("stream");

const readableStream = new Readable();

for (let i = 0; i <= 5; i++) {
    readableStream.push('foo');
}
readableStream.push(null);

readableStream.pipe(process.stdout)

В этом случае вы не можете захватить его через data событие. Кроме того, этот способ не очень полезен и неэффективен, я бы сказал, мы просто помещаем все данные в поток сразу (если он большой, все будет в памяти), а затем потребляем его.

Из документации:

readable._read:

"Когда вызывается readable._read(), если данные доступны из ресурса, реализация должна начать помещать эти данные в очередь чтения, используя метод this.push(dataChunk). Link"

readable.push:

"Метод readable.push() предназначен для вызова только реализуемыми читателями и только из метода readable._read(). Ссылка"

Надеюсь, поможет:)

Реализовать _read метод после инициализации вашего ReadableStream:

import {Readable} from "stream";
this.readableStream = new Readable();
this.readableStream.read = function () {};

readableStream похож на пул:

  • .push(data). Это похоже на перекачку воды в бассейн.
  • .pipe(destination). Это как подключить бассейн к трубе и перекачивать воду в другое место.
  • _read(size) работает как насос и контролирует, сколько воды течет и когда данные заканчиваются.

fs.createReadStream() создаст поток чтения с функцией _read(), которая была автоматически реализована для отправки данных файла и завершения при завершении файла.

_read(size) срабатывает автоматически, когда пул подключен к каналу. Таким образом, если вы принудительно вызовете эту функцию, не подключая путь к месту назначения, она будет перекачиваться в ?куда? и это влияет на состояние машины внутри _read() (может быть, курсор перемещается в неправильное место,...)

Функция read() должна быть создана внутри нового Stream.Readable(). На самом деле это функция внутри объекта. Это не readableStream.read(), и реализовать readableStream.read=function(size){...} не получится.

Простой способ понять реализацию:

      var Reader=new Object();
Reader.read=function(size){
    if (this.i==null){this.i=1;}else{this.i++;}
    this.push("abc");
    if (this.i>7){ this.push(null); }
}

const Stream = require('stream');
const renderStream = new Stream.Readable(Reader);

renderStream.pipe(process.stdout)

Вы можете использовать его для перезаписи любых потоковых данных на POST на другой сервер. Потоковые данные POST с Axios:

      require('axios')({
    method: 'POST',
    url: 'http://127.0.0.1:3000',
    headers: {'Content-Length': 1000000000000},
    data: renderStream
});
Другие вопросы по тегам