# 并发请求ES7 实现
// ret 所有异步任务: 循环把任务添加到 then 微任务队列,更新到 ret
// 判断限制,e = p.then(executing 取出执行?) + Promise.race
// Promise.resolve().then 两次?一次执行、一次移除 + await Promise.race
async function asyncPool(poolLimit, array, iteratorFn) {
const ret = []; // 存储所有的异步任务
const executing = []; // 存储正在执行的异步任务
for (const item of array) {
// 调用iteratorFn函数创建异步任务
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p); // 保存新的异步任务
// 当poolLimit值小于或等于总任务个数时,进行并发控制
if (poolLimit <= array.length) {
// 当任务完成后,从正在执行的任务数组中移除已完成的任务
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e); // 保存正在执行的异步任务
if (executing.length >= poolLimit) {
await Promise.race(executing); // 等待较快的任务执行完成
}
}
}
return Promise.all(ret);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
- ES6 实现
function asyncPool(poolLimit, array, iteratorFn) {
let i = 0;
const ret = []; // 存储所有的异步任务
const executing = []; // 存储正在执行的异步任务
const enqueue = function () {
if (i === array.length) {
return Promise.resolve();
}
const item = array[i++]; // 获取新的任务项
const p = Promise.resolve().then(() => iteratorFn(item, array));
ret.push(p);
let r = Promise.resolve();
// 当poolLimit值小于或等于总任务个数时,进行并发控制
if (poolLimit <= array.length) {
// 当任务完成后,从正在执行的任务数组中移除已完成的任务
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
executing.push(e);
if (executing.length >= poolLimit) {
r = Promise.race(executing);
}
}
// 正在执行任务列表 中较快的任务执行完成之后,才会从array数组中获取新的待办任务
return r.then(() => enqueue());
};
return enqueue().then(() => Promise.all(ret));
}
// 两层 Promise,内层执行,外层负责 resolve 返回结果
Promise.all = function (iterators) {
return new Promise((resolve, reject) => {
if (!iterators || iterators.length === 0) {
resolve([]);
} else {
let count = 0; // 计数器,用于判断所有任务是否执行完成
let result = []; // 结果数组
for (let i = 0; i < iterators.length; i++) {
// 考虑到iterators[i]可能是普通对象,则统一包装为Promise对象
Promise.resolve(iterators[i]).then(
(data) => {
result[i] = data; // 按顺序保存对应的结果
// 当所有任务都执行完成后,再统一返回结果
if (++count === iterators.length) {
resolve(result);
}
},
(err) => {
reject(err); // 任何一个Promise对象执行失败,则调用reject()方法
return;
}
);
}
}
});
};
// 两层 Promise,内层负责 resolve 执行,
Promise.race = function (iterators) {
return new Promise((resolve, reject) => {
for (const iter of iterators) {
Promise.resolve(iter)
.then((res) => {
resolve(res);
})
.catch((e) => {
reject(e);
});
}
});
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69