Как использовать наблюдаемое в `filter`?

Предположим, у меня есть структура каталогов, подобная следующей:

foo
|
+---one
|   +---tmp
|
+---two
|
+---three
    |
    +---tmp

Я хочу получить список подкаталогов под foo который имеет tmp (под) подкаталог внизу. У меня есть следующий код, который делает это:

const { bindNodeCallback, from } = require('rxjs');
const { map, flatMap, filter } = require('rxjs/operators');
const { readdir, access, accessSync, constants: { F_OK, R_OK }} = require('fs');
const { join } = require('path');

const readdirRx = bindNodeCallback(readdir);

const FOO = './foo';

const result = readdirRx(FOO)
    .pipe(
        // readdir will return an array of files. Convert each
        // file into an observable, and then flatten it.
        flatMap(from),

        // get rid of directories that do not have a `tmp` dir underneath
        filter(dir => {
            try {
                accessSync(join(FOO, dir, 'tmp'), F_OK | R_OK);
                return true;
            } catch (err) {
                return false;
            }
        })
    );
result.subscribe(console.log, console.error, () => console.log('Done!'));
// outputs (correctly):
//   one
//   three
//   Done!

Тем не менее, этот код выглядит ужасно для меня, потому что (а) я использую исключения для потока управления, и (б) есть синхронный accessSync вызов, оба из которых наносят ущерб производительности.

У меня есть следующий реактивный кусок кода, который проверяет то же самое:

const fileExistsAndReadableRx = bindNodeCallback(
    // check if file exists and readable
    (file, cb) => access(file, F_OK | R_OK,
        // call the callback with true if no errors found
        err => cb(!err)
    )
);

Тем не менее, я не могу понять, как я могу подключить fileExistsAndReadableRx в вышеуказанную программу. Моя цель - удалить блок try-catch и использовать fileExistsAndReadableRx вместо этого, чтобы отфильтровать подкаталоги, которые не имеют tmp (Суб) подкаталог. Как я могу это сделать?

(Обратите внимание, что реальная задача, которую я пытаюсь выполнить, - это не чтение диска. То, что я пытаюсь сделать, - это сложная асинхронная операция, и мне пришлось придумать более простой пример, чтобы проиллюстрировать мою проблему).

Что я уже пробовал:

Я пытался использовать map, но он испускает поток Observables, как и следовало ожидать:

const result = readdirRx(FOO)
    .pipe(
        flatMap(from),
        map(dir =>
            fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
        )
    );
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
//   Observable { _isScalar: false, _subscribe: [Function] }
//   Observable { _isScalar: false, _subscribe: [Function] }
//   Observable { _isScalar: false, _subscribe: [Function] }
//   Done!

Вот и я подумал, я знаю! Я бы использовал flatMap чтобы сгладить наблюдаемые. Это должно привести к трем булевым значениям, которые в конечном итоге излучаются из этих наблюдаемых. Но это тоже не сработало. Выдает только одно значение:

const result = readdirRx(FOO)
    .pipe(
        flatMap(from),
        flatMap(dir =>
            fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
        )
    );
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
//   true
//   Done!

Редактировать:

Я попробовал Ingo Bürk, но он выдает единственное логическое значение. Не список строк, как я ожидал:

const result = readdirRx(FOO)
    .pipe(
        flatMap(from),
        flatMap(dir => fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
            .pipe(
                filter(Boolean),
                map(() => dir),
            )
        ),
    );
result.subscribe(console.log, console.error, () => console.log('Done!'));
// Outputs:
//   true
//   Done!

1 ответ

Решение

Вы можете сделать что-то вроде этого:

readdirRx(FOO)
  .pipe(
    flatMap(from),
    flatMap(dir => fileExistsAndReadableRx(join(FOO, dir, 'tmp'))
      .pipe(
        catchError(res => of(res)),
        filter(Boolean),
        map(() => dir),
      )
    ),
  )

Предполагается, что fileExistsAndReadableRx возвращается Observable<boolean>, filter(Boolean) просто коротка для filter(v => !!v),

Уловка здесь по сути то, что вы пробовали, а именно (плоское) сопоставление каждого каталога с наблюдаемой проверкой для папки tmp. Затем мы используем этот результат, чтобы отфильтровать тех, кто ему не соответствует, и затем сопоставить его с каталогом, а не с этим промежуточным результатом.

Вы можете увидеть это в действии здесь: https://rxviz.com/v/d8djbkRO

вы, вероятно, захотите использовать cb(null, !err) вызвать обратный вызов в fileExistsAndReadableRx

Другие вопросы по тегам