gpt4 book ai didi

Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)

转载 作者:qq735679552 更新时间:2022-09-29 22:32:09 24 4
gpt4 key购买 nike

CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.

这篇CFSDN的博客文章Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.

前言 。

同步适合多个连续执行的,每一步的执行依赖于上一步操作,异步执行则和任务执行顺序无关(如从10个站点抓取数据) 。

同步执行类RunnerAsync 。

支持返回超时检测,系统中断检测 。

错误常量定义 。

?
1
2
3
4
//超时错误
var ErrTimeout = errors.New("received timeout")
//操作系统系统中断错误
var ErrInterrupt = errors.New("received interrupt")

实现代码如下 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
package task
import (
  "os"
  "time"
  "os/signal"
  "sync"
)
 
//异步执行任务
type Runner struct {
  //操作系统的信号检测
  interrupt chan os.Signal
  //记录执行完成的状态
  complete chan error
  //超时检测
  timeout <-chan time.Time
  //保存所有要执行的任务,顺序执行
  tasks []func(id int) error
  waitGroup sync.WaitGroup
  lock sync.Mutex
  errs []error
}
 
//new一个Runner对象
func NewRunner(d time.Duration) *Runner {
  return &Runner{
  interrupt: make(chan os.Signal, 1),
  complete: make(chan error),
  timeout: time.After(d),
  waitGroup: sync.WaitGroup{},
  lock: sync.Mutex{},
  }
}
 
//添加一个任务
func (this *Runner) Add(tasks ...func(id int) error) {
  this.tasks = append(this.tasks, tasks...)
}
 
//启动Runner,监听错误信息
func (this *Runner) Start() error {
  //接收操作系统信号
  signal.Notify(this.interrupt, os.Interrupt)
  //并发执行任务
  go func() {
  this.complete <- this.Run()
  }()
  select {
  //返回执行结果
  case err := <-this.complete:
  return err
  //超时返回
  case <-this.timeout:
  return ErrTimeout
  }
}
 
//异步执行所有的任务
func (this *Runner) Run() error {
  for id, task := range this.tasks {
  if this.gotInterrupt() {
   return ErrInterrupt
  }
  this.waitGroup.Add(1)
  go func(id int) {
   this.lock.Lock()
   //执行任务
   err := task(id)
   //加锁保存到结果集中
   this.errs = append(this.errs, err)
 
   this.lock.Unlock()
   this.waitGroup.Done()
  }(id)
  }
  this.waitGroup.Wait()
 
  return nil
}
 
//判断是否接收到操作系统中断信号
func (this *Runner) gotInterrupt() bool {
  select {
  case <-this.interrupt:
  //停止接收别的信号
  signal.Stop(this.interrupt)
  return true
  //正常执行
  default:
  return false
  }
}
 
//获取执行完的error
func (this *Runner) GetErrs() []error {
  return this.errs
}

使用方法     。

Add添加一个任务,任务为接收int类型的一个闭包 。

Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作) 。

测试示例代码 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package task
import (
  "testing"
  "time"
  "fmt"
  "os"
  "runtime"
)
 
func TestRunnerAsync_Start(t *testing.T) {
  //开启多核
  runtime.GOMAXPROCS(runtime.NumCPU())
  //创建runner对象,设置超时时间
  runner := NewRunnerAsync(8 * time.Second)
  //添加运行的任务
  runner.Add(
  createTaskAsync(),
  createTaskAsync(),
  createTaskAsync(),
  createTaskAsync(),
  createTaskAsync(),
  createTaskAsync(),
  createTaskAsync(),
  createTaskAsync(),
  createTaskAsync(),
  createTaskAsync(),
  createTaskAsync(),
  createTaskAsync(),
  createTaskAsync(),
  )
  fmt.Println("同步执行任务")
  //开始执行任务
  if err := runner.Start(); err != nil {
  switch err {
  case ErrTimeout:
   fmt.Println("执行超时")
   os.Exit(1)
  case ErrInterrupt:
   fmt.Println("任务被中断")
   os.Exit(2)
  }
  }
  t.Log("执行结束")
}
 
//创建要执行的任务
func createTaskAsync() func(id int) {
  return func(id int) {
  fmt.Printf("正在执行%v个任务\n", id)
  //模拟任务执行,sleep两秒
  //time.Sleep(1 * time.Second)
  }
}

执行结果   。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
同步执行任务
正在执行0个任务
正在执行1个任务
正在执行2个任务
正在执行3个任务
正在执行4个任务
正在执行5个任务
正在执行6个任务
正在执行7个任务
正在执行8个任务
正在执行9个任务
正在执行10个任务
正在执行11个任务
正在执行12个任务
  runnerAsync_test.go:49: 执行结束

异步执行类Runner 。

支持返回超时检测,系统中断检测 。

实现代码如下 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package task
import (
  "os"
  "time"
  "os/signal"
  "sync"
)
 
//异步执行任务
type Runner struct {
  //操作系统的信号检测
  interrupt chan os.Signal
  //记录执行完成的状态
  complete chan error
  //超时检测
  timeout <-chan time.Time
  //保存所有要执行的任务,顺序执行
  tasks []func(id int) error
  waitGroup sync.WaitGroup
  lock sync.Mutex
  errs []error
}
 
//new一个Runner对象
func NewRunner(d time.Duration) *Runner {
  return &Runner{
   interrupt: make(chan os.Signal, 1),
   complete: make(chan error),
   timeout: time.After(d),
   waitGroup: sync.WaitGroup{},
   lock:  sync.Mutex{},
  }
}
 
//添加一个任务
func (this *Runner) Add(tasks ...func(id int) error) {
  this.tasks = append(this.tasks, tasks...)
}
 
//启动Runner,监听错误信息
func (this *Runner) Start() error {
  //接收操作系统信号
  signal.Notify(this.interrupt, os.Interrupt)
  //并发执行任务
  go func() {
   this.complete <- this.Run()
  }()
  select {
  //返回执行结果
  case err := <-this.complete:
   return err
   //超时返回
  case <-this.timeout:
   return ErrTimeout
  }
}
 
//异步执行所有的任务
func (this *Runner) Run() error {
  for id, task := range this.tasks {
   if this.gotInterrupt() {
    return ErrInterrupt
   }
   this.waitGroup.Add(1)
   go func(id int) {
    this.lock.Lock()
    //执行任务
    err := task(id)
    //加锁保存到结果集中
    this.errs = append(this.errs, err)
    this.lock.Unlock()
    this.waitGroup.Done()
   }(id)
  }
  this.waitGroup.Wait()
  return nil
}
 
//判断是否接收到操作系统中断信号
func (this *Runner) gotInterrupt() bool {
  select {
  case <-this.interrupt:
   //停止接收别的信号
   signal.Stop(this.interrupt)
   return true
   //正常执行
  default:
   return false
  }
}
 
//获取执行完的error
func (this *Runner) GetErrs() []error {
  return this.errs
}

使用方法     。

Add添加一个任务,任务为接收int类型,返回类型error的一个闭包 。

Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作) 。

getErrs获取所有的任务执行结果 。

测试示例代码 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package task
import (
  "testing"
  "time"
  "fmt"
  "os"
  "runtime"
)
 
func TestRunner_Start(t *testing.T) {
  //开启多核心
  runtime.GOMAXPROCS(runtime.NumCPU())
  //创建runner对象,设置超时时间
  runner := NewRunner(18 * time.Second)
  //添加运行的任务
  runner.Add(
   createTask(),
   createTask(),
   createTask(),
   createTask(),
   createTask(),
   createTask(),
   createTask(),
   createTask(),
   createTask(),
   createTask(),
   createTask(),
   createTask(),
   createTask(),
   createTask(),
  )
  fmt.Println("异步执行任务")
  //开始执行任务
  if err := runner.Start(); err != nil {
   switch err {
   case ErrTimeout:
    fmt.Println("执行超时")
    os.Exit(1)
   case ErrInterrupt:
    fmt.Println("任务被中断")
    os.Exit(2)
   }
  }
  t.Log("执行结束")
  t.Log(runner.GetErrs())
}
 
//创建要执行的任务
func createTask() func(id int) error {
  return func(id int) error {
   fmt.Printf("正在执行%v个任务\n", id)
   //模拟任务执行,sleep
   //time.Sleep(1 * time.Second)
   return nil
  }
}

执行结果 。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
异步执行任务
正在执行2个任务
正在执行1个任务
正在执行4个任务
正在执行3个任务
正在执行6个任务
正在执行5个任务
正在执行9个任务
正在执行7个任务
正在执行10个任务
正在执行13个任务
正在执行8个任务
正在执行11个任务
正在执行12个任务
正在执行0个任务
  runner_test.go:49: 执行结束
  runner_test.go:51: [< nil > < nil > < nil > < nil > < nil > < nil > < nil > < nil > < nil > < nil > < nil > < nil > < nil > < nil >]

总结 。

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我的支持.

原文链接:http://www.cnblogs.com/chenqionghe/p/8269556.html 。

最后此篇关于Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)的文章就讲到这里了,如果你想了解更多关于Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com