- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在尝试使用 TPL Dataflow
实现数据处理管道.但是,我对数据流比较陌生,并不完全确定如何正确使用它来解决我要解决的问题。
问题:
我正在尝试遍历文件列表并处理每个文件以读取一些数据,然后进一步处理该数据。每个文件大概是700MB
至 1GB
在尺寸方面。每个文件包含 JSON
数据。为了并行处理这些文件而不是运行内存,我正在尝试使用 IEnumerable<>
与 yield return
然后进一步处理数据。
获得文件列表后,我想同时处理最多 4-5 个文件。我的困惑来自:
IEnumerable<>
和 yeild return
与 async/await
和数据流。偶遇this answer通过 svick , 但仍然不确定如何转换 IEnumerable<>
至 ISourceBlock
然后将所有 block 链接在一起并跟踪完成情况。producer
会非常快(通过文件列表),但是 consumer
将非常慢(处理每个文件 - 读取数据,反序列化 JSON
)。在这种情况下,如何跟踪完成情况。LinkTo
吗?数据 block 连接各种 block 的功能?或使用 OutputAvailableAsync()
等方法和 ReceiveAsync()
将数据从一个 block 传播到另一个 block 。代码:
private const int ProcessingSize= 4;
private BufferBlock<string> _fileBufferBlock;
private ActionBlock<string> _processingBlock;
private BufferBlock<DataType> _messageBufferBlock;
public Task ProduceAsync()
{
PrepareDataflow(token);
var bufferTask = ListFilesAsync(_fileBufferBlock, token);
var tasks = new List<Task> { bufferTask, _processingBlock.Completion };
return Task.WhenAll(tasks);
}
private async Task ListFilesAsync(ITargetBlock<string> targetBlock, CancellationToken token)
{
...
// Get list of file Uris
...
foreach(var fileNameUri in fileNameUris)
await targetBlock.SendAsync(fileNameUri, token);
targetBlock.Complete();
}
private async Task ProcessFileAsync(string fileNameUri, CancellationToken token)
{
var httpClient = new HttpClient();
try
{
using (var stream = await httpClient.GetStreamAsync(fileNameUri))
using (var sr = new StreamReader(stream))
using (var jsonTextReader = new JsonTextReader(sr))
{
while (jsonTextReader.Read())
{
if (jsonTextReader.TokenType == JsonToken.StartObject)
{
try
{
var data = _jsonSerializer.Deserialize<DataType>(jsonTextReader)
await _messageBufferBlock.SendAsync(data, token);
}
catch (Exception ex)
{
_logger.Error(ex, $"JSON deserialization failed - {fileNameUri}");
}
}
}
}
}
catch(Exception ex)
{
// Should throw?
// Or if converted to block then report using Fault() method?
}
finally
{
httpClient.Dispose();
buffer.Complete();
}
}
private void PrepareDataflow(CancellationToken token)
{
_fileBufferBlock = new BufferBlock<string>(new DataflowBlockOptions
{
CancellationToken = token
});
var actionExecuteOptions = new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = ProcessingSize,
MaxMessagesPerTask = 1,
MaxDegreeOfParallelism = ProcessingSize
};
_processingBlock = new ActionBlock<string>(async fileName =>
{
try
{
await ProcessFileAsync(fileName, token);
}
catch (Exception ex)
{
_logger.Fatal(ex, $"Failed to process fiel: {fileName}, Error: {ex.Message}");
// Should fault the block?
}
}, actionExecuteOptions);
_fileBufferBlock.LinkTo(_processingBlock, new DataflowLinkOptions { PropagateCompletion = true });
_messageBufferBlock = new BufferBlock<DataType>(new ExecutionDataflowBlockOptions
{
CancellationToken = token,
BoundedCapacity = 50000
});
_messageBufferBlock.LinkTo(DataflowBlock.NullTarget<DataType>());
}
在上面的代码中,我没有使用 IEnumerable<DataType>
和 yield return
因为我不能将它与 async/await
一起使用.所以我将输入缓冲区链接到 ActionBlock<DataType>
这又发布到另一个队列。但是通过使用 ActionBlock<>
,我无法将其链接到下一个 block 进行处理,必须手动 Post/SendAsync
来自 ActionBlock<>
至 BufferBlock<>
.此外,在这种情况下,不确定如何跟踪完成情况。
这段代码有效,但是,我相信会有比这更好的解决方案,我可以链接所有 block (而不是 ActionBlock<DataType>
,然后从它发送消息到 BufferBlock<DataType>
)
另一种选择是转换 IEnumerable<>
至 IObservable<>
使用 Rx
, 但我又不太熟悉 Rx
并且不知道如何混合 TPL Dataflow
和 Rx
最佳答案
问题一
你插入一个IEnumerable<T>
使用 Post
将生产者添加到您的 TPL 数据流链中或 SendAsync
直接在消费者 block 上,如下:
foreach (string fileNameUri in fileNameUris)
{
await _processingBlock.SendAsync(fileNameUri).ConfigureAwait(false);
}
您还可以使用 BufferBlock<TInput>
,但在您的情况下,它实际上似乎是不必要的(甚至有害 - 请参阅下一部分)。
问题二
你更喜欢什么时候SendAsync
而不是 Post
?如果您的生产者运行速度快于 URI 的处理速度(并且您已表明是这种情况),并且您选择提供您的 _processingBlock
一个BoundedCapacity
,然后当 block 的内部缓冲区达到指定容量时,您的 SendAsync
将“挂起”直到缓冲区插槽释放,并且您的 foreach
循环将被限制。这种反馈机制会产生背压并确保您不会耗尽内存。
问题三
你绝对应该使用 LinkTo
在大多数情况下链接您的 block 的方法。不幸的是,由于 IDisposable
的相互作用,您的情况属于极端情况。和非常大的(潜在的)序列。所以你的完成将在缓冲区和处理 block 之间自动流动(由于 LinkTo
),但在那之后 - 你需要手动传播它。这很棘手,但可行。
我将用一个“Hello World”示例来说明这一点,其中生产者迭代每个字符,而消费者(这真的很慢)将每个字符输出到调试窗口。
备注:LinkTo
不存在。
// REALLY slow consumer.
var consumer = new ActionBlock<char>(async c =>
{
await Task.Delay(100);
Debug.Print(c.ToString());
}, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
var producer = new ActionBlock<string>(async s =>
{
foreach (char c in s)
{
await consumer.SendAsync(c);
Debug.Print($"Yielded {c}");
}
});
try
{
producer.Post("Hello world");
producer.Complete();
await producer.Completion;
}
finally
{
consumer.Complete();
}
// Observe combined producer and consumer completion/exceptions/cancellation.
await Task.WhenAll(producer.Completion, consumer.Completion);
这个输出:
Yielded HHYielded eeYielded llYielded llYielded ooYielded Yielded wwYielded ooYielded rrYielded llYielded dd
As you can see from the output above, the producer is throttled and the handover buffer between the blocks never grows too large.
EDIT
You might find it cleaner to propagate completion via
producer.Completion.ContinueWith(
_ => consumer.Complete(), TaskContinuationOptions.ExecuteSynchronously
);
... 就在 producer
之后定义。这允许您稍微减少生产者/消费者耦合 - 但最后您仍然必须记住观察 Task.WhenAll(producer.Completion, consumer.Completion)
.
关于c# - 在 TPL 数据流中使用 async/await 和 yield return,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35371931/
我有以下 TPL 数据流,当使用谓词过滤从 TransformBlock 传递到 ActionBlock 的项目时,它永远不会完成。 如果谓词对任何项目返回 false,则数据流挂起。 请有人提供一些
我是 smarty 的新手,所以我不确定这是否会导致我遗漏某些内容,但目前我正在尝试从 css 文件中提取一个类。 到目前为止,我已经设置了 2 个类 mainbackground 和 body,ma
如何强制 TPL 使用固定数量的线程?我知道 MaxDegreeOfParallelism 可用于设置上限,但我希望上限等于下限。这可能吗?怎么办? 因为我知道有人会问 =) 是的,我确定我想这样做,
我正在尝试使用 GXT 3.0 的 XTemplates(类似于 EXT),这里有 2 个具有以下关系的简单 java 对象: class A { String name; public
我刚刚将 Visual Studio 11 Beta 升级到新的 Visual Studio 2012 RC,并且在引用 TPL 数据流时遇到了问题。 首先,我尝试像以前一样通过从框架中添加引用来引用
我需要制作可扩展的流程。该进程主要有 I/O 操作和一些次要的 CPU 操作(主要是反序列化字符串)。该过程在数据库中查询 url 列表,然后从这些 url 中获取数据,将下载的数据反序列化为对象,然
我们有一个 TPL 数据流管道,其中包含以下 block : 变换 block A:Http post call 转换 block B:数据库 IO Transform Block C:一些单位转换数
我有一个 BufferBlock 来发布消息: public class DelimitedFileBlock : ISourceBlock { private ISourceBlock _s
我想在 Windows Azure 上的工作进程中使用 TPL。我希望在队列中添加一个 IJob,它有一个 Run 方法,因此工作线程将包括: 循环 将项目从队列中取出 使用TPL调用IJob.Run
我尝试创 build 计良好的 TPL 数据流管道,以优化系统资源的使用。我的项目是一个 HTML 解析器,它将解析后的值添加到 SQL Server DB 中。我已经有了 future 管道的所有方
我想为特定的内容类型覆盖 page.tpl.php。 我已经尝试过这些东西,对我没有任何作用。 page--article.tpl.php page--node--article.tpl.php pa
我已经完成了这个 POC 并验证了当你创建 4 个线程并在四核机器上运行它们时,所有的核心都会变得忙碌——所以,CLR 已经在不同的核心上有效地调度了线程,那么为什么要使用 TASK 类呢? 我同意
使用Visual Studio Concurrency Visualizer我现在明白为什么切换到 Parallel.For 没有任何好处:只有 9% 的时间机器忙于执行代码,其余的时间为 71% 的
我的代码中有以下使用 TPL 的设置: 我的类中的一个字段:private CancellationTokenSource _cancellationTokenSource; 每次我创建使用特定取消
我有一个 Windows 服务,它在经过漫长的过程后发送电子邮件。每当有表条目并处理它并将其发送出去时,该服务就会继续从数据库表中获取电子邮件数据。 目前它是一个多线程应用程序,我们在生产服务器中将线
刚刚使用 TPL DataFlow 编写了示例生产者消费者模式。我在这里有一些基本问题。 只有在生产者发布所有项目后,消费者才处于事件状态。异步是指生产任务和消费任务都可以并行运行。 给消费者一个 s
我正在使用 TPL,需要有一个长时间运行的 TPL 任务将结果发送到父 UI 线程而不终止。我已经尝试了几种方法,并且已经在谷歌上搜索了很多。有谁知道如何通过 TPL 实现这一点? 最佳答案 您可以传
我有一个以这种方式设置的 TPL 数据流: 下载字节数组 处理数据 将处理后的数据流式传输到另一个位置 此流程运行良好,但偶尔会在下载文件时遇到备份、连接问题等。我想做的是并行下载,但仍确保执行第 3
我有一个应该批量调用并压缩大文件的控制台应用程序,我想使用 DataFlow,除了完成之外一切正常 请考虑以下代码 public static void CompressFiles(string fo
当你生成多个任务时,像这样: for (int i = 0; i ((stateObject) => { tls.Value = (int)stateObject;
我是一名优秀的程序员,十分优秀!