gpt4 book ai didi

c++ - Qt 多线程 QThreads 保持 TCP 连接并被重用

转载 作者:行者123 更新时间:2023-11-30 03:53:44 25 4
gpt4 key购买 nike

我真的不确定如何解决这个问题,所以我先解释一下。我需要运行多个线程,每个线程都通过 TCPSocket 连接到某个应用程序,到目前为止没有问题。该应用程序非常耗时,这就是为什么我希望它在多个线程上并行运行并且每个线程都与它通信。计算完成后,我想将结果发送到另一个收集结果的线程。为此我写了一个 Worker 类:

class Worker : public QObject {
Q_OBJECT

public:
Worker();
Worker(int port);
~Worker();
QTcpSocket* sock;
void insert();

public slots:
void connect();
void process(const int &id, const QString &param, const int &arity);

signals:
void ready();
void finished(const int &id, const int &consistent, const QString &result);
void error(QString err);
};

现在 superThread 应该处理一个巨大的文件并需要将它分散到线程中,然后接收和处理结果。到目前为止,我的方法是在 main() 中连接的另一个超线程,如下所示:

QThread* superThread = new QThread();
supWorker* super = new supWorker();
for (int i = 0; i < nrWorkers; i++){
Worker* worker = new Worker(portRange+i);
QThread* workerThread = new QThread();
QThread::connect(workerThread, SIGNAL(started()), worker, SLOT(connect()));
worker->moveToThread(workerThread);
workerThread->start();
QThread::connect(super, SIGNAL(process(int, QString, int)), worker, SLOT(process(int,QString,int)));
QThread::connect(worker, SIGNAL(finished(int, int, QString)), super, SLOT(handleResult(int, int, QString)));
}

这种方式的问题显然是我只能将 SIGNAL 发送到所有连接的线程。我想让 superThread 做的是只向其中一个线程发送参数。我不知道如何处理连接以便只有一个工作线程接收它?

非常感谢任何帮助或架构想法,在此先感谢。

最佳答案

不确定我的想法是否 100% 正确,但为什么不将工作线程数组传递给 super 线程,保留一个表示当前事件线程索引的索引,仅将信号连接到该线程,分派(dispatch)需要时发出信号,等待完成,断开信号,推进索引并重复?如果将序列化信号分派(dispatch)给线程是您真正想要的,那么它就可以工作。

编辑

好吧,我真的让自己齐心协力制作了一个基于 Qt 的示例,该示例实现了所需的工作流程并将其放在 github 上。 .

#pragma once

#include <QThread>
#include <QApplication>
#include <QMetaType>
#include <QTimer>

#include <vector>
#include <memory>
#include <cstdio>
#include <algorithm>

struct Work
{
int m_work;
};

struct Result
{
int m_result;
int m_workerIndex;
};

Q_DECLARE_METATYPE(Work);
Q_DECLARE_METATYPE(Result);

class Worker : public QThread
{
Q_OBJECT

public:
Worker(int workerIndex) : m_workerIndex(workerIndex)
{
moveToThread(this);
connect(this, SIGNAL(WorkReceived(Work)), SLOT(DoWork(Work)));
printf("[%d]: worker %d initialized\n", reinterpret_cast<int>(currentThreadId()), workerIndex);
}

void DispatchWork(Work work)
{
emit WorkReceived(work);
}

public slots:
void DoWork(Work work)
{
printf("[%d]: worker %d received work %d\n", reinterpret_cast<int>(currentThreadId()), m_workerIndex, work.m_work);
msleep(100);
Result result = { work.m_work * 2, m_workerIndex };
emit WorkDone(result);
}

signals:
void WorkReceived(Work work);
void WorkDone(Result result);

private:
int m_workerIndex;
};

class Master : public QObject
{
Q_OBJECT

public:
Master(int workerCount) : m_activeWorker(0), m_workerCount(workerCount)
{
printf("[%d]: creating master thread\n", reinterpret_cast<int>(QThread::currentThreadId()));
}
~Master()
{
std::for_each(m_workers.begin(), m_workers.end(), [](std::unique_ptr<Worker>& worker)
{
worker->quit();
worker->wait();
});
}


public slots:
void Initialize()
{
printf("[%d]: initializing master thread\n", reinterpret_cast<int>(QThread::currentThreadId()));
for (int workerIndex = 0; workerIndex < m_workerCount; ++workerIndex)
{
auto worker = new Worker(workerIndex);
m_workers.push_back(std::move(std::unique_ptr<Worker>(worker)));
connect(worker, SIGNAL(WorkDone(Result)), SLOT(WorkDone(Result)));
worker->start();
}
m_timer = new QTimer();
m_timer->setInterval(500);
connect(m_timer, SIGNAL(timeout()), SLOT(GenerateWork()));
m_timer->start();
}
void GenerateWork()
{
Work work = { m_activeWorker };
printf("[%d]: dispatching work %d to worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), work.m_work, m_activeWorker);
m_workers[m_activeWorker]->DispatchWork(work);
m_activeWorker = ++m_activeWorker % m_workers.size();
}
void WorkDone(Result result)
{
printf("[%d]: received result %d from worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), result.m_result, result.m_workerIndex);
}
void Terminate()
{
m_timer->stop();
delete m_timer;
}

private:
int m_workerCount;
std::vector<std::unique_ptr<Worker>> m_workers;
int m_activeWorker;
QTimer* m_timer;
};

QtThreadExample.cpp:

#include "QtThreadExample.hpp"
#include <QTimer>

int main(int argc, char** argv)
{
qRegisterMetaType<Work>("Work");
qRegisterMetaType<Result>("Result");
QApplication application(argc, argv);
QThread masterThread;
Master master(5);
master.moveToThread(&masterThread);
master.connect(&masterThread, SIGNAL(started()), SLOT(Initialize()));
master.connect(&masterThread, SIGNAL(terminated()), SLOT(Terminate()));
masterThread.start();
// Set a timer to terminate the program after 10 seconds
QTimer::singleShot(10 * 1000, &application, SLOT(quit()));
application.exec();
masterThread.quit();
masterThread.wait();
printf("[%d]: master thread has finished\n", reinterpret_cast<int>(QThread::currentThreadId()));
return 0;
}

一般来说,解决方案实际上不是从主线程本身发出信号,而是从工作线程发出信号——这样你就可以为每个线程获得一个唯一的信号,并且可以在事件循环中异步处理工作,并且然后发出线程完成的信号。该示例可以而且应该根据您的需要进行重构,但总的来说,它演示了 Qt 中的生产者/消费者模式,使用索引和信号线程的想法一个一个地使用。我正在使用通用计时器在主线程 (Master::m_timer) 中生成工作 - 我想在您的情况下,您将使用来自套接字、文件或其他东西的信号生成工作。然后我在一个事件的工作线程上调用一个方法,该方法向工作线程的事件循环发出一个信号以开始执行工作,然后发出一个关于完成的信号。这是一般性描述,查看示例,试用一下,如果您有任何后续问题,请告诉我。

如果你使用 Qt 对象,我想这会工作得很好,但在传统意义上的消费者/生产者模式中,信号/槽的东西实际上让生活变得有点困难,一个标准的 C++11 管道与 std::condition_variable 和主线程调用 condition_variable::notify_one()并且工作线程简单地等待条件变量会更容易,但是 Qt 对所有 I/O 东西都有很好的包装。因此,只需尝试一下再决定。

下面是示例的示例输出,我猜线程日志记录表明达到了所需的效果: enter image description here

请注意,由于 QApplication 本身运行一个事件循环,如果您没有 GUI,您实际上可以让所有 I/O 对象和主类存在于主线程中并发出信号从那里,从而消除了对单独的主线程的需要。当然,如果你有一个 GUI,你就不会想要用这些东西来增加它的负担。

关于c++ - Qt 多线程 QThreads 保持 TCP 连接并被重用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29975719/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com