gpt4 book ai didi

c# - 最新的 NetMQ 多线程示例

转载 作者:行者123 更新时间:2023-12-03 12:45:47 27 4
gpt4 key购买 nike

我正在尝试创建单个服务器/多个客户端 NetMQ 测试程序(我相信这应该使用 Router-Dealer 模式),这将允许服务器使用不同线程上的工作程序处理唯一的客户端请求(并回复每个客户端单独),但我找不到适用于当前 nuget 版本(撰写本文时为 v4.0.1.5)的单个工作示例。

每个客户端都应该发送一个请求(其中包含它的 Id,因为它是 Dealer 套接字),并从服务器获得它的专用响应。服务器应该有一个线程池(IOCP 样式),它将使用消息并发回响应。

我找到了 Multithreaded example在 NetMQ/Samples github 中,这似乎是我正在寻找的,但它使用了 QueueDevice将客户与工作人员联系起来,这个类似乎在某个时候被删除了:

var queue = new QueueDevice(
"tcp://localhost:5555",
"tcp://localhost:5556",
DeviceMode.Threaded);

然后是Router-dealer example在 netmq.readthedocs.io,它使用 NetMQPoller , 但不编译,因为它调用了不存在的 RouterSocket.ReceiveMessage()NetMQSocket.Receive(out bool hasmore)已删除的方法:

public static void Main(string[] args)
{
// NOTES
// 1. Use ThreadLocal<DealerSocket> where each thread has
// its own client DealerSocket to talk to server
// 2. Each thread can send using it own socket
// 3. Each thread socket is added to poller

const int delay = 3000; // millis
var clientSocketPerThread = new ThreadLocal<DealerSocket>();
using (var server = new RouterSocket("@tcp://127.0.0.1:5556"))
using (var poller = new NetMQPoller())
{
// Start some threads, each with its own DealerSocket
// to talk to the server socket. Creates lots of sockets,
// but no nasty race conditions no shared state, each
// thread has its own socket, happy days.

for (int i = 0; i < 3; i++)
{
Task.Factory.StartNew(state =>
{
DealerSocket client = null;
if (!clientSocketPerThread.IsValueCreated)
{
client = new DealerSocket();
client.Options.Identity =
Encoding.Unicode.GetBytes(state.ToString());
client.Connect("tcp://127.0.0.1:5556");
client.ReceiveReady += Client_ReceiveReady;
clientSocketPerThread.Value = client;
poller.Add(client);
}
else
{
client = clientSocketPerThread.Value;
}
while (true)
{
var messageToServer = new NetMQMessage();
messageToServer.AppendEmptyFrame();
messageToServer.Append(state.ToString());
PrintFrames("Client Sending", messageToServer);
client.SendMultipartMessage(messageToServer);
Thread.Sleep(delay);
}
}, string.Format("client {0}", i), TaskCreationOptions.LongRunning);
}

// start the poller
poller.RunAsync();

// server loop
while (true)
{
var clientMessage = server.ReceiveMessage();
PrintFrames("Server receiving", clientMessage);
if (clientMessage.FrameCount == 3)
{
var clientAddress = clientMessage[0];
var clientOriginalMessage = clientMessage[2].ConvertToString();
string response = string.Format("{0} back from server {1}",
clientOriginalMessage, DateTime.Now.ToLongTimeString());
var messageToClient = new NetMQMessage();
messageToClient.Append(clientAddress);
messageToClient.AppendEmptyFrame();
messageToClient.Append(response);
server.SendMultipartMessage(messageToClient);
}
}
}
}

static void PrintFrames(string operationType, NetMQMessage message)
{
for (int i = 0; i < message.FrameCount; i++)
{
Console.WriteLine("{0} Socket : Frame[{1}] = {2}", operationType, i,
message[i].ConvertToString());
}
}

static void Client_ReceiveReady(object sender, NetMQSocketEventArgs e)
{
bool hasmore = false;
e.Socket.Receive(out hasmore);
if (hasmore)
{
string result = e.Socket.ReceiveFrameString(out hasmore);
Console.WriteLine("REPLY {0}", result);
}
}

也许这些很容易修复,但我是 NetMQ 新手,我只想获得适合 C# 工作的“多线程 Hello World”。

NetMQ API 与官方 0MQ 文档中的 API 略有不同(它更符合 C# 的习惯并使用新的低分配 .NET 结构),因此我很难理解如何正确移植原始 C 示例。

谁能为我指出包含此场景的最新示例的资源,或者帮助我了解使用新的 NetMQ API 执行此操作的正确方法是什么?

或者这个库是否已弃用,我应该使用 clrzmq4 .NET 包装器?据我所见,NetMQ 使用 Span<T>和 ref 结构围绕着代码库,所以它看起来应该是高性能的。

最佳答案

我已经解决了 router-dealer example 的几个问题在 netmq.readthedocs.io,现在代码可以运行了。我将它发布在这里,也许它会有用(也许@somdoron 或其他人可以检查它并更新文档)。

该示例没有完全满足我的要求(拥有一群公平获得工作的 worker ),我相信我必须实现单独的代理线程。

using NetMQ;
using NetMQ.Sockets;

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

internal static class Program
{
public static void Main(string[] args)
{
InitLogger();

const int NumClients = 5;
Log($"App started with {NumClients} clients\r\n");

using (var server = new RouterSocket("@tcp://127.0.0.1:5556"))
using (var poller = new NetMQPoller())
{
// Start some threads, each with its own DealerSocket
// to talk to the server socket. Creates lots of sockets,
// but no nasty race conditions no shared state, each
// thread has its own socket, happy days.

for (int i = 0; i < NumClients; i++)
{
var clientNo = i + 1;
Task.Factory.StartNew(async () =>
{
var rnd = new Random(31 + clientNo * 57);
var clientId = $"C{clientNo}";
DealerSocket client = new DealerSocket();

client.Options.Identity = Encoding.ASCII.GetBytes(clientId);
client.Connect("tcp://127.0.0.1:5556");
client.ReceiveReady += (sender, e) =>
{
var msg = e.Socket.ReceiveMultipartMessage(3);
var clientid = Encoding.ASCII.GetString(e.Socket.Options.Identity);
Log($"Client '{clientid}' received <- server", msg);
};
poller.Add(client);

while (true)
{
var messageToServer = new NetMQMessage();
messageToServer.Append(NetMQFrame.Empty);
messageToServer.Append($"Some data ({clientId}|{rnd.Next():X8})", Encoding.ASCII);

Log($"Client {clientId} sending -> server: ", messageToServer);
client.SendMultipartMessage(messageToServer);

await Task.Delay(3000);
}
}, TaskCreationOptions.LongRunning);
}

// start the poller
poller.RunAsync();

// server loop

while (true)
{
var clientMessage = server.ReceiveMultipartMessage();

if (clientMessage.FrameCount < 1)
{
Log("Server received invalid frame count");
continue;
}

var clientid = clientMessage[0];
Log($"Server received <- {clientid.ConvertToString()}: ", clientMessage);

var msg = clientMessage.Last().ConvertToString(Encoding.ASCII);
string response = $"Replying to '{msg}'";

{
var messageToClient = new NetMQMessage();
messageToClient.Append(clientid);
messageToClient.Append(NetMQFrame.Empty);
messageToClient.Append(response, Encoding.ASCII);
Log($"Server sending -> {clientid.ConvertToString()}: {response}");
server.SendMultipartMessage(messageToClient);
}

}
}
}

#region Poor man's console logging

static BlockingCollection<string> _logQueue;

// log using a blocking collection
private static void InitLogger()
{
_logQueue = new BlockingCollection<string>();
Task.Factory.StartNew(() =>
{
foreach (var msg in _logQueue.GetConsumingEnumerable())
{
Console.WriteLine(msg);
}
});
}

// thread id, timestamp, message
static void Log(string msg)
{
var thid = Thread.CurrentThread.ManagedThreadId;
var time = GetTimestamp().ToString("HH:mm:ss.fff");
_logQueue.Add($"[T{thid:00}] {time}: {msg}");
}

// log all frames in a message
static void Log(string operationType, NetMQMessage message)
{
var frames = string.Join(", ", message.Select((m, i) => $"F[{i}]={{{m.ConvertToString(Encoding.ASCII)}}}"));
Log($"{operationType} {message.FrameCount} frames: " + frames);
}

// if possible, use win10 high precision timestamps
static DateTime GetTimestamp()
{
if (Environment.OSVersion.Platform == PlatformID.Win32NT && Environment.OSVersion.Version.Major >= 10) // win 10
{
long fileTime;
WinApi.GetSystemTimePreciseAsFileTime(out fileTime);
return DateTime.FromFileTimeUtc(fileTime);
}

return DateTime.UtcNow;
}

static class WinApi
{
[DllImport("Kernel32.dll", CallingConvention = CallingConvention.Winapi)]
internal static extern void GetSystemTimePreciseAsFileTime(out long filetime);
}

#endregion
}

关于c# - 最新的 NetMQ 多线程示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63820558/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com