gpt4 book ai didi

.net - 我怎样才能让一个 IPropagatorBlock 自动停止?

转载 作者:行者123 更新时间:2023-12-04 16:50:37 25 4
gpt4 key购买 nike

假设我从 TransformBlock<Uri, string> 开始(它本身是 IPropagatorBlock<Uri, string> 的实现)采用 Uri然后获取string中的内容(这是一个网络爬虫):

var downloader = new TransformBlock<Uri, string>(async uri => {
// Download and return string asynchronously...
});

一旦我有了字符串中的内容,我就会解析它以获取链接。由于一个页面可以有多个链接,我使用 TransformManyBlock<string, Uri> 将单个结果(内容)映射到多个链接:

// The discovered item block.
var parser = new TransformManyBlock<string, Uri>(s => {
// Parse the content here, return an IEnumerable<Uri>.
});

解析器的关键在于它可以传回一个空序列,表示没有更多的项目需要解析。

但是,这仅适用于树的一个分支(或网络的一部分)。

然后我将下载器链接到解析器,然后返回到下载器,如下所示:

downloader.LinkTo(parser);
parser.LinkTo(downloader);

现在,我知道我可以让一切都停止在 block 的外面(通过在其中一个上调用 Complete )但是我如何从内部发出它已完成的信号em> 积木?

还是我必须以某种方式自己管理这种状态?

现在,它只是挂起,因为在下载和解析所有内容后,下载程序 block 被饿死了。

这是一个完全包含的测试方法,它卡在对 Wait 的调用上:

[TestMethod]
public void TestSpider()
{
// The list of numbers.
var numbers = new[] { 1, 2 };

// Transforms from an int to a string.
var downloader = new TransformBlock<Tuple<int, string>, string>(
t => t.Item2 + t.Item1.ToString(CultureInfo.InvariantCulture),

// Let's assume four downloads to a domain at a time.
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);

// Gets the next set of strings.
var parser = new TransformManyBlock<string, Tuple<int, string>>(s => {
// If the string length is greater than three, return an
// empty sequence.
// This is the signal for this branch to stop.
if (s.Length > 3) return Enumerable.Empty<Tuple<int, string>>();

// Branch out.
return numbers.Select(n => new Tuple<int, string>(n, s));
},
// These are simple transformations/parsing, no need to not parallelize.
// The dataflow blocks will handle the task allocation.
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});

// For broadcasting to an action.
var parserBroadcaster = new BroadcastBlock<Tuple<int, string>>(
// Clone.
t => new Tuple<int, string>(t.Item1, t.Item2));

// Indicate what was parsed.
var parserConsumer = new ActionBlock<Tuple<int, string>>(
t => Debug.WriteLine(
string.Format(CultureInfo.InvariantCulture,
"Consumed - Item1: {0}, Item2: \"{1}\"",
t.Item1, t.Item2)));

// Link downloader to parser.
downloader.LinkTo(parser);

// Parser to broadcaster.
parser.LinkTo(parserBroadcaster);

// Broadcaster to consumer.
parserBroadcaster.LinkTo(parserConsumer);

// Broadcaster back to the downloader.
parserBroadcaster.LinkTo(downloader);

// Start the downloader.
downloader.Post(new Tuple<int, string>(1, ""));

// Wait on the consumer to finish.
parserConsumer.Completion.Wait();
}

它的输出(正如预期的那样,在它挂起之前)是:

Consumed - Item1: 1, Item2: "1"
Consumed - Item1: 2, Item2: "1"
Consumed - Item1: 1, Item2: "11"
Consumed - Item1: 2, Item2: "11"
Consumed - Item1: 1, Item2: "12"
Consumed - Item1: 2, Item2: "12"
Consumed - Item1: 1, Item2: "111"
Consumed - Item1: 2, Item2: "111"
Consumed - Item1: 1, Item2: "112"
Consumed - Item1: 2, Item2: "112"
Consumed - Item1: 1, Item2: "121"
Consumed - Item1: 2, Item2: "121"
Consumed - Item1: 1, Item2: "122"
Consumed - Item1: 2, Item2: "122"

最佳答案

TPL 数据流框架没有任何可以开箱即用的功能。这更像是一个状态管理问题。

也就是说,关键在于跟踪已下载的 URL 以及仍需要下载的 URL。

处理这个问题的理想位置是解析器 block ;这是您拥有内容(将转换为更多下载链接)和下载内容的 URL 的地方。

处理上面的示例,需要引入一种捕获下载结果以及下载它的 URI 的方法(我会使用 Tuple ,但它会使事情变得太困惑):

public class DownloadResult
{
public Tuple<int, string> Uri { get; set; }
public string Content { get; set; }
}

从那里开始,下载 block 几乎相同,只是更新为输出上述结构:

[TestMethod]
public void TestSpider2()
{
// The list of numbers.
var numbers = new[] { 1, 2 };

// Performs the downloading.
var downloader = new TransformBlock<Tuple<int, string>, DownloadResult>(
t => new DownloadResult {
Uri = t,
Content = t.Item2 +
t.Item1.ToString(CultureInfo.InvariantCulture)
},

// Let's assume four downloads to a domain at a time.
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
);

解析器的消费者不需要改变,但是确实需要更早声明,因为解析器必须向消费者发出信号它应该停止消费并且我们想要捕获它在传递给解析器的闭包中:

// Indicate what was parsed.
var parserConsumer = new ActionBlock<Tuple<int, string>>(
t => Debug.WriteLine(
string.Format(CultureInfo.InvariantCulture,
"Consumed - Item1: {0}, Item2: \"{1}\"",
t.Item1, t.Item2)));

现在必须引入状态管理器:

// The dictionary indicating what needs to be processed.
var itemsToProcess = new HashSet<Tuple<int, string>>();

起初,我想只用一个 ConcurrentDictionary<TKey, TValue> ,但是由于原子操作必须围绕删除 多次添加执行,因此它没有提供所需的内容。一个简单的 lock statement是这里的最佳选择。

解析器是变化最大的。它会正常解析项目,但也会自动执行以下操作:

  • 从状态机 (itemsToProcess) 中删除 URL
  • 向状态机添加新的 URL。
  • 如果在处理完上述内容后状态机中不存在任何项目,则通过调用 Complete method 向消费者 block 发出完成的信号。在 IDataflowBlock interface

看起来像这样:

// Changes content into items and new URLs to download.
var parser = new TransformManyBlock<DownloadResult, Tuple<int, string>>(
r => {
// The parsed items.
IEnumerable<Tuple<int, string>> parsedItems;

// If the string length is greater than three, return an
// empty sequence.
// This is the signal for this branch to stop.
parsedItems = (r.Uri.Item2.Length > 3) ?
Enumerable.Empty<Tuple<int, string>>() :
numbers.Select(n => new Tuple<int, string>(n, r.Content));

// Materialize the list.
IList<Tuple<int, string>> materializedParsedItems =
parsedItems.ToList();

// Lock here, need to make sure the removal from
// from the items to process dictionary and
// the addition of the new items are atomic.
lock (itemsToProcess)
{
// Remove the item.
itemsToProcess.Remove(r.Uri);

// If the materialized list has zero items, and the new
// list has zero items, finish the action block.
if (materializedParsedItems.Count == 0 &&
itemsToProcess.Count == 0)
{
// Complete the consumer block.
parserConsumer.Complete();
}

// Add the items.
foreach (Tuple<int, string> newItem in materializedParsedItems)
itemsToProcess.Add(newItem);

// Return the items.
return materializedParsedItems;
}
},

// These are simple transformations/parsing, no need to not
// parallelize. The dataflow blocks will handle the task
// allocation.
new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});

广播公司和链接是一样的:

// For broadcasting to an action.
var parserBroadcaster = new BroadcastBlock<Tuple<int, string>>(
// Clone.
t => new Tuple<int, string>(t.Item1, t.Item2));

// Link downloader to parser.
downloader.LinkTo(parser);

// Parser to broadcaster.
parser.LinkTo(parserBroadcaster);

// Broadcaster to consumer.
parserBroadcaster.LinkTo(parserConsumer);

// Broadcaster back to the downloader.
parserBroadcaster.LinkTo(downloader);

启动 block 时,状态机必须在根传递给 Post method 之前使用要下载的 URL 进行准备。 :

// The initial post to download.
var root = new Tuple<int, string>(1, "");

// Add to the items to process.
itemsToProcess.Add(root);

// Post to the downloader.
downloader.Post(root);

然后调用 Wait method Task class 上相同,现在无需挂起即可完成:

    // Wait on the consumer to finish.
parserConsumer.Completion.Wait();
}

关于.net - 我怎样才能让一个 IPropagatorBlock<TInput, TOutput> 自动停止?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13202328/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com