Как создать потоки из строки в Node.Js?

Я использую библиотеку ya-csv, которая ожидает либо файл, либо поток в качестве входных данных, но у меня есть строка.

Как мне преобразовать эту строку в поток в Node?

12 ответов

Решение

Как substack исправил меня в #node, новый API потоков в Node v10 делает это проще:

const Readable = require('stream').Readable;
const s = new Readable();
s._read = () => {}; // redundant? see update below
s.push('your text here');
s.push(null);

… После чего вы можете свободно передать его или иным образом передать его предполагаемому потребителю.

Это не так чисто, как возобновить однострочно, но это позволяет избежать дополнительной зависимости.

(Обновление: в версиях v0.10.26 до v9.2.1, вызов push непосредственно из приглашения REPL произойдет сбой с not implemented исключение, если вы не установили _read, Он не будет зависать внутри функции или скрипта. Если непоследовательность заставляет вас нервничать, включите noop.)

Не используйте ответ Джо Лисса. В большинстве случаев это будет работать, но в моем случае я потерял 4 или 5 часов поиска ошибок. Для этого не нужны сторонние модули.

НОВЫЙ ОТВЕТ:

var Readable = require('stream').Readable

var s = new Readable
s.push('beep')    // the string you want
s.push(null)      // indicates end-of-file basically - the end of the stream

Это должен быть полностью совместимый поток для чтения. Смотрите здесь для получения дополнительной информации о том, как правильно использовать потоки.

СТАРЫЙ ОТВЕТ: Просто используйте собственный поток PassThrough:

var stream = require("stream")
var a = new stream.PassThrough()
a.write("your string")
a.end()

a.pipe(process.stdout) // piping will work as normal
/*stream.on('data', function(x) {
   // using the 'data' event works too
   console.log('data '+x)
})*/
/*setTimeout(function() {
   // you can even pipe after the scheduler has had time to do other things
   a.pipe(process.stdout) 
},100)*/

a.on('end', function() {
    console.log('ended') // the end event will be called properly
})

Обратите внимание, что событие close не генерируется (что не требуется для потоковых интерфейсов).

Начиная с узла 10.17, stream.Readable имеет from метод, позволяющий легко создавать потоки из любой итерации (включая литералы массива):

const { Readable } = require("stream")

const readable = Readable.from(["input string"])

readable.on("data", (chunk) => {
  console.log(chunk) // will be called once with `"input string"`
})

Обратите внимание, что по крайней мере между 10.17 и 12.3 строка сама является итерируемой, поэтому Readable.from("input string") будет работать, но генерировать одно событие для каждого символа. Readable.from(["input string"]) будет генерировать одно событие для каждого элемента в массиве (в данном случае один элемент).

Также обратите внимание, что в более поздних узлах (вероятно, 12.3, поскольку в документации говорится, что функция была изменена тогда), больше нет необходимости заключать строку в массив.

https://nodejs.org/api/stream.html

Просто создайте новый экземпляр stream модуль и настроить его в соответствии с вашими потребностями:

var Stream = require('stream');
var stream = new Stream();

stream.pipe = function(dest) {
  dest.write('your string');
  return dest;
};

stream.pipe(process.stdout); // in this case the terminal, change to ya-csv

или же

var Stream = require('stream');
var stream = new Stream();

stream.on('data', function(data) {
  process.stdout.write(data); // change process.stdout to ya-csv
});

stream.emit('data', 'this is my string');

Изменить: ответ Гарт, вероятно, лучше.

Мой старый текст ответа сохраняется ниже.


Чтобы преобразовать строку в поток, вы можете использовать приостановленный поток:

through().pause().queue('your string').end()

Пример:

var through = require('through')

// Create a paused stream and buffer some data into it:
var stream = through().pause().queue('your string').end()

// Pass stream around:
callback(null, stream)

// Now that a consumer has attached, remember to resume the stream:
stream.resume()

Для этого есть модуль: https://gist.github.com/kawanet/8aea35dc4a578f09757d

/**
* @see https://www.npmjs.com/package/string-to-stream
*/
var str = require('string-to-stream')
str('hi there').pipe(process.stdout) // => 'hi there' 

Другое решение - передать функцию чтения в конструктор Readable (см. Параметры чтения потока cf doc)

var s = new Readable({read(size) {
    this.push("your string here")
    this.push(null)
  }});

Вы можете после использования s.pipe для примера

В кофе-скрипте:

class StringStream extends Readable
  constructor: (@str) ->
    super()

  _read: (size) ->
    @push @str
    @push null

используй это:

new StringStream('text here').pipe(stream1).pipe(stream2)

Вот изящное решение на TypeScript:

import { Readable } from 'stream'

class ReadableString extends Readable {
    private sent = false

    constructor(
        private str: string
    ) {
        super();
    }

    _read() {
        if (!this.sent) {
            this.push(Buffer.from(this.str));
            this.sent = true
        }
        else {
            this.push(null)
        }
    }
}

const stringStream = new ReadableString('string to be streamed...')

Я устал от переучивания этого каждые шесть месяцев, поэтому я просто опубликовал модуль npm, чтобы абстрагировать детали реализации:

https://www.npmjs.com/package/streamify-string

Это ядро ​​модуля:

const Readable = require('stream').Readable;
const util     = require('util');

function Streamify(str, options) {

  if (! (this instanceof Streamify)) {
    return new Streamify(str, options);
  }

  Readable.call(this, options);
  this.str = str;
}

util.inherits(Streamify, Readable);

Streamify.prototype._read = function (size) {

  var chunk = this.str.slice(0, size);

  if (chunk) {
    this.str = this.str.slice(size);
    this.push(chunk);
  }

  else {
    this.push(null);
  }

};

module.exports = Streamify;

str это string это должно быть передано конструктору при вызове, и будет выведено потоком как данные. options Типичные параметры, которые могут быть переданы в поток, согласно документации.

Согласно Travis CI, он должен быть совместим с большинством версий узла.

В NodeJS вы можете создать читаемый поток несколькими способами:

РЕШЕНИЕ 1

Вы можете сделать это с помощью fsмодуль. Функция fs.createReadStream() позволяет вам открыть читаемый поток, и все, что вам нужно сделать, это передать путь к файлу, чтобы начать потоковую передачу.

      const fs = require('fs');

const readable_stream = fs.createReadStream('file_path');

РЕШЕНИЕ 2

Если вы не хотите создавать файл, вы можете создать поток в памяти и что-то с ним сделать (например, загрузить куда-нибудь). Вы можете сделать это с помощью module. Вы можете импортировать Readable из streamмодуль, и вы можете создать читаемый поток. При создании объекта также можно реализовать read()метод, который используется для чтения данных из внутреннего буфера. Если данные недоступны для чтения, nullвозвращается. Необязательный аргумент указывает определенное количество байтов для чтения. Если size аргумент не указан, будут возвращены все данные, содержащиеся во внутреннем буфере.

      const Readable = require('stream').Readable;

const readable_stream = new Readable({
  ​read(size) {
   ​// ...
​  }
});

РЕШЕНИЕ 3

Когда вы загружаете что-то по сети, это может быть получено как поток (например, вы загружаете документ PDF из некоторого API).

      const axios = require('axios');

const readable_stream = await axios({
  method: 'get',
  url: "pdf_resource_url",
  responseType: 'stream'
}).data;

РЕШЕНИЕ 4

Сторонние пакеты могут поддерживать создание потоков как функцию. Это способ с aws-sdk пакет, который обычно используется для загрузки файлов в S3.

      const file = await s3.getObject(params).createReadStream();

JavaScript является утилитарным, поэтому, если вы просто скопируете API читаемого потока, он будет работать просто отлично. На самом деле, вы, вероятно, не можете реализовать большинство этих методов или просто оставить их как заглушки; все, что вам нужно реализовать, - это то, что использует библиотека. Вы можете использовать предварительно построенный Node EventEmitter класс, чтобы иметь дело с событиями, так что вам не нужно реализовывать addListener и такой себе.

Вот как вы можете реализовать это в CoffeeScript:

class StringStream extends require('events').EventEmitter
  constructor: (@string) -> super()

  readable: true
  writable: false

  setEncoding: -> throw 'not implemented'
  pause: ->    # nothing to do
  resume: ->   # nothing to do
  destroy: ->  # nothing to do
  pipe: -> throw 'not implemented'

  send: ->
    @emit 'data', @string
    @emit 'end'

Тогда вы можете использовать это так:

stream = new StringStream someString
doSomethingWith stream
stream.send()
Другие вопросы по тегам