- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
(注意:我使用的是.Net 4,而不是.Net 4.5,所以我不能使用TPL的DataflowBlock类。)
TL; DR版本
最终,我只是在寻找一种使用多个线程来处理顺序工作项的方式,该方式可以在最终输出中保留其顺序,而无需无限制的输出缓冲区。
动机
我现有的代码提供了一种用于处理多个数据块的多线程机制,其中一个I/O绑定(bind)线程(“供应商”)负责将数据块排队以进行处理。这些数据块构成工作项。
一个或多个线程(“处理器”)负责一次使一个工作项出队,它们进行处理,然后在将其下一个工作项出队之前将处理后的数据写入输出队列。
最终的I/O绑定(bind)线程(“消费者”)负责将已完成的工作项从输出队列中取出并写入最终目标。这些工作项(并且必须)按照入队的顺序来编写。我使用并发优先级队列实现了这一点,其中每个项目的优先级由其源索引定义。
我正在使用此方案对大型数据流进行一些自定义压缩,其中压缩本身相对较慢,但是未压缩数据的读取和压缩数据的写入相对较快(尽管受I/O限制)。
我以大约64K的较大块处理数据,因此管道的开销相对较小。
我当前的解决方案运行良好,但是它涉及6年前使用许多同步事件编写的自定义代码,并且设计似乎有些笨拙。因此,我已开始从事学术练习,以查看是否可以使用更现代的.Net库来重写它。
新设计
我的新设计使用了BlockingCollection<>
类,并且在某种程度上基于this Microsoft article。
特别是,请查看标题为“使用多个生产者进行负载平衡”的部分。我已经尝试过使用这种方法,因此我有多个处理任务,每个处理任务都从共享输入BlockingCollection中获取工作项,并将完成的项写入其自己的BlockingCollection输出队列中。
因为每个处理任务都有其自己的输出队列,所以我试图使用BlockingCollection.TakeFromAny()
使第一个可用的已完成工作项出队。
复用器问题
到目前为止,一切都很好,但是现在出现了问题。 Microsoft文章指出:
The gaps are a problem. The next stage of the pipeline, the Display Image stage, needs to show images in order and without gaps in the sequence. This is where the multiplexer comes in. Using the TakeFromAny method, the multiplexer waits for input from both of the filter stage producer queues. When an image arrives, the multiplexer looks to see if the image's sequence number is the next in the expected sequence. If it is, the multiplexer passes it to the Display Image stage. If the image is not the next in the sequence, the multiplexer holds the value in an internal look-ahead buffer and repeats the take operation for the input queue that does not have a look-ahead value. This algorithm allows the multiplexer to put together the inputs from the incoming producer queues in a way that ensures sequential order without sorting the values.
最佳答案
例如,在启动时创建一个项目池,例如1000。将它们存储在BlockingCollection中-“池队列”。
供应商从池队列中获取项目,从文件中加载它们,以序列号/任何内容加载,然后将它们提交给处理器线程池。
处理器完成其工作,然后将输出发送到多路复用器。多路复用器会完成所有杂乱的项目的存储,直到处理了较早的项目为止。
当多路复用器输出的任何物品完全消耗掉某个物品时,它们将返回到池队列中,以供供应商重复使用。
如果一个“慢项”确实需要大量处理,则多路复用器中的乱序收集将随着“快速项”在其他池线程中的通过而增加,但由于多路复用器实际上并未将其项馈送到它的输出,不补充池队列。
当池为空时,供应商将阻止该池,并且将无法再供应任何物品。
处理池输入中剩余的“快速项目”将被处理,然后除“慢速项目”外将停止处理。供应商被阻止,多路复用器在其集合中有[poolSize-1]个项目。不会使用额外的内存,不会浪费CPU,唯一发生的是对“慢速项目”的处理。
当“慢项”最终完成时,它将输出到多路复用器。
现在,多路复用器可以按要求的顺序输出所有[poolSize]项。随着这些项目的消耗,池再次被填满,现在可以从池中获取项目的供应商继续运行,再次读取其文件,将项目排队到处理器池中。
自动调节,不需要限制的缓冲区,没有内存失控。
编辑:我的意思是“不需要任何限制的缓冲区” :)
而且,没有GC保留-由于这些项目已被重复使用,因此它们不需要GC'ing。
关于c# - 管道,多路复用和无界缓冲,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15021469/
我正在使用 Assets 管道来管理我的 Grails 3.0 应用程序的前端资源。但是,似乎没有创建 CoffeeScript 文件的源映射。有什么办法可以启用它吗? 我的 build.gradle
我有一个我想要的管道: 提供一些资源, 运行一些测试, 拆资源。 我希望第 3 步中的拆卸任务运行 不管 测试是否通过或失败,在第 2 步。据我所知 runAfter如果前一个任务成功,则只运行一个任
如果我运行以下命令: Measure-Command -Expression {gci -Path C:\ -Recurse -ea SilentlyContinue | where Extensio
我知道管道是一个特殊字符,我需要使用: Scanner input = new Scanner(System.in); String line = input.next
我再次遇到同样的问题,我有我的默认处理方式,但它一直困扰着我。 有没有更好的办法? 所以基本上我有一个运行的管道,在管道内做一些事情,并想从管道内返回一个键/值对。 我希望整个管道返回一个类型为 ps
我有三个环境:dev、hml 和 qa。 在我的管道中,根据分支,阶段有一个条件来检查它是否会运行: - stage: Project_Deploy_DEV condition: eq(varia
我有 Jenkins Jenkins ver. 2.82 正在运行并想在创建新作业时使用 Pipeline 功能。但我没有看到这个列为选项。我只能在自由式项目、maven 项目、外部项目和多配置之间进
在对上一个问题 (haskell-data-hashset-from-unordered-container-performance-for-large-sets) 进行一些观察时,我偶然发现了一个奇
我正在寻找有关如何使用管道将标准输出作为其他命令的参数传递的见解。 例如,考虑这种情况: ls | grep Hello grep 的结构遵循以下模式:grep SearchTerm PathOfFi
有没有办法不因声明性管道步骤而失败,而是显示警告?目前我正在通过添加 || exit 0 来规避它到 sh 命令行的末尾,所以它总是可以正常退出。 当前示例: sh 'vendor/bin/phpcs
我们正在从旧的 Jenkins 设置迁移到所有计划都是声明性 jenkinsfile 管道的新服务器……但是,通过使用管道,我们无法再手动清除工作区。我如何设置 Jenkins 以允许 手动点播清理工
我在 Python 中阅读了有关 Pipelines 和 GridSearchCV 的以下示例: http://www.davidsbatista.net/blog/2017/04/01/docume
我有一个这样的管道脚本: node('linux'){ stage('Setup'){ echo "Build Stage" } stage('Build'){ echo
我正在使用 bitbucket 管道进行培训 这是我的 bitbucket-pipelines.yml: image: php:7.2.9 pipelines: default:
我正在编写一个程序,其中输入文件被拆分为多个文件(Shamir 的 secret 共享方案)。 这是我想象的管道: 来源:使用 Conduit.Binary.sourceFile 从输入中读取 导管:
我创建了一个管道,它有一个应该只在开发分支上执行的阶段。该阶段还需要用户输入。即使我在不同的分支上,为什么它会卡在这些步骤的用户输入上?当我提供输入时,它们会被正确跳过。 stage('Deplo
我正在尝试学习管道功能(%>%)。 当试图从这行代码转换到另一行时,它不起作用。 ---- R代码--原版----- set.seed(1014) replicate(6,sample(1:8))
在 Jenkins Pipeline 中,如何将工件从以前的构建复制到当前构建? 即使之前的构建失败,我也想这样做。 最佳答案 Stuart Rowe 还在 Pipeline Authoring Si
我正在尝试使用 执行已定义的作业构建 使用 Jenkins 管道的方法。 这是一个简单的例子: build('jenkins-test-project-build', param1 : 'some-
当我使用 where 过滤器通过管道命令排除对象时,它没有给我正确的输出。 PS C:\Users\Administrator> $proall = Get-ADComputer -filter *
我是一名优秀的程序员,十分优秀!