高阶 RxJs 映射运算符综合指南:switchMap、mergeMap、concatMap(和 excludeMap)【译】

原文: Comprehensive Guide to Higher-Order RxJs Mapping Operators: switchMap, mergeMap, concatMap (and exhaustMap)

我们每天发现的一些最常用的 RxJs 操作符是 RxJs 高阶映射操作符:switchMap、mergeMap、concatMap 和 exhaustMap。

例如,我们程序中的大多数网络调用都将使用这些运算符之一来完成,因此熟悉它们对于编写几乎任何响应式程序都是必不可少的。

知道在给定情况下使用哪个运算符(以及为什么)可能会有点令人困惑,我们经常想知道这些运算符是如何真正工作的以及为什么它们被这样命名。

这些操作符看起来似乎无关,但我们真的很想一口气学完它们,因为选择错误的操作符可能会意外地导致我们的程序出现一些微妙的问题。

为什么映射操作符有点混乱?

这是有原因的:为了理解这些操作符,我们首先需要了解每个操作符内部使用的 Observable 组合策略。

与其试图理解 switchMap 本身,不如先了解什么是 Observable 切换;我们需要先学习 Observable 连接等,而不是直接进入 concatMap。

这就是我们将在这篇文章中所做的,我们将按逻辑顺序学习 concat、merge、switch 和exhaust策略及其对应的映射运算符:concatMap、mergeMap、switchMap 和 exhaustMap。

我们将结合弹珠图和一些实际示例(包括运行代码)来解释这些概念。

最后,您将确切地知道这些映射运算符中的每一个是如何工作的,何时使用它们,为什么使用它们,以及它们命名的原因。

目录

在这篇文章中,我们将讨论以下主题:

  • 映射运算符
  • 什么是高阶可观测映射
  • 可观察到的连锁反应
  • RxJs concatMap 操作符
  • 可观察到的合并
  • RxJs mergeMap 运算符
  • 可观察到的切换
  • RxJs switchMap 运算符
  • Exhaust 策略
  • RxJs exhaust 运算符
  • 如何选择正确的映射操作符?
  • 运行 GitHub repo (包含代码示例)
  • 总结

请注意,这篇文章是我们正在进行的 RxJs 系列的一部分。因此,闲话少说,让我们开始我们的 RxJs 映射运算符深潜!

Rxjs映射运算符

让我们从头开始,介绍一下这些映射运算符的一般作用。

正如操作符的名称所暗示的那样,它们正在执行某种映射: 但究竟映射的是什么呢?首先让我们看一下 RxJs map 操作符的弹珠图:

使用 map 操作符,我们可以获取一个输入流(值为1、2、3) ,并从中创建一个派生的映射输出流(值为10、20、30)。

底部的输出流的值是通过获取输入流的值并应用它们得到的一个函数: 这个函数只是将这些值乘以10。

因此,映射运算符完全是关于映射输入可观测值的。下面是我们如何使用它来处理 HTTP 请求的一个例子:

1
2
3
4
5
6
7
8
9
10
11
const http$ : Observable<Course[]> = this.http.get('/api/courses');

http$
.pipe(
tap(() => console.log('HTTP request executed')),
map(res => Object.values(res['payload']))
)
.subscribe(
courses => console.log("courses", courses)
);

在这个例子中,我们创建了一个可观察的 HTTP 来进行后端调用,并且我们订阅了它。可观察对象将发出后端 HTTP 响应的值,这是一个 JSON 对象。

在这种情况下,HTTP 响应将数据包装在一个有效负载属性中,因此为了获取数据,我们应用了 RxJs 映射操作符。然后映射函数将映射 JSON 响应有效负载并提取有效负载属性的值。

现在我们已经回顾了基本映射是如何工作的,现在让我们讨论一下高阶映射。

什么是高阶可观测映射?

在高阶映射中,我们不是将类似1的普通值映射到类似10的另一个值,而是将一个值映射到一个可观察对象!

结果是一个高阶的 Observable。它只是一个Observable,和其他的一样,但它的值本身也是 Observable,我们可以分别订阅。

这听起来可能有点牵强,但实际上,这种类型的映射一直在发生。让我们给出这种映射的一个实际例子。举个例子,我们有一个响应式表单,它通过一个Observable 随时间发出有效的表单值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Component({
selector: 'course-dialog',
templateUrl: './course-dialog.component.html'
})
export class CourseDialogComponent implements AfterViewInit {

form: FormGroup;
course:Course;

@ViewChild('saveButton') saveButton: ElementRef;

constructor(
private fb: FormBuilder,
private dialogRef: MatDialogRef<CourseDialogComponent>,
@Inject(MAT_DIALOG_DATA) course:Course) {

this.course = course;

this.form = fb.group({
description: [course.description,
Validators.required],
category: [course.category, Validators.required],
releasedAt: [moment(), Validators.required],
longDescription: [course.longDescription,
Validators.required]
});
}
}

Reactive Form 提供了一个可观察的对象 this.Form.valueChanges,它在用户与表单交互时发出最新的表单值。这将是我们可观察到的源头。

我们想要做的是在这些值随着时间的推移发出时至少保存其中的一些值,以实现表单草案的预保存特性。这样,当用户填写表单时,数据将逐步保存,从而避免由于意外重新加载而丢失整个表单数据。

为什么是高阶可观察对象?

为了实现表单草案保存功能,我们需要获取表单值,然后创建第二个可观察的 HTTP 对象来执行后端保存,然后订阅它。

我们可以尝试手动完成所有这些操作,但这样我们就会陷入嵌套的订阅反模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

this.form.valueChanges
.subscribe(
formValue => {

const httpPost$ =
this.http.put(`/api/course/${courseId}`, formValue);

httpPost$.subscribe(
res => ... handle successful save ...
err => ... handle save error ...
);

}
);

正如我们所看到的,这将导致我们的代码很快地嵌套在多个级别,这是我们在最初使用 RxJs 时试图避免的问题之一。

让我们把这个新的 httpPost$ Observable 称为内部可观察对象,因为它是在内部嵌套代码块中创建的。

避免嵌套订阅

我们希望以一种更方便的方式完成所有这些过程: 我们希望获取表单值,并将其映射到一个保存的 Observable。这将有效地创建一个更高阶的可观察对象,其中每个值对应一个保存请求。

然后,我们希望透明地订阅这些网络观察数据,并且一次性直接接收网络响应,以避免任何嵌套。

如果我们有某种更高阶的 RxJs 映射运算符,我们就可以做到这一切!那为什么我们需要四个不同的操作员呢?

为了理解这一点,想象一下如果 valueChanges 连续发出多个表单值,并且保存操作需要一些时间才能完成,会发生什么情况:

  • 我们是否应该等待一个保存请求完成后再进行另一个保存?

  • 我们应该同时进行多次保存吗?

  • 我们应该取消正在进行的保存,重新开始一个新的吗?

  • 如果其中一个已经在进行中,我们应该忽略新的保存尝试吗?

在研究这些用例中的每一个之前,让我们回到上面的嵌套订阅代码。

在嵌套订阅示例中,我们实际上是并行地触发保存操作,这不是我们想要的,因为不能强有力地保证后端将按顺序处理保存操作,并且最后一个有效的表单值确实是存储在后端上的表单值。

让我们看看如何确保只有在前一次保存完成后才执行保存请求。

理解 Observable concat()

为了实现顺序保存,我们将引入可观察对象连接的新概念。在这个代码示例中,我们使用 concat () RxJs 函数将两个可观察对象连接起来:

1
2
3
4
5
6
7
8
9

const series1$ = of('a', 'b');

const series2$ = of('x', 'y');

const result$ = concat(series1$, series2$);

result$.subscribe(console.log);

在使用 of 创建函数创建了两个 Observables series1$ 和 series2$ 之后,我们创建了第三个 result$ Observable,它是串联 series1$ 和 series2$ 的结果。

下面是这个程序的控制台输出,显示了结果 Observable 发出的值:

1
2
3
4
5
6

a
b
x
y

正如我们所看到的,这些值是将 series1$ 的值与 series2$ 连接在一起的结果。但这里有一个问题: 这只有在这些可观测数据完成的情况下才有效! !

of函数的作用是: 创建一个可观察的对象,发出传递给of()的值,然后在发出所有值之后完成 Observable。

Observable concat 的弹珠图

为了真正理解发生了什么,我们需要看看 Observable concat 的弹珠图:

你注意到第一个可观察值 b 后面的竖条了吗?这标志着第一个值为 a 和 b (series1$)的可观察对象完成的时间点。

让我们按照时间线一步一步来分析这里发生了什么:

  • 两个可观察对象 series1$ 和 series2$ 被传入 concat() 函数

  • 然后 concat ()将订阅第一个可观察对象 series1$,但不订阅第二个可观察对象 series2$(这对于理解连接非常关键)

  • series1$ 发出值 a,该值立即反映在输出 result$ Observable 中

  • series2$ Observable 还没有发出值,因为它还没有被订阅

  • 然后 series1$ 将发出 b 值,该值将反映在输出中

  • 然后,series1$ 将完成,并且只有在完成之后,concat() 才会订阅 series2$

  • 然后,series2$ 值将开始反映在输出中,直到 然后,series2$ 完成

  • 请注意,我们可以向 concat() 传递任意多的可观测数据,而不仅仅是像本例中这样的两个

关于Observable concat 的关键点

正如我们所看到的,Observable 连接就是关于 Observable 的完成(complete)! 我们取第一个 Observable 并使用它的值,等待它完成,然后我们使用下一个 Observable,依此类推,直到所有 Observable 完成。

回到我们的高阶 Observable 映射示例,让我们看看串联的概念如何帮助我们。

使用 Observable 的连接实现顺序保存

正如我们所见,为了确保我们的表单值按顺序保存,我们需要获取每个表单值并将其映射到 httpPost$ Observable。

然后我们需要订阅它,但我们希望在订阅下一个 httpPost$ Observable 之前完成保存。

然后我们将订阅每个 httpPost$ 并按顺序处理每个请求的结果。 最后,我们需要的是一个混合了以下内容的运算符:

  • 一个高阶映射操作(获取表单值并将其转换为 httpPost$ Observable)

  • 使用 concat() 操作,将多个 httpPost$ Observables 连接在一起以确保在前一个正在进行的保存首先完成之前不会进行下一个 HTTP 保存。

RxJs concatMap 操作符

代码如下:

1
2
3
4
5
6
7
8
9
this.form.valueChanges
.pipe(
concatMap(formValue => this.http.put(`/api/course/${courseId}`,
formValue))
)
.subscribe(
saveResult => ... handle successful save ...,
err => ... handle save error ...
);

正如我们所见,使用像 concatMap 这样的高阶映射运算符的第一个好处是现在我们不再有嵌套订阅。

通过使用 concatMap,现在所有表单值都将按顺序发送到后端,如 Chrome DevTools Network 选项卡中所示:

分解 concatMap 网络日志图

正如我们所看到的,一个保存 HTTP 请求只有在前一次保存完成后才会启动。以下是 concatMap 操作符如何确保请求始终按顺序发生:

  • concatMap 正在获取每个表单值并将其转换为保存的 HTTP Observable,称为内部 Observable

  • concatMap 然后订阅内部 Observable 并将其输出发送到结果 Observable

  • 第二个表单值可能比在后端保存前一个表单值更快

  • 如果发生这种情况,新的表单值将不会立即映射到 HTTP 请求

  • 相反, concatMap 将等待先前的 HTTP Observable 完成,然后将新值映射到 HTTP Observable,订阅它并因此触发下一次保存

注意,这里的代码只是保存表单草稿值的实现的基础。您可以将它与其他操作符组合起来,例如,只保存有效的表单值,并使用防抖、节流操作,以确保它们不会太频繁地发生。

可观察对象的合并(merge)

将 Observable 串联应用于一系列 HTTP 保存操作似乎是确保保存按预期顺序发生的好方法。

但是在其他情况下,我们希望并行运行,而不需要等待前一个内部 Observable 完成。

为此,我们有合并 Observable 组合策略! 与 concat 不同,merge 不会在订阅下一个 Observable 之前等待 Observable 完成。

相反,merge 同时订阅每个合并的 Observable,然后随着多个值随着时间的推移到达,它将每个源 Observable 的值输出到组合结果 Observable。

merge 操作的实际例子

为了清楚地表明 merge 不依赖于 observable 的完成(complete),让我们合并两个永远不会完成的可观测数据,因为它们是 interval Observables:

1
2
3
4
5
6
7
const series1$ = interval(1000).pipe(map(val => val*10));

const series2$ = interval(1000).pipe(map(val => val*100));

const result$ = merge(series1$, series2$);

result$.subscribe(console.log);

使用 interval() 创建的 Observable 将每隔一秒发出值 0、1、2 等,并且永远不会完成。

请注意,我们对这些区间 Observable 应用了几个 map 运算符,只是为了更容易在控制台输出中区分它们。

以下是控制台中可见的前几个值:

1
2
3
4
5
6
7
8
0
0
10
100
20
200
30
300

Merging and Observable Completion

正如我们所看到的,合并的源可观测数据的值在发出时立即显示在可观测结果中。如果其中一个合并的可观测数据完成,那么当其他可观测数据随着时间的推移到达时,merge 将继续发出它们的值。

Observable merge 的弹珠图

正如我们所见,合并的源 Observables 的值立即显示在输出中。 直到所有合并的 Observable 完成后,结果 Observable 才会完成。

现在我们了解了合并策略,让我们看看它如何在高阶 Observable 映射的上下文中使用。

RxJs mergeMap 运算符

如果我们将 merge 策略与高阶可观测映射的概念结合起来,我们就得到了 RxJs mergeMap 操作符。让我们看看这个操作符的弹珠图:

下面是 mergeMap 操作符的工作原理:

  • 与 concatMap 的情况一样,源可观测的每个值仍然被映射到内部的可观测值中

  • 与 concatMap 一样,这个内部观察对象也是由 mergeMap 订阅的

  • 当内部观察值发出新值时,它们会立即反映在输出观察值中

  • 但是与 concatMap 不同的是,在 mergeMap 的情况下,我们不必等待先前的内部可观测值完成,然后再触发下一个内部可观测值

  • 这意味着使用 mergeMap (与 concatMap 不同) ,我们可以让多个内部可观测数据随时间重叠,并行发出值,就像我们在图片中看到的用红色突出显示的那样

检查 mergeMap 网络日志

回到我们之前的表单草案保存示例,很明显,在这种情况下,我们需要的是 concatMap,而不是 mergeMap,因为我们不希望保存并行发生。

让我们看看如果我们不小心选择了 mergeMap 会发生什么:

1
2
3
4
5
6
7
8
9
10
11

this.form.valueChanges
.pipe(
mergeMap(formValue =>
this.http.put(`/api/course/${courseId}`,
formValue))
)
.subscribe(
saveResult => ... handle successful save ...,
err => ... handle save error ...
);

现在假设用户与表单进行交互并开始快速地输入数据。在这种情况下,我们现在可以在网络日志中看到并行运行的多个保存请求:

正如我们所看到的,请求是并行发生的,在这种情况下是一个错误!在重负载下,这些请求可能会被处理得无序。

可观察对象的切换

现在我们来谈谈另一个 Observable 组合策略:切换。 切换的概念更接近于合并而不是串联,因为我们不等待任何 Observable 终止。

但是在切换时,与合并不同,如果一个新的 Observable 开始发出值,我们将在订阅新的 Observable 之前取消订阅之前的 Observable。

Observable 切换就是为了确保未使用的 Observables 的取消订阅逻辑被触发,从而可以释放资源!

Observable Switch 的弹珠图

注意对角线,这些不是偶然的! 在切换策略的情况下,在图中表示高阶 Observable 很重要,它是图像的顶行。

这个高阶 Observable 发出的值本身就是 Observable。

对角线从高阶 Observable 顶线分叉的那一刻,是 Observable 值被 switch 发出和订阅的那一刻。

分解 switch 网络日志图

下面是这个图表中的情况:

  • 高阶 Observable 发出它的第一个内部 Observable (a-b-c-d),它被订阅(通过 switch 策略实现)第一个内部 Observable (a-b-c-d) 发出值 a 和 b,它们立即反映在输出中

  • 但随后第二个内部 Observable (e-f-g) 被发射,这触发了第一个内部 Observable (a-b-c-d) 的取消订阅,这是切换的关键部分

  • 然后第二个内部 Observable (e-f-g) 开始发出新值,这些值反映在输出中

  • 但请注意,第一个内部 Observable (a-b-c-d) 同时仍在发出新值 c 和 d

  • 然而,这些后来的值没有反映在输出中,那是因为我们同时取消了第一个内部 Observable (a-b-c-d) 的订阅

我们现在可以理解为什么要用这种不同寻常的方式,用对角线来绘制图表了: 因为我们需要直观地表示每个内部可观测数据是从哪里订阅(或者取消订阅)的,这发生在对角线从高阶可观测数据源分叉出来的点上。

RxJs switchMap 运算符

然后我们使用 switch 策略并将其应用于高阶映射。假设我们有一个发出值1、3和5的普通输入流。

然后我们将把每个值映射到一个可观测值,就像我们在连接映射和合并映射的情况下所做的那样,并获得一个更高阶的可观测值。

如果我们现在在发出的内部 Observable 之间切换,而不是连接或合并它们,我们最终会得到 switchMap 运算符:

分解 Observable switchMap 的弹珠图

下面是这个操作符的工作原理:

  • 可观测的源发射值1、3和5

  • 然后通过应用映射函数将这些值转换为 Observables

  • 当内部观察值发出一个值时,该值将立即反映在输出中

  • 但是如果在前一个 Observable 有机会完成之前发出了像 5 这样的新值,则前一个内部 Observable (30-30-30) 将被取消订阅,并且它的值将不再反映在输出中

  • 注意上图中红色的 30-30-30 内部 Observable:最后 30 个值没有发出,因为 30-30-30 内部 Observable 被取消订阅

Search TypeAhead-switchMap 运算符示例

switchMap 的一个非常常见的用例是 search Typehead。首先,让我们定义我们的源 Observer,它的值本身将触发搜索请求。

这个源 Observable 将发出值,这些值是用户在输入中键入的搜索文本:

1
2
3
4
5
6
7
const searchText$: Observable<string> = 
fromEvent<any>(this.input.nativeElement, 'keyup')
.pipe(
map(event => event.target.value),
startWith('')
)
.subscribe(console.log);

此源 Observable 链接到用户键入其搜索的输入文本字段。 当用户输入单词“Hello World”作为搜索时,这些是 searchText$ 发出的值:

1
2
3
4
5
6
7
8
9
10
11
12
13
H
H
He
Hel
Hell
Hello
Hello
Hello W
Hello W
Hello Wo
Hello Wor
Hello Worl
Hello World

使用防抖并删除重复项

注意重复的值,或者是由于使用了两个单词之间的空格,或者是使用 Shift 键将字母 H 和 W 大写。

为了避免将所有这些值作为单独的搜索请求发送到后端,让我们通过使用 deounceTime 操作符等待用户输入稳定下来:

1
2
3
4
5
6
7
8
const searchText$: Observable<string> = 
fromEvent<any>(this.input.nativeElement, 'keyup')
.pipe(
map(event => event.target.value),
startWith(''),
debounceTime(400)
)
.subscribe(console.log);

通过使用这个操作符,如果用户以正常速度输入,我们现在在 searchText $的输出中只有一个值:

1
Hello World

这已经比我们以前的要好得多了,现在只有当它的稳定值至少为400毫秒时才会发出一个值!

但是,如果用户在思考搜索时输入速度很慢,以至于两个值之间的时间间隔超过400毫秒,那么搜索流可能是这样的:

1
2
3
He
Hell
Hello World

此外,用户可以键入一个值,按退格键并再次键入,这可能导致重复的搜索值。我们可以通过添加独特的 distinctUntilChanged 操作符来防止重复搜索的发生。

取消过时搜索

但更重要的是,我们需要一种方法来取消以前的搜索,作为一个新的搜索开始。

这里我们要做的是将每个搜索字符串转换为一个后端搜索请求并订阅它,然后在两个连续的搜索请求之间应用切换策略,如果触发一个新的搜索,则前一个搜索将被取消。

而这正是 switchMap 操作符要做的事情!下面是使用它的 Typehead 逻辑的最终实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const searchText$: Observable<string> = 
fromEvent<any>(this.input.nativeElement, 'keyup')
.pipe(
map(event => event.target.value),
startWith(''),
debounceTime(400),
distinctUntilChanged()
);

const lessons$: Observable<Lesson[]> = searchText$
.pipe(
switchMap(search => this.loadLessons(search))
)
.subscribe();

function loadLessons(search:string): Observable<Lesson[]> {

const params = new HttpParams().set('search', search);

return this.http.get(`/api/lessons/${coursesId}`, {params});
}

现在让我们看看 switchMap 操作符的运行情况!如果用户在搜索栏上输入,然后犹豫了一下,输入了其他内容,我们通常可以在网络日志中看到:

正如我们所看到的,以前的一些搜索已经被取消,因为他们正在进行,这是可怕的,因为这将释放服务器资源,然后可以用于其他事情。

The Exhaust Strategy

switchMap 操作符对于 typehead 场景是理想的,但是在其他情况下,我们想要做的是忽略源可观察值中的新值,直到前面的值被完全处理。

例如,假设我们正在触发一个后端保存请求,以响应保存按钮中的单击。我们可以首先尝试使用 concatMap 操作符来实现这一点,以确保按顺序执行保存操作:

1
2
3
4
5
fromEvent(this.saveButton.nativeElement, 'click')
.pipe(
concatMap(() => this.saveCourse(this.form.value))
)
.subscribe();

这可以确保按顺序保存,但是如果用户多次单击保存按钮,现在会发生什么情况?下面是我们将在网络日志中看到的内容:

正如我们所看到的,每次点击都会触发自己的保存: 如果我们点击20次,就会有20次保存!在这种情况下,我们希望不仅仅是确保保存按顺序发生。

我们也希望能够忽略一个点击,但只有当一个保存是正在进行。exhaust 组合策略将允许我们这样做。

exhaust 的弹珠图

为了理解 exhaust 是如何工作的,让我们看看这个弹珠图:

就像以前一样,我们在第一行有一个更高阶的 Observable,它的值本身就是 Observable,从第一行分叉出来。这是这张图中发生的事情:

  • 就像 switch 的情况一样,exhaust 订阅第一个内部 Observable (a-b-c)

  • 像往常一样,值 a、b 和 c 会立即反映在输出中

  • 然后发出第二个内部 Observable (d-e-f),而第一个 Observable (a-b-c) 仍在进行中

  • 第二个 Observable 被 exhaust 策略丢弃,并且不会被订阅(这是 exhaust 的关键部分)

  • 只有在第一个可观测数据(a-b-c)完成后,exhaust 才会订阅新的可观测数据

  • 当第三个 Observable (g-h-i) 发出时,第一个 Observable (a-b-c) 已经完成,所以第三个 Observable 不会被丢弃,会被订阅

  • 然后,第三个 Observable 的值 g-h-i 将显示在结果 Observable 的输出中,与输出中不存在的值 d-e-f 不同

就像 concat、merge 和 switch 的情况一样,我们现在可以在高阶映射的上下文中应用 exhaust 策略。

RxJs exhaustMap 运算符

现在让我们来看一下 exaustMap 操作符的弹珠图。让我们记住,与前面图表的顶行不同,源可观测值1-3-5发出的值不是可观测值。

相反,这些值可以是鼠标点击:

下面是 exhaustMap 图表的情况:

  • 发出值 1,并创建内部 Observable 10-10-10

  • Observable 10-10-10 发出所有值并在源 Observable 中发出值 3 之前完成,因此所有 10-10-10 值在输出中发出

  • 在输入中发出一个新值 3,触发一个新的 30-30-30 内部 Observable

  • 但是现在,虽然 30-30-30 仍在运行,但我们在源 Observable 中得到了一个新值 5

  • 这个值 5 被 exhaust 策略丢弃,这意味着从未创建 50-50-50 Observable,因此 50-50-50 值从未出现在输出中

A Practical Example for exhaustMap

现在让我们将这个新的 exhaustMap 操作符应用到我们的保存按钮场景:

1
2
3
4
5
fromEvent(this.saveButton.nativeElement, 'click')
.pipe(
exhaustMap(() => this.saveCourse(this.form.value))
)
.subscribe();

正如我们所看到的,当保存请求被忽略时,我们所做的单击仍然在进行,正如预期的那样!

注意,如果我们连续点击20次,最终正在进行的保存请求会结束,然后第二个保存请求会开始。

如何选择正确的映射操作符?

concatMap, mergeMap, switchMap and exhaustMap 都是相似的,因为它们都是高阶映射操作符。

但它在许多微妙的方面也是如此不同,以至于没有一个操作符可以被安全地指定为默认值。

相反,我们可以简单地根据用例选择合适的操作符:

  • 如果我们需要在等待完成时按顺序执行操作,那么 concatMap 是正确的选择

  • 对于并行处理,mergeMap 是最好的选择

  • 如果我们需要取消逻辑,可以使用 switchMap

  • 在当前的 Observables 还在运行的时候忽略了这一点,exhaustMap 就是这样做的

Running GitHub repo (with code samples)

如果你想尝试一下这篇文章中的例子,这里有一个仓库,里面包含了这篇文章的运行代码。

这个存储库包括一个小的 HTTP 后端,它将帮助在一个更现实的场景中测试 RxJ 映射操作符,并包括运行例子,比如表单草稿的预先保存、typeahead、主题和用 Reactive 风格编写的组件的例子:

结论

正如我们所看到的,RxJs 高阶映射运算符对于在响应式编程中执行一些非常常见的操作(如网络调用)是必不可少的。

为了真正理解这些映射操作符及其名称,我们需要首先关注理解底层的可观察组合策略 concat、 merge、 switch 和 exhaust。

我们还需要认识到存在一个更高阶的映射操作,其中值被转换为分离的可观测值,并且这些可观测值被映射操作本身以隐藏的方式订阅。

选择正确的运算符就是选择正确的内部 Observable 组合策略。 选择错误的运算符通常不会导致程序立即损坏,但随着时间的推移可能会导致一些难以解决的问题。