- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
这个问题是关于 go 和它的 net 包的。
我写了一个简单的 tcp 服务器处理一些 RPC。客户端使用 chan net.Conn
来管理客户端的所有 tcp 连接。服务器正在运行一个 tcp 监听器。
代码如下:客户:
package server
import (
"errors"
"log"
"net"
)
var tcpPool chan net.Conn
func NewClient(connections int, address string) {
tcpPool = make(chan net.Conn, connections)
for i := 0; i < connections; i++ {
conn, err := net.Dial("tcp4", address)
if err != nil {
log.Panic(err)
}
tcpPool <- conn
}
}
func SendMessage(msg []byte) ([]byte, error) {
conn := getConn()
log.Println("check conn: ", conn)
log.Println("msg: ", msg)
defer releaseConn(conn)
// send message
n, err := conn.Write(msg)
if err != nil {
log.Panic(err)
} else if n < len(msg) {
log.Panic(errors.New("Message did not send in full"))
}
// receiving a message
inBytes := make([]byte, 0)
for {
// bufsize 1024, read bufsize bytes each time
b := make([]byte, bufSize)
res, err := conn.Read(b)
log.Println("server sends >>>>>>>>>>>>: ", res)
if err != nil {
b[0] = ReError
break
}
inBytes = append(inBytes, b[:res]...)
// message finished.
if res < bufSize {
break
}
}
// check replied message
if len(inBytes) == 0 {
return []byte{}, errors.New("empty buffer error")
}
log.Println("SendMessage gets: ", inBytes)
return inBytes, nil
}
func releaseConn(conn net.Conn) error {
log.Println("return conn to pool")
select {
case tcpPool <- conn:
return nil
}
}
func getConn() (conn net.Conn) {
log.Println("Take one from pool")
select {
case conn := <-tcpPool:
return conn
}
}
服务器
func StartTCPServer(network, addr string) error {
listener, err := net.Listen(network, addr)
if err != nil {
return errors.Wrapf(err, "Unable to listen on address %s\n", addr)
}
log.Println("Listen on", listener.Addr().String())
defer listener.Close()
for {
log.Println("Accept a connection request.")
conn, err := listener.Accept()
if err != nil {
log.Println("Failed accepting a connection request:", err)
continue
}
log.Println("Handle incoming messages.")
go onConn(conn)
}
}
//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
inBytes := make([]byte, 0)
defer func() {
if e := recover(); e != nil {
//later log
if err, ok := e.(error); ok {
println("recover", err.Error())
}
}
conn.Close()
}()
// load msg
for {
buf := make([]byte, bufSize)
res, err := conn.Read(buf)
log.Println("server reading: ", res)
inBytes = append(inBytes, buf[:res]...)
if err != nil || res < bufSize {
break
}
}
var req RPCRequest
err := json.Unmarshal(inBytes, &req)
if err != nil {
log.Panic(err)
}
log.Println("rpc request: ", req)
var query UserRequest
err = json.Unmarshal(req.Query, &query)
if err != nil {
log.Panic(err)
}
log.Println("rpc request query: ", query)
// call method to process request
// good now we can proceed to function call
// some actual function calls gets a output
// outBytes, err := json.Marshal(out)
conn.Write(outBytes)
}
我认为这是非常标准的。但由于某种原因,我只能在客户端发送消息,然后第二个和第三个开始显示一些不正常。
1st ---> 成功,得到响应第二 ---> 客户端可以发送但没有返回,服务器端的日志显示没有消息第 3 次 ---> 如果我再次从客户端发送,它显示 broken pipe
错误..
最佳答案
有一些不好的处理方式。首先,确保来自服务器的消息完成的标志取决于 io.EOF,而不是长度
// message finished.
if res < 512 {
break
}
取而代之的是,读取器返回一个 io.EOF
是显示消息完成的唯一符号。其次,chan type
有阻塞的属性,不需要使用select。顺便说一下,你真的需要启动一个goroutine来释放。同样要求getConn
func releaseConn(conn net.Conn) {
go func(){
tcpPool <- conn
}()
}
func getConn() net.Conn {
con := <-tcpPool
return con
}
第三,listener不要关闭,下面的代码不好
defer listener.Close()
最重要的原因是在客户端,res, err := conn.Read(b)
这会收到服务器的回复。当没有回复时,它会阻止而不是 io.EOF,也不会出现其他错误。这意味着,您不能将持久通信部分装入函数 send() 中。您可以做一件事来使用 sendmsg() 发送,但永远不要使用 sendmsg() 来处理回复。你可以这样处理回复
var receive chan string
func init() {
receive = make(chan string, 10)
}
func ReceiveMessage(con net.Conn) {
// receiving a message
inBytes := make([]byte, 0, 1000)
var b = make([]byte, 512)
for {
// bufsize 1024, read bufsize bytes each time
res, err := con.Read(b)
if err != nil {
if err == io.EOF {
break
}
fmt.Println(err.Error())
break
}
inBytes = append(inBytes, b[:res]...)
msg := string(inBytes)
fmt.Println("receive msg from server:" + msg)
receive <- msg
}
}
我在你的代码中发现了几个问题,但我不知道是哪一个导致了你的失败。这是我根据您编写的代码并进行了一些修复。客户端.go:
package main
import (
"fmt"
"io"
"log"
"net"
)
var tcpPool chan net.Conn
var receive chan string
func init() {
receive = make(chan string, 10)
}
func NewClient(connections int, address string) {
tcpPool = make(chan net.Conn, connections)
for i := 0; i < connections; i++ {
conn, err := net.Dial("tcp", address)
if err != nil {
log.Panic(err)
}
tcpPool <- conn
}
}
func SendMessage(con net.Conn, msg []byte) error {
// send message
_, err := con.Write(msg)
if err != nil {
log.Panic(err)
}
return nil
}
func ReceiveMessage(con net.Conn) {
// receiving a message
inBytes := make([]byte, 0, 1000)
var b = make([]byte, 512)
for {
// bufsize 1024, read bufsize bytes each time
res, err := con.Read(b)
if err != nil {
if err == io.EOF {
break
}
fmt.Println(err.Error())
break
}
inBytes = append(inBytes, b[:res]...)
msg := string(inBytes)
fmt.Println("receive msg from server:" + msg)
receive <- msg
}
}
func getConn() net.Conn {
con := <-tcpPool
return con
}
func main() {
NewClient(20, "localhost:8101")
con := <-tcpPool
e := SendMessage(con, []byte("hello, i am client"))
if e != nil {
fmt.Println(e.Error())
return
}
go ReceiveMessage(con)
var msg string
for {
select {
case msg = <-receive:
fmt.Println(msg)
}
}
}
server.go
package main
import (
"fmt"
"io"
"net"
)
func StartTCPServer(network, addr string) error {
listener, err := net.Listen(network, addr)
if err != nil {
return err
}
for {
conn, err := listener.Accept()
if err != nil {
fmt.Println(err.Error())
continue
}
onConn(conn)
}
}
//onConn recieves a tcp connection and waiting for incoming messages
func onConn(conn net.Conn) {
inBytes := make([]byte, 0)
// load msg
for {
buf := make([]byte, 512)
res, err := conn.Read(buf)
if err != nil {
if err == io.EOF {
return
}
fmt.Println(err.Error())
return
}
inBytes = append(inBytes, buf[:res]...)
fmt.Println("receive from client:" + string(inBytes))
conn.Write([]byte("hello"))
}
}
func main() {
if e := StartTCPServer("tcp", ":8101"); e != nil {
fmt.Println(e.Error())
return
}
}
这有效,没有错误。顺便说一句,我看不到您在客户端或服务器端的哪个位置执行 con.Close()。关闭它是必要的。这意味着连接一旦从池中获取,就不会再放回去。当你认为一个连接结束时,关闭它并建立一个新的连接来填充池而不是把它放回去,因为把一个关闭的连接放回池中是一个致命的操作。
关于go - 多次使用TCP连接返回 'broken pipe'错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54454187/
我有管道输出 command_a | command_b | ... | command_n 输出是一个数字序列 4.2 -1 ... 0.2 我可以使用 gnuplot 绘制这些数字吗? (将 gn
关闭。这个问题是opinion-based 。目前不接受答案。 想要改进这个问题吗?更新问题,以便 editing this post 可以用事实和引文来回答它。 . 已关闭 6 年前。 Improv
我目前正在尝试连接父项和子项之间的管道。子级正在执行 sort 并对从父级接收到的输入进行排序。然后 children 写入一个单独的管道。每个进程有两个管道。一个这样 parent 可以将输入发送给
最近我正在研究 Python 中的并行编程工具。这是 os.pipe 和 multiprocessing.Pipe 之间的两个主要区别。(尽管它们被使用的场合) os.pipe是单向,multipro
我的站点上运行着 Yahoo Pipe,Romneyomics它使用来自 Delicious 和 Topsy 的饲料。Delicious 提要不提供“描述”字段,但 Topsy 提供,并且不仅仅是一个
我有一些使用管道的 Haskell 代码: module Main(main) where import Pipes a :: Producer Int IO () a = each [1..10]
所以标题几乎解释了我的问题。 stdout=subprocess.PIPE 和 stdout=PIPE 有什么区别?两者都来自 subprocess 模块,但为什么要使用一个而不是另一个呢?你如何使用
我有一个名为“myPipe”的自定义管道。我得到: The pipe 'myPipe' could not be found error 在我的单元测试中请建议在我的 .spec.ts 中导入和声明什
我有一个非常简单的 Python 3 脚本: f1 = open('a.txt', 'r') print(f1.readlines()) f2 = open('b.txt', 'r') print(f
我正在使用管道和 Python 的多处理模块在进程之间发送简单的对象。文档指出,如果管道已关闭,则调用 pipe.recv() 应该引发 EOFError。相反,我的程序只是阻塞在 recv() 上,
我在 perl 中见过这两种形式的管道 open。 一种是简单的管道打开 open FH,'| command'; 其他是安全管道打开 open FH,'|-','command'; 现在,第二个中的
我正在尝试对我的组件进行单元测试,但它立即生成一个错误: 类型错误:this.store$.pipe 不是函数 根据我的理解, createSpyObj 应该模拟状态。我有不同的选项选项,但没有一个起
我在这里看到这个帖子很多次了;但未能从命令中捕获故意错误。迄今为止我找到的最好的部分工作.. from Tkinter import * import os import Image, ImageTk
我正在编写一个简单的程序来解析编译器的输出并重新格式化任何错误消息,以便我们使用的 IDE(visual studio)可以解析它们。我们使用 nmake构建,它将使用如下命令行调用编译器: cc16
我有一个在coreos上运行的kubernetes集群。我希望在称为记录的Pod中的容器中运行journal2gelf https://github.com/systemd/journal2gelf。
为什么当管道中没有写入器时,读取器存在可以,但当管道中没有读取器时,写入器存在就不行? 。是不是因为reader需要等待,所以没有writer也没关系,而writer已经准备好数据了,即使数据准备好了
我在/etc/postfix/master.cf 中创建了一个 postfix 命令管道,其中包含一个在 STDOUT 和 STDERR 上产生输出的有效命令。在终端上调用时一切正常(因此在 STDO
我有一个命令需要来自管道的输入。例如,考虑著名的 cat 命令: $ echo Hello | cat Hello 假设我在 Perl 6 程序中有一个字符串,我想将其通过管道传递给命令: use v
因此,由于我们拥有各种设置,我习惯于遇到需要将一个可观察结果添加到另一个结果的地方,然后同时使用两者。我需要第一个在另一个之前完成的地方 getUser() .pipe( mergeMap
我在 Angular 5 中有一个非常简单的管道 import { Pipe, Injectable } from '@angular/core'; @Pipe({ name: "defaul
我是一名优秀的程序员,十分优秀!