函数响应式开发库 RxJS
今天我们来看一看 RxJS。大部分开发者都听说过函数式和响应式编程。那么 RxJS 究竟如何结合这两个概念?它究竟致力于解决什么问题呢?
ReactiveX 将观察者模式与迭代器模式和函数式编程与集合相结合,以满足对管理事件序列的理想方式的需求。
上述就是官网对于该库的解释。我们看到几个关键词: 观察者模式、迭代器模式、函数式编程以及事件序列。
通过上述描述,我们清楚的知道了它是为了解决事件序列的问题而存在的。那也就是说,处理事件越多,越复杂 RxJS 越有用。同时一旦涉及到事件序列,我们就不能忽略两个概念:时间和调度。
我们先不进入复杂的概念。我们先对比几个例子看看它是如何帮助我们管理事件序列的。
例子
浏览器节流点击
浏览器最普遍的事件就是浏览器侦听事件。我们现在要实现一个节流点击加数的函数:
// 常量 节流时间 1000 msconst RATE = 1000;
// 状态数据let count = 0;
// 最后一次的点击的时间let lastClick = Date.now() - RATE;
// 添加时间监听document.addEventListener('click', event => { // 判断时间 if (Date.now() - lastClick >= RATE) { count += event.clientX; console.log(count); // 更新点击时间 lastClick = Date.now(); }});
可以看到这里创建状态数据 count 以及交互事件所需要的额外状态 lastClick。如果使用 RXJs 的话:
import { fromEvent } from 'rxjs';import { throttleTime, map, scan } from 'rxjs/operators';
// 常量 节流时间 1000 msconst RATE = 1000;
// 常量 初始化数据const INITIAL_COUNT = 0;
// 添加监听fromEvent(document, 'click') .pipe( // 添加节流 throttleTime(RATE), // 转换数据 map(event => event.clientX), // 隔离状态,保留上一次回调的值。类似 reduce scan((count, clientX) => count + clientX, INITIAL_COUNT) ) // 订阅产生副作用 .subscribe(count => console.log(count));
这里我稍微修改了一下官网的例子(提取了 RATE 和 INITIAL_COUNT 常量),这里我们看到了 rxjs 的几个好处。
- 封装交互事件(减少额外状态)
- 隔离状态数据(减少出错)
- 易于修改(是否使用节流只需要删减一行代码)
进一步,我们可以使用 RXJs 来减少复杂事件产生的额外状态。
let messagePool = []ws.on('message', (message) => { messagePool.push(message)})
setInterval(() => { render(messagePool) messagePool = []}, 1000)
import { fromEvent } from 'rxjs';import { throttleTime, map, scan } from 'rxjs/operators';fromEvent(ws, 'message') .bufferTime(1000) .subscribe(messages => render(messages))
const code = [ "ArrowUp", "ArrowUp", "ArrowDown", "ArrowDown", "ArrowLeft", "ArrowRight", "ArrowLeft", "ArrowRight", "KeyB", "KeyA", "KeyB", "KeyA"]
Rx.Observable.fromEvent(document, 'keyup') .map(e => e.code) .bufferCount(12, 1) .subscribe(last12key => { if (_.isEqual(last12key, code)) { console.log('隐藏的彩蛋 \(^o^)/~') } }
概念
import { Observable, Observer } from 'rxjs'
export class HttpService { get<T>(url: string): Observable<T> { return new Observable((observer: Observer<T>) => { const controller = new AbortController() fetch(`${url}`, { method: 'GET', signal: controller.signal, }) .then((res) => res.json()) .then((res) => { observer.next(res) observer.complete() }) .catch((e) => { observer.error(e) })
return () => controller.abort() }) }}