JS 并发调度器

浏览器的并发连接数量是有限的,对于 chrome,一般是 6 个。那么对于瞬时大量请求发送的场景,如果不加管控,那么有些排队靠后的请求,很可能在等待到浏览器调度前,就耗尽自己的时间限制,从而自我取消。针对这个问题,可以设计实现一个并发调度器,将并发请求“缓存”起来,控制并发数,从而避免请求在浏览器排队时超时

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
const LimitConcurrentScheduler = function (concurrent = 5) {
this.concurrent = concurrent;
this.pendingRequests = 0;
this.queue = [];
};

LimitConcurrentScheduler.prototype.dequeue = function () {
while (this.pendingRequests < this.concurrent && this.queue.length) {
this.pendingRequests++;
const { task, resolve, reject } = this.queue.shift();
task()
.then(resolve, reject)
.finally(() => {
this.pendingRequests--;
this.dequeue();
});
}
};

LimitConcurrentScheduler.prototype.run = function (task) {
return new Promise((resolve, reject) => {
this.queue.push({
task,
resolve,
reject,
});
this.dequeue();
});
};

使用时,假设有一个异步任务 task:

1
2
3
4
5
// 全局常量
const limitConcurrentScheduler = new LimitConcurrentScheduler()

// 发送请求时
limitConcurrentScheduler.run(task)

当一个异步任务加入并发队列后,按照代码,其并不会直接触发,而是尝试执行一次出列。在出列时,按照并发数限制依次取出队列中的任务执行,当一个任务执行后,不论失败,均再次执行一次出列,并将标记当前并发数的变量 pendingRequests 减一。从而实现了并发调度

当一个同步任务加入并发队列后呢?毫无疑问上述的代码中, dequeue() 方法执行 task() 之后会报错,因为并没有返回 promise 对象,一般也就没有 then 方法可以被调用

可以用利用 Promise.resolve().then() 包裹一下,因为在 Promise.prototype.then() 中,当传入的第一个参数 onResolved 回调方法返回一个 promise 时,当前 then() 将返回这个 promise,而当返回的并不是 thenable 对象时,则以这个普通对象作为 value 兑现一个新的 promise 并返回。这样可以保证被包裹后的 task 代码块一定返回了一个 promise,从而有 then() 方法可以被调用,详解见:一步一步手写 Promise

此时代码变为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
const LimitConcurrentScheduler = function (concurrent = 5) {
this.concurrent = concurrent;
this.pendingRequests = 0;
this.queue = [];
};

LimitConcurrentScheduler.prototype.dequeue = function () {
while (this.pendingRequests < this.concurrent && this.queue.length) {
this.pendingRequests++;
const { task, resolve, reject } = this.queue.shift();
Promise.resolve()
.then(task) // * 注意此处不是 task() 而是 task
.then(resolve, reject)
.finally(() => {
this.pendingRequests--;
this.dequeue();
});
}
};

LimitConcurrentScheduler.prototype.run = function (task) {
return new Promise((resolve, reject) => {
this.queue.push({
task,
resolve,
reject,
});
this.dequeue();
});
};

封面来自 SAWARATSUKI kawaiilogos,该作者还没上传 JS 的 logo,所以用 nodeJS 的冒充一下