Буфер через файл
У меня есть сценарий, когда мой производитель работает намного быстрее, чем мой потребитель с большим количеством данных. Однако я не хочу блокировать продюсера. Таким образом, единственный вариант - буферизовать дополнительные данные через файл.
Мне удалось сделать неоптимальное решение. Однако должен быть более прагматичный способ реализовать это.
Я более знаком с RxJS, где у меня, вероятно, не было бы больших проблем, но я только начал переключаться на горную местность и был бы признателен за некоторые советы.
import fs from 'fs-promise'
import _ from 'highland'
export default function buffer(id, source) {
const ctx = {
fd: null,
written: 0,
read: 0,
eof: false,
err: null,
path: `/tmp/${id}`
}
async function drain(push) {
ctx.fd = ctx.fd || await fs.open(ctx.path, 'w+')
const bufferSize = 1024 * 1024
while (ctx.read < ctx.written) {
let buffer = new Buffer(bufferSize)
buffer = buffer.slice(0,
await fs.read(ctx.fd, buffer, 0, buffer.length, ctx.read))
if (buffer.length === 0) {
break
}
ctx.read += buffer.length
push(null, buffer)
}
}
// TODO: What about errors?
source.each(async (x) => {
try {
ctx.fd = ctx.fd || await fs.open(ctx.path, 'w+')
if (x === _.nil) {
ctx.eof = true
} else {
ctx.written += await fs.write(ctx.fd, x, 0, x.length, ctx.written)
}
} catch (err2) {
ctx.err = err2
}
})
return _(async (push, next) => {
try {
await drain()
if (ctx.err) {
throw ctx.err
} else if (ctx.eof) {
await drain()
fs.unlink(ctx.path)
push(null, _.nil)
} else {
setTimeout(next, 40)
}
} catch (err2) {
fs.unlink(ctx.path)
push(err2)
}
})
}