RxJS & React-Observables 硬核入门指南

Redux-observable是一个基于rxjs的Redux中间件,允许开发者使用异步操作。它是redux-thunkredux-saga的替代品。

本文介绍了RxJS的基础知识,如何上手 redux-observable,以及一些实际的用例。但在此之前,我们需要理解观察者(Observer)模式。

Observer 观察者模式

在观察者模式中,一个名为“可观察对象(Observable)”或“Subject”的对象维护着一个名为“观察者(Observers)”的订阅者集合。当Subjects的状态发生变化时,它会通知所有的观察者。

在JavaScript中,最简单的例子是事件发射器(event emitters)事件处理程序(event handlers)

当您执行.addeventlistener时,你正在将一个观察者推入subject的观察者集合中。无论何时事件发生,subject都会通知所有观察者。

RxJS

根据官方网站,RxJSReactiveXJavaScript实现,ReactiveX是一个库,通过使用可观察序列来编写异步和基于事件的程序。

简单来说,RxJS是观察者模式的一个实现。它还扩展了Observer模式,提供了允许我们以声明方式组合observableSubjects的操作符。

观察者(Observers)、可观察对象(Observables)、操作符(Operators)和SubjectsRxJS的构建块。现在让我们更详细地看看每一个。

Observers

观察者(Observers)是可以订阅observableSubjects的对象。订阅之后,他们可以收到三种类型的通知: next、error和complete

代码语言:javascript
复制
interface Observer<T> {
    closed?: boolean;
    next: (value: T) => void;
    error: (err: any) => void;
    complete: () => void;
}

Observable推送nexterrorcomplete通知时,观察者的.next.error.complete方法就会被调用。

Observables

可观察对象是可以在一段时间内发出数据的对象。它可以用“大理石图”来表示。

其中,水平线表示时间,圆形节点表示Observable发出的数据,垂直线表示Observable已经成功完成。

Observables对象可能会遇到错误。X(叉)表示由Observable发出的错误。

completed”和“error”状态是最终状态。这意味着,observable在成功完成或遇到错误后不能发出任何数据。

创建一个 Observable

可观察对象(Observables)是通过新的Observable构造函数创建的,该构造函数只有一个参数——订阅函数。可观察对象Observables也可以使用一些操作符来创建,但我们稍后会在讨论操作符的时候讨论这个。

代码语言:javascript
复制
import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
// Subscribe function
});

订阅一个 Observable

可观察对象(Observables)可以通过其.subscribe方法和传递一个Observer来订阅。

代码语言:javascript
复制
observable.subscribe({
next: (x) => console.log(x),
error: (x) => console.log(x),
complete: () => console.log('completed');
});

执行 Observable

Observable被订阅时,我们传递给新Observable构造函数的subscribe函数就会被执行。

订阅函数接受一个参数—SubscriberSubscriber的结构类似于观察者,它有相同的3个方法:.next、.error和.complete

observable可以使用.next方法将数据推送到Observer。如果Observable成功完成了,它可以使用.complete方法通知观察者。如果Observable遇到了错误,它可以使用.error方法将错误推送给观察者。

代码语言:javascript
复制
// Create an Observable
const observable = new Observable(subscriber => {
subscriber.next('first data');
subscriber.next('second data');
setTimeout(() => {
subscriber.next('after 1 second - last data');
subscriber.complete();
subscriber.next('data after completion'); // <-- ignored
}, 1000);
subscriber.next('third data');
});

// Subscribe to the Observable
observable.subscribe({
next: (x) => console.log(x),
error: (x) => console.log(x),
complete: () => console.log('completed')
});

// Outputs:
//
// first data
// second data
// third data
// after 1 second - last data
// completed

Observables 是单播的

可观察对象Observables是单播的,这意味着可观察对象最多可以有一个订阅方。当一个观察者订阅了一个可观察对象,它会得到一个有自己执行路径的可观察对象的副本,使可观察对象成为单播的。

这就像在看YouTube视频。所有的观众观看相同的视频内容,但他们可以观看视频的不同部分

例如:让我们创建一个在10秒内发出1到10的Observable。然后,立即订阅一次Observable, 5秒后再订阅一次。

代码语言:javascript
复制
// Create an Observable that emits data every second for 10 seconds
const observable = new Observable(subscriber => {
let count = 1;
const interval = setInterval(() => {
subscriber.next(count++);

    if (count &gt; 10) {
        clearInterval(interval);   
    }
}, 1000);

});

// Subscribe to the Observable
observable.subscribe({
next: value => {
console.log(Observer 1: ${value});
}
});

// After 5 seconds subscribe again
setTimeout(() => {
observable.subscribe({
next: value => {
console.log(Observer 2: ${value});
}
});
}, 5000);

/* Output

Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5

Observer 2: 1
Observer 1: 6
Observer 2: 2
Observer 1: 7
Observer 2: 3
Observer 1: 8
Observer 2: 4
Observer 1: 9
Observer 2: 5
Observer 1: 10
Observer 2: 6
Observer 2: 7
Observer 2: 8
Observer 2: 9
Observer 2: 10

*/

在输出中,可以注意到第二个Observer从1开始打印,尽管它在5秒后订阅了。这是因为第二个观察者收到了一个可观察对象的副本,它的订阅函数被再次调用了。这说明了可观察对象的单播行为。

Subjects

Subject是可观察对象的一种特殊类型

创建一个 Subject

使用new Subject()构造函数创建Subject:

代码语言:javascript
复制
import { Subject } from 'rxjs';

// Create a subject
const subject = new Subject();

订阅一个 Subject

订阅Subject类似于订阅Observable:你使用.subscribe方法并传递一个Observer:

代码语言:javascript
复制
subject.subscribe({
next: (x) => console.log(x),
error: (x) => console.log(x),
complete: () => console.log("done")
});

执行一个 Subject

observable不同的是,Subject调用自己的.next、.error和.complete方法来将数据推送给观察者。

代码语言:javascript
复制
// Push data to all observers
subject.next('first data');

// Push error to all observers
subject.error('oops something went wrong');

// Complete
subject.complete('done');

Subjects 是多播的

Subjects是多播的:多个观察者共享相同的Subject及其执行路径。这意味着所有通知都会广播给所有观察者这就像看现场直播节目。所有观众都在同一时间观看相同内容的同一片段

示例:让我们创建一个Subject,在10秒内触发1到10。然后,立即订阅一次Observable, 5秒后再订阅一次。

代码语言:javascript
复制
// Create a subject
const subject = new Subject();

let count = 1;
const interval = setInterval(() => {
subscriber.next(count++);
if (count > 10) {
clearInterval(interval);
}
}, 1000);

// Subscribe to the subjects
subject.subscribe(data => {
console.log(Observer 1: ${data});
});

// After 5 seconds subscribe again
setTimeout(() => {
subject.subscribe(data => {
console.log(Observer 2: ${data});
});
}, 5000);

/* OUTPUT

Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 5
Observer 1: 6
Observer 2: 6
Observer 1: 7
Observer 2: 7
Observer 1: 8
Observer 2: 8
Observer 1: 9
Observer 2: 9
Observer 1: 10
Observer 2: 10

*/

在输出中,可以注意到第二个Observer5开始打印,而不是从1开始。这是因为第二个观察者共享同一个Subject。由于Subject在5秒后订阅,所以它已经完成了14的发送。这说明了Subject的多播行为。

Subjects 是 Observable 也是 Observer

Subjects.next、.error和.complete方法。这意味着他们遵循观察者的结构。因此,一个Subject也可以被用作一个观察者,并传递给observable或其他Subject.subscribe函数。

例如:让我们创建一个可观察对象Observable和一个Subject。然后使用Subject作为观察者订阅Observable。最后,订阅SubjectObservable发出的所有值都将被推送到Subject,而Subject将把接收到的值广播给所有的observer

代码语言:javascript
复制
// Create an Observable that emits data every second
const observable = new Observable(subscriber => {
let count = 1;
const interval = setInterval(() => {
subscriber.next(count++);

   if (count &gt; 5) {
        clearInterval(interval);   
   }

}, 1000);
});

// Create a subject
const subject = new Subject();

// Use the Subject as Observer and subscribe to the Observable
observable.subscribe(subject);

// Subscribe to the subject
subject.subscribe({
next: value => console.log(value)
});

/* Output

1
2
3
4
5

*/

操作符

操作符使RxJS变得有用。操作符是返回一个新的可观察对象的纯函数。可分为两大类:

  1. 创建操作符
  2. Pipeable操作符

创建操作符

创建操作符是可以创建一个新的Observable的函数。

例如:我们可以创建一个Observable,它使用from操作符来触发数组中的每个元素。

代码语言:javascript
复制
const observable = from([2, 30, 5, 22, 60, 1]);

observable.subscribe({
next: (value) => console.log("Received", value),
error: (err) => console.log(err),
complete: () => console.log("done")
});

/* OUTPUTS

Received 2
Received 30
Received 5
Received 22
Received 60
Received 1
done

*/

使用大理石图的Observable也是一样的。

Pipeable 操作符

可管道操作符(pipe-able operator)是将Observable作为输入,并返回一个行为经过修改新的Observable函数。

例子:让我们以from操作符创建的Observable为例。现在使用这个Observable,我们可以创建一个新的Observable,使用filter操作符只发出大于10的数字。

代码语言:javascript
复制
const greaterThanTen = observable.pipe(filter(x => x > 10));

greaterThanTen.subscribe(console.log, console.log, () => console.log("completed"));

// OUTPUT
// 11
// 12
// 13
// 14
// 15

同样的情况也可以用大理石图表示。

还有很多更有用的操作符。你可以在RxJS官方文档中看到完整的操作符列表和示例。

了解所有常用的操作符是至关重要的。下面是我经常使用的一些操作符:

代码语言:javascript
复制
1. mergeMap
2. switchMap
3. exhaustMap
4. map
5. catchError
6. startWith
7. delay
8. debounce
9. throttle
10.interval
11.from
12.of

Redux Observables

根据官方网站,Redux ObservablesRedux 基于rxjs的中间件。它能组合和取消异步操作,以创建副作用和更多功能。

Redux中,无论何时dispatch一个action,它都会运行所有的reducer函数,并返回一个新的状态state

Redux-observable获取所有这些已dispatch的action和新state,并从中创建两个可观察对象- actions可观察对象action和states可观察对象state

Actions可观察对象action将发出所有使用store.dispatch()分派的actions。可观察状态state将触发根reducer返回的所有新状态对象。

Epics

还有很多更有用的操作符。你可以在RxJS官方文档中看到完整的操作符列表和示例。

了解所有常用的操作符是至关重要的。下面是我经常使用的一些操作符:

代码语言:javascript
复制
1. mergeMap
2. switchMap
3. exhaustMap
4. map
5. catchError
6. startWith
7. delay
8. debounce
9. throttle
10.interval
11.from
12.of

Redux Observables

根据官方网站,Redux ObservablesRedux 基于rxjs的中间件。它能组合和取消异步操作,以创建副作用和更多功能。

Redux中,无论何时dispatch一个action,它都会运行所有的reducer函数,并返回一个新的状态state

Redux-observable获取所有这些已dispatch的action和新state,并从中创建两个可观察对象- actions可观察对象action和states可观察对象state

Actions可观察对象action将发出所有使用store.dispatch()分派的actions。可观察状态state将触发根reducer返回的所有新状态对象。

Epics

根据官方网站,Epics 是一个接受actions流并返回actions流的函数。actions进,actions出。

epic是可以用来订阅action和状态观察对象的函数。一旦订阅,epic将接收action流和状态流作为输入,并且必须返回action流作为输出。

代码语言:javascript
复制
const someEpic = (action, state) => {
return action$.pipe( // subscribe to actions observable
map(action => { // Receive every action, Actions In
return someOtherAction(); // return an action, Actions Out
})
)
}

重要的是要明白在Epic中接收到的所有action都已经通过reducers完成了。

Epic内部,我们可以使用任何RxJS的可观察模式,这就是为什么redux-observable很有用。

例如:我们可以使用.filter操作符创建一个新的中间可观察对象。类似地,我们可以创建任意数量的中间可观察对象,但最终可观察对象的最终输出必须是一个action,否则redux-observable将引发异常。

代码语言:javascript
复制
const sampleEpic = (action, state) => {
return action$.pipe(
filter(action => action.payload.age >= 18), // can create intermediate observables and streams
map(value => above18(value)) // where above18 is an action creator
);
}

使用store.dispatch()立即分派由Epics发出的每个action

即可上手

首先,让我们安装依赖项:

代码语言:javascript
复制
npm install --save rxjs redux-observable

创建一个名为epics单独文件夹来保存所有的epics。在epics文件夹中创建一个新的文件index.js,并使用combineEpics函数合并所有的epics来创建根epic。然后导出根epic

代码语言:javascript
复制
import { combineEpics } from 'redux-observable';
import { epic1 } from './epic1';
import { epic2 } from './epic2';

const epic1 = (action, state) => {
...
}

const epic2 = (action, state) => {
...
}

export default combineEpics(epic1, epic2);

使用createEpicMiddleware函数创建一个eoic中间件,并将其传递给createStore Redux函数。

代码语言:javascript
复制
import { createEpicMiddleware } from 'redux-observable';
import { createStore, applyMiddleware } from 'redux';
import rootEpic from './rootEpics';

const epicMiddleware = createEpicMiddlware();

const store = createStore(
rootReducer,
applyMiddleware(epicMiddlware)
);

最后,将epic根目录传递给epic中间件的.run方法:

代码语言:javascript
复制
epicMiddleware.run(rootEpic);

练习

RxJS有一个很大的学习曲线,并且Redux-observable的设置使已经很痛苦的Redux设置过程更加糟糕。所有这些都让Redux看起来有点过头了。但是这里有一些实际的用例可以改变您的想法。

在本节中,我将比较redux-observableredux-thunk,以展示redux-observable如何在复杂的用例中发挥作用。但我不讨厌redux- tank,我喜欢它,我每天都在使用它!

练习1:调用API

用例:调用API来获取文章的注释。当API调用正在进行时显示加载器,并处理API错误。

redux-thunk的实现是这样的:

代码语言:javascript
复制
function getComments(postId){
return (dispatch) => {
dispatch(getCommentsInProgress());
axios.get(/v1/api/posts/${postId}/comments).then(response => {
dispatch(getCommentsSuccess(response.data.comments));
}).catch(() => {
dispatch(getCommentsFailed());
});
}
}

这是绝对正确的。但是action creator是臃肿的。

我们可以写一个Epic来实现相同的redux-observable:

代码语言:javascript
复制
const getCommentsEpic = (action, state) => action$.pipe(
ofType('GET_COMMENTS'),
mergeMap((action) => from(axios.get(/v1/api/posts/${action.payload.postId}/comments)
.pipe(
map(response => getCommentsSuccess(response.data.comments)),
catchError(() => getCommentsFailed()),
startWith(getCommentsInProgress())
)
);

它可以让我们有一个像这样干净简单的action creator:

代码语言:javascript
复制
function getComments(postId) {
return {
type: 'GET_COMMENTS',
payload: {
postId
}
}
}

练习2:请求防抖

用例:每当文本字段的值发生变化时,通过调用API为文本字段提供自动补全。API调用应该在用户停止输入1秒后进行。

redux-thunk的实现是这样的:

代码语言:javascript
复制
let timeout;

function valueChanged(value) {
return dispatch => {
dispatch(loadSuggestionsInProgress());
dispatch({
type: 'VALUE_CHANGED',
payload: {
value
}
});

    // If changed again within 1 second, cancel the timeout
    timeout &amp;&amp; clearTimeout(timeout);

    // Make API Call after 1 second
    timeout = setTimeout(() =&gt; {
        axios.get(`/suggestions?q=${value}`)
            .then(response =&gt;
                  dispatch(loadSuggestionsSuccess(response.data.suggestions)))
            .catch(() =&gt; dispatch(loadSuggestionsFailed()))
    }, 1000, value);
}

}

它需要一个全局变量timeout。当我们开始使用全局变量时,我们的action creator就不再是纯函数了。对使用全局变量的action creator进行单元测试也变得很困难。

我们可以使用.debounce操作符在redux-observable中实现同样的功能:

代码语言:javascript
复制
const loadSuggestionsEpic = (action, state) => action$.pipe(
ofType('VALUE_CHANGED'),
debounce(1000),
mergeMap(action => from(axios.get(/suggestions?q=${action.payload.value})).pipe(
map(response => loadSuggestionsSuccess(response.data.suggestions)),
catchError(() => loadSuggestionsFailed())
)),
startWith(loadSuggestionsInProgress())
);

现在,我们的action creator可以被清理,更重要的是,它们可以再次成为纯函数。

代码语言:javascript
复制
function valueChanged(value) {
return {
type: 'VALUE_CHANGED',
payload: {
value
}
}
}

练习3:请求撤销

用例:继续前面的用例,假设用户在1秒钟内没有输入任何东西,并且我们进行了第一次API调用来获取建议。

假设API本身平均需要2-3秒才能返回结果。现在,如果用户在第一个API调用进行时输入了一些东西,1秒后,我们将创建第二个API。我们可以同时有两个API调用,它可以创建一个竞争条件。

为了避免这种情况,我们需要在进行第二个API调用之前取消第一个API调用。

redux-thunk的实现是这样的:

代码语言:javascript
复制
let timeout;
var cancelToken = axios.cancelToken;
let apiCall;

function valueChanged(value) {
return dispatch => {
dispatch(loadSuggestionsInProgress());
dispatch({
type: 'VALUE_CHANGED',
payload: {
value
}
});

    // If changed again within 1 second, cancel the timeout
    timeout &amp;&amp; clearTimeout(timeout);

    // Make API Call after 1 second
    timeout = setTimeout(() =&gt; {
        // Cancel the existing API
        apiCall &amp;&amp; apiCall.cancel(&#39;Operation cancelled&#39;);

        // Generate a new token
        apiCall = cancelToken.source();


        axios.get(`/suggestions?q=${value}`, {
            cancelToken: apiCall.token
        })
            .then(response =&gt; dispatch(loadSuggestionsSuccess(response.data.suggestions)))
            .catch(() =&gt; dispatch(loadSuggestionsFailed()))

    }, 1000, value);
}

}

现在,它需要另一个全局变量来存储Axios的取消令牌。更多的全局变量=更多的非纯函数!

要使用redux-observable实现相同的功能,我们只需要将.mergemap替换为.switchmap:

代码语言:javascript
复制
const loadSuggestionsEpic = (action, state) => action$.pipe(
ofType('VALUE_CHANGED'),
throttle(1000),
switchMap(action => from(axios.get(/suggestions?q=${action.payload.value})).pipe(
map(response => loadSuggestionsSuccess(response.data.suggestions)),
catchError(() => loadSuggestionsFailed())
)),
startWith(loadSuggestionsInProgress())
);

因为它不需要对我们的action creator进行任何更改,所以它们可以继续是纯函数。

类似地,在许多用例中,redux-observable确实很出色!例如,查询API,管理WebSocket连接,等等。

总结

如果你正在开发一个包含如此复杂的用例的Redux应用程序,强烈推荐使用Redux-observables。毕竟,使用它的好处直接与应用程序的复杂性成正比,这从上面提到的实际用例中是显而易见的。

我坚信使用正确的库集将帮助我们开发更干净和可维护的应用程序,并且从长远来看,使用它们的好处将超过缺点。