gpt4 book ai didi

Go:将 gob 与 zmq4 一起使用

转载 作者:行者123 更新时间:2023-12-01 21:13:53 25 4
gpt4 key购买 nike

我正在尝试将 gob 与 zmq4 一起使用套接字(来自 pebbe 的 zmq4 )。一个 zmq4 socket 没有 io 设备,这使得 gob 似乎无法直接读/写:

我不能使用 &client ( type **zmq4.Socket )作为类型 io.Writergob.NewEncoder 的参数中: zmq4.Socket没有实现 io.Writer (缺少写方法)

一个zmq4发送函数,SendMessage() , 接受 interface{} ,所以我用它来发送。

服务器端zmq4接收函数返回 string , []byte , []string[][]byte .我正在使用 RecvMessage()返回 []string .

可以写信给 bytes.Buffer ,发送该缓冲区,将其读取为 []string然后使用 gob 处理消息的内容部分。虽然当前的问题在于转换 []stringbytes.Buffer让 gob 能够 io.Read从中。听起来很基础,但到目前为止我已经尝试了很多方法,但都没有成功。这是当前的。当数据在发送之前和接收之后似乎相同时,问题显然是 gob 产生“缓冲区中的额外数据”,如 print陈述正在显示。

有没有更简单、更可行的方法?如果您有 zmq4 ,下面的代码是独立的,应该执行。

package main

import (
"bytes"
"encoding/gob"
"fmt"
"sync"
"time"

zmq "github.com/pebbe/zmq4"
)

type LogEntry struct {
ErrID int
Name string
Level string
LogStr string
}

// The client task
// --------------------------------------------------------------
func client_task(s string) {
var mu sync.Mutex
client, _ := zmq.NewSocket(zmq.DEALER)
defer client.Close()
client.SetIdentity(s)
client.Connect("tcp://localhost:5570")

go func() {

aLogEntry := &LogEntry{
ErrID: 1,
Name: "Client",
LogStr: "Log msg sent",
}

for request_nbr := 1; true; request_nbr++ {

var network bytes.Buffer
enc := gob.NewEncoder(&network)
err := enc.Encode(aLogEntry)
if err != nil {
fmt.Println("encode error:", err)
}

// Early decode test - this will influence subsequent gob
// behaviour so leave commented when caring about the sent
// data
// dec := gob.NewDecoder(&network)
// var aLogEntry2 *LogEntry
// err = dec.Decode(&aLogEntry2)
// if err != nil {
// fmt.Printf("client_task(DECODE ERROR) : %s\n\n", err)
// }
// fmt.Printf("client_task(TEST DECODE) %+v\n\n", aLogEntry)

mu.Lock()
// Replaced length by bytes Buffer method: 91
fmt.Printf("client_task(len) : %d\n\n", network.Len())
fmt.Printf("client_task(network) : %v\n\n", network)

client.SendMessage(network, 0)
mu.Unlock()
time.Sleep(5 * time.Second)
}
}()

// pause to allow server
for {
time.Sleep(100 * time.Millisecond)
}
}

// The server task
// --------------------------------------------------------------
func server_task() {

frontend, _ := zmq.NewSocket(zmq.ROUTER)
defer frontend.Close()
frontend.Bind("tcp://*:5570")

for {
msg, _ := frontend.RecvMessage(0)
// Added error reporting - does not report any error
// err does never get filled in here, never an error could get reported here
if err != nil {
fmt.Printf("RECV ERROR: %s", err)
}

// using WriteString to write the content portion of the
// received message to the bytes.Buffer for gob to process
var network bytes.Buffer
dec := gob.NewDecoder(&network)
network.WriteString(msg[1])

// Added length of the bytes Buffer: 285
// Before sending the bytes Buffer is: 91
// More than just msg[1] is written into the buffer ?
fmt.Printf("server_task(len): %d\n\n", network.Len())
fmt.Printf("server_task(msg[1]) : %s\n\n", msg[1])

var aLogEntry *LogEntry
err := dec.Decode(&aLogEntry)
if err != nil {
fmt.Printf("server_task(DECODE ERROR) : %s\n\n", err)
}

fmt.Printf("server_task(aLogEntry) %+v\n\n", aLogEntry)
}
}

func main() {
defer fmt.Println("main() done")

go client_task("1")
go server_task()

// Run for 5 seconds then quit
time.Sleep(5 * time.Second)
}

打印语句在客户端显示:
client_task(network) : {[62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0}

在服务器端:
server_task(msg[1]) : {[62 255 129 3 1 1 8 76 111 103 69 110 116 114 121 1 255 130 0 1 4 1 5 69 114 114 73 68 1 4 0 1 4 78 97 109 101 1 12 0 1 5 76 101 118 101 108 1 12 0 1 6 76 111 103 83 116 114 1 12 0 0 0 27 255 130 1 2 1 6 67 108 105 101 110 116 2 12 76 111 103 32 109 115 103 32 115 101 110 116 0] 0 0}

这看起来几乎一样。

结果是:
server_task(DECODE ERROR) : extra data in buffer
server_task(aLogEntry) <nil>

最佳答案

观察
ZeroMQ 原生 API 定义了这个属性:

When receiving messages a ZMQ_ROUTER socket shall prepend a message part containing the routing id of the originating peer to the message before passing it to the application.


解决方案
可以开始使用足够的 PUSH/PULL 可扩展的正式原型(prototype),而不是(对您的用例而言过于复杂) DEALER/ROUTER ,或者可能依赖于您的 ROUTER 的假设-node 永远不会 .RecvMessage( 0 )与其他内部多部分结构,但只有一个,匹配 [ <routing_id> | <[network]-payload> [ | ... ] ]的模板这不能得到强有力的保证,可以吗?
虽然不确定, pebbezmq4用于 ZeroMQ 的 go-wrapper 尝试或不实现所有 native API 功能和/或处理潜在差异(在没有任何用户级应用程序干预的情况下自动处理?)读取所有多部分组件并处理 ot-once 和/或 NULL - 终止字符串的处理,可能在 .Decode() 中发生冲突-方法。
最后但同样重要的是,如果您的代码依赖于 msg[1]内部有一个有效且符合所有约定的有效负载,如果我没有忽略一些低级黑客,我不知道,我看不到对案例的明确处理,当发起方( DEALER)没有交付任何这样的新消息发送到消费者端( ROUTER ),但 .RecvMessage( 0 ) -方法填充 msg并继续(空的 msg )朝向 .Decode() -方法,由于明显的原因,它必须在空或格式错误的情况下失败 msg ,一定不是吗?
我肯定会从 PUSH/PULL 开始替换,它不会在交付端注入(inject)前置的,现在是多帧组合,带有 routing_id及相关风险。
非阻塞模式从 .RecvMessage() 返回-方法在填充时仍会发生冲突 msg使用空数据,如果 PULL 内没有待处理的消息等待-RxQueue-buffers,它仍然会 panic .Decode() -方法。
万一 .RecvMessage( 0 ) - 方法调用实际上表现出阻塞模式接收,如果要将 ZeroMQ 从错误的根本原因分析中完全排除,则应该更加注意错误状态的检测和处理。更多自卫 .setsockopt() -所有已部署的 ZeroMQ 资源的设置( ZMQ_LINGER 和许多其他资源)也将提高健壮性和易错程度,即在应用程序崩溃可能对生产造成任何损害的情况下。
您可以尝试重现错误: here IDE 不幸错过了 ZeroMQ 部分。
Golang.org site失败:
go: finding module for package github.com/pebbe/zmq4
go: downloading github.com/pebbe/zmq4 v1.2.0
go: found github.com/pebbe/zmq4 in github.com/pebbe/zmq4 v1.2.0
# pkg-config --cflags -- libzmq
pkg-config: exec: "pkg-config": executable file not found in $PATH

Go build failed.

关于Go:将 gob 与 zmq4 一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61428228/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com