gpt4 book ai didi

c# - 寻找关于 MSDN TPL 数据流示例和并行化的清晰度

转载 作者:行者123 更新时间:2023-12-04 08:28:16 32 4
gpt4 key购买 nike

我在看this MSDN sample code并进行了一些调整以弄清楚如果受约束的资源(如演示中定义的)是“内存”并创建了一个、两个、四个或更多实例会发生什么。
例如, Main() 有一个定义的数量,可以增加或减少数量

networkResources.Post(new NetworkResource() { Name = "eth0" });
memoryResources.Post(new MemoryResource() { Name = "Memory01" });
fileResources.Post(new FileResource() { Name = "MFMHardDrive01" });
我还试图更多地了解 ActionBlock,特别是“资源被释放回其池”的行。该行是否只是对需要完成的额外工作的引用,因为它似乎只是像 while() 循环一样循环消息而没有结束,这使得这感觉类似于线程的 SpinWait()。
添加此循环引用循环的行(可能意味着从另一个队列中提取)如下。也许这是作为某种线程心跳来防止线程的低效释放/重新分配
networkResources.Post(data.Item1);
memoryResources.Post(data.Item2);
为了更容易弄清楚这一点,我在输出中添加了颜色以使线程更容易识别。
我的理解和期望是,通过添加更多的 MemoryResources,系统应该并行运行,类似于 Windows NT 管理,我可以将网卡分配给各个 CPU,以实现更有效的并行化。当我添加额外的 MemoryResource 项目时,在同一个域(网络)或许多源域(网络 + 磁盘)中不会发生并行化。
并行化的瓶颈在哪里定义?
enter image description here
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
// Represents a resource. A derived class might represent
// a limited resource such as a memory, network, or I/O
// device.
abstract class Resource
{
public object TeapotIsResource { get; set; }

public int TouchCount { get; set; }
}

// Represents a memory resource. For brevity, the details of
// this class are omitted.
class MemoryResource : Resource
{
public MemoryResource()
{
NetworkSequenceID = ran.Next();
}

Random ran = new Random((int)DateTime.Now.Ticks);
public int NetworkSequenceID { get; set; }

public object TeapotIsMemory { get; set; }

public string Name { get; set; }
}

// Represents a network resource. For brevity, the details of
// this class are omitted.
class NetworkResource : Resource
{
public NetworkResource()
{
NetworkSequenceID = ran.Next();
}

Random ran = new Random((int)DateTime.Now.Ticks);
public int NetworkSequenceID { get; set; }

public object TeapotIsNetwork { get; set; }

public string Name { get; set; }
}

// Represents a file resource. For brevity, the details of
// this class are omitted.
class FileResource : Resource
{
public FileResource()
{
NetworkSequenceID = ran.Next();
}

Random ran = new Random((int)DateTime.Now.Ticks);
public int NetworkSequenceID { get; set; }

public string Name { get; set; }
}

public static DateTime TimeForFile = DateTime.UtcNow;
public static DateTime TimeForNetwork = DateTime.UtcNow;

static int NetworkSequenceID = 0;
static void Main(string[] args)
{
Console.ForegroundColor = ConsoleColor.White;
colorMap.Add(867);


// Create three BufferBlock<T> objects. Each object holds a different
// type of resource.
var networkResources = new BufferBlock<NetworkResource>();
var fileResources = new BufferBlock<FileResource>();
var memoryResources = new BufferBlock<MemoryResource>();

// Create two non-greedy JoinBlock<T1, T2> objects.
// The first join works with network and memory resources;
// the second pool works with file and memory resources.

var joinNetworkAndMemoryResources =
new JoinBlock<NetworkResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false,
});

var joinFileAndMemoryResources =
new JoinBlock<FileResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false,
});

// Create two ActionBlock<T> objects.
// The first block acts on a network resource and a memory resource.
// The second block acts on a file resource and a memory resource.

var networkMemoryAction =
new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.

// Print a message.
PrintLine("Network worker", data.Item1.Name, data.Item1.NetworkSequenceID, networkResources.Count,
"using resources on", data.Item2.Name, data.Item2.NetworkSequenceID, memoryResources.Count);

// Simulate a lengthy operation that uses the resources.
Thread.Sleep(2000);

// Print a message.
PrintLine("Network worker", data.Item1.Name, data.Item1.NetworkSequenceID, networkResources.Count,
"finished using resources on", data.Item2.Name, data.Item2.NetworkSequenceID, memoryResources.Count);

data.Item1.TouchCount = data.Item1.TouchCount + 1;
data.Item2.TouchCount = data.Item2.TouchCount + 1;

if (data.Item2.TouchCount == 10)
{
Console.WriteLine(DateTime.UtcNow - TimeForNetwork);
}

networkResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});

var fileMemoryAction =
new ActionBlock<Tuple<FileResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
PrintLine("File worker", data.Item1.Name, data.Item1.NetworkSequenceID, fileResources.Count,
"using resources on", data.Item2.Name, data.Item2.NetworkSequenceID, memoryResources.Count);

// Simulate a lengthy operation that uses the resources.
//Thread.Sleep(new Random().Next(500, 2000));
Thread.Sleep(2000);

// Print a message.
PrintLine("File worker", data.Item1.Name, data.Item1.NetworkSequenceID, fileResources.Count,
"finished using resources on", data.Item2.Name, data.Item2.NetworkSequenceID, memoryResources.Count);

data.Item1.TouchCount = data.Item1.TouchCount + 1;
data.Item2.TouchCount = data.Item2.TouchCount + 1;

if (data.Item2.TouchCount == 10 )
{
Console.WriteLine(DateTime.UtcNow - TimeForFile);
}
// Release the resources back to their respective pools.
fileResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});

// Link the resource pools to the JoinBlock<T1, T2> objects.
// Because these join blocks operate in non-greedy mode, they do not
// take the resource from a pool until all resources are available from
// all pools.

networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);

fileResources.LinkTo(joinFileAndMemoryResources.Target1);
memoryResources.LinkTo(joinFileAndMemoryResources.Target2);

// Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
joinFileAndMemoryResources.LinkTo(fileMemoryAction);

// Populate the resource pools. In this example, network and
// file resources are more abundant than memory resources.

Console.WriteLine("ADDING: Allocating 4 Network interfaces");
networkResources.Post(new NetworkResource() { Name = "eth0" });
networkResources.Post(new NetworkResource() { Name = "eth1" });
//networkResources.Post(new NetworkResource() { Name = "eth2" });
//networkResources.Post(new NetworkResource() { Name = "eth3" });

Console.WriteLine("ADDING: Allocate a small memory resource");
memoryResources.Post(new MemoryResource() { Name = "Memory01" });
memoryResources.Post(new MemoryResource() { Name = "Memory02" });

Console.WriteLine("ADDING: Old disk technology simulator ");
fileResources.Post(new FileResource() { Name = "MFMHardDrive01" });
//fileResources.Post(new FileResource() { Name = "MFMHardDrive02" });

// Allow data to flow through the network for several seconds.
Thread.Sleep(10000000);
}

public static void MessageProcessor(string from)
{
}

private static void PrintLine(string workerType, string dataItem1Name, int networkSequenceID1, int count1, string onString, string item2Name, int networkSequenceID2, int count2)
{
// Many threads writing to console
lock (colorMap)
{
int fixAPICountingAtZero = 1;

Console.Write(workerType + " " + dataItem1Name + "/");
SetConsoleColor(networkSequenceID1);
Console.Write("(" + (count1 + fixAPICountingAtZero) + "): " + onString + item2Name + "/");
SetConsoleColor(networkSequenceID2);
Console.WriteLine(" (" + (count2 + fixAPICountingAtZero) + ")");
}
}

static List<int> colorMap = new List<int>();
static string SetConsoleColor(int anumber)
{

if (!colorMap.Contains(anumber))
{
colorMap.Add(anumber);
}

var original = Console.ForegroundColor;
Console.ForegroundColor = (ConsoleColor)colorMap.IndexOf(anumber);
Console.Write(anumber);
Console.ForegroundColor = original;

return "";
}
}

/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/

最佳答案

如果我正确理解问题。
我能看到的第一个问题是 ActionBlock如果未设置,构造函数可以采用默认为以下的选项。
ExecutionDataflowBlockOptions

When specific configuration options are not set, the followingdefaults are used:

OptionsDefault

  • TaskScheduler : Default
  • CancellationToken :None
  • MaxMessagesPerTask :DataflowBlockOptions.Unbounded (-1)
  • BoundedCapacity : DataflowBlockOptions.Unbounded (-1)
  • MaxDegreeOfParallelism : 1

所以,如果你没有在 ActionBlock 上设置选项明确地,由于 MaxDegreeOfParallelism 的事实,它将被限制为串行处理消息。默认为 1 示例
var blobk = new ActionBlock<T>(
x => {...},
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = <something that makes sense to your solution>
});
备注 :您当前的实现使用同步工作流。要真正打开 Dataflow 管道的性能和可扩展性,您需要确保使用 Func<Task>为任何 IO 绑定(bind)的工作负载委托(delegate)重载并利用 async 和 await 模式。

关于c# - 寻找关于 MSDN TPL 数据流示例和并行化的清晰度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65152242/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com