并发队列
特性
✨ 控制执行数量
通过 parallel
属性自定义执行数量。
- 当 parallel = 1 时,相当于串行执行,一个任务执行完了之后在进行下一个任务。
- 当 parallel > 1 时,相当于并行执行,多个任务同时进行。
✨ 失败重试
最大重试次数表示任务失败后,最多重试请求的次数,期间如果请求成功的话将会停止重试。 默认最大重试次数为 3 次,你可以通过 maxRetry
属性自定义设置。
使用
javascript
const queue = new Queue({
parallel: 3,
onCompleted: () => {
console.log('执行完毕!')
}
})
for(let i = 0; i < 20; i++) {
const task = new Promise((resolve, reject) => {
// TODO 具体的任务逻辑
})
queue.add({
task: task,
onSuccess: (data, stat) => {
console.log(`当前进度: ${stat.success / stat.count}`)
}
})
}
queue.start()
源码
javascript
const emptyFun = () => {}
const STATUS_READY = 'ready'
const STATUS_SUCCESS = 'success'
const STATUS_FAILED = 'failed'
/**
* 队列
*/
export default class Queue {
constructor (options) {
this.curidx = 0 // 当前序号
this.parallel = options.parallel || 1 // 最大执行数
this.count = 0 // 当前执行数
this.list = [] // 所有队列
this.maxRetry = options.retry || 3 // 失败重试次数
this.onCompleted = options.onCompleted || emptyFun // 队列执行完毕的回调函数
}
add ({ task, onSuccess, onFailed }) {
if (task instanceof Function) {
const index = this.list.length
this.list.push({
index: index,
task: task,
onSuccess,
onFailed,
retryCount: 0,
status: STATUS_READY
})
} else {
console.error('队列执行函数必须是 Function 实例')
}
}
start () {
if (this.isStop) {
return
}
const start = this.curidx
const end = start + this.parallel
this.list.slice(start, end).forEach(item => {
item.task().then(
(data) => this.resolve(item, data),
(error) => this.reject(item, error)
)
})
this.curidx += end - 1
this.count = this.parallel
}
next () {
this.curidx++
const item = this.list[this.curidx]
if (!item) {
return
}
this.count++
item.task().then(
(data) => this.resolve(item, data),
(error) => this.reject(item, error)
)
}
reset () {
this.curidx = 0
this.count = 0
this.list = []
this.isStop = false
}
stop () {
this.isStop = true
}
resolve (item, data) {
if (this.isStop) return
this.count--
item.status = STATUS_SUCCESS
if (item.onSuccess) {
const query = this.query()
item.onSuccess(data, {
index: item.index,
retryCount: item.retryCount,
...query,
})
}
if (this.curidx < this.list.length) {
if (this.count < this.parallel) {
this.next()
}
return
}
this.getStats()
}
reject (item, error) {
if (this.isStop) return
this.count--
item.status = STATUS_FAILED
if (item.retryCount < this.maxRetry) {
this.retry(item)
return
}
if (item.onFailed) {
const query = this.query()
item.onFailed(error, {
index: item.index,
retryCount: item.retryCount,
...query
})
}
if (this.curidx < this.list.length) {
if (this.count < this.parallel) {
this.next()
}
return
}
this.getStats()
}
retry (item) {
item.retryCount += 1
item.task().then(
(data) => this.resolve(item, data),
(error) => this.reject(item, error)
)
this.count++
}
query () {
const reducer = (key) => {
return this.list.reduce(function (accumulator, item) {
accumulator = item.status === key ? accumulator.concat(item) : accumulator
return accumulator
}, [])
}
let successQueue = reducer(STATUS_SUCCESS)
let failedQueue = reducer(STATUS_FAILED)
return {
success: successQueue.length,
failed: failedQueue.length,
count: this.list.length
}
}
getStats () {
const query = this.query()
if ((query.success + query.failed) === query.count) {
this.onCompleted()
this.reset()
}
}
}