你了解发布-订阅模式吗?

什么是发布订阅模式

在软件架构中,发布/订阅(Publish–subscribe pattern)是一种消息范式,消息的发送者(称为发布者)不会将消息直接发送给特定的接收者(称为订阅者)。而是将发布的消息分为不同的类别,无需了解哪些订阅者(如果有的话)可能存在。同样的,订阅者可以表达对一个或多个类别的兴趣,只接收感兴趣的消息,无需了解哪些发布者(如果有的话)存在。

发布-订阅模式的发布和订阅都由统一的一个调度中心来处理,那也就是说这个模式呢是有三部分组成的

  • 发布者:将消息事件发布到调度中心
  • 订阅者: 把自己想关注的消息事件,注册到调度中心
  • 调度中心:处理事件注册与发布

有什么作用呢,就是在异步编程中实现更松的解耦

img

与观察者模式的区别❗

  • 实现方式:在观察者模式中,观察者(Observer)通常会直接订阅(Subscribe)主题(Subject)的更新,而主题则会在状态改变时直接调用观察者的方法。而在发布订阅模式中,发布者(Publisher)和订阅者(Subscriber)通常不会直接交互,而是通过一个调度中心(Message broker 或 Event bus)来进行通信。

  • 耦合性:观察者模式中的观察者和主题之间的耦合性相对较高,因为观察者需要直接订阅主题。而在发布订阅模式中,由于引入了调度中心,发布者和订阅者之间的耦合性较低。

  • 使用场景:观察者模式通常用于处理较为简单的一对多依赖关系,例如GUI中的事件处理等。而发布订阅模式则更适合处理复杂的异步处理和跨系统通信等场景,例如消息队列、事件驱动架构等。

观察者模式:直接和公司签
订阅者模式:签大厂的外包公司

JS实现发布订阅模式

先列举下需要实现发布-订阅模式的思路,目的呢就是实现三个方法,添加、删除、派发

🐾第一步: 实现发布订阅模式的第一步是创建一个可以存储事件及其对应回调函数的容器。在JavaScript中,我们可以使用一个对象来作为这个容器。每个事件都是对象的一个属性,其值是一个数组,用来存储所有订阅了该事件的回调函数。

🐾第二步: 第二步是添加一个名为subscribe的方法,该方法允许监听器订阅特定的事件。这个方法需要两个参数:一个是事件名,另一个是当事件被触发时应该调用的回调函数。

🐾第三步: 第三步是实现取消订阅的功能:添加一个名为unsubscribe的方法来实现这个功能。这个方法需要两个参数:一个是事件名,另一个是要取消订阅的回调函数。

🐾第四步: 第四步是实现事件发布的功能,我们可以添加一个名为publish的方法来实现这个功能。这个方法需要两个参数:一个是事件名,另一个是当事件被触发时应该传递给回调函数的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class PubSub {
constructor() {
this.events = {};
}

subscribe(event, callback) {
if (!this.events[event]) {
this.events[event] = [];
}
this.events[event].push(callback);
}

unsubscribe(event, callback) {
if (!this.events[event]) return new Error('事件无效');
this.events[event] = this.events[event].filter(cb => cb !== callback);
}

publish(event, data) {
if (!this.events[event]) return new Error('该事件未注册');
this.events[event].forEach(callback => callback(...data));
}
}

🐾使用示例: 至此我们已经完成了一个基本的发布订阅模式,下面展示一下它的具体使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 创建一个新的PubSub实例
const pubsub = new PubSub();

// 定义两个回调函数
function callback1(data) {
console.log('这里是第一个回调: ' + data);
}

function callback2(data) {
console.log('这里是第二个回调: ' + data);
}

// 订阅一个事件
pubsub.subscribe('myEvent', callback1);
pubsub.subscribe('myEvent', callback2);

// 输出两个回调函数的 console
pubsub.publish('myEvent', 'Hello, world!');

// 取消订阅 callback1
pubsub.unsubscribe('myEvent', callback1);

// callback1 的订阅被取消了,仅打印 callback2 的 console
pubsub.publish('myEvent', 'Hello, world!');

错误处理与功能优化

首先是类型判断与错误处理,我们应当检查参数的类型:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class PubSub {
constructor() {
this.events = {};
}

subscribe(event, callback) {
if (typeof event !== 'string') {
throw new Error('Event name must be a string');
}
if (typeof callback !== 'function') {
throw new Error('Callback must be a function');
}
if (!this.events[event]) {
this.events[event] = [];
}
this.events[event].push(callback);
}

unsubscribe(event, callback) {
if (typeof event !== 'string') {
throw new Error('Event name must be a string');
}
if (typeof callback !== 'function') {
throw new Error('Callback must be a function');
}
if (!this.events[event]) return;
this.events[event] = this.events[event].filter(cb => cb !== callback);
}

publish(event, data) {
if (typeof event !== 'string') {
throw new Error('Event name must be a string');
}
if (!this.events[event]) return;
this.events[event].forEach(callback => callback(data));
}
}

开发中我们会遇到一些一次性事件,不会被触发第二次了,我们可以加一个参数来省去手动清除事件的负担:

1
2
3
4
5
6
7
8
publish(event, data, once = false) {
if (typeof event !== 'string') {
throw new Error('Event name must be a string');
}
if (!this.events[event]) return;
this.events[event].forEach(callback => callback(data));
if(once) delete this.events[event];
}

如果一个回调函数在被调用时订阅了相同的事件,可能会导致无限循环。这是因为publish方法会立即调用所有的回调函数,而这些回调函数可能会改变监听器列表。
上面的代码并没有考虑这个问题,但测试后发现并不会发生无限循环的情况,这是什么原因呢?问题出在 for 和 forEach 中,forEach 方法在开始循环时就已经确定了循环的次数,所以,即使在回调函数中添加或删除了元素,也不会影响forEach的循环次数;for 循环会实时检查数组的长度,故而会出现上述的情况。


你了解发布-订阅模式吗?
http://example.com/2023/10/16/你了解发布订阅模式吗/
作者
weirdo
发布于
2023年10月16日
更新于
2023年10月16日
许可协议