- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
前言 。
同步适合多个连续执行的,每一步的执行依赖于上一步操作,异步执行则和任务执行顺序无关(如从10个站点抓取数据) 。
同步执行类RunnerAsync 。
支持返回超时检测,系统中断检测 。
错误常量定义 。
1
2
3
4
|
//超时错误
var ErrTimeout = errors.New("received timeout")
//操作系统系统中断错误
var ErrInterrupt = errors.New("received interrupt")
|
实现代码如下 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
package task
import (
"os"
"time"
"os/signal"
"sync"
)
//异步执行任务
type Runner struct {
//操作系统的信号检测
interrupt chan os.Signal
//记录执行完成的状态
complete chan error
//超时检测
timeout <-chan time.Time
//保存所有要执行的任务,顺序执行
tasks []func(id int) error
waitGroup sync.WaitGroup
lock sync.Mutex
errs []error
}
//new一个Runner对象
func NewRunner(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
waitGroup: sync.WaitGroup{},
lock: sync.Mutex{},
}
}
//添加一个任务
func (this *Runner) Add(tasks ...func(id int) error) {
this.tasks = append(this.tasks, tasks...)
}
//启动Runner,监听错误信息
func (this *Runner) Start() error {
//接收操作系统信号
signal.Notify(this.interrupt, os.Interrupt)
//并发执行任务
go func() {
this.complete <- this.Run()
}()
select {
//返回执行结果
case err := <-this.complete:
return err
//超时返回
case <-this.timeout:
return ErrTimeout
}
}
//异步执行所有的任务
func (this *Runner) Run() error {
for id, task := range this.tasks {
if this.gotInterrupt() {
return ErrInterrupt
}
this.waitGroup.Add(1)
go func(id int) {
this.lock.Lock()
//执行任务
err := task(id)
//加锁保存到结果集中
this.errs = append(this.errs, err)
this.lock.Unlock()
this.waitGroup.Done()
}(id)
}
this.waitGroup.Wait()
return nil
}
//判断是否接收到操作系统中断信号
func (this *Runner) gotInterrupt() bool {
select {
case <-this.interrupt:
//停止接收别的信号
signal.Stop(this.interrupt)
return true
//正常执行
default:
return false
}
}
//获取执行完的error
func (this *Runner) GetErrs() []error {
return this.errs
}
|
使用方法 。
Add添加一个任务,任务为接收int类型的一个闭包 。
Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作) 。
测试示例代码 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
package task
import (
"testing"
"time"
"fmt"
"os"
"runtime"
)
func TestRunnerAsync_Start(t *testing.T) {
//开启多核
runtime.GOMAXPROCS(runtime.NumCPU())
//创建runner对象,设置超时时间
runner := NewRunnerAsync(8 * time.Second)
//添加运行的任务
runner.Add(
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
createTaskAsync(),
)
fmt.Println("同步执行任务")
//开始执行任务
if err := runner.Start(); err != nil {
switch err {
case ErrTimeout:
fmt.Println("执行超时")
os.Exit(1)
case ErrInterrupt:
fmt.Println("任务被中断")
os.Exit(2)
}
}
t.Log("执行结束")
}
//创建要执行的任务
func createTaskAsync() func(id int) {
return func(id int) {
fmt.Printf("正在执行%v个任务\n", id)
//模拟任务执行,sleep两秒
//time.Sleep(1 * time.Second)
}
}
|
执行结果 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
同步执行任务
正在执行0个任务
正在执行1个任务
正在执行2个任务
正在执行3个任务
正在执行4个任务
正在执行5个任务
正在执行6个任务
正在执行7个任务
正在执行8个任务
正在执行9个任务
正在执行10个任务
正在执行11个任务
正在执行12个任务
runnerAsync_test.go:49: 执行结束
|
异步执行类Runner 。
支持返回超时检测,系统中断检测 。
实现代码如下 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
package task
import (
"os"
"time"
"os/signal"
"sync"
)
//异步执行任务
type Runner struct {
//操作系统的信号检测
interrupt chan os.Signal
//记录执行完成的状态
complete chan error
//超时检测
timeout <-chan time.Time
//保存所有要执行的任务,顺序执行
tasks []func(id int) error
waitGroup sync.WaitGroup
lock sync.Mutex
errs []error
}
//new一个Runner对象
func NewRunner(d time.Duration) *Runner {
return &Runner{
interrupt: make(chan os.Signal, 1),
complete: make(chan error),
timeout: time.After(d),
waitGroup: sync.WaitGroup{},
lock: sync.Mutex{},
}
}
//添加一个任务
func (this *Runner) Add(tasks ...func(id int) error) {
this.tasks = append(this.tasks, tasks...)
}
//启动Runner,监听错误信息
func (this *Runner) Start() error {
//接收操作系统信号
signal.Notify(this.interrupt, os.Interrupt)
//并发执行任务
go func() {
this.complete <- this.Run()
}()
select {
//返回执行结果
case err := <-this.complete:
return err
//超时返回
case <-this.timeout:
return ErrTimeout
}
}
//异步执行所有的任务
func (this *Runner) Run() error {
for id, task := range this.tasks {
if this.gotInterrupt() {
return ErrInterrupt
}
this.waitGroup.Add(1)
go func(id int) {
this.lock.Lock()
//执行任务
err := task(id)
//加锁保存到结果集中
this.errs = append(this.errs, err)
this.lock.Unlock()
this.waitGroup.Done()
}(id)
}
this.waitGroup.Wait()
return nil
}
//判断是否接收到操作系统中断信号
func (this *Runner) gotInterrupt() bool {
select {
case <-this.interrupt:
//停止接收别的信号
signal.Stop(this.interrupt)
return true
//正常执行
default:
return false
}
}
//获取执行完的error
func (this *Runner) GetErrs() []error {
return this.errs
}
|
使用方法 。
Add添加一个任务,任务为接收int类型,返回类型error的一个闭包 。
Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作) 。
getErrs获取所有的任务执行结果 。
测试示例代码 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package task
import (
"testing"
"time"
"fmt"
"os"
"runtime"
)
func TestRunner_Start(t *testing.T) {
//开启多核心
runtime.GOMAXPROCS(runtime.NumCPU())
//创建runner对象,设置超时时间
runner := NewRunner(18 * time.Second)
//添加运行的任务
runner.Add(
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
createTask(),
)
fmt.Println("异步执行任务")
//开始执行任务
if err := runner.Start(); err != nil {
switch err {
case ErrTimeout:
fmt.Println("执行超时")
os.Exit(1)
case ErrInterrupt:
fmt.Println("任务被中断")
os.Exit(2)
}
}
t.Log("执行结束")
t.Log(runner.GetErrs())
}
//创建要执行的任务
func createTask() func(id int) error {
return func(id int) error {
fmt.Printf("正在执行%v个任务\n", id)
//模拟任务执行,sleep
//time.Sleep(1 * time.Second)
return nil
}
}
|
执行结果 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
异步执行任务
正在执行2个任务
正在执行1个任务
正在执行4个任务
正在执行3个任务
正在执行6个任务
正在执行5个任务
正在执行9个任务
正在执行7个任务
正在执行10个任务
正在执行13个任务
正在执行8个任务
正在执行11个任务
正在执行12个任务
正在执行0个任务
runner_test.go:49: 执行结束
runner_test.go:51: [<
nil
> <
nil
> <
nil
> <
nil
> <
nil
> <
nil
> <
nil
> <
nil
> <
nil
> <
nil
> <
nil
> <
nil
> <
nil
> <
nil
>]
|
总结 。
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我的支持.
原文链接:http://www.cnblogs.com/chenqionghe/p/8269556.html 。
最后此篇关于Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)的文章就讲到这里了,如果你想了解更多关于Go语言同步与异步执行多个任务封装详解(Runner和RunnerAsync)的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
Task.WaitAll 方法等待所有任务,Task.WaitAny 方法等待一个任务。如何等待任意N个任务? 用例:下载搜索结果页面,每个结果都需要一个单独的任务来下载和处理。如果我使用 WaitA
我正在查看一些像这样的遗留 C# 代码: await Task.Run(() => { _logger.LogException(LogLevel.Error, mes
如何在 Linux 中运行 cron 任务? 关注此Q&A ,我有这个 cron 任务要运行 - 只是将一些信息写入 txt 文件, // /var/www/cron.php $myfile = fo
原谅我的新手问题,但我想按顺序执行三个任务并在剧本中使用两个角色: 任务 角色 任务 角色 任务 这是我到目前为止(任务,角色,任务): --- - name: Task Role Task ho
我有一个依赖于 installDist 的自定义任务 - 不仅用于执行,还依赖于 installDist 输出: project.task('run', type: JavaExec, depends
从使用 Wix 创建的 MSI 运行卸载时,我需要在尝试删除任何文件之前强行终止在后台运行的进程。主要应用程序由一个托盘图标组成,它反射(reflect)了 bg 进程监控本地 Windows 服务的
我想编写 Ant 任务来自动执行启动服务器的任务,然后使用我的应用程序的 URL 打开 Internet Explorer。 显然我必须执行 startServer先任务,然后 startApplic
使用 ASP.NET 4.5,我正在尝试使用新的 async/await 玩具。我有一个 IDataReader 实现类,它包装了一个特定于供应商的阅读器(如 SqlDatareader)。我有一个简
使用命令 gradle tasks可以得到一份所有可用任务的报告。有什么方法可以向此命令添加参数并按任务组过滤任务。 我想发出类似 gradle tasks group:Demo 的命令筛选所有任务并
除了sshexec,还有什么办法吗?任务要做到这一点?我知道您可以使用 scp 复制文件任务。但是,我需要执行其他操作,例如检查是否存在某些文件夹,然后将其删除。我想使用类似 condition 的东
假设我有字符串 - "D:\ApEx_Schema\Functions\new.sql@@\main\ONEVIEW_Integration\3" 我需要将以下内容提取到 diff 变量中 - 文档名
我需要编写一个 ant 任务来确定某个文件是否是只读的,如果是,则失败。我想避免使用自定义选择器来为我们的构建系统的性质做这件事。任何人都有任何想法如何去做?我正在使用 ant 1.8 + ant-c
这是一个相当普遍的计算机科学问题,并不特定于任何操作系统或框架。 因此,我对与在线程池上切换任务相关的开销感到有些困惑。在许多情况下,给每个作业分配自己的特定线程是没有意义的(我们不想创建太多硬件线程
我正在使用以下 Ansible playbook 一次性关闭远程 Ubuntu 主机列表: - hosts: my_hosts become: yes remote_user: my_user
如何更改 Ant 中的当前工作目录? Ant documentation没有 任务,在我看来,最好的做法是不要更改当前工作目录。 但让我们假设我们仍然想这样做——你会如何做到这一点?谢谢! 最佳答案
是否可以运行 cronjob每三天一次?或者也许每月 10 次。 最佳答案 每三天运行一次 - 或更短时间在月底运行一次。 (如果上个月有 31 天,它将连续运行 2 天。) 0 0 */3 * *
如何在 Gradle 任务中执行托管在存储库中的工具? 在我的具体情况下,我正在使用 Gradle 构建一个 Android 应用程序。我添加了一项任务,将一些 protobuf 数据从文本编码为二进
我的项目有下一个结构: Root |- A |- C (depends on A) \- B (depends on A) 对于所有子项目,我们使用自己的插件生成资源:https://githu
我设置了一个具有4个节点的Hadoop群集,其中一个充当HDFS的NameNode以及Yarn主节点。该节点也是最强大的。 现在,我分发了2个文本文件,一个在node01(名称节点)上,一个在node
在 TFS 2010 中为多个用户存储任务的最佳方式是什么?我只能为一项任务分配一个。 (例如:当我计划向所有开发人员演示时) (这是一个 Scrum Msf 敏捷项目,其中任务是用户故事的一部分)
我是一名优秀的程序员,十分优秀!