gpt4 book ai didi

node.js - TypeScript:使用ZeroMQ ROUTER/DEALER时占用大量内存

转载 作者:太空宇宙 更新时间:2023-11-04 01:47:24 25 4
gpt4 key购买 nike

我们最近开始为一种应用程序使用Typescript语言,在这种应用程序中,服务器与客户端之间应该进行队列通信。

为了实现队列通信,我们尝试将ZeroMQ库版本4.6.0用作npm软件包:npm install -g zeromq and npm install -g @types/zeromq

确切的情况:

客户端将通过ZeroMQ向服务器发送数千条消息。反过来,服务器将根据来自客户端的每个传入消息响应一些确认消息。客户端将根据确认消息发送下一条消息。

使用的ZeroMQ模式:

ROUTER/DEALER模式(我们不能使用任何其他模式)。

客户端代码:

    import Zmq  = require('zeromq');
let clientSocket : Zmq.Socket;
let messageQueue = [];

export class ZmqCommunicator
{
constructor(connString : string)
{
clientSocket = Zmq.socket('dealer');
clientSocket.connect(connString);
clientSocket.on('message', this.ReceiveMessage);
}

public ReceiveMessage = (msg) => {
var argl = arguments.length,
envelopes = Array.prototype.slice.call(arguments, 0, argl - 1),
payload = arguments[0];
var json = JSON.parse(msg.toString('utf8'));

if(json.type != "error" && json.type =='ack'){
if(messageQueue.length>0){
this.Dispatch(messageQueue.splice(0, 1)[0]);
}
}

public Dispatch(message) {
clientSocket.send(JSON.stringify(message));
}

public SendMessage(msg: Message, isHandshakeMessage : boolean){
// The if condition will be called only once for the first handshake message. For all other messages, the else condition will be called always.
if(isHandshakeMessage == true){
clientSocket.send(JSON.stringify(message));
}
else{
messageQueue.push(msg);
}
}
}


在服务器端,我们已经配置了 ROUTER套接字。
上面的代码非常简单。 SendMessage()函数实际上是为数千条消息而调用的,并且代码可以成功运行,但会消耗大量内存。

问题:

因为ZeroMQ的行为是异步的,所以每当客户端必须将新消息发送到ZeroMQ ReceiveMessage()时,客户端就必须等待回叫 ROUTER(从发送到方法Dispatch的流程中可以明显看出)。

基于我们对TypeScript的有限知识以及对ZeroMQ与TypeScript的使用,问题在于,因为运行打字稿代码的默认线程(创建所需的1000多个消息并发送到 SendMessage())继续执行(创建和发送更多消息)在发送第一条消息(本质上是握手消息)之后,除非所有1000+条消息都已创建并发送到 SendMessage()(这不是发送数据,而是排队数据,因为我们要解释路由器套接字发送的确认消息和仅基于我们要发送下一条消息的确认),调用不会进入 ReceiveMessage()回调方法。

就是说,只有在创建和调用 ReceiveMessage()的默认线程完成此操作后才对 SendMessage()进行调用,以处理1000多个消息,并且现在没有其他任务可以执行任何进一步的操作。

因为ZeroMQ不提供使用 ROUTER/DEALER发送/接收数据的任何同步机制,所以我们必须根据上述代码使用 messageQueue对象利用队列。

此机制将在内存中加载巨大的 messageQueue(带有1000多个消息),并且仅在默认线程最后到达 ReceiveMessage()调用之后才出队。如果说我们有10000甚至更多的消息要发送,情况只会变得更糟。

问题:


我们已经肯定了这种行为。因此,我们可以确定上面已经解释的理解。我们对TypeScript或ZeroMQ用法的理解是否有差距?
是否有像Typescript中的阻塞队列/大小限制数组这样的概念,它将在队列中接收有限的条目,并阻塞队列中的任何新添加,直到现有的添加为队列为止(本质上适用于默认线程暂停其处理直到该时间)调用 ReceiveMessage()会从队列中取消排队的条目)?
是否有任何同步的ZeroMQ方法(我们在C#的类似设置中使用了它,我们在ZeroMQ上合并并同步接收数据)?
在这种情况下使用多线程有何优势?不知道Typescript是否在很大程度上支持多线程。


注意:我们在许多论坛上进行了搜索,并且在任何地方都没有任何潜在客户。上面的描述可能在一个问题中有多个问题(违反stackoverflow论坛的规则);但对我们而言,所有这些问题都与在Typescript中有效使用ZeroMQ相互关联。

期待从社区获得一些线索。

最佳答案

欢迎来到ZeroMQ

如果这是您第一次阅读有关ZeroMQ的内容,请先阅读5秒钟,以了解[ZeroMQ hierarchy in less than a five seconds]部分中的主要概念差异。




  1)...我们对TypeScript或ZeroMQ用法的理解是否有差距?


尽管我不能担任TypeScript部分的工作,但让我提及一些细节,这可能有助于您前进。虽然ZeroMQ主要是一种无代理的异步信令/消息框架,但它具有多种使用方式,并且有一些工具可以在应用程序代码和ZeroMQ Context() -instance之间加强同步和异步协作,这是基础所有服务设计。

本机API提供了一种方法,用于定义是否应阻止相应的调用,直到能够完成跨Context()实例边界的消息处理为止,或者相反,如果调用应服从ZMQ_DONTWAIT并且异步地将控制权返回给调用者,而与操作(完成)无关。

作为其他技巧,可以选择配置ZMQ_SND_HWM + ZMQ_RCV_HWM和其他相关的.setsockopt()选项,以便满足特定的阻止/静默删除行为。




  因为ZeroMQ不提供任何发送/接收数据的同步机制


好吧,ZeroMQ API确实为同步调用.send()/.recv()方法提供了方法,在该方法中,调用者被阻止,直到任何可行的消息可以传递到/ cc引擎的控制域中为止。

显然,TypeScript语言绑定/包装负责将这些本机API服务公开给您。




  3)是否有任何同步ZeroMQ方法(我们在C#的类似设置中使用了它,我们在ZeroMQ上合并并同步接收数据)?


是的,有几个这样的:
-如果未由Context()标志指示,则本机API会阻塞,直到可以发送消息为止
-本地API提供了一个ZMQ_DONTWAIT对象,如果给定Poller()作为.poll()持续时间说明符,它可以-1等待所寻求的事件,从而阻止调用者,直到出现任何此类事件并出现在long-实例。

同样,TypeScript语言绑定/包装负责将这些本机API服务公开给您。




  ...大量内存消耗...


好吧,这可能表明资源管理不善。一旦被分配,ZeroMQ消息在适当的情况下也应变为Poller() -d。如果资源被系统地释放并从内存中释放出来,请检查您的TypeScript代码和TypeScript语言绑定/包装源。

关于node.js - TypeScript:使用ZeroMQ ROUTER/DEALER时占用大量内存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50969097/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com