- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
本文介绍了在并发编程中数据汇总的问题,并探讨了在并发环境下使用互斥锁和通道两种方式来保证数据安全性的方法.
首先,通过一个实例,描述了一个并发拉取数据并汇总的案例,并使用互斥锁来确保线程安全。然后,讨论了互斥锁的一些缺点,引出了通道作为一种替代方案,并介绍了通道的基本使用和特性。接下来,通过实例演示了如何使用通道来实现并发下的数据汇总.
最后,引用了etcd中使用通道实现协程并发下数据汇总的例子,展示了通道在实际项目中的应用.
在请求处理过程中,经常需要通过RPC接口拉取数据。有时候,由于数据量较大,单个数据拉取操作可能会导致整个请求的处理时间较长。为了加快处理速度,我们通常考虑同时开启多个协程并发地拉取数据。一旦多个协程并发拉取数据后,主协程需要汇总这些协程拉取到的数据,然后再返回结果。在这个过程中,往往涉及对共享资源的并发访问,为了保证线程安全性,通常会使用互斥锁。下面通过一个简单的代码来展示该过程
package main
import (
"fmt"
"sync"
"time"
)
type Data struct {
ID int
Name string
}
var (
// 汇总结果
dataList []Data
// 互斥锁
mutex sync.Mutex
)
func fetchData(page int, wg *sync.WaitGroup) {
// 模拟RPC接口拉取数据的耗时操作
time.Sleep(time.Second)
// 假设从RPC接口获取到了一批数据
data := Data{
ID: page,
Name: fmt.Sprintf("Data %d", page),
}
// 使用互斥锁保护共享数据的并发访问
mutex.Lock()
defer mutext.Unlock()
dataList = append(dataList, data)
wg.Done()
}
func main() {
var wg sync.WaitGroup
// 定义需要拉取的数据页数
numPages := 10
// 启动多个协程并发地拉取数据
for i := 1; i <= numPages; i++ {
wg.Add(1)
go fetchData(i, &wg)
}
// 等待所有协程完成
wg.Wait()
// 打印拉取到的数据
fmt.Println("Fetched data:")
for _, data := range dataList {
fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)
}
}
在上述示例中,我们定义了一个共享的 dataList 切片用于保存拉取到的数据。每个goroutine通过调用 fetchData 函数来模拟拉取数据的过程,并使用互斥锁 mutex 保护 dataList 的并发访问。主协程使用 sync.WaitGroup 等待所有协程完成数据拉取任务,然后打印出拉取到的数据。通过并发地拉取数据,并使用互斥锁保证线程安全,我们可以显著提高数据拉取的速度,并且确保数据的正确性和一致性.
回看上述实现,其实是涉及到了多个协程操作同一份数据,有可能导致线程安全的问题,然后这里是通过互斥锁来保证线程安全的。确实,使用互斥锁是可以保证线程安全的,但是也是存在一些缺点的,比如竞争和阻塞,两个协程同时竞争互斥锁时,只有一个协程能够获得锁,而其他协程则会被阻塞,这个就可能导致性能瓶颈,当然在这个场景下问题不大。其次就是代码的复杂性提高了,使用互斥锁需要仔细设计和管理,确保锁的正确获取和释放。这增加了代码的复杂性和维护成本,如果在代码中处理锁的方式不正确,可能会死锁,导致程序无法继续执行.
那我们其实就有疑问,在协程并发下数据汇总的场景,是否存在其他方式,不需要通过使用互斥锁,也能够保证线程安全呢? 其实还真有, Go 语言中的 channel 非常适用于这种情况。通过使用通道,我们可以实现线程安全的数据共享和同步,而无需显式地使用互斥锁。下面我们来了解一下 channel .
channel 在Go语言中是一种特殊的数据结构,用于协程之间的通信和同步。它类似于一个先进先出(FIFO)的队列,用于数据的传输和共享。在并发环境中,可以将数据发送到通道,也可以从通道中接收数据,而这两个操作都是线程安全的.
使用 channel 的优势在于它提供了内置的同步机制,无需显式地使用互斥锁来处理并发访问.
当一个协程向通道发送数据时,如果通道已满,发送操作会被阻塞,直到有其他协程从通道中接收数据释放空间。同样地,当一个协程从通道接收数据时,如果通道为空,接收操作也会被阻塞,直到有其他协程向通道发送数据.
同时,当多个协程同时访问通道时,Go运行时系统会自动处理协程之间的同步和并发访问的细节,保证数据的正确性和一致性。从而可以放心地在多个协程中使用通道进行数据的发送和接收操作,而不需要额外的锁或同步机制来保证线程安全.
因此,使用 channel 其实是可以避免常见的并发问题,如竞态条件和死锁,简化了并发编程的复杂性.
通过上面对 channel 的基本介绍,我们已经对 channel 有了基本的了解,其实可以粗略理解其为一个并发安全的队列。下面来了解下 channel 的基本语法,从而能够开始使用 channel .
channel基本操作分为创建 channel ,发送数据到 channel ,接收 channel 中的数据,以及关闭 channel 。下面对其进行简单展示
创建channel,使用make函数创建通道,通道的类型可以根据需要选择,例如 int 、 string 等
ch := make(chan int)
发送数据到channel:使用 <- 操作符将数据发送到通道中 。
ch <- data
接收channel中的数据: 使用 <- 操作符从通道中接收数据 。
result := <-ch
关闭channel, 使用 close 函数关闭通道。关闭通道后,仍然可以从通道接收数据,但无法再向通道发送数据 。
close(ch)
通过上面 channel 的四个基本操作,便能够实现在不同协程间线程安全得传递数据。最后通过一个例子,完整得展示 channel 的基本使用.
package main
import "fmt"
func main() {
ch := make(chan string) // 创建字符串通道
defer close(ch)
go func() {
ch <- "hello, channel!" // 发送数据到通道
}()
result := <-ch // 从通道接收数据
fmt.Println(result)
}
在这个示例中,我们创建了一个字符串通道 ch 。然后,在一个单独的协程中,我们向通道发送了字符串"hello, channel!"。最后,主协程从通道中接收数据,并将其打印出来.
通过使用通道,我们可以实现协程之间的数据传输和同步,确保数据的安全共享和线程安全性。通道的使用能够简化并发编程的复杂性,提供一种高效、可靠的方式来处理并发场景下的数据传递.
下面,我们使用 channel 来实现并发数据汇总,替换掉之前使用互斥锁来保证线程安全的实现:
package main
import (
"fmt"
"sync"
"time"
)
type Data struct {
ID int
Name string
}
func fetchData(page int, ch chan Data, wg *sync.WaitGroup) {
// 模拟 RPC 接口拉取数据的耗时操作
time.Sleep(time.Second)
// 假设从 RPC 接口获取到了一批数据
data := Data{
ID: page,
Name: fmt.Sprintf("Data %d", page),
}
ch <- data // 将数据发送到通道
wg.Done()
}
func main() {
var wg sync.WaitGroup
// 定义需要拉取的数据页数
numPages := 10
dataCh := make(chan Data, 10) // 创建用于接收数据的通道
// 启动多个协程并发地拉取数据
for i := 1; i <= numPages; i++ {
wg.Add(1)
go fetchData(i, dataCh, &wg)
}
go func() {
wg.Wait()
close(dataCh) // 关闭通道,表示数据已经全部发送完成
}()
// 从通道接收数据并汇总
var dataList []Data
for data := range dataCh {
dataList = append(dataList, data)
}
// 打印拉取到的数据
fmt.Println("Fetched data:")
for _, data := range dataList {
fmt.Printf("ID: %d, Name: %s\n", data.ID, data.Name)
}
}
在修改后的代码中,我们创建了一个用于接收数据的 dataCh 。每个协程通过将数据发送到该 channel 来完成数据的汇总。主协程通过从 channel 接收数据,并将其添加到 dataList 中实现数据的汇总过程。这种方式不需要显式地加锁和解锁,并且避免了互斥锁带来的复杂性和性能问题.
通过使用 channel ,我们能够以一种更直观、更安全的方式实现协程之间的数据传递和同步。 channel 在并发编程中起到了关键的作用,简化了并发操作的管理和实现。同时,它提供了内置的同步机制,保证了数据的正确性和一致性,避免了死锁和竞态条件的问题.
协程间的并发下汇总数据可以归类为协程间的数据传递这个场景。在这个场景中,多个协程并发地拉取数据,然后将数据汇总到一个共享的数据结构中。为了保证数据的正确性和一致性,需要使用某种机制来确保多个协程对共享数据的并发访问是安全的.
在原始的实现中,使用了互斥锁来保护共享数据的并发访问。互斥锁提供了互斥访问的机制,确保同一时间只有一个协程可以访问共享数据,从而避免了数据竞争和不一致性。这种方式在保证线程安全的同时,引入了锁的开销和复杂性.
而使用 channel 来实现协程间的安全数据传递可以更简洁和高效。每个协程可以将拉取到的数据通过 channel 发送到主协程,主协程通过接收 channel 中的数据来进行汇总。 channel 提供了并发安全的数据传递机制,协程之间的数据传输是同步和有序的。由于 channel 本身就提供了同步机制,不需要额外的锁和同步操作,能够更简洁地实现协程间的安全数据传递.
因此,如果需要在多个协程间实现数据传递,而且由此可能带来线程安全的问题,此时使用 channel 来实现是相对比较合适的.
假设我们需要对 etcd 进行性能测试,此时需要模拟大量并发请求,对 etcd 进行负载测试,并收集每个请求的执行时间、成功/失败状态等结果数据。然后主协程需要收集每一个请求的结果数据,并进行统计计算,生成相应的性能报告。基于此,能够计算出总请求数、请求成功率、平均执行时间、最慢/最快请求等统计信息,以及错误分布情况和慢速请求的详细信息.
从上面的讲述来看,其实我们可以大概想象出这个模型,多个协程并发执行,然后获取每个请求的结果数据。然后主协程需要收集汇总这些数据,基于此来生成性能报告。这个模型其实也就是我们上面所说的协程并发下的数据汇总,因此通过 channel 来实现协程间的数据传输,是非常合适的.
下面我们来看看 etcd 中对应的实现。 etcd 中存在一个 report 对象的实现,能够接受一系列的请求数据的结果,然后生成性能报告返回回去。结构体定义如下
type report struct {
results chan Result
stats Stats
}
func (r *report) Results() chan<- Result { return r.results }
// Result describes the timings for an operation.
type Result struct {
Start time.Time
End time.Time
Err error
}
func newReport(precision string) *report {
r := &report{
results: make(chan Result, 16),
}
return r
}
Result 结构体为单个测试的结果,而 report 结构体则用于整个测试过程的报告和统计信息。通过使用 results 通道,可以将每个测试的结果发送到 report 结构体中,以便进行统计和生成报告.
当进行性能压测时,首先通过 newReport 生成一个 report 对象,然后启动多个协程同时进行压测请求,每一个请求处理完成之后,便会生成一个处理结果,存储到 Result 对象当中。然后基于 report 对象的 Results 方法获取到对应的 channel ,将处理结果传输给主协程.
主协程便通过遍历 report 对象中的 results 变量对应的 channel ,汇总计算所有处理结果,基于此便能够生成压测结果和报告。下面来看其具体流程.
首先是创建一个 report 对象,然后启动多个协程来处理请求,将结果发送到 report 对象中的 results 对应的 channel 中.
// 这里NewReportSample方法,其实是对上面newReport方法的一个封装
r := NewReportSample("%f")
// 这里假设只有一个协程,模拟执行一系列的测试,并将测试结果发送到 Report 对象的 results 通道中。
go func() {
start := time.Now()
for i := 0; i < 5; i++ {
// 不真实进行请求,只是简单获取执行结果,将测试结果进行传输
end := start.Add(time.Second)
r.Results() <- Result{Start: start, End: end}
start = end
}
r.Results() <- Result{Start: start, End: start.Add(time.Second), Err: fmt.Errorf("oops")}
// 假设所有压测请求都执行完成了
close(r.Results())
}()
// 主协程 汇总所有的处理结果,然后生成压测报告
stats := <-r.Stats()
以上代码中, r 是通过 NewReportSample("%f") 创建的一个 Report 对象。然后,在一个单独的协程中,执行了一系列的测试,并将测试结果发送到 r.Results() 通道中.
这段代码的作用是模拟执行一系列的测试,并将测试结果发送到 Report 对象的 results 通道中。通过使用 r.Results() 方法返回的通道,可以将测试结果发送到报告对象中进行统计和处理.
接下来,主协程应该不断从 r.Results() 方法返回的通道中读取数据,汇总所有的处理结果,从而生成压测报告。这个方法其实是被封装在 r.Stas() 方法中,具体如下:
func (r *report) Stats() <-chan Stats {
// 创建一个channel
donec := make(chan Stats, 1)
// 启动一个协程来执行
go func() {
defer close(donec)
r.processResults()
s := r.stats.copy()
if r.sps != nil {
s.TimeSeries = r.sps.getTimeSeries()
}
// 执行完成的话,将结果返回
donec <- s
}()
// 返回channel
return donec
}
// Stats方法启动的协程中,实际运行的任务
func (r *report) processResults() {
st := time.Now()
// 遍历r.results方法中channel中的数据,然后执行处理流程
for res := range r.results {
r.processResult(&res)
}
// 后续执行一些具体的计算逻辑
}
上述代码是 report 结构体中的两个方法,其中 Stats() 方法返回一个只读的 Stats 通道。这个方法会在一个单独的协程中执行,并处理 results 通道中的测试结果。事实上就是汇总 channel 中的数据,然后进行一定的处理,然后返回.
本文通过介绍并发编程中的数据汇总问题,提出了使用互斥锁和通道来保证线程安全的方法。互斥锁适用于临界区保护和共享资源的互斥访问,但可能存在死锁和性能瓶颈的问题。相比之下,通道提供了更直观和安全的协程间通信方式,避免了锁的问题,并提供了更灵活的并发模式.
基于以上内容的介绍,大概能够明确下,在数据传递和汇总的场景下,使用 channel 来实现可能是更为合适的,能够提高代码的可读性和并发安全性。希望以上内容对你有所帮助.
最后此篇关于协程并发下数据汇总:除了互斥锁,还有其他方式吗?的文章就讲到这里了,如果你想了解更多关于协程并发下数据汇总:除了互斥锁,还有其他方式吗?的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
我怎样才能将 numberGrade 的值调高,如果它是 89.5,它会变成 90。numberGrade 被当作 double ,但将它设为 int 并不会向上或向下舍入。 public class
经过了漫长时间的移植和查询资料,得以解决一下嵌入式docker出现的问题,很多网上的资料全都是复制粘贴复制粘贴,找不到合适的解决方法让人很是苦恼,希望自己总结出的一些解决问题的经验给广大朋友减少一些
之前我是通过脚本来使用库的: 现在我使用 yarn 和 rollup 来构建带有下一个文件的项目。包.json: { "name": "maplib", "version": "1.0.0",
在 R 中,我正在尝试使用不同的窗口宽度对大向量(最多 400k 个元素)进行非常快速的滚动平均值,然后对于每个窗口宽度按每年的最大值汇总数据。下面的例子希望是清楚的。 我尝试了几种方法,到目前为止最
我想问一下我应该如何解决这个问题,因为我已经对这部分感到困惑和困惑。我已经使用这个命令全局安装了汇总 npm install --global rollup 但是,当我尝试运行“汇总”命令时,我应该期
我正在构建 javascript 库(更像是小部件),其中将包含一些 UI。我正在通过 javascript 向 DOM 添加 HTML 元素。要添加此 HTML,我有以下代码: async inse
我在显示一份报告时遇到了一些困难,该报告既可以将所有日期分组到月中,又可以对月中每天的“支出”求和。 我的 SQL 查询创建了这个:(检索所有日期) Date
我正在从事 Angular2 项目。我浏览了 Angular2 aot 文档并且能够生成 ngFactory 文件。我按照文档中的建议使用了 rollup js。我有一些非 es6 npm 包。我已经
我目前正在构建 Ionic 2-RC3 应用程序。但是自从我升级到 RC-1 后,我遇到了以下错误:(不确定它们是否保持不变,但你明白了)。 [15:16:17] rollup: Conflicti
Arabic, Egypt (ar_EG) -----------------------------阿拉伯语,埃及 Arabic, Israel (ar_IL) -----------------
我正在尝试汇总我的完全 es6 模块存储库,该存储库具有项目的本地导入/导出,以及对也是脚本或模块的依赖项的导入。 我也在尝试进行双重构建,通过汇总创建遗留的 iife 模块。 这仅适用于我的项目,没
我有一个由 DayTots 类对象组成的 VBA 集合(见下文) 我正在使用 For Each 遍历集合以创建一个 由汇总记录组成的新集合,基于日期 有什么方法可以用 Linq 做到这一点吗?我怀疑也
这是我第一次尝试理解/使用汇总。 我正在使用 this boilerplate因为它都是基于three.js,我也喜欢使用它。 到目前为止,我目前的(几乎肯定是不正确的)方法是: 从github下载样
我有两个 column_property 列,我想在 grandtotal 列中将它们加在一起。我希望能够根据 grandtotal 列进行排序和过滤。 如何对 subtotal 和 shipping
我收到以下错误消息: Error: Parse Error: Line 29: Unexpected token ILLEGAL 对应的代码行是 mobx 观察者装饰器: @observer clas
我真的坚持这一点,我真的很感激这方面的任何帮助。 目标是计算 Woocommerce 订单上每个类别中的项目数量,以便每个部分都可以以类别名称和产品数量为标题。例如: 汉堡 x 5 在此下方将是该订单
我正在从路由器收集传输数据;它提供每日,每月和每两分钟(间隔为120秒)的摘要。如果我在一天中(因此一个月中)重启路由器,则这些报告将不完整。但是,我仍然会得到间隔数据,并且可以对引导前后的记录进行汇
假设我有一个像这样的数据框: a b c d e f 1. 1 5 5 9 2 3 2. 4 7 3 1 4 6 3. 2 3 8 9
假设我有一个记录列表,我想通过取中位数来总结它。更具体地说,说我有 data Location = Location { x :: Double, y :: Double } 我有一个测量列表,我想将
我刚刚开始使用 AngularJS。我需要从 AngularJS 的书中升级这个购物车示例,以便所有 (items.price*item.quantity) 的总数显示在页面底部。实现它的推荐方法是什
我是一名优秀的程序员,十分优秀!