- ubuntu12.04环境下使用kvm ioctl接口实现最简单的虚拟机
- Ubuntu 通过无线网络安装Ubuntu Server启动系统后连接无线网络的方法
- 在Ubuntu上搭建网桥的方法
- ubuntu 虚拟机上网方式及相关配置详解
CFSDN坚持开源创造价值,我们致力于搭建一个资源共享平台,让每一个IT人在这里找到属于你的精彩世界.
这篇CFSDN的博客文章Go 通过 Map/Filter/ForEach 等流式 API 高效处理数据的思路详解由作者收集整理,如果你对这篇文章有兴趣,记得点赞哟.
用过 Java 的同学都熟悉 Stream API,那么在 Go 里我们可以用类似的方式处理集合数据吗?本文给大家介绍 go-zero 内置的 Stream API,为了帮助理解,函数主要分为三类:获取操作、中间处理操作、终结操作.
如果有 java 使用经验的同学一定会对 java8 的 Stream 赞不绝口,极大的提高了们对于集合类型数据的处理能力.
1
2
3
4
|
int
sum = widgets.stream()
.filter(w -> w.getColor() == RED)
.mapToInt(w -> w.getWeight())
.sum();
|
Stream 能让我们支持链式调用和函数编程的风格来实现数据的处理,看起来数据像是在流水线一样不断的实时流转加工,最终被汇总。Stream 的实现思想就是将数据处理流程抽象成了一个数据流,每次加工后返回一个新的流供使用.
动手写代码之前,先想清楚,把需求理清楚是最重要的一步,我们尝试代入作者的视角来思考整个组件的实现流程。首先把底层实现的逻辑放一下 ,先尝试从零开始进行功能定义 stream 功能.
Stream 的工作流程其实也属于生产消费者模型,整个流程跟工厂中的生产流程非常相似,尝试先定义一下 Stream 的生命周期:
下面围绕 stream 的三个生命周期开始定义 API:
为了创建出数据流 stream 这一抽象对象,可以理解为构造器.
我们支持三种方式构造 stream,分别是:切片转换,channel 转换,函数式转换.
注意这个阶段的方法都是普通的公开方法,并不绑定 Stream 对象.
1
2
3
4
5
6
7
8
9
10
11
|
// 通过可变参数模式创建 stream
func Just(items ...
interface
{}) Stream
// 通过 channel 创建 stream
func Range(source <-chan
interface
{}) Stream
// 通过函数创建 stream
func From(generate GenerateFunc) Stream
// 拼接 stream
func Concat(s Stream, others ...Stream) Stream
|
加工阶段需要进行的操作往往对应了我们的业务逻辑,比如:转换,过滤,去重,排序等等.
这个阶段的 API 属于 method 需要绑定到 Stream 对象上.
结合常用的业务场景进行如下定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
// 去除重复item
Distinct(keyFunc KeyFunc) Stream
// 按条件过滤item
Filter(filterFunc FilterFunc, opts ...Option) Stream
// 分组
Group(fn KeyFunc) Stream
// 返回前n个元素
Head(n int64) Stream
// 返回后n个元素
Tail(n int64) Stream
// 转换对象
Map(fn MapFunc, opts ...Option) Stream
// 合并item到slice生成新的stream
Merge() Stream
// 反转
Reverse() Stream
// 排序
Sort(fn LessFunc) Stream
// 作用在每个item上
Walk(fn WalkFunc, opts ...Option) Stream
// 聚合其他Stream
Concat(streams ...Stream) Stream
|
加工阶段的处理逻辑都会返回一个新的 Stream 对象,这里有个基本的实现范式 。
汇总阶段其实就是我们想要的处理结果,比如:是否匹配,统计数量,遍历等等.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// 检查是否全部匹配
AllMatch(fn PredicateFunc) bool
// 检查是否存在至少一项匹配
AnyMatch(fn PredicateFunc) bool
// 检查全部不匹配
NoneMatch(fn PredicateFunc) bool
// 统计数量
Count()
int
// 清空stream
Done()
// 对所有元素执行操作
ForAll(fn ForAllFunc)
// 对每个元素执行操作
ForEach(fn ForEachFunc)
|
梳理完组件的需求边界后,我们对于即将要实现的 Stream 有了更清晰的认识。在我的认知里面真正的架构师对于需求的把握以及后续演化能达到及其精准的地步,做到这一点离不开对需求的深入思考以及洞穿需求背后的本质。通过代入作者的视角来模拟复盘整个项目的构建流程,学习作者的思维方法论这正是我们学习开源项目最大的价值所在.
好了,我们尝试定义出完整的 Stream 接口全貌以及函数.
接口的作用不仅仅是模版作用,还在于利用其抽象能力搭建项目整体的框架而不至于一开始就陷入细节,能快速的将我们的思考过程通过接口简洁的表达出来,学会养成自顶向下的思维方法从宏观的角度来观察整个系统,一开始就陷入细节则很容易拔剑四顾心茫然。。.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
rxOptions struct {
unlimitedWorkers bool
workers
int
}
Option func(opts *rxOptions)
// key生成器
//item - stream中的元素
KeyFunc func(item
interface
{})
interface
{}
// 过滤函数
FilterFunc func(item
interface
{}) bool
// 对象转换函数
MapFunc func(intem
interface
{})
interface
{}
// 对象比较
LessFunc func(a, b
interface
{}) bool
// 遍历函数
WalkFunc func(item
interface
{}, pip chan<-
interface
{})
// 匹配函数
PredicateFunc func(item
interface
{}) bool
// 对所有元素执行操作
ForAllFunc func(pip <-chan
interface
{})
// 对每个item执行操作
ForEachFunc func(item
interface
{})
// 对每个元素并发执行操作
ParallelFunc func(item
interface
{})
// 对所有元素执行聚合操作
ReduceFunc func(pip <-chan
interface
{}) (
interface
{}, error)
// item生成函数
GenerateFunc func(source <-chan
interface
{})
Stream
interface
{
// 去除重复item
Distinct(keyFunc KeyFunc) Stream
// 按条件过滤item
Filter(filterFunc FilterFunc, opts ...Option) Stream
// 分组
Group(fn KeyFunc) Stream
// 返回前n个元素
Head(n int64) Stream
// 返回后n个元素
Tail(n int64) Stream
// 获取第一个元素
First()
interface
{}
// 获取最后一个元素
Last()
interface
{}
// 转换对象
Map(fn MapFunc, opts ...Option) Stream
// 合并item到slice生成新的stream
Merge() Stream
// 反转
Reverse() Stream
// 排序
Sort(fn LessFunc) Stream
// 作用在每个item上
Walk(fn WalkFunc, opts ...Option) Stream
// 聚合其他Stream
Concat(streams ...Stream) Stream
// 检查是否全部匹配
AllMatch(fn PredicateFunc) bool
// 检查是否存在至少一项匹配
AnyMatch(fn PredicateFunc) bool
// 检查全部不匹配
NoneMatch(fn PredicateFunc) bool
// 统计数量
Count()
int
// 清空stream
Done()
// 对所有元素执行操作
ForAll(fn ForAllFunc)
// 对每个元素执行操作
ForEach(fn ForEachFunc)
}
|
channel() 方法用于获取 Stream 管道属性,因为在具体实现时我们面向的是接口对象所以暴露一个私有方法 read 出来.
1
2
|
// 获取内部的数据容器channel,内部方法
channel() chan
interface
{}
|
功能定义梳理清楚了,接下来考虑几个工程实现的问题.
链式调用,创建对象用到的 builder 模式可以达到链式调用效果。实际上 Stream 实现类似链式的效果原理也是一样的,每次调用完后都创建一个新的 Stream 返回给用户.
1
2
3
4
|
// 去除重复item
Distinct(keyFunc KeyFunc) Stream
// 按条件过滤item
Filter(filterFunc FilterFunc, opts ...Option) Stream
|
所谓的流水线可以理解为数据在 Stream 中的存储容器,在 go 中我们可以使用 channel 作为数据的管道,达到 Stream 链式调用执行多个操作时异步非阻塞效果.
数据加工本质上是在处理 channel 中的数据,那么要实现并行处理无非是并行消费 channel 而已,利用 goroutine 协程、WaitGroup 机制可以非常方便的实现并行处理.
core/fx/stream.go 。
go-zero 中关于 Stream 的实现并没有定义接口,不过没关系底层实现时逻辑是一样的.
为了实现 Stream 接口我们定义一个内部的实现类,其中 source 为 channel 类型,模拟流水线功能.
1
2
3
|
Stream struct {
source <-chan
interface
{}
}
|
channel 创建 Range 。
通过 channel 创建 stream 。
1
2
3
4
5
|
func Range(source <-chan
interface
{}) Stream {
return
Stream{
source: source,
}
}
|
可变参数模式创建 Just 。
通过可变参数模式创建 stream,channel 写完后及时 close 是个好习惯.
1
2
3
4
5
6
7
8
|
func Just(items ...
interface
{}) Stream {
source := make(chan
interface
{}, len(items))
for
_, item := range items {
source <- item
}
close(source)
return
Range(source)
}
|
函数创建 From 。
通过函数创建 Stream 。
1
2
3
4
5
6
7
8
|
func From(generate GenerateFunc) Stream {
source := make(chan
interface
{})
threading.GoSafe(func() {
defer close(source)
generate(source)
})
return
Range(source)
}
|
因为涉及外部传入的函数参数调用,执行过程并不可用因此需要捕捉运行时异常防止 panic 错误传导到上层导致应用崩溃.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func Recover(cleanups ...func()) {
for
_, cleanup := range cleanups {
cleanup()
}
if
r := recover(); r != nil {
logx.ErrorStack(r)
}
}
func RunSafe(fn func()) {
defer rescue.Recover()
fn()
}
func GoSafe(fn func()) {
go RunSafe(fn)
}
|
拼接其他 Stream 创建一个新的 Stream,调用内部 Concat method 方法,后文将会分析 Concat 的源码实现.
1
2
3
|
func Concat(s Stream, others ...Stream) Stream {
return
s.Concat(others...)
}
|
去重 Distinct 。
因为传入的是函数参数KeyFunc func(item interface{}) interface{}意味着也同时支持按照业务场景自定义去重,本质上是利用 KeyFunc 返回的结果基于 map 实现去重.
函数参数非常强大,能极大的提升灵活性.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
func (s Stream) Distinct(keyFunc KeyFunc) Stream {
source := make(chan
interface
{})
threading.GoSafe(func() {
// channel记得关闭是个好习惯
defer close(source)
keys := make(map[
interface
{}]lang.PlaceholderType)
for
item := range s.source {
// 自定义去重逻辑
key := keyFunc(item)
// 如果key不存在,则将数据写入新的channel
if
_, ok := keys[key]; !ok {
source <- item
keys[key] = lang.Placeholder
}
}
})
return
Range(source)
}
|
使用案例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// 1 2 3 4 5
Just(
1
,
2
,
3
,
3
,
4
,
5
,
5
).Distinct(func(item
interface
{})
interface
{} {
return
item
}).ForEach(func(item
interface
{}) {
t.Log(item)
})
// 1 2 3 4
Just(
1
,
2
,
3
,
3
,
4
,
5
,
5
).Distinct(func(item
interface
{})
interface
{} {
uid := item.(
int
)
// 对大于4的item进行特殊去重逻辑,最终只保留一个>3的item
if
uid >
3
{
return
4
}
return
item
}).ForEach(func(item
interface
{}) {
t.Log(item)
})
|
通过将过滤逻辑抽象成 FilterFunc,然后分别作用在 item 上根据 FilterFunc 返回的布尔值决定是否写回新的 channel 中实现过滤功能,实际的过滤逻辑委托给了 Walk method.
Option 参数包含两个选项:
1
2
3
4
5
6
7
8
9
|
FilterFunc func(item
interface
{}) bool
func (s Stream) Filter(filterFunc FilterFunc, opts ...Option) Stream {
return
s.Walk(func(item
interface
{}, pip chan<-
interface
{}) {
if
filterFunc(item) {
pip <- item
}
}, opts...)
}
|
使用示例:
1
2
3
4
5
6
7
8
9
|
func TestInternalStream_Filter(t *testing.T) {
// 保留偶数 2,4
channel := Just(
1
,
2
,
3
,
4
,
5
).Filter(func(item
interface
{}) bool {
return
item.(
int
)%
2
==
0
}).channel()
for
item := range channel {
t.Log(item)
}
}
|
walk 英文意思是步行,这里的意思是对每个 item 都执行一次 WalkFunc 操作并将结果写入到新的 Stream 中.
这里注意一下因为内部采用了协程机制异步执行读取和写入数据所以新的 Stream 中 channel 里面的数据顺序是随机的.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
|
// item-stream中的item元素
// pipe-item符合条件则写入pipe
WalkFunc func(item
interface
{}, pipe chan<-
interface
{})
func (s Stream) Walk(fn WalkFunc, opts ...Option) Stream {
option := buildOptions(opts...)
if
option.unlimitedWorkers {
return
s.walkUnLimited(fn, option)
}
return
s.walkLimited(fn, option)
}
func (s Stream) walkUnLimited(fn WalkFunc, option *rxOptions) Stream {
// 创建带缓冲区的channel
// 默认为16,channel中元素超过16将会被阻塞
pipe := make(chan
interface
{}, defaultWorkers)
go func() {
var wg sync.WaitGroup
for
item := range s.source {
// 需要读取s.source的所有元素
// 这里也说明了为什么channel最后写完记得完毕
// 如果不关闭可能导致协程一直阻塞导致泄漏
// 重要, 不赋值给val是个典型的并发陷阱,后面在另一个goroutine里使用了
val := item
wg.Add(
1
)
// 安全模式下执行函数
threading.GoSafe(func() {
defer wg.Done()
fn(item, pipe)
})
}
wg.Wait()
close(pipe)
}()
// 返回新的Stream
return
Range(pipe)
}
func (s Stream) walkLimited(fn WalkFunc, option *rxOptions) Stream {
pipe := make(chan
interface
{}, option.workers)
go func() {
var wg sync.WaitGroup
// 控制协程数量
pool := make(chan lang.PlaceholderType, option.workers)
for
item := range s.source {
// 重要, 不赋值给val是个典型的并发陷阱,后面在另一个goroutine里使用了
val := item
// 超过协程限制时将会被阻塞
pool <- lang.Placeholder
// 这里也说明了为什么channel最后写完记得完毕
// 如果不关闭可能导致协程一直阻塞导致泄漏
wg.Add(
1
)
// 安全模式下执行函数
threading.GoSafe(func() {
defer func() {
wg.Done()
//执行完成后读取一次pool释放一个协程位置
<-pool
}()
fn(item, pipe)
})
}
wg.Wait()
close(pipe)
}()
return
Range(pipe)
}
|
使用案例:
返回的顺序是随机的.
1
2
3
4
5
6
7
8
|
func Test_Stream_Walk(t *testing.T) {
// 返回 300,100,200
Just(
1
,
2
,
3
).Walk(func(item
interface
{}, pip chan<-
interface
{}) {
pip <- item.(
int
) *
100
}, WithWorkers(
3
)).ForEach(func(item
interface
{}) {
t.Log(item)
})
}
|
通过对 item 匹配放入 map 中.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
KeyFunc func(item
interface
{})
interface
{}
func (s Stream) Group(fn KeyFunc) Stream {
groups := make(map[
interface
{}][]
interface
{})
for
item := range s.source {
key := fn(item)
groups[key] = append(groups[key], item)
}
source := make(chan
interface
{})
go func() {
for
_, group := range groups {
source <- group
}
close(source)
}()
return
Range(source)
}
|
获取前 n 个元素 Head 。
n 大于实际数据集长度的话将会返回全部元素 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
func (s Stream) Head(n int64) Stream {
if
n <
1
{
panic(
"n must be greather than 1"
)
}
source := make(chan
interface
{})
go func() {
for
item := range s.source {
n--
// n值可能大于s.source长度,需要判断是否>=0
if
n >=
0
{
source <- item
}
// let successive method go ASAP even we have more items to skip
// why we don't just break the loop, because if break,
// this former goroutine will block forever, which will cause goroutine leak.
// n==0说明source已经写满可以进行关闭了
// 既然source已经满足条件了为什么不直接进行break跳出循环呢?
// 作者提到了防止协程泄漏
// 因为每次操作最终都会产生一个新的Stream,旧的Stream永远也不会被调用了
if
n ==
0
{
close(source)
break
}
}
// 上面的循环跳出来了说明n大于s.source实际长度
// 依旧需要显示关闭新的source
if
n >
0
{
close(source)
}
}()
return
Range(source)
}
|
使用示例:
1
2
3
4
5
6
7
|
// 返回1,2
func TestInternalStream_Head(t *testing.T) {
channel := Just(
1
,
2
,
3
,
4
,
5
).Head(
2
).channel()
for
item := range channel {
t.Log(item)
}
}
|
获取后 n 个元素 Tail 。
这里很有意思,为了确保拿到最后 n 个元素使用环形切片 Ring 这个数据结构,先了解一下 Ring 的实现.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
// 环形切片
type Ring struct {
elements []
interface
{}
index
int
lock sync.Mutex
}
func NewRing(n
int
) *Ring {
if
n <
1
{
panic(
"n should be greather than 0"
)
}
return
&Ring{
elements: make([]
interface
{}, n),
}
}
// 添加元素
func (r *Ring) Add(v
interface
{}) {
r.lock.Lock()
defer r.lock.Unlock()
// 将元素写入切片指定位置
// 这里的取余实现了循环写效果
r.elements[r.index%len(r.elements)] = v
// 更新下次写入位置
r.index++
}
// 获取全部元素
// 读取顺序保持与写入顺序一致
func (r *Ring) Take() []
interface
{} {
r.lock.Lock()
defer r.lock.Unlock()
var size
int
var start
int
// 当出现循环写的情况时
// 开始读取位置需要通过去余实现,因为我们希望读取出来的顺序与写入顺序一致
if
r.index > len(r.elements) {
size = len(r.elements)
// 因为出现循环写情况,当前写入位置index开始为最旧的数据
start = r.index % len(r.elements)
}
else
{
size = r.index
}
elements := make([]
interface
{}, size)
for
i :=
0
; i < size; i++ {
// 取余实现环形读取,读取顺序保持与写入顺序一致
elements[i] = r.elements[(start+i)%len(r.elements)]
}
return
elements
}
|
总结一下环形切片的优点:
环形切片能实现固定容量满的情况下旧数据不断被新数据覆盖,由于这个特性可以用于读取 channel 后 n 个元素.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
func (s Stream) Tail(n int64) Stream {
if
n <
1
{
panic(
"n must be greather than 1"
)
}
source := make(chan
interface
{})
go func() {
ring := collection.NewRing(
int
(n))
// 读取全部元素,如果数量>n环形切片能实现新数据覆盖旧数据
// 保证获取到的一定最后n个元素
for
item := range s.source {
ring.Add(item)
}
for
_, item := range ring.Take() {
source <- item
}
close(source)
}()
return
Range(source)
}
|
那么为什么不直接使用 len(source) 长度的切片呢?
答案是节省内存。凡是涉及到环形类型的数据结构时都具备一个优点那就省内存,能做到按需分配资源.
使用示例:
1
2
3
4
5
6
7
8
9
10
11
12
|
func TestInternalStream_Tail(t *testing.T) {
// 4,5
channel := Just(
1
,
2
,
3
,
4
,
5
).Tail(
2
).channel()
for
item := range channel {
t.Log(item)
}
// 1,2,3,4,5
channel2 := Just(
1
,
2
,
3
,
4
,
5
).Tail(
6
).channel()
for
item := range channel2 {
t.Log(item)
}
}
|
元素转换Map 。
元素转换,内部由协程完成转换操作,注意输出channel并不保证按原序输出.
1
2
3
4
5
6
|
MapFunc func(intem
interface
{})
interface
{}
func (s Stream) Map(fn MapFunc, opts ...Option) Stream {
return
s.Walk(func(item
interface
{}, pip chan<-
interface
{}) {
pip <- fn(item)
}, opts...)
}
|
使用示例:
1
2
3
4
5
6
7
8
|
func TestInternalStream_Map(t *testing.T) {
channel := Just(
1
,
2
,
3
,
4
,
5
,
2
,
2
,
2
,
2
,
2
,
2
).Map(func(item
interface
{})
interface
{} {
return
item.(
int
) *
10
}).channel()
for
item := range channel {
t.Log(item)
}
}
|
合并 Merge 。
实现比较简单,我考虑了很久没想到有什么场景适合这个方法.
1
2
3
4
5
6
7
8
9
|
func (s Stream) Merge() Stream {
var items []
interface
{}
for
item := range s.source {
items = append(items, item)
}
source := make(chan
interface
{},
1
)
source <- items
return
Range(source)
}
|
反转 Reverse 。
反转 channel 中的元素。反转算法流程是:
注意一下为什么获取 s.source 时用切片来接收呢? 切片会自动扩容,用数组不是更好吗?
其实这里是不能用数组的,因为不知道 Stream 写入 source 的操作往往是在协程异步写入的,每个 Stream 中的 channel 都可能在动态变化,用流水线来比喻 Stream 工作流程的确非常形象.
1
2
3
4
5
6
7
8
9
10
11
|
func (s Stream) Reverse() Stream {
var items []
interface
{}
for
item := range s.source {
items = append(items, item)
}
for
i := len(items)/
2
-
1
; i >=
0
; i-- {
opp := len(items) -
1
- i
items[i], items[opp] = items[opp], items[i]
}
return
Just(items...)
}
|
使用示例:
1
2
3
4
5
6
|
func TestInternalStream_Reverse(t *testing.T) {
channel := Just(
1
,
2
,
3
,
4
,
5
).Reverse().channel()
for
item := range channel {
t.Log(item)
}
}
|
排序 Sort 。
内网调用 slice 官方包的排序方案,传入比较函数实现比较逻辑即可.
1
2
3
4
5
6
7
8
9
10
11
|
func (s Stream) Sort(fn LessFunc) Stream {
var items []
interface
{}
for
item := range s.source {
items = append(items, item)
}
sort.Slice(items, func(i, j
int
) bool {
return
fn(i, j)
})
return
Just(items...)
}
|
使用示例:
1
2
3
4
5
6
7
8
9
|
// 5,4,3,2,1
func TestInternalStream_Sort(t *testing.T) {
channel := Just(
1
,
2
,
3
,
4
,
5
).Sort(func(a, b
interface
{}) bool {
return
a.(
int
) > b.(
int
)
}).channel()
for
item := range channel {
t.Log(item)
}
}
|
拼接 Concat 。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
func (s Stream) Concat(steams ...Stream) Stream {
// 创建新的无缓冲channel
source := make(chan
interface
{})
go func() {
// 创建一个waiGroup对象
group := threading.NewRoutineGroup()
// 异步从原channel读取数据
group.Run(func() {
for
item := range s.source {
source <- item
}
})
// 异步读取待拼接Stream的channel数据
for
_, stream := range steams {
// 每个Stream开启一个协程
group.Run(func() {
for
item := range stream.channel() {
source <- item
}
})
}
// 阻塞等待读取完成
group.Wait()
close(source)
}()
// 返回新的Stream
return
Range(source)
}
|
汇总 API 。
全部匹配 AllMatch 。
1
2
3
4
5
6
7
8
9
10
11
|
func (s Stream) AllMatch(fn PredicateFunc) bool {
for
item := range s.source {
if
!fn(item) {
// 需要排空 s.source,否则前面的goroutine可能阻塞
go drain(s.source)
return
false
}
}
return
true
}
|
任意匹配 AnyMatch 。
1
2
3
4
5
6
7
8
9
10
11
|
func (s Stream) AnyMatch(fn PredicateFunc) bool {
for
item := range s.source {
if
fn(item) {
// 需要排空 s.source,否则前面的goroutine可能阻塞
go drain(s.source)
return
true
}
}
return
false
}
|
一个也不匹配 NoneMatch 。
1
2
3
4
5
6
7
8
9
10
11
|
func (s Stream) NoneMatch(fn func(item
interface
{}) bool) bool {
for
item := range s.source {
if
fn(item) {
// 需要排空 s.source,否则前面的goroutine可能阻塞
go drain(s.source)
return
false
}
}
return
true
}
|
数量统计 Count 。
1
2
3
4
5
6
7
|
func (s Stream) Count()
int
{
var count
int
for
range s.source {
count++
}
return
count
}
|
清空 Done 。
1
2
3
4
|
func (s Stream) Done() {
// 排空 channel,防止 goroutine 阻塞泄露
drain(s.source)
}
|
迭代全部元素 ForAll 。
1
2
3
|
func (s Stream) ForAll(fn ForAllFunc) {
fn(s.source)
}
|
迭代每个元素 ForEach 。
1
2
3
|
func (s Stream) ForAll(fn ForAllFunc) {
fn(s.source)
}
|
至此 Stream 组件就全部实现完了,核心逻辑是利用 channel 当做管道,数据当做水流,不断的用协程接收/写入数据到 channel 中达到异步非阻塞的效果.
回到开篇提到的问题,未动手前想要实现一个 stream 难度似乎非常大,很难想象在 go 中 300 多行的代码就能实现如此强大的组件.
实现高效的基础来源三个语言特性:
参考资料 。
pipeline模式 。
切片反转算法 。
项目地址 。
https://github.com/zeromicro/go-zero 。
到此这篇关于Go 通过 Map/Filter/ForEach 等流式 API 高效处理数据的文章就介绍到这了,更多相关go 流式 API 处理数据内容请搜索我以前的文章或继续浏览下面的相关文章希望大家以后多多支持我! 。
原文链接:https://www.cnblogs.com/kevinwan/p/15761172.html 。
最后此篇关于Go 通过 Map/Filter/ForEach 等流式 API 高效处理数据的思路详解的文章就讲到这里了,如果你想了解更多关于Go 通过 Map/Filter/ForEach 等流式 API 高效处理数据的思路详解的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
这是一个假设性问题。如果我有来自 3 个单独的 sql db 查询的 3 个数组,这些查询都与另一个数组相关。例如…… //db schools id | school_name classes id
在我的应用程序中,我使用 scrape(string url) 方法从网页中抓取链接。可以说它每次都返回我 10 个 url。 我想从每个抓取的 url 中抓取 10 个链接。 长话短说: (第 1
我的java7代码: final Map result = new HashMap<>(); final Set> classes = getClasses(co.glue()); for (fina
是否可以在 SwiftUI 中设置变量,例如在这样的 ForEach 中: struct ContentView: View { var test: Int var body: som
在 D、int、uint 中使用 foreach 时,循环索引的首选类型是什么?或者只是通过省略类型自动实现? 最佳答案 一般来说,索引应该是size_t。与长度相同。如果您尝试使用 int 或 ui
根据 http://dlang.org/statement.html 的“Foreach 限制”部分以下代码 int[] a; int[] b; foreach (int i; a) { a
在什么情况下我们应该在 JDK 8 中使用旧的 foreach 循环遍历新的 collection.forEach() 还是最好的做法是转换 every foreach 循环?是否存在任何重要的性能差
获得类似东西的惯用方法是什么? ((fn [coll] (function-body)) [:a :b :c :d]) -> [[:a :b][:a :c][:a :d][:b :c][:b :d][
我正在创建一个基于 who is it? 的 Java 应用程序。现在我正在制作一种方法,在回答问题时我需要其他卡片。 我有两个列表: 列表是一个 ImageView 列表,其中我有卡片必须代表的 2
我希望有人能在我发疯之前帮助我。 我有 3 张 table : Table A SELECT companypk, companyname, logo, msscope FROM global_com
我正在尝试将多个字符串添加到 C# 中的 MailAddress。 如果我使用ForEach,我的代码会是这样 foreach (var item in GetPeopleList()
我没有太多的 C# 经验,所以如果有人能指出正确的方向,我将不胜感激。我有一个引用对象变量的 foreach 循环。我希望在主循环中创建另一个 foreach 循环,将当前变量与对象数组中的其余变量进
下面的代码每 60 秒删除文件夹“Images”中的文件,它可以工作,但是当文件夹为空时它会显示:警告:为 foreach() 提供的参数无效如果没有文件,如何解决这个问题,说“文件夹为空而不是那个警
我需要在两种不同的模式下运行,因此“if”(第二个稍后构建一个大的 csv) 下面对于单个实例运行正常,但在第二个 (*) 的加载时间上失败,因为在前 7k 行中的每一行上运行。 我想避免可怕的事情
我们可以使用以下两种方法实现类数组对象的迭代: let arrayLike = document.getElementsByClassName('dummy'); [].forEach.call(ar
我有这个代码 ... 它说: Attribute value invalid for tag forEach according to TLD 最佳答案 forEach标签不支持 valu
我在 SwiftUI 中有一个像这样的 ForEach: ForEach(entries) { (e: MyType) in NavigationLinkItem(entry: e) } 现在我
我无法在一个 Foreach 或 Foreach-Object 循环中使用多个命令 我的情况是—— 我有很多文本文件,大约 100 个。 所以他们被阅读 Get-ChildItem $FilePath
我必须从 json 文件(实际上是 2 个 json 文件)执行 ForEach,因此我执行 2 forEach,代码是 table { font-family: arial, sans-
我对编程很陌生,当我执行 forEach 函数时,我的应用程序返回错误。我的controller.js中有以下代码 $scope.ajaxRequest = A.Game.get({action: '
我是一名优秀的程序员,十分优秀!