gpt4 book ai didi

go - 多次使用TCP连接返回 'broken pipe'错误

转载 作者:IT王子 更新时间:2023-10-29 02:35:43 25 4
gpt4 key购买 nike

这个问题是关于 go 和它的 net 包的。

我写了一个简单的 tcp 服务器处理一些 RPC。客户端使用 chan net.Conn 来管理客户端的所有 tcp 连接。服务器正在运行一个 tcp 监听器。

代码如下:客户:

package server

import (
"errors"
"log"
"net"
)

var tcpPool chan net.Conn

func NewClient(connections int, address string) {

tcpPool = make(chan net.Conn, connections)
for i := 0; i < connections; i++ {
conn, err := net.Dial("tcp4", address)
if err != nil {
log.Panic(err)
}
tcpPool <- conn
}
}

func SendMessage(msg []byte) ([]byte, error) {
conn := getConn()

log.Println("check conn: ", conn)
log.Println("msg: ", msg)

defer releaseConn(conn)
// send message
n, err := conn.Write(msg)
if err != nil {
log.Panic(err)
} else if n < len(msg) {
log.Panic(errors.New("Message did not send in full"))
}

// receiving a message
inBytes := make([]byte, 0)

for {
// bufsize 1024, read bufsize bytes each time
b := make([]byte, bufSize)
res, err := conn.Read(b)
log.Println("server sends >>>>>>>>>>>>: ", res)
if err != nil {
b[0] = ReError
break
}
inBytes = append(inBytes, b[:res]...)
// message finished.
if res < bufSize {
break
}
}
// check replied message
if len(inBytes) == 0 {
return []byte{}, errors.New("empty buffer error")
}
log.Println("SendMessage gets: ", inBytes)
return inBytes, nil
}

func releaseConn(conn net.Conn) error {
log.Println("return conn to pool")
select {
case tcpPool <- conn:
return nil
}
}

func getConn() (conn net.Conn) {
log.Println("Take one from pool")
select {
case conn := <-tcpPool:
return conn
}
}

服务器

func StartTCPServer(network, addr string) error {
listener, err := net.Listen(network, addr)
if err != nil {
return errors.Wrapf(err, "Unable to listen on address %s\n", addr)
}
log.Println("Listen on", listener.Addr().String())
defer listener.Close()
for {
log.Println("Accept a connection request.")
conn, err := listener.Accept()
if err != nil {
log.Println("Failed accepting a connection request:", err)
continue
}
log.Println("Handle incoming messages.")
go onConn(conn)
}
}

//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
inBytes := make([]byte, 0)
defer func() {
if e := recover(); e != nil {
//later log
if err, ok := e.(error); ok {
println("recover", err.Error())
}
}
conn.Close()
}()
// load msg
for {
buf := make([]byte, bufSize)
res, err := conn.Read(buf)
log.Println("server reading: ", res)
inBytes = append(inBytes, buf[:res]...)
if err != nil || res < bufSize {
break
}
}

var req RPCRequest
err := json.Unmarshal(inBytes, &req)
if err != nil {
log.Panic(err)
}
log.Println("rpc request: ", req)

var query UserRequest
err = json.Unmarshal(req.Query, &query)
if err != nil {
log.Panic(err)
}
log.Println("rpc request query: ", query)

// call method to process request
// good now we can proceed to function call
// some actual function calls gets a output
// outBytes, err := json.Marshal(out)
conn.Write(outBytes)
}

我认为这是非常标准的。但由于某种原因,我只能在客户端发送消息,然后第二个和第三个开始显示一些不正常。

1st ---> 成功,得到响应第二 ---> 客户端可以发送但没有返回,服务器端的日志显示没有消息第 3 次 ---> 如果我再次从客户端发送,它显示 broken pipe 错误..

最佳答案

有一些不好的处理方式。首先,确保来自服务器的消息完成的标志取决于 io.EOF,而不是长度

    // message finished.
if res < 512 {
break
}

取而代之的是,读取器返回一个 io.EOF 是显示消息完成的唯一符号。其次,chan type有阻塞的属性,不需要使用select。顺便说一下,你真的需要启动一个goroutine来释放。同样要求getConn

func releaseConn(conn net.Conn)  {
go func(){
tcpPool <- conn
}()
}

func getConn() net.Conn {
con := <-tcpPool
return con
}

第三,listener不要关闭,下面的代码不好

defer listener.Close()

最重要的原因是在客户端,res, err := conn.Read(b) 这会收到服务器的回复。当没有回复时,它会阻止而不是 io.EOF,也不会出现其他错误。这意味着,您不能将持久通信部分装入函数 send() 中。您可以做一件事来使用 sendmsg() 发送,但永远不要使用 sendmsg() 来处理回复。你可以这样处理回复

var receive chan string

func init() {
receive = make(chan string, 10)
}
func ReceiveMessage(con net.Conn) {
// receiving a message
inBytes := make([]byte, 0, 1000)
var b = make([]byte, 512)
for {
// bufsize 1024, read bufsize bytes each time
res, err := con.Read(b)
if err != nil {
if err == io.EOF {
break
}
fmt.Println(err.Error())
break
}
inBytes = append(inBytes, b[:res]...)
msg := string(inBytes)
fmt.Println("receive msg from server:" + msg)
receive <- msg
}
}

我在你的代码中发现了几个问题,但我不知道是哪一个导致了你的失败。这是我根据您编写的代码并进行了一些修复。客户端.go:

package main

import (
"fmt"
"io"
"log"
"net"
)

var tcpPool chan net.Conn
var receive chan string

func init() {
receive = make(chan string, 10)
}
func NewClient(connections int, address string) {
tcpPool = make(chan net.Conn, connections)
for i := 0; i < connections; i++ {
conn, err := net.Dial("tcp", address)
if err != nil {
log.Panic(err)
}
tcpPool <- conn
}
}

func SendMessage(con net.Conn, msg []byte) error {
// send message
_, err := con.Write(msg)
if err != nil {
log.Panic(err)
}
return nil
}

func ReceiveMessage(con net.Conn) {
// receiving a message
inBytes := make([]byte, 0, 1000)
var b = make([]byte, 512)
for {
// bufsize 1024, read bufsize bytes each time
res, err := con.Read(b)
if err != nil {
if err == io.EOF {
break
}
fmt.Println(err.Error())
break
}
inBytes = append(inBytes, b[:res]...)
msg := string(inBytes)
fmt.Println("receive msg from server:" + msg)
receive <- msg
}
}

func getConn() net.Conn {
con := <-tcpPool
return con
}

func main() {
NewClient(20, "localhost:8101")
con := <-tcpPool
e := SendMessage(con, []byte("hello, i am client"))
if e != nil {
fmt.Println(e.Error())
return
}
go ReceiveMessage(con)
var msg string
for {
select {
case msg = <-receive:
fmt.Println(msg)
}
}
}

server.go

package main

import (
"fmt"
"io"
"net"
)

func StartTCPServer(network, addr string) error {
listener, err := net.Listen(network, addr)
if err != nil {
return err
}
for {
conn, err := listener.Accept()
if err != nil {

fmt.Println(err.Error())
continue

}
onConn(conn)
}
}

//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
inBytes := make([]byte, 0)
// load msg
for {
buf := make([]byte, 512)
res, err := conn.Read(buf)
if err != nil {
if err == io.EOF {
return
}
fmt.Println(err.Error())
return
}
inBytes = append(inBytes, buf[:res]...)

fmt.Println("receive from client:" + string(inBytes))
conn.Write([]byte("hello"))
}
}

func main() {
if e := StartTCPServer("tcp", ":8101"); e != nil {
fmt.Println(e.Error())
return
}
}

这有效,没有错误。顺便说一句,我看不到您在客户端或服务器端的哪个位置执行 con.Close()。关闭它是必要的。这意味着连接一旦从池中获取,就不会再放回去。当你认为一个连接结束时,关闭它并建立一个新的连接来填充池而不是把它放回去,因为把一个关闭的连接放回池中是一个致命的操作。

关于go - 多次使用TCP连接返回 'broken pipe'错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54454187/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com