gpt4 book ai didi

multithreading - 我如何并行化 GPars Actors?

转载 作者:行者123 更新时间:2023-12-04 08:27:44 24 4
gpt4 key购买 nike

我对 GPars Actors 的理解可能有偏差,所以如果我错了请纠正我。我有一个 Groovy 应用程序,可以轮询 Web 服务以查找作业。当找到一个或多个作业时,它会将每个作业发送到我创建的 DynamicDispatchActor,然后处理该作业。这些作业是完全独立的,不需要向主线程返回任何东西。当多个作业同时进入时,我希望它们被并行处理,但无论我尝试什么配置, Actor 都会先入先出地处理它们。

举个代码例子:

def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))

def actor = poolGroup.messageHandler {
when {Integer msg ->
println("I'm number ${msg} on thread ${Thread.currentThread().name}")
Thread.sleep(1000)
}
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

integers.each {
actor << it
}

打印出来:

I'm number 1 on thread Actor Thread 31I'm number 2 on thread Actor Thread 31I'm number 3 on thread Actor Thread 31I'm number 4 on thread Actor Thread 31I'm number 5 on thread Actor Thread 31I'm number 6 on thread Actor Thread 31I'm number 7 on thread Actor Thread 31I'm number 8 on thread Actor Thread 31I'm number 9 on thread Actor Thread 31I'm number 10 on thread Actor Thread 31

在每次打印输出之间稍作停顿。另请注意,每个打印输出都来自同一个 Actor/线程。

我想在这里看到的是,前 5 个数字会立即打印出来,因为线程池设置为 5,然后当这些线程空闲时,接下来的 5 个数字会被打印出来。我在这里完全偏离基地了吗?

最佳答案

要使其按您预期的方式运行,需要进行一些更改:

import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool

def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))

def closure = {
when {Integer msg ->
println("I'm number ${msg} on thread ${Thread.currentThread().name}")
Thread.sleep(1000)
stop()
}
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

def actors = integers.collect { poolGroup.messageHandler(closure) << it }
actors*.join()

完整要点文件:https://gist.github.com/wololock/7f1348e04f68710e42d2

那么输出将是:

I'm number 5 on thread Actor Thread 5
I'm number 4 on thread Actor Thread 4
I'm number 1 on thread Actor Thread 1
I'm number 3 on thread Actor Thread 3
I'm number 2 on thread Actor Thread 2
I'm number 6 on thread Actor Thread 3
I'm number 9 on thread Actor Thread 4
I'm number 7 on thread Actor Thread 2
I'm number 8 on thread Actor Thread 5
I'm number 10 on thread Actor Thread 1

现在让我们看看发生了什么变化。首先,在您之前的示例中,您只处理了一个 Actor 。您正确定义了 poolGroup,但随后您创建了一个单独的 actor 并将计算转移到这个单独的实例。要并行运行这些计算,您必须依赖 poolGroup 并且只向某些消息处理程序发送输入 - 池组将处理 actor 的创建及其生命周期管理。这就是我们所做的:

def actors = integers.collect { poolGroup.messageHandler(closure) << it }

它将创建一个从给定输入开始的 Actor 集合。池组将注意不超过指定的池大小。然后您必须加入每个 Actor ,这可以通过使用 groovy 的魔法来完成:actors*.join()。感谢应用程序将等待终止,直到所有参与者停止计算。这就是为什么我们必须将 stop() 方法添加到消息处理程序主体的 when 闭包中 - 没有它,它不会终止,因为 pool group 不知道 actors 是否这样做了工作 - 他们可能会等待,例如其他消息。

替代方案

我们还可以考虑使用 GPars 并行迭代的替代解决方案:

import groovyx.gpars.GParsPool

// This example is dummy, but let's assume that this processor is
// stateless and shared between threads component.
class Processor {
void process(int number) {
println "${Thread.currentThread().name} starting with number ${number}"
Thread.sleep(1000)
}
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Processor processor = new Processor()

GParsPool.withPool 5, {
integers.eachParallel { processor.process(it) }
}

在此示例中,您有一个无状态组件 Processor 和使用具有多个输入值的无状态 Processor 实例的并行计算。

我试图弄清楚您在评论中提到的情况,但我不确定单个参与者是否可以一次处理多条消息。 actor 的无状态仅意味着它在处理消息期间不会更改其内部状态,并且不得在 actor 范围内存储任何其他信息。如果我的推理不正确,如果有人能纠正我,那就太好了:)

希望对您有所帮助。最好!

关于multithreading - 我如何并行化 GPars Actors?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27325006/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com