gpt4 book ai didi

c++ - HPUX 上的套接字未收到完整数据

转载 作者:行者123 更新时间:2023-11-30 17:51:45 25 4
gpt4 key购买 nike

我真的不明白这里出了什么问题,所以我希望有人能发现我错过的东西。

我正在编写一个用户守护程序,它接受我用 java 开发的客户端。目前该客户端仅连接并发送用户名密码。我在 cygwin 下开发了代码并且它可以工作。守护进程发送其简介,然后客户端发送用户名和密码,守护进程通过断开客户端连接或发送 OK(尚未完成)进行响应。

当我使用 cygwin 测试它时,它可以在本地主机上运行。我将代码移植到HPUX上,客户端可以连接,并且还收到守护进程的引入。现在,当客户端发送其用户名和密码时,它不再起作用。守护进程只接收一个字节,当它尝试再次读取时,我得到 -1 作为 EAGAIN 的结果,仅此而已。客户端没有显示任何错误,守护进程端也没有显示任何错误。当我使用 gdb 单步执行代码时,消息已完全接收。 :(

我使用的代码是这样的,如果需要更多信息,我可以添加它:

int TCPSocket::receive(SocketMessage &oMessage, int nBytes)
{
int max = getSendBufferSize();

if(nBytes != -1 && nBytes < max)
max = nBytes;

SocketMessage sb(max);
int rcv_bytes = 0;
int total = 0;
int error = 0;

while(1)
{
rcv_bytes = ::recv(getSocketId(), &sb[0], sb.size(), 0);
error = errno;
FILE_LOG(logDEBUG4) << "Received on socket: " << mSocketId << " bytes: " << rcv_bytes << " expected:" << sb.size() << " total: " << total << " errno: " << error;

if(rcv_bytes == -1)
{
if(error == EAGAIN || error == EWOULDBLOCK)
return total;

throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection error!", __FILE__, __LINE__);
}

//if(rcv_bytes == 0)
// throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection has been closed!");

total += rcv_bytes;
oMessage.insert(oMessage.end(), sb.begin(), sb.begin()+rcv_bytes);
}

return total;
}

日志的输出是这样的:

16:16:04.391 DEBUG4: Received on socket: 4 bytes: 1  expected:32768 total: 0 errno: 2
16:16:04.391 DEBUG4: Received on socket: 4 bytes: -1 expected:32768 total: 1 errno: 11

那么剩下的 30 个字节在哪里?为什么没有返回?

更新

这只是实际接收数据的代码部分。套接字类本身只处理原始套接字,没有任何协议(protocol)。该协议(protocol)是在单独的类中实现的。该接收函数应该抓取网络上可用的尽可能多的字节并将其放入缓冲区(SocketMessage)中。字节数是多个消息还是单个消息的一部分并不重要,因为控制类将从(可能)部分消息流中构造实际的消息 block 。因此,第一个调用仅接收一个字节不是问题,因为如果消息未完成,调用者将在循环中等待,直到更多数据到达并且消息完成。因为可能有多个客户端,所以我使用非阻塞套接字。我不想处理单独的线程,因此我的服务器正在复用连接。这里的问题是接收只接收一个字节,而我知道应该有更多。错误代码 EAGAIN 已被处理,当接下来进入接收时,它应该获得更多字节。即使网络只传输一个字节,消息的其余部分仍然应该接下来到达,但事实并非如此。 select 等待套接字接收数据 block ,就好像那里什么也没有一样。当我在 dbg 中运行相同的代码并单步执行时,它就可以工作了。当我再次与同一个客户端连接时,突然收到更多字节。当我使用 localhost 与 cygwin 使用相同的代码时,它工作正常。

更新

这是完整的代码。

Main.cpp

mServerSocket = new TCPSocket(getListeningPort());
mServerSocket->bindSocket();
mServerSocket->setSocketBlocking(false);
mServerSocket->listenToClient(0);

setupSignals();
while(isSIGTERM() == false)
{
try
{
max = prepareListening();

//memset(&ts, 0, sizeof(struct timespec));
pret = pselect(max+1, &mReaders, &mWriters, &mExceptions, NULL, &mSignalMask);
error_code = errno;
if(pret == 0)
{
// Timeout occured, but we are not interested in that for now.
// Currently this shouldn't happen anyway.
continue;
}
processRequest(pret, error_code);

}
catch (SocketException &excp)
{
removeClientConnection(findClientConnection(excp.getTCPSocket()));
}
} // while sigTERM


BaseSocket.cpp:

#ifdef UNIX

#include <sys/socket.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>

#endif

#include "support/exceptions/socket_exception.h"
#include "support/logging/simple_log.h"
#include "support/network/base_socket.h"

using namespace std;

BaseSocket::BaseSocket(void)
{
mSocketId = -1;
mSendBufferSize = MAX_SEND_LEN;
}

BaseSocket::BaseSocket(int pNumber)
{
mSocketId = -1;
mPortNumber = pNumber;
mBlocking = 1;
mSendBufferSize = MAX_SEND_LEN;

try
{
if ((mSocketId = ::socket(AF_INET, SOCK_STREAM, 0)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::CONSTRUCTOR, errno, "Socket", "unix: error in socket constructor", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}

/*
set the initial address of client that shall be communicated with to
any address as long as they are using the same port number.
The clientAddr structure is used in the future for storing the actual
address of client applications with which communication is going
to start
*/
mClientAddr.sin_family = AF_INET;
mClientAddr.sin_addr.s_addr = htonl(INADDR_ANY);
mClientAddr.sin_port = htons(mPortNumber);
updateSendBufferSize(MAX_SEND_LEN);
}

void BaseSocket::updateSendBufferSize(int nNewSize)
{
mSendBufferSize = getSendBufferSize();
if(mSendBufferSize > nNewSize)
mSendBufferSize = nNewSize;
}

BaseSocket::~BaseSocket(void)
{
close();
}

void BaseSocket::setSocketId(int socketFd)
{
mSocketId = socketFd;
}

int BaseSocket::getSocketId()
{
return mSocketId;
}

// returns the port number
int BaseSocket::getPortNumber()
{
return mPortNumber;
}

void BaseSocket::setDebug(int debugToggle)
{
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_DEBUG, (char *) &debugToggle, sizeof(debugToggle)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set debug", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}

void BaseSocket::setReuseAddr(int reuseToggle)
{
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_REUSEADDR, (char *) &reuseToggle,
sizeof(reuseToggle)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set reuse address", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}

void BaseSocket::setKeepAlive(int aliveToggle)
{
try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_KEEPALIVE, (char *) &aliveToggle,
sizeof(aliveToggle)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set keep alive", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}

void BaseSocket::setLingerSeconds(int seconds)
{
struct linger lingerOption;

if (seconds > 0)
{
lingerOption.l_linger = seconds;
lingerOption.l_onoff = 1;
}
else
lingerOption.l_onoff = 0;

try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (char *) &lingerOption,
sizeof(struct linger)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set linger seconds", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}

void BaseSocket::setLingerOnOff(bool lingerOn)
{
struct linger lingerOption;

if (lingerOn)
lingerOption.l_onoff = 1;
else
lingerOption.l_onoff = 0;

try
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (char *) &lingerOption,
sizeof(struct linger)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set linger on/off", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}

void BaseSocket::setSendBufferSize(int sendBufSize)
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_SNDBUF, (char *) &sendBufSize, sizeof(sendBufSize)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error send buffer size", __FILE__, __LINE__);
#endif
}
updateSendBufferSize(sendBufSize);
}

void BaseSocket::setReceiveBufferSize(int receiveBufSize)
{
if (setsockopt(mSocketId, SOL_SOCKET, SO_RCVBUF, (char *) &receiveBufSize, sizeof(receiveBufSize)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set receive buffer size", __FILE__, __LINE__);
#endif
}
}

int BaseSocket::isSocketBlocking()
{
return mBlocking;
}

void BaseSocket::setSocketBlocking(int blockingToggle)
{
if (blockingToggle)
{
if (isSocketBlocking())
return;
else
mBlocking = 1;
}
else
{
if (!isSocketBlocking())
return;
else
mBlocking = 0;
}

try
{
#ifdef UNIX
int flags;
if (-1 == (flags = fcntl(mSocketId, F_GETFL, 0)))
flags = 0;

if(mBlocking)
fcntl(mSocketId, F_SETFL, flags & (~O_NONBLOCK));
else
fcntl(mSocketId, F_SETFL, flags | O_NONBLOCK);

/*if (ioctl(socketId, FIONBIO, (char *) &blocking) == -1)
{
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error set socke blocking");
}*/
#endif
}
catch (SocketException& excp)
{
excp.response();
}
}

int BaseSocket::getDebug()
{
int myOption;
int myOptionLen = sizeof(myOption);

try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_DEBUG, (void *) &myOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get debug", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}

return myOption;
}

int BaseSocket::getReuseAddr()
{
int myOption;
int myOptionLen = sizeof(myOption);

try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_REUSEADDR, (void *) &myOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get reuse address", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}

return myOption;
}

int BaseSocket::getKeepAlive()
{
int myOption;
int myOptionLen = sizeof(myOption);

try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_KEEPALIVE, (void *) &myOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get keep alive", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return myOption;
}

int BaseSocket::getLingerSeconds()
{
struct linger lingerOption;
int myOptionLen = sizeof(struct linger);

try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (void *) &lingerOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get linger seconds", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}

return lingerOption.l_linger;
}

bool BaseSocket::getLingerOnOff()
{
struct linger lingerOption;
int myOptionLen = sizeof(struct linger);

try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_LINGER, (void *) &lingerOption, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get linger on/off", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}

if (lingerOption.l_onoff == 1)
return true;
else
return false;
}

int BaseSocket::getSendBufferSize()
{
int sendBuf;
int myOptionLen = sizeof(sendBuf);

try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_SNDBUF, (void *)&sendBuf, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get send buffer size", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return sendBuf;
}

int BaseSocket::getReceiveBufferSize()
{
int rcvBuf;
int myOptionLen = sizeof(rcvBuf);

try
{
if (getsockopt(mSocketId, SOL_SOCKET, SO_RCVBUF, (void *) &rcvBuf, &myOptionLen) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::OPTION, errno, "Socket", "unix: error get receive buffer size", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return -1;
}
return rcvBuf;
}

ostream &operator<<(ostream& io, BaseSocket& s)
{
string flagStr = "";

io << endl;
io << "Summary of socket settings:" << endl;
io << " Socket Id: " << s.getSocketId() << endl;
io << " port #: " << s.getPortNumber() << endl;
io << " debug: " << (flagStr = s.getDebug() ? "true" : "false")
<< endl;
io << " reuse addr: " << (flagStr = s.getReuseAddr() ? "true" : "false")
<< endl;
io << " keep alive: " << (flagStr = s.getKeepAlive() ? "true" : "false")
<< endl;
io << " send buf size: " << s.getSendBufferSize() << endl;
io << " recv bug size: " << s.getReceiveBufferSize() << endl;
io << " blocking: "
<< (flagStr = s.isSocketBlocking() ? "true" : "false") << endl;
io << " linger on: "
<< (flagStr = s.getLingerOnOff() ? "true" : "false") << endl;
io << " linger seconds: " << s.getLingerSeconds() << endl;
io << endl;
return io;
}

void BaseSocket::close(void)
{
::close(mSocketId);
}

TCPSocket.cpp:


#ifdef UNIX
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#endif

#include <sstream>

#include "support/logging/log.h"
#include "support/exceptions/socket_exception.h"
#include "support/logging/simple_log.h"
#include "support/network/tcp_socket.h"

using namespace std;

const int MSG_HEADER_LEN = 6;

TCPSocket::TCPSocket()
: BaseSocket()
{
}

TCPSocket::TCPSocket(int portId)
: BaseSocket(portId)
{
}

TCPSocket::~TCPSocket()
{
}

void TCPSocket::initialize()
{
}

void TCPSocket::bindSocket()
{
try
{
if (bind(mSocketId, (struct sockaddr *) &mClientAddr, sizeof(struct sockaddr_in)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::BIND, 0, "Socket", "unix: error calling bind()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}

void TCPSocket::connectToServer(string& serverNameOrAddr, hostType hType)
{
/*
when this method is called, a client socket has been built already,
so we have the socketId and portNumber ready.

a HostInfo instance is created, no matter how the server's name is
given (such as www.yuchen.net) or the server's address is given (such
as 169.56.32.35), we can use this HostInfo instance to get the
IP address of the server
*/

HostInfo serverInfo(serverNameOrAddr, hType);

// Store the IP address and socket port number
struct sockaddr_in serverAddress;

serverAddress.sin_family = AF_INET;
serverAddress.sin_addr.s_addr = inet_addr(
serverInfo.getHostIPAddress().c_str());
serverAddress.sin_port = htons(mPortNumber);

// Connect to the given address
try
{
if (connect(mSocketId, (struct sockaddr *) &serverAddress, sizeof(serverAddress)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::CONNECT, 0, "Socket", "unix: error calling connect()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}

TCPSocket *TCPSocket::acceptClient(string& clientHost)
{
int newSocket; // the new socket file descriptor returned by the accept system call

// the length of the client's address
int clientAddressLen = sizeof(struct sockaddr_in);
struct sockaddr_in clientAddress; // Address of the client that sent data

// Accepts a new client connection and stores its socket file descriptor
try
{
if ((newSocket = accept(mSocketId, (struct sockaddr *) &clientAddress, &clientAddressLen)) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::ACCEPT, 0, "Socket", "unix: error calling accept()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
return NULL;
}

// Get the host name given the address
char *sAddress = inet_ntoa((struct in_addr) clientAddress.sin_addr);
HostInfo clientInfo(sAddress, ADDRESS);
clientHost += clientInfo.getHostName();

// Create and return the new TCPSocket object
TCPSocket* retSocket = new TCPSocket();
retSocket->setSocketId(newSocket);
return retSocket;
}

void TCPSocket::listenToClient(int totalNumPorts)
{
try
{
if (listen(mSocketId, totalNumPorts) == -1)
{
#ifdef UNIX
throw SocketException(this, SocketException::LISTEN, 0, "Socket", "unix: error calling listen()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}
}

ostream &operator<<(ostream &oStream, const TCPSocket &oSocket)
{
oStream << oSocket.mSocketId;
return oStream;
}

int TCPSocket::send(SocketMessage const &oBuffer, int nSize)
{
int numBytes; // the number of bytes sent
int error = errno;

if(nSize == -1)
nSize = oBuffer.size();

if((unsigned int)nSize > oBuffer.size())
{
std::stringstream ss;
ss << "Invalid Buffersize! Requested: " << (unsigned int)nSize << " Provided: " << oBuffer.size();
std::string s;
ss >> s;
FILE_LOG(logERROR) << s;
throw SocketException(this, SocketException::SEND, 0, "Socket", s, __FILE__, __LINE__);
}

// Sends the message to the connected host
try
{
FILE_LOG(logDEBUG4) << "Sending on socket: "<< mSocketId << " bytes:" << nSize;
numBytes = ::send(mSocketId, &oBuffer[0], nSize, 0);
error = errno;
FILE_LOG(logDEBUG4) << "Sent on socket: "<< mSocketId << " bytes:" << nSize << " errno: " << error;
if(numBytes == -1)
{
#ifdef UNIX
if(error == EAGAIN || error == EWOULDBLOCK)
{
return -1;
}
else
throw SocketException(this, SocketException::SEND, error, "Socket", "unix: error calling send()", __FILE__, __LINE__);
#endif
}
}
catch (SocketException& excp)
{
excp.response();
}

return numBytes;
}

int TCPSocket::receive(SocketMessage &oMessage, int nBytes)
{
int max = getSendBufferSize();

if(nBytes != -1 && nBytes < max)
max = nBytes;

SocketMessage sb(max);
int rcv_bytes = 0;
int total = 0;
int error = 0;

while(1)
{
rcv_bytes = ::recv(getSocketId(), &sb[0], sb.size(), 0);
error = errno;
FILE_LOG(logDEBUG4) << "Received on socket: " << getSocketId() << " bytes: " << rcv_bytes << " expected:" << sb.size() << " total: " << total << " errno: " << error;

if(rcv_bytes == -1)
{
if(error == EAGAIN || error == EWOULDBLOCK)
return total;

throw SocketException(this, SocketException::RECEIVE, error, "Socket", "Client connection error!", __FILE__, __LINE__);
}

// Socket has been closed.
if(rcv_bytes == 0)
return total;

total += rcv_bytes;
oMessage.insert(oMessage.end(), sb.begin(), sb.begin()+rcv_bytes);
}

return total;
}

void TCPSocket::close(void)
{
BaseSocket::close();
}

最佳答案

您确定是 Nagle Algorithm不是在这里踢的吗?如果您尚未通过设置 TCP_NODELAY 来禁用它套接字选项您的数据可能不会被发送,直到一定数量的数据( MSS )可用。

关于c++ - HPUX 上的套接字未收到完整数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16545729/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com