Как раскошелиться поток, используя highland.js?
У меня есть sourceStream
состоящий из BaseData
объекты.
Я хочу раскрутить этот поток в n
количество различных потоков, которые затем фильтруют и преобразуют каждый BaseData
возразить по своему вкусу.
В конце концов, я хочу иметь n
потоки, содержащие только определенный тип, и разветвленные потоки могут различаться по длине, поскольку данные могут быть отброшены или добавлены в будущем.
Я думал, что смогу настроить это через fork
:
import * as _ from 'highland';
interface BaseData {
id: string;
data: string;
}
const sourceStream = _([
{id: 'foo', data: 'poit'},
{id: 'foo', data: 'fnord'},
{id: 'bar', data: 'narf'}]);
const partners = [
'foo',
'bar',
];
partners.forEach((partner: string) => {
const partnerStream = sourceStream.fork();
partnerStream.filter((baseData: BaseData) => {
return baseData.id === partner;
});
partnerStream.each(console.log);
});
Я ожидал, что теперь будет два потока, и foo
-поток, содержащий два элемента:
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
и bar
-поток, содержащий один элемент:
{ id: 'bar', data: 'narf' }
Но вместо этого я получаю ошибку:
/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
throw new Error(
^
Error: Stream already being consumed, you must either fork() or observe()
at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
at Array.forEach (native)
at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
Как разделить поток на несколько потоков?
Я также попытался объединить вызовы в цепочку, но потом получаю только один результат потока:
partners.forEach((partner: string) => {
console.log(partner);
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
});
partnerStream.each((item: BaseData) => {
console.log(item);
});
});
Только печать:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
Вместо ожидаемого:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}
Это также может быть тот случай, когда я неправильно понял fork
было все о. Согласно записи в документе:
Stream.fork () Форкирует поток, позволяя добавлять дополнительных потребителей с общим обратным давлением. Поток, разветвленный нескольким потребителям, будет извлекать значения из своего источника только тогда, когда самый медленный потребитель сможет их обработать.
ПРИМЕЧАНИЕ: не зависит от согласованного порядка выполнения между вилками. Это преобразование только гарантирует, что все вилки обработают значение foo, прежде чем любой обработает вторую строку значений. Это не гарантирует порядок обработки вилок foo.
СОВЕТ: Будьте осторожны при изменении потоковых значений в вилках (или при использовании библиотеки, которая делает это). Поскольку одно и то же значение будет передано на каждый форк, изменения, сделанные на одном форке, будут видны на любом форке, который выполняется после него. Добавьте к этому непоследовательный порядок выполнения, и вы можете получить тонкие ошибки искажения данных. Если вам нужно изменить какие-либо значения, вы должны сделать копию и изменить копию вместо этого.
Предупреждение об устаревании: в настоящее время возможно преобразовать поток после его использования (например, через преобразование). Это больше не будет возможно в следующем основном выпуске. Если вы собираетесь форкнуть поток, всегда вызывайте форк на нем.
Так что вместо "Как раскошелиться поток?" мой реальный вопрос может быть таким: Как дублировать горный поток на лету в разные потоки?
2 ответа
Нужно иметь в виду, чтобы не использовать раздвоенный поток, прежде чем все вилки будут созданы. Как если бы кто-то использовал раздвоенный поток, он и его "родитель" будут израсходованы, а любой последующий разветвление будет разветвленным из пустого потока.
const partnerStreams: Array<Stream<BaseData>> = [];
partners.forEach((partner: string) => {
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
}
);
partnerStreams.push(partnerStream);
});
partnerStreams.forEach((stream, index) => {
console.log(index, stream);
stream.toArray((foo) => {
console.log(index, foo);
});
});
Это печатает:
0 [ { id: 'foo', data: 'poit' }, { id: 'foo', data: 'fnord' } ]
1 [ { id: 'bar', data: 'narf' } ]
partnerStream.filter()
возвращает новый поток Вы тогда потребляете partnerStream
снова используя partnerStream.each()
без звонка fork()
или же observe()
, Так что либо цепочка partnerStream.filter().each()
звонки или присвоить возвращаемое значение partnerStream.filter()
к переменной и вызов .each()
на что.