同步执行类RunnerAsync

支持返回超时检测,系统中断检测

错误常量定义

//超时错误
var ErrTimeout = errors.New("received timeout")

//操作系统系统中断错误
var ErrInterrupt = errors.New("received interrupt")

实现代码如下


package task import ( "os" "time" "os/signal" )

//同步执行任务 type RunnerAsync struct { //操作系统的信号检测 interrupt chan os.Signal //记录执行完成的状态 complete chan error //超时检测 timeout <-chan time.Time //保存所有要执行的任务,顺序执行 tasks []func(id int) } //new一个RunnerAsync对象 func NewRunnerAsync(d time.Duration) *RunnerAsync { return &RunnerAsync{ interrupt: make(chan os.Signal, 1), complete: make(chan error), timeout: time.After(d), } } //添加一个任务 func (this *RunnerAsync) Add(tasks ...func(id int)) { this.tasks = append(this.tasks, tasks...) } //启动RunnerAsync,监听错误信息 func (this *RunnerAsync) Start() error { //接收操作系统信号 signal.Notify(this.interrupt, os.Interrupt) //执行任务 go func() { this.complete <- this.Run() }() select { //返回执行结果 case err := <-this.complete: return err //超时返回 case <-this.timeout: return ErrTimeout } } //顺序执行所有的任务 func (this *RunnerAsync) Run() error { for id, task := range this.tasks { if this.gotInterrupt() { return ErrInterrupt } //执行任务 task(id) } return nil } //判断是否接收到操作系统中断信号 func (this *RunnerAsync) gotInterrupt() bool { select { case <-this.interrupt: //停止接收别的信号 signal.Stop(this.interrupt) return true //正常执行 default: return false } }

 

使用方法    

Add添加一个任务,任务为接收int类型的一个闭包

Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作)

 

测试示例代码

package task

import (
    "testing"
    "time"
    "fmt"
    "os"
    "runtime"
)

func TestRunnerAsync_Start(t *testing.T) {

    //开启多核
    runtime.GOMAXPROCS(runtime.NumCPU())

    //创建runner对象,设置超时时间
    runner := NewRunnerAsync(8 * time.Second)
    //添加运行的任务
    runner.Add(
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
        createTaskAsync(),
    )

    fmt.Println("同步执行任务")

    //开始执行任务
    if err := runner.Start(); err != nil {
        switch err {
        case ErrTimeout:
            fmt.Println("执行超时")
            os.Exit(1)
        case ErrInterrupt:
            fmt.Println("任务被中断")
            os.Exit(2)
        }
    }

    t.Log("执行结束")

}

//创建要执行的任务
func createTaskAsync() func(id int) {
    return func(id int) {
        fmt.Printf("正在执行%v个任务\n", id)
        //模拟任务执行,sleep两秒
        //time.Sleep(1 * time.Second)
    }
}

执行结果

同步执行任务
正在执行0个任务
正在执行1个任务
正在执行2个任务
正在执行3个任务
正在执行4个任务
正在执行5个任务
正在执行6个任务
正在执行7个任务
正在执行8个任务
正在执行9个任务
正在执行10个任务
正在执行11个任务
正在执行12个任务
	runnerAsync_test.go:49: 执行结束

  

 

异步执行类Runner

支持返回超时检测,系统中断检测

实现代码如下

package task

import (
    "os"
    "time"
    "os/signal"
    "sync"
)

//异步执行任务
type Runner struct {
    //操作系统的信号检测
    interrupt chan os.Signal

    //记录执行完成的状态
    complete chan error

    //超时检测
    timeout <-chan time.Time

    //保存所有要执行的任务,顺序执行
    tasks []func(id int) error

    waitGroup sync.WaitGroup

    lock sync.Mutex

    errs []error
}

//new一个Runner对象
func NewRunner(d time.Duration) *Runner {
    return &Runner{
        interrupt: make(chan os.Signal, 1),
        complete:  make(chan error),
        timeout:   time.After(d),
        waitGroup: sync.WaitGroup{},
        lock:      sync.Mutex{},
    }
}

//添加一个任务
func (this *Runner) Add(tasks ...func(id int) error) {
    this.tasks = append(this.tasks, tasks...)
}

//启动Runner,监听错误信息
func (this *Runner) Start() error {

    //接收操作系统信号
    signal.Notify(this.interrupt, os.Interrupt)

    //并发执行任务
    go func() {
        this.complete <- this.Run()
    }()

    select {
    //返回执行结果
    case err := <-this.complete:
        return err
        //超时返回
    case <-this.timeout:
        return ErrTimeout
    }
}

//异步执行所有的任务
func (this *Runner) Run() error {
    for id, task := range this.tasks {
        if this.gotInterrupt() {
            return ErrInterrupt
        }

        this.waitGroup.Add(1)
        go func(id int) {
            this.lock.Lock()

            //执行任务
            err := task(id)
            //加锁保存到结果集中
            this.errs = append(this.errs, err)

            this.lock.Unlock()
            this.waitGroup.Done()
        }(id)
    }
    this.waitGroup.Wait()

    return nil
}

//判断是否接收到操作系统中断信号
func (this *Runner) gotInterrupt() bool {
    select {
    case <-this.interrupt:
        //停止接收别的信号
        signal.Stop(this.interrupt)
        return true
        //正常执行
    default:
        return false
    }
}

//获取执行完的error
func (this *Runner) GetErrs() []error {
    return this.errs
}

  

使用方法    

Add添加一个任务,任务为接收int类型,返回类型error的一个闭包

Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作)

getErrs获取所有的任务执行结果

 

测试示例代码

package task

import (
    "testing"
    "time"
    "fmt"
    "os"
    "runtime"
)

func TestRunner_Start(t *testing.T) {
    //开启多核心
    runtime.GOMAXPROCS(runtime.NumCPU())

    //创建runner对象,设置超时时间
    runner := NewRunner(18 * time.Second)
    //添加运行的任务
    runner.Add(
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
        createTask(),
    )

    fmt.Println("异步执行任务")

    //开始执行任务
    if err := runner.Start(); err != nil {
        switch err {
        case ErrTimeout:
            fmt.Println("执行超时")
            os.Exit(1)
        case ErrInterrupt:
            fmt.Println("任务被中断")
            os.Exit(2)
        }
    }

    t.Log("执行结束")

    t.Log(runner.GetErrs())

}

//创建要执行的任务
func createTask() func(id int) error {
    return func(id int) error {
        fmt.Printf("正在执行%v个任务\n", id)
        //模拟任务执行,sleep
        //time.Sleep(1 * time.Second)
        return nil
    }
}

执行结果

异步执行任务
正在执行2个任务
正在执行1个任务
正在执行4个任务
正在执行3个任务
正在执行6个任务
正在执行5个任务
正在执行9个任务
正在执行7个任务
正在执行10个任务
正在执行13个任务
正在执行8个任务
正在执行11个任务
正在执行12个任务
正在执行0个任务
	runner_test.go:49: 执行结束
	runner_test.go:51: [<nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil> <nil>]

  

 

版权声明:本文为chenqionghe原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/chenqionghe/p/8269556.html