gpt4 book ai didi

java - TCP 套接字连接 Java Client<-> c++ 服务器中的意外行为

转载 作者:塔克拉玛干 更新时间:2023-11-03 07:52:30 35 4
gpt4 key购买 nike

我在板上浏览了很多问题,关于 tcp 套接字、大端和小端格式,但对我来说没有什么适合我的情况。

很抱歉我的英语不好,我正在努力:)

我对简单的客户端-服务器配置中的意外行为失去了理智。场景如下:

服务器(C++)<--- TCP 套接字 ---> 客户端(Java)。

这是客户端代码:

package NetServ.apps.bigServer.NSLPClient;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.util.ArrayList;


public class Communicator {

private Socket sock;
private final int port = 6666;
private final String address="127.0.0.1";
private DataOutputStream out;
private DataInputStream in;
public Communicator(){

System.out.println("Creating communicator. Trying to bind to the tcp socket");

try {
sock = new Socket(address, port);
out=new DataOutputStream(sock.getOutputStream());
in=new DataInputStream(sock.getInputStream());
} catch (UnknownHostException e) {
System.out.println("Unable to resolv host");
e.printStackTrace();
} catch (IOException e) {
System.out.println("Generic I/O exception");
e.printStackTrace();
}
System.out.println("Communicator created");
}


public void sendRequest(Request req) throws IOException{
int cmd=0;
if(req.getCmd().equals(CommandType.tg_setup_message))
cmd=0;
if(req.getCmd().equals(CommandType.tg_remove_message))
cmd=1;
if(req.getCmd().equals(CommandType.tg_trigger_message))
cmd=2;
if(req.getCmd().equals(CommandType.tg_probe_message))
cmd=3;
byte[] buff;
Charset charset = Charset.forName("ISO-8859-1");

out.writeInt(cmd);

//out.writeUTF(req.getDstAddr().toString().substring(1));
buff = req.getDstAddr().toString().substring(1).getBytes(charset);
out.writeShort((short)buff.length);
out.write(buff, 0, buff.length);

out.writeInt(req.getProtocol());
out.writeInt(req.getSecure());

//out.writeUTF(req.getDataId());
buff = req.getDataId().getBytes(charset);
out.writeShort((short)buff.length);
out.write(buff, 0, buff.length);

//out.writeUTF(req.getUser());
buff = req.getUser().getBytes(charset);
out.writeShort((short)buff.length);
out.write(buff, 0, buff.length);


out.flush();
out.writeInt(req.getOffpath_type());
if(req.getOffpath_type()!=-1){
out.writeInt(req.getMetric_type());

String tmp = "" + req.getMetric();

//out.writeUTF(tmp);
buff = tmp.getBytes(charset);
out.writeShort((short)buff.length);
out.write(buff, 0, buff.length);

}

switch (req.getCmd()){
case tg_setup_message:
out.writeUTF(req.getUrl());
out.writeInt(req.getLifetime());
out.writeUTF(req.getParameters().toString());
break;
case tg_remove_message:
//TODO
break;
case tg_trigger_message:
//TODO
break;
case tg_probe_message:
for (Short s : req.getProbes()){
//System.out.println("Writing probe code " + s.shortValue());
out.writeShort(s.shortValue());
}
break;
}


if(req.getSignature()!=null){
out.writeInt(1);
out.writeUTF(req.getSignature());
}else{
out.writeInt(0);
}

if(req.getDep()!=null){
out.writeInt(1);
out.writeUTF(req.getDep());
}else{
out.writeInt(0);
}

if(req.getNotif()!=null){
out.writeInt(1);
out.writeUTF(req.getNotif());
}else{
out.writeInt(0);
}

if(req.getNode()!=null){
out.writeInt(1);
out.writeUTF(req.getNode());
}else{
out.writeInt(0);
}
out.flush();
//out.close();
System.out.println("request sent");
}

public ArrayList<String> rcvProbeResponse() throws IOException, SocketException{
ArrayList<String> response= new ArrayList<String>();
System.out.println("Waiting for response...");
boolean timeout=false;

int responseCode=-1;
responseCode=in.readInt();
//responseCode = in.readInt();
//System.out.println("Response code "+responseCode);
if(responseCode==1){ //response is ready! !
System.out.println("Response arriving from NSLP (code 1 )");

int responseCmdCode = in.readInt();
if(responseCmdCode!=2)
return null;
//System.out.println("Response Command Code " + responseCmdCode );
int probeSize = in.readInt();
//System.out.println("Number of probes " + probeSize);
for(int i=0; i<probeSize; i++){
//System.out.println("i: "+i);
String out = in.readUTF();
response.add(out);
}
}
in.close();
if(timeout)
return null;
else
return response;
}

}

没有什么特别之处:实体之间的协议(protocol)只是整数字符串的交换,这会触发服务器执行一些信令任务(服务器是信令协议(protocol)的守护进程)。

另一方面,服务器是我修改后与 java 通信的遗留代码。这是相关代码:

[...]
// Set the current socket
communicator->setSocket(sockfd);

// FSM data structure
NetservNslpFsmData * data = new NetservNslpFsmData();

//give the address list of this node to all FSMs created by the client
data->nodeAddressList = &(param.addresses);
// Read from socket the parameters and use them
int ret;
NetservNslpCommunicator::command cmd;
ret = communicator->recvCommandFromJava(&cmd);
if (ret <= 0) {
logSocketError(sockfd, "Command");
// free up the memory allocated
delete data;
return;
}

switch(cmd){
case NetservNslpCommunicator::tg_setup_message:
DLog(param.name, "cmd set: setup");
break;
case NetservNslpCommunicator::tg_remove_message:
DLog(param.name, "cmd set: remove");
break;
case NetservNslpCommunicator::tg_probe_message:
DLog(param.name, "cmd set: probe");
break;
case NetservNslpCommunicator::tg_trigger_message:
DLog(param.name, "cmd set: trigger");
break;
}
ret = communicator->recvIPFromJava(&(data->destAddr));
DLog(param.name, "Dst Address set: "<< data->destAddr.get_ip_str());
if (ret <= 0) {
logSocketError(sockfd, "Destination IP");
// free up the memory allocated
delete data;
return;
}

[...]
int reliable = communicator->recvIntFromJava();
data->reliability = (reliable == NetservNslpCommunicator::TCP);
DLog(param.name, "Reliability set : "<< data->reliability);
int secure = communicator->recvIntFromJava();
data->security = (secure == NetservNslpCommunicator::TCP);
DLog(param.name, "Security set : "<< data->security);

data->dataId = communicator->recvStringFromJava();
DLog(param.name, "DataId : "<< data->dataId);
if (data->dataId == NULL) {
logSocketError(sockfd, "dataId");
// free up the memory allocated
delete data;
return;
}
data->user = communicator->recvStringFromJava();
DLog(param.name, "User : "<< data->user);
if (data->user == NULL) {
logSocketError(sockfd, "user");
// free up the memory allocated
delete data;
return;
}

//Receiving OffPath parameters
data->offpath_type=communicator->recvIntFromJava();
DLog(param.name, "OffType : "<< data->offpath_type);
if(data->offpath_type != -1){

data->metric_type=communicator->recvIntFromJava();
DLog(param.name, "MetricType : "<< data->metric_type);
if(data->metric_type>3|| data->metric_type<1){
logSocketError(sockfd, "metric type");
// free up the memory allocated
delete data;
return;
}
char * tmpStr = communicator->recvStringFromJava();
if (tmpStr == NULL) {
logSocketError(sockfd, "metric");
// free up the memory allocated
delete data;
return;
}
data->metric = tmpStr;
DLog(param.name, "MetricValue : "<< data->metric);
DLog(param.name, "MetricLength : "<< data->metric.length());
}

// check if socket is still alive or some errors occured
if (!communicator->isAlive(sockfd)) {
logSocketError(sockfd, "Socket not alive!");
// free up the memory allocated
delete data;
return;
}
DLog(param.name,"Reading command-specific configuration");
switch(cmd)
{
case NetservNslpCommunicator::tg_setup_message:
data->urlList.push_back(communicator->recvString());
//check if the service data is exchanged together with signaling messages
if (data->urlList.front() != NULL && (strncmp(data->urlList.front(), "file://", 7) == 0))
data->data_included = true;
data->lifetime = communicator->recvIntFromJava();
data->setupParams = communicator->recvStringFromJava();
break;
case NetservNslpCommunicator::tg_remove_message:
break;
case NetservNslpCommunicator::tg_probe_message:
{
DLog(param.name, "Reading probe codes list.");
short probe = 0;
do {
probe = communicator->recvShortFromJava();
DLog(param.name,"Probe Code " << probe);
data->probes.push_back(probe);
} while (probe != 0);
data->probes.pop_back(); //delete the last 0
if (data->probes.empty()) {
logSocketError(sockfd, "Probe list is empty!");
return;
}
break;
}
case NetservNslpCommunicator::tg_trigger_message:
data->triggerType = communicator->recvInt();

switch (data->triggerType){
case NETSERV_MESSAGETYPE_SETUP:
data->urlList.push_back(communicator->recvString());
data->lifetime = communicator->recvInt();
data->setupParams = communicator->recvString();
break;
case NETSERV_MESSAGETYPE_REMOVE:
break;
case NETSERV_MESSAGETYPE_PROBE:
{
short probe = 0;
do {
probe = communicator->recvShortFromJava();
data->probes.push_back(probe);
} while (probe != 0);
data->probes.pop_back(); //delete the last 0
break;
}
default:
ERRLog(param.name, "Trigger type not supported");
closeSocket(sockfd);
return;
}
break;
default:
logSocketError(sockfd, "Trigger type not supported!");
return;
}
DLog(param.name,"Reading optional parameters.");
// Optional parameters passing
bool addParam = 0;
addParam = communicator->recvIntFromJava();
if (addParam) {
data->signature = communicator->recvStringFromJava();
if (data->signature == NULL) {
logSocketError(sockfd, "signature");
// free up the memory allocated
delete data;
return;
}
DLog(param.name, "Message signature : "<< data->signature);
}

addParam = communicator->recvIntFromJava();
if (addParam) {
data->depList.push_back(communicator->recvStringFromJava());
if (data->depList.front() == NULL) {
logSocketError(sockfd, "dependency list");
// free up the memory allocated
delete data;
return;
}
DLog(param.name, "Message dependency list : "<< data->depList.front());
}

addParam = communicator->recvIntFromJava();
if (addParam) {
data->notification = communicator->recvStringFromJava();
if (data->notification == NULL) {
logSocketError(sockfd, "notification");
// free up the memory allocated
delete data;
return;
}
DLog(param.name, "Message notification : "<< data->notification);
}

addParam = communicator->recvIntFromJava();
if (addParam) {
data->node = communicator->recvStringFromJava();
if (data->node == NULL) {
logSocketError(sockfd, "node");
// free up the memory allocated
delete data;
return;
}
DLog(param.name, "Node destination : "<< data->node);
}
[...]

通信器包装套接字并使用标准调用来写入和读取类型:

int NetservNslpCommunicator::recvCommandFromJava(NetservNslpCommunicator::command * cmd){
int code = recvIntFromJava();
cout<<"received int "<<code<<endl;
if(code>=0){
switch(code){
case 0:
*cmd=NetservNslpCommunicator::tg_setup_message;
break;
case 1:
*cmd=NetservNslpCommunicator::tg_remove_message;
break;
case 2:
*cmd=NetservNslpCommunicator::tg_trigger_message;
break;
case 3:
*cmd=NetservNslpCommunicator::tg_probe_message;
break;
}
}
return code;
}

int NetservNslpCommunicator::recvIPFromJava(protlib::hostaddress * addr){
cout<<"receiving an IP"<<endl;
char* str = recvStringFromJava();
cout<<"String received "<< str << endl;
addr->set_ipv4(str);
return 1;
}

char * NetservNslpCommunicator::recvStringFromJava(){
short length = recvShortFromJava();
cout<< "receiving a string..."<<endl<<"String length "<<length<<endl;
char * string = new char[length];
int r = 0;
int orLength=length;
while(length)
{
int r = recv(sock, string, length, 0);
if(r <= 0)
break; // Socket closed or an error occurred
length -= r;
}
string[orLength]='\0';

if(orLength==0)
return NULL;
else
return string;
}

int NetservNslpCommunicator::recvIntFromJava(){
int x = 0;
recvBuffer(sock, &x, 4);
return x;
}

short NetservNslpCommunicator::recvShortFromJava()
{
short x = 0;
recvBuffer(sock, &x, 2);
return x;
}


int NetservNslpCommunicator::recvBuffer(int sock, void * buf, size_t size)
{
int counter = 0;
// Create a pollfd struct for use in the mainloop
struct pollfd poll_fd;
poll_fd.fd = sock;
poll_fd.events = POLLIN | POLLPRI;
poll_fd.revents = 0;

int r;
while (size && !stop)
{
/* Non-blocking behavior */
// wait on number_poll_sockets for the events specified above for sleep_time (in ms)
int poll_status = poll(&poll_fd, 1/*Number of poll socket*/, 100);
if (poll_fd.revents & POLLERR) // Error condition
{
if (errno != EINTR)
cout << "NetservNslpCommunicator : " << "Poll caused error " << strerror(errno) << " - indicated by revents" << endl;
else
cout << "NetservNslpCommunicator : " << "poll(): " << strerror(errno) << endl;
}
//ignore hangups when reading from a socket
if (poll_fd.revents & POLLHUP) // Hung up
{
cout << "NetservNslpCommunicator : " << "Poll hung up" << endl;
// return -1;
}
if (poll_fd.revents & POLLNVAL) // Invalid request: fd not open
{
cout << "NetservNslpCommunicator : " << "Poll Invalid request: fd not open" << endl;
return -1;
}

switch (poll_status)
{
case -1:
if (errno != EINTR)
cout << "NetservNslpCommunicator : " << "Poll status indicates error: " << strerror(errno) << endl;
else
cout << "NetservNslpCommunicator : " << "Poll status: " << str error(errno) << endl;
break;

case 0:

if (isTriggerTimerEnabled){
counter++;
if (counter == triggerTimerValue){
isTriggerTimerEnabled = false;
return -1;
}
}
continue;
break;

default:
r = recv(sock, buf, size, 0);
if (r <= 0)
{
if (r == 0) { // connection closed
r = -1; // return an error if socket closes
cout << "NetservNslpCommunicator : " << "No data received from socket!" << endl;
stop=true;
break;
}
if (r == -1 && errno == EINTR) // received interrupt during recv, continuing
continue;
if (r == -1 && errno != EINTR) // socket error, raise exception
break;
}

if (r != -1)
size -= r;
break;
}
}

counter = 0;
isTriggerTimerEnabled = false;
return r;
}

我要求你只关注 tg_probe_message 部分。其他消息还有待落实。

奇怪的行为是:客户端第一次向服务器发送请求时一切顺利,所有值都被完美读取。因此服务器回答发回一些整数和字符串序列。这是我在套接字上捕获的跟踪(仅应用层。每行一个 TCP 数据包):

00  //
00 //
00 //
03 // First integer

00 //
0a // Short representing string length

31:37:32:2e:31:36:2e:33:2e:32 //the string: "172.16.3.2"

00
00
00
01

00
00
00
00

00
1b

4e:65:74:53:65:72:76:2e:61:70:70:73:2e:4c:6f:69:62:46:61:6b:65:5f:30:2e:30:2e:31 //The string "NetServ.apps.LoibFake_0.0.1"

00
03

6a:61:65 //the string "jae"

00
00
00
03

00
00
00
01

00
01

31 //the string "1"

00
02

00
00

00 //
00 //
00 // 4 times
00 //

服务器回答:

00:00:00:01 //response code
00:00:00:02 //response type
00:00:00:04 //number of strings to read
00:12 //str length
31:30:2e:31:30:2e:30:2e:35:20:41:43:54:49:56:45:20:31
00:12 //str length
31:30:2e:31:30:2e:30:2e:34:20:41:43:54:49:56:45:20:31
00:12 //str length
31:30:2e:31:30:2e:30:2e:33:20:41:43:54:49:56:45:20:32
00:12 //str length
31:30:2e:31:30:2e:30:2e:36:20:41:43:54:49:56:45:20:32

客户端第二次发送请求(同一个请求)时发生了一些奇怪的事情。这是我在第二次连接期间使用 tcpdump 捕获的内容:

00  //
00 //
00 //
03 // First integer

00 //
0a // Short representing string length

31:37:32:2e:31:36:2e:33:2e:32 //the string: "172.16.3.2"

00
00
00
01

00
00
00
00

00:1b:4e:65:74:53:65:72:76:2e:61:70:70:73:2e:4c:6f:69:62:46:61:6b:65:5f:30:2e:30:2e:31:00:03:6a:61:65:00:00:00:03:00:00:00:01:00:01:31:00:02:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:00

只要有一点耐心,您就会发现最后一个数据包包含请求的所有位(与第一个请求相同的位)。

通过调试,我可以看到命令 communicator->recvCommandFromJava(&cmd) 返回数字 50331648 (03:00:00:00) 而不是 3 (00:00:00:03 ) 并且当执行命令 communicator->recvIPFromJava(&(data->destAddr)) 时,它又调用 recvStringFromJava(),它使用 recvShortFromJava(),表示字符串长度 00:0a (10) 的 short 被换成小端 0a:00 (2560)。我相信这会导致 tcp 将所有可用数据放入下一个数据包并破坏后续调用。

正如您从代码中看到的那样,我没有在服务器中采用从主机顺序到网络顺序的转换(那是因为它对第一个请求工作正常),但似乎在第二个请求。 DataOutputStream 上的文档指定 intshort 以大端格式编写。服务器不应用转换。

所以,最后就是这个问题:C++ 是否有可能在执行期间更改主机格式?这怎么可能发生?我可以做些什么来使 Java 客户端和 C++ 服务器之间的字节顺序具有可预测的行为?

最佳答案

Endian-ness 与将数据放入下一个数据包无关。那只是因为它是一个字节流协议(protocol)。

您有两个不同的问题需要解决:一个是用 ntohl() 和 friend 解决的,另一个是继续阅读,直到您拥有所有您期望的数据。

关于java - TCP 套接字连接 Java Client<-> c++ 服务器中的意外行为,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26036156/

35 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com