Skip to content

并发队列

特性

控制执行数量

通过 parallel 属性自定义执行数量。

  • 当 parallel = 1 时,相当于串行执行,一个任务执行完了之后在进行下一个任务。
  • 当 parallel > 1 时,相当于并行执行,多个任务同时进行。

失败重试

最大重试次数表示任务失败后,最多重试请求的次数,期间如果请求成功的话将会停止重试。 默认最大重试次数为 3 次,你可以通过 maxRetry 属性自定义设置。

使用

javascript
const queue = new Queue({
  parallel: 3,
  onCompleted: () => {
    console.log('执行完毕!')
  }
})

for(let i = 0; i < 20; i++) {
  const task = new Promise((resolve, reject) => {
    // TODO 具体的任务逻辑
  })

  queue.add({
    task: task,
    onSuccess: (data, stat) => {
      console.log(`当前进度: ${stat.success / stat.count}`)
    }
  })
}

queue.start()

源码

javascript
const emptyFun = () => {}

const STATUS_READY = 'ready'
const STATUS_SUCCESS = 'success'
const STATUS_FAILED = 'failed'

/**
 * 队列
 */
export default class Queue {
  constructor (options) {
    this.curidx = 0 // 当前序号
    this.parallel = options.parallel || 1 // 最大执行数
    this.count = 0 // 当前执行数
    this.list = [] // 所有队列
    this.maxRetry = options.retry || 3 // 失败重试次数
    this.onCompleted = options.onCompleted || emptyFun // 队列执行完毕的回调函数
  }

  add ({ taskonSuccessonFailed }) {
    if (task instanceof Function) {
      const index = this.list.length
      this.list.push({
        index: index,
        task: task,
        onSuccess,
        onFailed,
        retryCount: 0,
        status: STATUS_READY
      })
    } else {
      console.error('队列执行函数必须是 Function 实例')
    }
  }

  start () {
    if (this.isStop) {
      return
    }

    const start = this.curidx
    const end = start + this.parallel

    this.list.slice(start, end).forEach(item => {
      item.task().then(
        (data=> this.resolve(item, data),
        (error=> this.reject(item, error)
      )
    })

    this.curidx += end - 1
    this.count = this.parallel
  }

  next () {
    this.curidx++
    const item = this.list[this.curidx]

    if (!item) {
      return
    }

    this.count++
    item.task().then(
      (data=> this.resolve(item, data),
      (error=> this.reject(item, error)
    )
  }

  reset () {
    this.curidx = 0
    this.count = 0
    this.list = []
    this.isStop = false
  }

  stop () {
    this.isStop = true
  }

  resolve (itemdata) {
    if (this.isStop) return

    this.count--

    item.status = STATUS_SUCCESS

    if (item.onSuccess) {
      const query = this.query()
      item.onSuccess(data, {
        index: item.index,
        retryCount: item.retryCount,
        ...query,
      })
    }

    if (this.curidx < this.list.length) {
      if (this.count < this.parallel) {
        this.next()
      }
      return
    }

    this.getStats()
  }

  reject (itemerror) {
    if (this.isStop) return

    this.count--

    item.status = STATUS_FAILED

    if (item.retryCount < this.maxRetry) {
      this.retry(item)
      return
    }

    if (item.onFailed) {
      const query = this.query()
      item.onFailed(error, {
        index: item.index,
        retryCount: item.retryCount,
        ...query
      })
    }

    if (this.curidx < this.list.length) {
      if (this.count < this.parallel) {
        this.next()
      }
      return
    }

    this.getStats()
  }

  retry (item) {
    item.retryCount += 1

    item.task().then(
      (data=> this.resolve(item, data),
      (error=> this.reject(item, error)
    )

    this.count++
  }

  query () {
    const reducer = (key=> {
      return this.list.reduce(function (accumulatoritem) {
        accumulator = item.status === key ? accumulator.concat(item) : accumulator
        return accumulator
      }, [])
    }

    let successQueue = reducer(STATUS_SUCCESS)
    let failedQueue = reducer(STATUS_FAILED)

    return {
      success: successQueue.length,
      failed: failedQueue.length,
      count: this.list.length
    }
  }

  getStats () {
    const query = this.query()

    if ((query.success + query.failed) === query.count) {
      this.onCompleted()
      this.reset()
    }
  }
}