Как закрыть eventSource с редуксным действием в саге?
Я хочу запускать и останавливать eventSource кнопками. Кнопка запуска с действием CONSTANTS.FETCHING_STARTED и кнопка остановки с действием CONSTANTS.FETCHING_STOPPED.
https://github.com/redux-saga/redux-saga/issues/940 - у меня не работает.
Я пытался остановиться и начать в одной функции, но это не сработало.
Это моя сага.
import {
call,
take,
takeLatest,
put,
race,
cancelled
} from "redux-saga/effects";
import { eventChannel, END } from "redux-saga";
import * as CONSTANTS from "../constants";
export function* innerSSE(eventSrc) {
const eCh = eventChannel(emitter => {
eventSrc.onmessage = msg => {
emitter(msg);
};
eventSrc.onerror = () => {
emitter(END);
};
return () => {
eventSrc.close();
};
});
try {
yield takeLatest(eCh);
} finally {
if (yield cancelled()) eCh.close();
}
}
function* start() {
const eventSrc = new EventSource(CONSTANTS.STREAM_URL);
const chan = yield call(innerSSE, eventSrc);
while (true) {
const msg = yield take(chan);
yield put({
type: CONSTANTS.FETCHING_IN_PROGRESS,
payload: JSON.parse(msg.data)
});
}
}
function* stop(eventSrc) {
// innerSSE.close()
}
export function* sseSaga() {
while (true) {
try {
const started = yield take(CONSTANTS.FETCHING_STARTED);
yield call(start, started);
} catch (error) {
console.log(error);
}
}
}
0 ответов
Попробуй это
function* main(){
yield takeLatest(CONSTANTS.FETCHING_STARTED, function*(){
// create event channel
const chan = eventChannel(()=>{ /* ... */})
yield takeEvery(chan, function*(){
// process event
})
try{
// waiting for FETCHING_STOPPED
yield take(CONSTANTS.FETCHING_STOPPED)
}finally{
// or close by another FETCHING_STARTED
chan.close()
}
})
}
function eventSource(eventSource, events = []) {
const subs = emitter => {
eventSource.onmessage = (msg) => {
//return emitter(msg);
};
eventSource.onerror = () => {
return emitter(END)
};
console.log(events);
eventSource.addEventListener('disconnect', (event) => emitter({type: 'DISCONNECT', payload: JSON.parse(event.data)}), false);
events.map((eventType, index) => {
console.log(eventType);
return eventSource.addEventListener(eventType, (event) => emitter({type: eventType, payload: JSON.parse(event.data)}), false)
});
return () => {
return eventSource.close();
}
};
return eventChannel(subs);
}
function* itemsEventSource(formData){
let result;
try {
const params = new URLSearchParams();
params.append('clientId', formData.clientID);
params.append('access_token', formData.token);
result = new EventSource([SOCKET, 'notify'].join("/") + "?" + params.toString());
const channel = yield call(eventSource, result, [CONNECTED, DISCONNECTED, FETCH_SUCCESS, 'NOTIFY']);
while (true) {
const action = yield take(channel);
yield put(action);
}
} catch (error) {
yield put({type: CONNECT_FAILED, error: error.message});
}
return result;}
function* watchItemsEventSource() {
while (true) {
try {
const {payload} = yield take(CONNECT);
const [connect, cancel] = yield race(
[
call(itemsEventSource, payload),
take(CONNECT_FAILED)
]);
console.log(connect, cancel, payload);
} catch (error) {
yield put({type: ERROR, error: error.message});
}
}}
export function *moduleASaga(){
yield all({[ fork(watchItemsEventSource)]}) }