gpt4 book ai didi

QTcpSocket 在另一个 QThread 中处理

转载 作者:行者123 更新时间:2023-12-04 12:43:32 26 4
gpt4 key购买 nike

我需要处理单个 QThread 上的传入 tcp 连接。客户端身份验证成功后,相应的套接字应存储在 QList 对象中。

[简化的主/服务器端应用程序]

class Server : public QObject
{
Q_OBJECT

public:
Server();

private:
QList<QTcpSocket*> m_connections;
QTcpServer m_server;
void handleIncomingConnection();
void handleWaiterThread();

private slots:
void treatFinishedWaiterThread();
}

[根据函数定义]

handleIncomingConnection() 槽与服务器对象的 (m_server) newConnection() 信号连接。

void Server::handleIncomingConnection()
{
QThread *waiter = new QThread();
connect(waiter, SIGNAL(started()), this, SLOT(handleWaiterThread()));
connect(waiter, SIGNAL(finished()), this, SLOT(treatFinishedWaiterThread()));
moveToThread(waiter);
waiter->start();
}
void Server::handleWaiterThread()
{
// fetch requesting socket
QTcpSocket *socket = m_server->nextPendingConnection();

// HANDLE PASSWORD AUTHENTICATION HERE ...
// IF SUCCESSFUL, CONTINUE

connect(socket, SIGNAL(disconnected()), this, SLOT(clientDisconnected()));

// add to list
m_connections.append(socket);
}
void Server::treatFinishedWaiterThread()
{
QThread *caller = qobject_cast<QThread*>(sender());
caller->deleteLater();
}

如果我尝试运行它,线程会被创建,但在它们完成时不会发出信号,所以之后我无法删除线程。另外我收到这条消息:

QObject::moveToThread: Widgets cannot be moved to a new thread

如何解决这个问题?




[01.06.2016]

根据 QTcpServer::nextPendingConnection() 它说:

The returned QTcpSocket object cannot be used from another thread. If you want to use an incoming connection from another thread, you need to override incomingConnection().

所以最后我必须创建另一个继承自 QTcpServer 的类。




[01.07.2016 #1]

我修改了代码并添加了自定义服务器和等待线程类。

[自定义服务器类]

class CustomServer : public QTcpServer
{
Q_OBJECT

public:
WServer(QObject* = nullptr) : QTcpServer(parent) {}

signals:
void connectionRequest(qintptr);

protected:
void incomingConnection(qintptr socketDescriptor)
{
emit connectionRequest(socketDescriptor);
}
};

[自定义线程类]

class Waiter : public QThread
{
Q_OBJECT

public:
Waiter(qintptr socketDescriptor, QObject *parent = nullptr)
: QThread(parent)
{
// Create socket
m_socket = new QTcpSocket(this);
m_socket->setSocketDescriptor(socketDescriptor);
}

signals:
void newSocket(QTcpSocket*);

protected:
void run()
{
// DO STUFF HERE
msleep(2500);

emit newSocket(m_socket);
}

private:
QTcpSocket *m_socket;
};

[新主类]

class ServerGUI : public QWidget
{
Q_OBJECT

public:
Server(QObject*);

private:
QList<QTcpSocket*> m_connections;
CustomServer m_server;

private slots:
void handleConnectionRequest(qintptr);
void handleNewSocket(QTcpSocket*);
}
void CustomServer::handleConnectionRequest(qintptr socketDescriptor)
{
Waiter *nextWaiter = new Waiter(socketDescriptor, this);
connect(nextWaiter, SIGNAL(newSocket(QTcpSocket*)), this, SLOT(handleNewSocket(QTcpSocket*)));
connect(nextWaiter, SIGNAL(finished()), this, SLOT(deleteLater()));
nextWaiter->start();
}
void CustomServer::handleNewSocket(QTcpSocket *socket)
{
// DO STUFF HERE ...

connect(socket, SIGNAL(disconnected()), this, SLOT(clientDisconnected()));

// FINALLY ADD TO ACTIVE-CLIENT LIST ...
}

信号和插槽特定设置:

由于 CustomServer 被定义为我的主要小部件类(处理 GUI;称为 ServerGUI)中的类成员 (m_server),m_serverconnectionRequest(qintptr) 信号与 ServerGUI 实例的 handleConnectionRequest(qintptr) 槽连接。

但现在我的应用程序在启动后立即崩溃,在调试窗口中显示以下消息:

HEAP[qtapp.exe]: Invalid address specified to RtlValidateHeap( 000002204F430000, 0000006E0090F4C0 )

这可能是什么原因?




[01.10.2016 #2]

我根据 user2014561 的回答修改了我的代码。

对于 CustomServer

class CustomServer : public QTcpServer
{
Q_OBJECT

public:
WServer(QHostAddress, quint16, quint16, QObject* = nullptr);
~WServer();

void kickAll();
void kickClient(qintptr);

QHostAddress localAddress() const;
quint16 serverPort() const;
bool isReady() const;
bool alreadyConnected(qintptr) const;
bool clientLimitExhausted() const;

signals:
void clientConnected(qintptr);
void clientDisconnected(qintptr);

private slots:
void destroyedfunc(QObject*);

// JUST FOR TESTING PURPOSES
void waiterFinished();

private:
QList<ServerPeer*> m_connections;
quint16 m_maxAllowedClients;
bool m_readyState;

void incomingConnection(qintptr);
};

对于kickAll():

void WServer::kickAll()
{
while (!m_connections.isEmpty())
{
ServerPeer *peer = m_connections.first();
QEventLoop loop;
connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit())); // ### PROBLEM ENCOUNTERED HERE
QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
loop.exec();
}
}

对于 kickClient(qintptr):

void WServer::kickClient(qintptr client_id)
{
foreach (ServerPeer *peer, m_connections)
{
bool peerState;
QMetaObject::invokeMethod(peer, "hasSocket", Qt::BlockingQueuedConnection,
Q_RETURN_ARG(bool, peerState), Q_ARG(qintptr, client_id));
if (peerState)
{
QEventLoop loop;
connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit()));
QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
loop.exec();
break;
}
}
}

对于destroyedfunc(QObject*):

void CustomServer::destroyedfunc(QObject *obj)
{
ServerPeer *peer = static_cast<ServerPeer*>(obj);
m_connections.removeAll(peer);
}

对于 incomingConnection(qintptr):

void WServer::incomingConnection(qintptr handle)
{
ServerPeer *peer = new ServerPeer();
QThread *waiter = new QThread();

m_connections.append(peer); // add to list
peer->moveToThread(waiter);

// notify about client connect
connect(peer, SIGNAL(connected(qintptr)), this, SIGNAL(clientConnected(qintptr)));
// stop waiter thread by indirectly raising finished() signal
connect(peer, SIGNAL(finished()), waiter, SLOT(quit()));
// notify about client disconnect
connect(peer, SIGNAL(disconnected(qintptr)), this, SIGNAL(clientDisconnected(qintptr)));
// remove client from list
connect(peer, SIGNAL(destroyed(QObject*)), this, SLOT(destroyedfunc(QObject*)));
// notify about finished waiter thread; only for debug purposes
connect(waiter, SIGNAL(finished()), this, SLOT(waiterFinished()));
// remove waiter thread when finished
connect(waiter, SIGNAL(finished()), waiter, SLOT(deleteLater()));

QMetaObject::invokeMethod(peer, "start", Qt::QueuedConnection,
Q_ARG(qintptr, handle));

waiter->start();
}

对于ServerPeer

class ServerPeer : public QObject
{
Q_OBJECT

public:
ServerPeer(QObject* = nullptr);
~ServerPeer();

bool hasSocket(qintptr) const;

signals:
void connected(qintptr);
void disconnected(qintptr);
void finished();

public slots:
void start(qintptr);
void disconnect();

private slots :
void notifyConnect();
void notifyDisconnect();

private:
QTcpSocket *m_peer;
qintptr m_id;
};

对于 ServerPeer(QObject*):

ServerPeer::ServerPeer(QObject *parent) : QObject(parent), m_peer(nullptr)
{

}

对于 ~ServerPeer():

ServerPeer::~ServerPeer()
{
disconnect();
}

对于开始(qintptr):

void ServerPeer::start(qintptr handle)
{
qDebug() << "New waiter thread has been started.";

m_peer = new QTcpSocket(this);
if (!m_peer->setSocketDescriptor(handle))
{
this->deleteLater();
return;
}

if (true /*verification here*/)
{
connect(m_peer, SIGNAL(disconnected()), this, SLOT(notifyDisconnect()));
connect(m_peer, SIGNAL(disconnected()), this, SLOT(deleteLater()));

// manually do connected notification
QTimer::singleShot(0, this, SLOT(notifyConnect()));
}
else
{
this->deleteLater();
}

emit finished();
}

对于 disconnect():

void ServerPeer::disconnect()
{
if (m_peer != nullptr)
{
if (m_peer->state() != QAbstractSocket::SocketState::ClosingState
&& m_peer->state() != QAbstractSocket::SocketState::UnconnectedState)
m_peer->abort();

delete m_peer;
m_peer = nullptr;
}
}

对于 notifyConnect():

void ServerPeer::notifyConnect()
{
emit connected(m_peer);
}

对于 notifyDisconnect():

void ServerPeer::notifyDisconnect()
{
emit disconnected(m_peer);
}

ServerGUI

class ServerGUI : public QWidget
{
Q_OBJECT

public:
ServerGUI(QWidget* = nullptr);

private:
Ui::ServerWindow ui;
CustomServer *m_server;

private slots:
// For further handling, e.g. updating client view
void handleNewClient(qintptr);
void handleRemovedClient(qintptr);
}

对于ServerGUI(QWidget*):

ServerGUI::ServerGUI(QWidget *parent) : QWidget(parent)
{
// initialize gui elements;
// GENERATED WITH ACCORDING *.ui FILE
ui.setupUi(this);

m_server = new WServer(QHostAddress::LocalHost, 1234, 2, this);
if (!m_server->isReady())
{
qDebug() << "Server could not start!";
delete m_server;
m_server = nullptr;
return;
}

connect(m_server, SIGNAL(clientConnected(qintptr)), this, SLOT(handleNewClient(qintptr)));
connect(m_server, SIGNAL(clientDisconnected(qintptr)), this, SLOT(handleRemovedClient(qintptr)));
}

这是我的主要功能:

int main(int argc, char *argv[])
{
QApplication a(argc, argv);
ServerGUI w;

w.show();
return a.exec();
}

如果我尝试踢一个(选定的)客户端,使用给定的代码会弹出以下消息:

QMetaObject::invokeMethod: No such method ServerPeer::hasSocket(qintptr)

QObject::connect: Cannot connect (null)::destroyed() to QEventLoop::quit()

如何解决这个问题?

最佳答案

如果我理解正确,您想在单独的线程上运行服务器的每个对等点,如果是这样,那么以下内容可能对您有所帮助:

  1. 创建QTcpServer的子类
  2. 重新实现incomingConnection()方法
  3. 创建QThreadServerPeer 的实例(没有父实例)并启动线程
  4. 执行SIGNAL - SLOT 连接以从列表中删除对等点并删除线程和对等点实例
  5. ServerPeer 添加到您的QList
  6. 时间开始后,进行凭据验证;如果你拒绝他们,中止连接

编辑,注意事项:

你还没有得到 connected SIGNAL 因为当你设置一个 socketDescriptor 到套接字时已经连接了,所以你可以简单地假设 setSocketDescriptor 套接字已连接并执行您想要的操作。

关于关闭时的错误,这是因为您没有正确释放线程,请参阅我的编辑如何解决。

最后 QTcpSocket 不能被不同的线程访问,如果你需要从另一个线程调用 ServerPeer 使用 QMetaObject::invokeMethodQueuedConnectionBlockingQueuedConnectionSIGNAL SLOT 机制。

编辑 2:

现在服务器及其对等节点将在 MainWindow::closeEvent 上被删除,这样您就可以看到正在调用的断开连接的函数。我想问题的发生取决于类将被删除的顺序。

您可以与套接字交互,包括通过它发送数据,但我相信使用已经提到的用于跨线程调用的 Qt 方法会很轻松。在我的示例中,您可以轻松地写入特定点或所有点​​。


自定义服务器.h:

//Step 1

#include <QtCore>
#include <QtNetwork>
#include "serverpeer.h"

class CustomServer : public QTcpServer
{
Q_OBJECT
public:
explicit CustomServer(const QHostAddress &host, quint16 port, quint16 maxconnections, QObject *parent = nullptr);
~CustomServer();

void kickAll();
void kickClient(qintptr id);
void writeData(const QByteArray &data, qintptr id);
void writeData(const QByteArray &data);

QHostAddress localAddress();
quint16 serverPort();
bool isReady();

signals:
void clientConnected(qintptr);
void clientDisconnected(qintptr);

private slots:
void destroyedfunc(QObject *obj);

private:
void incomingConnection(qintptr handle);

QList<ServerPeer*> m_connections;
int m_maxAllowedClients;
};

自定义服务器.cpp:

#include "customserver.h"

CustomServer::CustomServer(const QHostAddress &host, quint16 port, quint16 maxconnections, QObject *parent) :
m_maxAllowedClients(maxconnections), QTcpServer(parent)
{
listen(host, port);
}

CustomServer::~CustomServer()
{
kickAll();
}

//Step 2
void CustomServer::incomingConnection(qintptr handle)
{
// handle client limit
if (m_connections.size() >= m_maxAllowedClients)
{
qDebug() << "Can't allow new connection: client limit reached!";
QTcpSocket *socket = new QTcpSocket();
socket->setSocketDescriptor(handle);
socket->abort();
delete socket;
return;
}

//Step 3
ServerPeer *peer = new ServerPeer();
QThread *waiter = new QThread();

peer->moveToThread(waiter);

//Step 4
connect(peer, SIGNAL(connected(qintptr)), this, SIGNAL(clientConnected(qintptr)));
connect(peer, SIGNAL(disconnected(qintptr)), this, SIGNAL(clientDisconnected(qintptr)));
connect(peer, SIGNAL(destroyed()), waiter, SLOT(quit()));
connect(peer, SIGNAL(destroyed(QObject*)), this, SLOT(destroyedfunc(QObject*)));
connect(waiter, SIGNAL(finished()), waiter, SLOT(deleteLater()));

QMetaObject::invokeMethod(peer, "start", Qt::QueuedConnection, Q_ARG(qintptr, handle));

waiter->start();

//Step 5
m_connections.append(peer);
}

void CustomServer::kickAll()
{
while (!m_connections.isEmpty())
{
ServerPeer *peer = m_connections.first();
QEventLoop loop;
connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit()));
QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
loop.exec();
}
}

void CustomServer::kickClient(qintptr id)
{
foreach (ServerPeer *peer, m_connections)
{
ServerPeer::State hassocket;
QMetaObject::invokeMethod(peer, "hasSocket", Qt::BlockingQueuedConnection, Q_RETURN_ARG(ServerPeer::State, hassocket), Q_ARG(qintptr, id));
if (hassocket == ServerPeer::MyTRUE)
{
QEventLoop loop;
connect(peer->thread(), SIGNAL(destroyed()), &loop, SLOT(quit()));
QMetaObject::invokeMethod(peer, "deleteLater", Qt::QueuedConnection);
loop.exec();
break;
}
}
}

void CustomServer::writeData(const QByteArray &data)
{
foreach (ServerPeer *peer, m_connections)
QMetaObject::invokeMethod(peer, "writeData", Qt::QueuedConnection, Q_ARG(QByteArray, data));
}

void CustomServer::writeData(const QByteArray &data, qintptr id)
{
foreach (ServerPeer *peer, m_connections)
{
ServerPeer::State hassocket;
QMetaObject::invokeMethod(peer, "hasSocket", Qt::BlockingQueuedConnection, Q_RETURN_ARG(ServerPeer::State, hassocket), Q_ARG(qintptr, id));
if (hassocket == ServerPeer::MyTRUE)
{
QMetaObject::invokeMethod(peer, "writeData", Qt::QueuedConnection, Q_ARG(QByteArray, data));
break;
}
}
}

QHostAddress CustomServer::localAddress()
{
return QTcpServer::serverAddress();
}

quint16 CustomServer::serverPort()
{
return QTcpServer::serverPort();
}

bool CustomServer::isReady()
{
return QTcpServer::isListening();
}

void CustomServer::destroyedfunc(QObject *obj)
{
ServerPeer *peer = static_cast<ServerPeer*>(obj);
m_connections.removeAll(peer);
}

serverpeer.h:

#include <QtCore>
#include <QtNetwork>

class ServerPeer : public QObject
{
Q_OBJECT
public:
explicit ServerPeer(QObject *parent = nullptr);
~ServerPeer();

enum State
{
MyTRUE,
MyFALSE
};

signals:
void connected(qintptr id);
void disconnected(qintptr id);

public slots:
ServerPeer::State hasSocket(qintptr id);
void start(qintptr handle);
void writeData(const QByteArray &data);

private slots:
void readyRead();
void notifyConnect();
void notifyDisconnect();

private:
QTcpSocket *m_peer;
qintptr m_id;
};

服务器端.cpp:

#include "serverpeer.h"

ServerPeer::ServerPeer(QObject *parent) : QObject(parent), m_peer(nullptr)
{

}

ServerPeer::~ServerPeer()
{
if (m_peer)
m_peer->abort();
}

ServerPeer::State ServerPeer::hasSocket(qintptr id)
{
if (m_id == id)
return MyTRUE;
else
return MyFALSE;
}
void ServerPeer::start(qintptr handle)
{
m_peer = new QTcpSocket(this);
m_peer->setSocketDescriptor(handle);

//Step 6
if (true /*verification here*/)
{
m_id = handle;
QTimer::singleShot(0, this, SLOT(notifyConnect()));
connect(m_peer, SIGNAL(readyRead()), this, SLOT(readyRead()));
connect(m_peer, SIGNAL(disconnected()), this, SLOT(notifyDisconnect()));
connect(m_peer, SIGNAL(disconnected()), this, SLOT(deleteLater()));
}
else
{
m_peer->abort();
this->deleteLater();
}
}

void ServerPeer::readyRead()
{
qDebug() << m_peer->readAll() << QThread::currentThread();
}

void ServerPeer::writeData(const QByteArray &data)
{
m_peer->write(data);
m_peer->flush();
}

void ServerPeer::notifyConnect()
{
emit connected(m_id);
}

void ServerPeer::notifyDisconnect()
{
emit disconnected(m_id);
}

主窗口.cpp:

MainWindow::MainWindow(QWidget *parent) :
QMainWindow(parent),
ui(new Ui::MainWindow)
{
ui->setupUi(this);

qRegisterMetaType<qintptr>("qintptr");

m_server = new CustomServer(QHostAddress::LocalHost, 1024, 2, this);

if (!m_server->isReady())
{
qDebug() << "Server could not start!";
delete m_server;
m_server = nullptr;
return;
}

connect(m_server, SIGNAL(clientConnected(qintptr)), this, SLOT(handleNewClient(qintptr)));
connect(m_server, SIGNAL(clientDisconnected(qintptr)), this, SLOT(handleRemovedClient(qintptr)));
}

MainWindow::~MainWindow()
{
delete ui;
}

void MainWindow::closeEvent(QCloseEvent *)
{
if (m_server)
{
delete m_server;
m_server = nullptr;
}
}

void MainWindow::handleNewClient(qintptr id)
{
qDebug() << __FUNCTION__ << id;
m_server->writeData(QString("Hello client id: %0\r\n").arg(id).toLatin1(), id);
m_server->writeData(QString("New client id: %0\r\n").arg(id).toLatin1());
}

void MainWindow::handleRemovedClient(qintptr id)
{
qDebug() << __FUNCTION__ << id;
}

关于QTcpSocket 在另一个 QThread 中处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34641732/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com