Объединить наблюдаемые в один наблюдаемый из динамического массива наблюдаемых?

Резюме: по сути, мне нужно что-то вроде from Users where usergroupIds in [1,3,5],

Дано usergroupIds$ вывести массив идентификаторов группы [1,3,5]

Я хочу объединить всех пользователей по userids в usergroupIds и объединить идентификаторы пользователей (разные)

Вот что я придумал:

usergroupIds$.subscribe(usergroupIds => {
    const users$s = usergroupIds.map(gid => createUsersObservable(gid))
    //  [users$ in group 1, users$ in group 3, users$ in group 5]
    const users$ = combineLatest(...user$s).pipe(
        distinct(user => user.id)
    )
})

createUsersObservable = gid => 
  collectionData(db.collection('users').where('groupId', '==', gid)) // rxFire firestore

Отписаться и повторно подписаться users$ каждый раз, когда изменения кажутся неправильными?

Можно ли выразить users$ полностью в RxJS без создания его в подписке каждый раз?

Обновление: с помощью @FanCheung:

combinedUsers$ = userGroupIds$.pipe(
        switchMap(userGroupIds => {
            const users$s = userGroupIds.map(groupId =>
                createUsersObservable(groupId))
            return combineLatest(...users$s)
        })

Однако, так как usersObservable генерирует массив пользователей одновременно, комбинированный пользователь $ приводит к чему-то вроде [[userA, userB], [userB, userC], [userA, userD]] который я не против провести дополнительную обработку по подписке:

combinedUsers$.subscribe(combinedUsers => {
        const userMap = {}
        for (const users of combinedUsers) 
            users.forEach(user => (userMap[user.id] = user))
        const uniqueUsers = Object.values(userMap)

        // update UI to uniqueUsers
    })

Тем не менее, есть ли способ использовать как-то сгладить результаты combinedUsers$, а затем выполнить distinct оператор?

2 ответа

Решение

Смотрите, если это работает. По сути, он срабатывает, когда usergroupIds$ испускает, и, таким образом, вы получаете новую динамическую наблюдаемость. shareReplay нужен, если вы всегда хотите, чтобы источник наблюдался usergroupdIds$ выдать последнее сохраненное значение при подписке

usergroupIds$.pipe(
shareReplay(1),
switchMap(usergroupIds => {
    const users$Array = usergroupIds.map(gid => createUsersObservable(gid))
    //  [users$ in group 1, users$ in group 3, users$ in group 5]
   return forkJoin(...users$Array).pipe(
        map(arr=>[].concat(...arr)),
        switchMap(arr=>from(arr)),
        distinct(user => user.id)
    )
})
)

Таким образом, вы не хотите делать отдельные запросы createUsersObservable каждый раз usergroupIds$ излучает. Это звучит как хороший пример использования mergeScan:

usergroupIds$.pipe(
  mergeScan((users, user) => createUsersObservable().pipe(
    map(user => [user, ...users]),
  ), []),
);

Демонстрационная версия: https://stackblitz.com/edit/rxjs-8zybdu

Другие вопросы по тегам