gpt4 book ai didi

java - ZMQ 丢失事件在 jeromq scala 中传播

转载 作者:太空宇宙 更新时间:2023-11-04 10:31:18 26 4
gpt4 key购买 nike

我是 ZeroMQ 的新手,似乎在我的 begin() 方法中的循环中丢失了消息。

我想知道我是否遗漏了一个没有对消息进行排队的部分或其他内容?

当我在发布者上引发一个事件时,该事件向我的订阅者发送两条消息,中间有一个小间隙,我似乎没有收到转发的第二条消息。 我错过了什么?

class ZMQSubscriber[T <: Transaction, B <: Block](
socket: InetSocketAddress,
hashTxListener: Option[HashDigest => Future[Unit]],
hashBlockListener: Option[HashDigest => Future[Unit]],
rawTxListener: Option[Transaction => Future[Unit]],
rawBlockListener: Option[Block => Future[Unit]]) {
private val logger = BitcoinSLogger.logger

def begin()(implicit ec: ExecutionContext) = {
val context = ZMQ.context(1)

// First, connect our subscriber socket
val subscriber = context.socket(ZMQ.SUB)
val uri = socket.getHostString + ":" + socket.getPort

//subscribe to the appropriate feed
hashTxListener.map { _ =>
subscriber.subscribe(HashTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the transaction hashes from zmq")
}

rawTxListener.map { _ =>
subscriber.subscribe(RawTx.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw transactions from zmq")
}

hashBlockListener.map { _ =>
subscriber.subscribe(HashBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to the hashblock stream from zmq")
}

rawBlockListener.map { _ =>
subscriber.subscribe(RawBlock.topic.getBytes(ZMQ.CHARSET))
logger.debug("subscribed to raw block")
}

subscriber.connect(uri)
subscriber.setRcvHWM(0)
logger.info("Connection to zmq client successful")

while (true) {
val notificationTypeStr = subscriber.recvStr(ZMQ.DONTWAIT)
val body = subscriber.recv(ZMQ.DONTWAIT)
Future(processMsg(notificationTypeStr, body))
}
}

private def processMsg(topic: String, body: Seq[Byte])(implicit ec: ExecutionContext): Future[Unit] = Future {

val notification = ZMQNotification.fromString(topic)
val res: Option[Future[Unit]] = notification.flatMap {
case HashTx =>
hashTxListener.map { f =>
val hash = Future(DoubleSha256Digest.fromBytes(body))
hash.flatMap(f(_))
}
case RawTx =>
rawTxListener.map { f =>
val tx = Future(Transaction.fromBytes(body))
tx.flatMap(f(_))
}
case HashBlock =>
hashBlockListener.map { f =>
val hash = Future(DoubleSha256Digest.fromBytes(body))
hash.flatMap(f(_))
}
case RawBlock =>
rawBlockListener.map { f =>
val block = Future(Block.fromBytes(body))
block.flatMap(f(_))
}
}
}
}

最佳答案

所以这似乎是通过在 while 循环中使用 ZMsg.recvMsg() 而不是

来解决的
  val notificationTypeStr = subscriber.recvStr(ZMQ.DONTWAIT)
val body = subscriber.recv(ZMQ.DONTWAIT)

我不确定为什么会这样,但确实如此。这是我的 begin 方法现在的样子

    while (run) {
val zmsg = ZMsg.recvMsg(subscriber)
val notificationTypeStr = zmsg.pop().getString(ZMQ.CHARSET)
val body = zmsg.pop().getData
Future(processMsg(notificationTypeStr, body))
}
Future.successful(Unit)
}

关于java - ZMQ 丢失事件在 jeromq scala 中传播,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49970120/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com