- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我不确定以下是否可行,但我想以节流的方式在并行中调用一些操作,但要保持处理流程连续,而不要恢复使用计时器或循环/ sleep 周期。
到目前为止,我已经让它工作了,它从某个来源加载了大量输入……然后以受控方式并行处理它们并像下面这样循环。
static void Main(string[] args)
{
while(true) //Simulate a Timer Elapsing...
{
IEnumerable<int> inputs = new List<int>() {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
//Simulate querying database queue tables for next batch of entries
RunAllActions(inputs, 3); //Max 3 at a time.
}
}
static void RunAllActions(IEnumerable<int> inputs, int maxConcurrency)
{
var options = new ParallelOptions() {MaxDegreeOfParallelism = maxConcurrency};
Parallel.ForEach<int>(inputs, options, DoWork);
//Blocks here until all inputs are processed.
Console.WriteLine("Batch of Work Done!!!");
}
static void DoWork(int input)
{
Console.WriteLine("Starting Task {0}", input);
System.Threading.Thread.Sleep(3000);
Console.WriteLine("Finishing Task {0}", input);
}
我想知道的是,TPL 中是否有一个结构,我可以使用它来保持它始终运行...以便我可以用收到的 MessageQueue 替换“Timer Elapsing”和“Database Polling”事件。
以下是我想要实现的目标的粗略版本...我可以通过其他方式实现它,但我想知道 TPL 是否内置了这种模式。
internal class Engine
{
private MessageQueue mq;
private Queue<int> myInternalApplicationQueue;
public Engine()
{
//Message Queue to get new task inputs from
mq = new MessageQueue();
mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
// internal Queue to put them in.
myInternalApplicationQueue = new Queue<int>();
}
void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
{
//On MQ Receive, pop the input in a queue in my app
int input = (int) e.Message.Body;
myInternalApplicationQueue.Enqueue(input);
}
public void StartWorking()
{
//Once this gets called, it doesn't stop... it just keeps processing/watching that queue
//processing the tasks as fast as it's allowed while the app is running.
var options = new ParallelOptions() { MaxDegreeOfParallelism = 3 };
Parallel.KeepWorkingOnQueue<int>(myInternalApplicationQueue, options, DoWork);
// ^^^^^^^^^^^^^^^^^^ <----- THIS GUY
}
}
最佳答案
您可以使用 BlockingCollection<T>
处理这种类型的操作,这实际上是一种生产者/消费者场景。
基本上,您会设置一个 BlockingCollection<T>
并将其用作您的“制作人”。然后,您将拥有三个(或任意数量)消费者 任务(通常设置为长时间运行的任务)来处理元素(通过在标准 foreach 循环中调用 blockingCollection.GetConsumingEnumerable()
)。
然后您可以根据需要将项目添加到集合中,它们将不断得到处理。当你完全完成后,你会调用 BlockingCollection<T>.CompleteAdding
,这将导致 foreach 循环完成,整个过程停止。
作为旁注 - 您通常不想使用 Parallel.ForEach
在 GetConsumingEnumerable()
上来自BlockingCollection<T>
- 至少不会,除非你自己处理分区。通常最好使用多个任务并让每个任务按顺序迭代。原因是 Parallel.ForEach
中的默认分区方案会导致问题(它会等到数据“ block ”可用,而不是立即处理项目,并且“ block ”会随着时间的推移变得越来越大)。
关于c# - 如何使用 C# 4 中的 TPL 创建常量 Processing "Flow",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9232778/
我正在尝试实现 kotlin stateflow,但不知道它不起作用的原因。 当前输出: 验证34567 预期输出: 验证 34567 验证失败 package stateflowDuplicate
最近我发现了一个我无法理解的流行为。 问题描述 考虑这种情况:你有一个父 flow 并且在它的 collect 中,你将有一个“子”flow 并调用 .collect(),像这样: parentFlo
我想通过 UML2 事件图为以下事件建模: 执行操作 1。此操作产生两个输出参数: Object1和对象 2。 执行操作 2。此操作需要 Object2 作为输入参数。它不需要 Object1 作为输
在 Vaadin 8 中,您可以在 Tab(属于 TabSheet)上设置一个图标: tab#setIcon(...) 在 Vaadin Flow(目前使用 14.1)中,我不知道如何在 Tab(属于
有什么办法可以做到这一点吗?如果存储库仅在 .git/config 中包含 git-flow 指令(如 ),则该存储库是否被视为已初始化 .... [gitflow "branch"] mas
the 2nd collecting below does not collect until I remove the first one. also read from [Manuel Vi
在官方示例中,他们总是有 /* @flow */在页面顶部。现在这对现有项目来说很好,很有帮助,我想选择加入每个文件的流程。从头开始构建一个新项目我只想在任何地方进行流类型检查,而不必输入 /* @f
Vaadin 突然停止构建我的库并出现以下错误。我已经跳了 Vaadin 舞(还有很多其他的东西),但我现在没主意了。我尝试为生产构建库(但对于开发也失败了)。 我正在使用 Vaadin Flow。
我注意到很多人和例子都使用 Flows 作为 List<> 的包装器,例如像这样: @Query("SELECT * from some_model ORDER BY some_field") fun
Vaadin 突然停止构建我的库并出现以下错误。我已经跳了 Vaadin 舞(还有很多其他的东西),但我现在没主意了。我尝试为生产构建库(但对于开发也失败了)。 我正在使用 Vaadin Flow。
关闭。这个问题是opinion-based 。目前不接受答案。 想要改进这个问题吗?更新问题,以便 editing this post 可以用事实和引文来回答它。 . 已关闭 3 年前。 Improv
git flow init后,如何删除git flow模型? 如何从 .git/config 文件中删除所有相关配置? $ git flow init # force reset $ git flow
我运行了 git init 并在选择第一个分支时犯了一个错误。现在我想重新运行它来更改设置,但它再也不会问第一个问题。 Which branch should be used for bringing
我刚刚开始一份新工作,他们的代码管理一团乱。通常情况下这没什么问题,我可以应付,但在这个地方,情况就糟糕得离谱了。 他们使用 TFS...对此我无能为力。没有机会介绍 git,但我一直在阅读有关 gi
我的应用程序有更新问题。我不太明白 subview 之间的数据流是怎么回事。 这是我目前的结构 ViewModel:ObsebsrvableObject MainView 与 ObservedObje
为什么“flow check-contents”需要使用 < 将文件重定向到其中,而“flow suggest”则不需要?看起来 check-contents 应该假设命令行参数是要检查的文件路径。
最近我在 Git 中发现了工作流的三个概念: GitFlow GitHub 流程 GitLab 流程 我读过 the nice articles关于它,但我不太了解 GitLab Flow。 简单地说
我们刚刚改用 Hg Flow,但我们还没有弄清楚的一件事是如何最好地使用 Jenkins。理想情况下,我们将有一个构建和测试开发的作业,一个构建和测试默认作业和其他作业,这些作业在创建功能或发布分支时
将 Vaadin 12 与 FormLayout 一起使用和标签左侧的输入字段。 我想设置标签列的宽度。如何使用 Java API 实现这一点? 最佳答案 用于设置个人 FormItem标签宽度和/或
我正在使用 React-Flow 来可视化 React 中组件的树状层次结构。每当您在 React-Flow 中创建自定义节点时,您都可以像这样在树中使用它: ,我就可以做到这一点。 .有没有
我是一名优秀的程序员,十分优秀!