Как связать события, генерируемые событиями, в Redx-сагу?

Я пытаюсь использовать redux-saga для подключения событий из PouchDB к моему приложению React.js, но я изо всех сил пытаюсь выяснить, как подключить события, излучаемые из PouchDB, к моей Saga. Поскольку событие использует функцию обратного вызова (и я не могу передать его генератору), я не могу использовать yield put() внутри обратного вызова он выдает странные ошибки после компиляции ES2015 (используя Webpack).

Итак, вот что я пытаюсь сделать, часть, которая не работает, находится внутри replication.on('change' (info) => {}),

function * startReplication (wrapper) {
  while (yield take(DATABASE_SET_CONFIGURATION)) {
    yield call(wrapper.connect.bind(wrapper))

    // Returns a promise, or false.
    let replication = wrapper.replicate()

    if (replication) {
      replication.on('change', (info) => {
        yield put(replicationChange(info))
      })
    }
  }
}

export default [ startReplication ]

5 ответов

Решение

Как объяснил Ниррек, когда вам нужно подключиться к push-источникам данных, вам нужно будет создать итератор событий для этого источника.

Я хотел бы добавить, что вышеуказанный механизм можно сделать многоразовым. Поэтому нам не нужно заново создавать итератор событий для каждого отдельного источника.

Решение состоит в том, чтобы создать общий канал с put а также take методы. Вы можете позвонить take метод изнутри генератора и подключите put метод интерфейса слушателя вашего источника данных.

Вот возможная реализация. Обратите внимание, что канал буферизует сообщения, если их никто не ждет (например, Генератор занят удаленным вызовом)

function createChannel () {
  const messageQueue = []
  const resolveQueue = []

  function put (msg) {
    // anyone waiting for a message ?
    if (resolveQueue.length) {
      // deliver the message to the oldest one waiting (First In First Out)
      const nextResolve = resolveQueue.shift()
      nextResolve(msg)
    } else {
      // no one is waiting ? queue the event
      messageQueue.push(msg)
    }
  }

  // returns a Promise resolved with the next message
  function take () {
    // do we have queued messages ?
    if (messageQueue.length) {
      // deliver the oldest queued message
      return Promise.resolve(messageQueue.shift())
    } else {
      // no queued messages ? queue the taker until a message arrives
      return new Promise((resolve) => resolveQueue.push(resolve))
    }
  }

  return {
    take,
    put
  }
}

Тогда вышеуказанный канал можно использовать в любое время, когда вы хотите прослушать внешний источник данных. Для вашего примера

function createChangeChannel (replication) {
  const channel = createChannel()

  // every change event will call put on the channel
  replication.on('change', channel.put)
  return channel
}

function * startReplication (getState) {
  // Wait for the configuration to be set. This can happen multiple
  // times during the life cycle, for example when the user wants to
  // switch database/workspace.
  while (yield take(DATABASE_SET_CONFIGURATION)) {
    let state = getState()
    let wrapper = state.database.wrapper

    // Wait for a connection to work.
    yield apply(wrapper, wrapper.connect)

    // Trigger replication, and keep the promise.
    let replication = wrapper.replicate()

    if (replication) {
      yield call(monitorChangeEvents, createChangeChannel(replication))
    }
  }
}

function * monitorChangeEvents (channel) {
  while (true) {
    const info = yield call(channel.take) // Blocks until the promise resolves
    yield put(databaseActions.replicationChange(info))
  }
}

Мы можем использовать eventChannelредукса-саги

Вот мой пример

// fetch history messages
function* watchMessageEventChannel(client) {
  const chan = eventChannel(emitter => {
    client.on('message', (message) => emitter(message));
    return () => {
      client.close().then(() => console.log('logout'));
    };
  });
  while (true) {
    const message = yield take(chan);
    yield put(receiveMessage(message));
  }
}

function* fetchMessageHistory(action) {
  const client = yield realtime.createIMClient('demo_uuid');
  // listen message event
  yield fork(watchMessageEventChannel, client);
}

Пожалуйста, обратите внимание:

сообщения в eventChannel не буферизируются по умолчанию. Если вы хотите обработать message eventтолько один за другим, вы не можете использовать блокировку вызова послеconst message = yield take(chan);

Или Вы должны предоставить буфер фабрике eventChannel, чтобы указать стратегию буферизации для канала (например, eventChannel (подписчик, буфер)). Для получения дополнительной информации см. Документацию по redux-saga API.

Фундаментальная проблема, которую мы должны решить, заключается в том, что источники событий "основаны на push", тогда как саги "основаны на pull".

Если вы подписались на событие, подобное: replication.on('change', (info) => {}), тогда обратный вызов выполняется всякий раз, когда replication Источник событий решает выдвинуть новое значение.

С сагами нам нужно перевернуть контроль. Это сага, которая должна контролировать, когда она решает ответить на доступную новую информацию об изменениях. Другими словами, саге нужно вытащить новую информацию.

Ниже приведен пример одного из способов достижения этого:

function* startReplication(wrapper) {
  while (yield take(DATABASE_SET_CONFIGURATION)) {
    yield apply(wrapper, wrapper.connect);
    let replication = wrapper.replicate()
    if (replication)
      yield call(monitorChangeEvents, replication);
  }
}

function* monitorChangeEvents(replication) {
  const stream = createReadableStreamOfChanges(replication);

  while (true) {
    const info = yield stream.read(); // Blocks until the promise resolves
    yield put(replicationChange(info));
  }
}

// Returns a stream object that has read() method we can use to read new info.
// The read() method returns a Promise that will be resolved when info from a
// change event becomes available. This is what allows us to shift from working
// with a 'push-based' model to a 'pull-based' model.
function createReadableStreamOfChanges(replication) {
  let deferred;

  replication.on('change', info => {
    if (!deferred) return;
    deferred.resolve(info);
    deferred = null;
  });

  return {
    read() {
      if (deferred)
        return deferred.promise;

      deferred = {};
      deferred.promise = new Promise(resolve => deferred.resolve = resolve);
      return deferred.promise;
    }
  };
}

Здесь есть JSbin из приведенного выше примера: http://jsbin.com/cujudes/edit?js,console

Вам также следует взглянуть на ответ Ясин Элуафи на похожий вопрос: могу ли я использовать генераторы es6 системы redux-saga в качестве прослушивателя сообщений для веб-сокетов или источника событий?

Благодаря @Yassine Elouafi

Я создал короткую лицензированную MIT реализацию общих каналов как расширение redux-saga для языка TypeScript на основе решения @Yassine Elouafi.

// redux-saga/channels.ts
import { Saga } from 'redux-saga';
import { call, fork } from 'redux-saga/effects';

export interface IChannel<TMessage> {
    take(): Promise<TMessage>;
    put(message: TMessage): void;
}

export function* takeEvery<TMessage>(channel: IChannel<TMessage>, saga: Saga) {
    while (true) {
        const message: TMessage = yield call(channel.take);
        yield fork(saga, message);
    }
}

export function createChannel<TMessage>(): IChannel<TMessage> {
    const messageQueue: TMessage[] = [];
    const resolveQueue: ((message: TMessage) => void)[] = [];

    function put(message: TMessage): void {
        if (resolveQueue.length) {
            const nextResolve = resolveQueue.shift();
            nextResolve(message);
        } else {
            messageQueue.push(message);
        }
    }

    function take(): Promise<TMessage> {
        if (messageQueue.length) {
            return Promise.resolve(messageQueue.shift());
        } else {
            return new Promise((resolve: (message: TMessage) => void) => resolveQueue.push(resolve));
        }
    }

    return {
        take,
        put
    };
}

И пример использования, подобный конструкции redux-saga *takeEvery

// example-socket-action-binding.ts
import { put } from 'redux-saga/effects';
import {
    createChannel,
    takeEvery as takeEveryChannelMessage
} from './redux-saga/channels';

export function* socketBindActions(
    socket: SocketIOClient.Socket
) {
    const socketChannel = createSocketChannel(socket);
    yield* takeEveryChannelMessage(socketChannel, function* (action: IAction) {
        yield put(action);
    });
}

function createSocketChannel(socket: SocketIOClient.Socket) {
    const socketChannel = createChannel<IAction>();
    socket.on('action', (action: IAction) => socketChannel.put(action));
    return socketChannel;
}

У меня была та же проблема, также с использованием PouchDB, и я нашел ответы, которые были чрезвычайно полезными и интересными Однако есть много способов сделать то же самое в PouchDB, и я немного покопался и нашел другой подход, который, возможно, проще рассуждать.

Если вы не прикрепите слушателей к db.change запрос затем возвращает любые данные об изменениях непосредственно вызывающей стороне и добавление continuous: true Опция приведет к выдаче longpoll и не вернется, пока не произойдет какое-либо изменение. Таким образом, тот же результат может быть достигнут с помощью следующего

export function * monitorDbChanges() {
  var info = yield call([db, db.info]); // get reference to last change 
  let lastSeq = info.update_seq;

  while(true){
    try{
      var changes = yield call([db, db.changes], { since: lastSeq, continuous: true, include_docs: true, heartbeat: 20000 });
      if (changes){
        for(let i = 0; i < changes.results.length; i++){
          yield put({type: 'CHANGED_DOC', doc: changes.results[i].doc});
        }
        lastSeq = changes.last_seq;
      }
    }catch (error){
      yield put({type: 'monitor-changes-error', err: error})
    }
  }
}

Есть одна вещь, которую я не дошел до дна. Если я заменю for цикл с change.results.forEach((change)=>{...}) тогда я получаю неверную синтаксическую ошибку на yield, Я предполагаю, что это как-то связано с некоторым столкновением в использовании итераторов.

Другие вопросы по тегам