- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
刚刚使用 TPL DataFlow 编写了示例生产者消费者模式。我在这里有一些基本问题。
只有在生产者发布所有项目后,消费者才处于事件状态。异步是指生产任务和消费任务都可以并行运行。
给消费者一个 sleep 时间来验证它是否阻止其他数据项。它似乎是按顺序执行的,没有任何并行性。
我是不是做错了什么?
class AscDataBlocks
{
public Int64 start;
public Int64 End;
//public string ThreadName;
public void AscBufferProducer(ITargetBlock<Int64> targetAscTransform)
// This is using TPL DataBlock producer consumer pattern.
{
for (var i = start; i < End; i++)
{
Console.WriteLine("Postingasc : {0}", i);
targetAscTransform.Post(i);
}
}
public void ProcessDataBuffer(Int64 ascDataSet)
{
if (ascDataSet == 5)
// Testing if this will delay all the other data processing
Thread.Sleep(5000);
else
Thread.Sleep(500);
Console.WriteLine(ascDataSet);
}
// Demonstrates the consumption end of the producer and consumer pattern.
public async Task<Int64> AscTransConsumerAsync(IReceivableSourceBlock<Int64> source)
{
// Initialize a counter to track the number of bytes that are processed.
int status = 0;
// Read from the source buffer until the source buffer has no
// available output data.
while (await source.OutputAvailableAsync())
{
Int64 data;
source.TryReceive(out data);
ProcessDataBuffer(data);//This function processed the data buffer for ascollection and writes the data to the database.
// Increment the count of bytes received.
status = 1;
}
return status;
}
}
static void Main(string[] args)
{
AscDataBlocks ascb;
BufferBlock<Int64> ascbuffer;
System.Threading.Tasks.Task<Int64> ascProcessingconsumer;
CancellationToken ct = new CancellationToken();
CancellationTokenSource cts = new CancellationTokenSource();
ascb = new AscDataBlocks();
ascb.start = 1;
ascb.End = 100;
try
{
ascbuffer = new BufferBlock<Int64>(new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
CancellationToken = ct
});// Initiallize the buffer bloack
ascProcessingconsumer = ascb.AscTransConsumerAsync(ascbuffer); //Initialize the consumer.
//Post source data to the dataflow block.
ascb.AscBufferProducer(ascbuffer);
ascProcessingconsumer.Wait();
}
catch (Exception ex)
{
//foreach (var v in ex.InnerExceptions)
// Console.WriteLine("msg: " + v.Message);
}
}
最佳答案
The consumer is active only after all the items are posted from the producer. Does asynchronous means that both produce and consume tasks can run in parallel.
发生这种情况是因为您在消费者有机会开始之前很快就发布了所有商品。如果您添加了 Thread.Sleep(100)
,您会发现它们实际上是并行工作的。
Given a sleep time in consumer to verify if its blocking other data items. It seems to be executing sequentially and not getting any parallelism.
TPL 数据流并不神奇:它不会修改您的代码以并行执行。是您调用了一次 AscTransConsumerAsync()
,所以它实际上只执行一次,请不要感到惊讶。
TDF 确实支持并行处理,但您需要实际让它执行处理代码。为此,请使用其中一个执行 block 。在你的情况下 ActionBlock
似乎很合适。
如果使用它,则可以通过设置 MaxDegreeOfParallelism
将 block 配置为并行执行.当然,这样做意味着您需要确保处理委托(delegate)是线程安全的。
这样,AscTransConsumerAsync()
现在可能看起来像这样:
public async Task<Int64> AscTransConsumerAsync(ISourceBlock<Int64> source)
{
// counter to track the number of items that are processed
Int64 count = 0;
var actionBlock = new ActionBlock<Int64>(
data =>
{
ProcessDataBuffer(data);
// count has to be accessed in a thread-safe manner
// be careful about using Interlocked,
// for more complicated computations, locking might be more appropriate
Interlocked.Increment(ref count);
},
// some small constant might be better than Unbounded, depedning on circumstances
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
source.LinkTo(actionBlock, new DataflowLinkOptions { PropagateCompletion = true });
// this assumes source will be completed when done,
// you need to call ascbuffer.Complete() after AscBufferProducer() for this
await actionBlock.Completion;
return count;
}
关于.net - TPL 数据流生产者消费者模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/14255655/
对此感到疯狂,真的缺少一些东西。 我有webpack 4.6.0,webpack-cli ^ 2.1.2,所以是最新的。 在文档(https://webpack.js.org/concepts/mod
object Host "os.google.com" { import "windows" address = "linux.google.com" groups = ["linux"] } obj
每当我安装我的应用程序时,我都可以将数据库从 Assets 文件夹复制到 /data/data/packagename/databases/ .到此为止,应用程序工作得很好。 但 10 或 15 秒后
我在 cc 模式缓冲区中使用 hideshow.el 来折叠我不查看的文件部分。 如果能够在 XML 文档中做到这一点就好了。我使用 emacs 22.2.1 和内置的 sgml-mode 进行 xm
已结束。此问题不符合 Stack Overflow guidelines .它目前不接受答案。 我们不允许提出有关书籍、工具、软件库等方面的建议的问题。您可以编辑问题,以便用事实和引用来回答它。 关闭
根据java: public Scanner useDelimiter(String pattern) Sets this scanner's delimiting pattern to a patt
我读过一些关于 PRG 模式以及它如何防止用户重新提交表单的文章。比如this post有一张不错的图: 我能理解为什么在收到 2xx 后用户刷新页面时不会发生表单提交。但我仍然想知道: (1) 如果
看看下面的图片,您可能会清楚地看到这一点。 那么如何在带有其他一些 View 的简单屏幕中实现没有任何弹出/对话框/模式的微调器日期选择器? 我在整个网络上进行了谷歌搜索,但没有找到与之相关的任何合适
我不知道该怎么做,我一直遇到问题。 以下是代码: rows = int(input()) for i in range(1,rows): for j in range(1,i+1):
我想为重写创建一个正则表达式。 将所有请求重写为 index.php(不需要匹配),它不是以/api 开头,或者不是以('.html',或'.js'或'.css'或'.png'结束) 我的例子还是这样
MVC模式代表 Model-View-Controller(模型-视图-控制器) 模式 MVC模式用于应用程序的分层开发 Model(模型) - 模型代表一个存取数据的对象或 JAVA PO
我想为组织模式创建一个 RDF 模式世界。您可能知道,组织模式文档基于层次结构大纲,其中标题是主要的分组实体。 * March auxiliary :PROPERTIES: :HLEVEL: 1 :E
我正在编写一个可以从文件中读取 JSON 数据的软件。该文件包含“person”——一个值为对象数组的对象。我打算使用 JSON 模式验证库来验证内容,而不是自己编写代码。符合代表以下数据的 JSON
假设我有 4 张 table 人 公司 团体 和 账单 现在bills/persons和bills/companys和bills/groups之间是多对多的关系。 我看到了 4 种可能的 sql 模式
假设您有这样的文档: doc1: id:1 text: ... references: Journal1, 2013, pag 123 references: Journal2, 2014,
我有这个架构。它检查评论,目前工作正常。 var schema = { id: '', type: 'object', additionalProperties: false, pro
这可能很简单,但有人可以解释为什么以下模式匹配不明智吗?它说其他规则,例如1, 0, _ 永远不会匹配。 let matchTest(n : int) = let ran = new Rand
我有以下选择序列作为 XML 模式的一部分。理想情况下,我想要一个序列: 来自 my:namespace 的元素必须严格解析。 来自任何其他命名空间的元素,不包括 ##targetNamespace和
我希望编写一个 json 模式来涵盖这个(简化的)示例 { "errorMessage": "", "nbRunningQueries": 0, "isError": Fals
首先,我是 f# 的新手,所以也许答案很明显,但我没有看到。所以我有一些带有 id 和值的元组。我知道我正在寻找的 id,我想从我传入的三个元组中选择正确的元组。我打算用两个 match 语句来做到这
我是一名优秀的程序员,十分优秀!