gpt4 book ai didi

c++ - 如何使用具有串行执行顺序的 C++ 线程池

转载 作者:行者123 更新时间:2023-11-28 02:41:34 28 4
gpt4 key购买 nike

我正在尝试使用基于优先级任务的 C++ 线程池。根据优先级(它是一个比较器对象,在我的例子中不仅仅是一个值),它需要串行执行,而不是仅仅分派(dispatch)到线程池中的下一个可用线程。

我目前的实现是基于下面的代码https://github.com/en4bz/ThreadPool我让它作为普通线程池工作得很好(不过我没有使用该池的优先级变体,因为我不知道如何指定自定义谓词对象而不是 int - 如果有人可以让我知道如何传递低于它的 PriorityLevel 将是一个真正的优势)因此,项目从 std::priority_queue< T > 送入线程池,其中 T 是一个使用下一个 PriorityLevel 按优先顺序对项目进行排序的对象。

任务优先级的类型在下面的几行中进行了描述——这些包括 channel 号、优先级字符“A-Z”和一个可选的序列号(指定时,表示任务必须等待所有更高优先级的任务完成在计划任务在可用线程池上执行之前。我知道如何使用 operator<() 严格弱排序谓词在线程池中对这些东西进行排序 - 但我不知道如何将这些元素中的一些放在后面支持执行队列。

所以在这个例子中

(1) channel[1] priority[A] 
(2) channel[1] priority[A] sequenceNum[1]
(3) channel[1] priority[A] sequenceNum[2]
(4) channel[1] priority[A] sequenceNum[3]
(5) channel[2] priority[B]
(6) channel[2] priority[B] sequenceNum[1]
(7) channel[2] priority[B] sequenceNum[2]

项目 1 和 5 将具有最高优先级,因为它们没有先决条件 - 它们将同时运行(如果有可用线程),但其他元素必须等到它们的先决条件 channel /优先任务完成。

下面是我如何使用线程池(请注意,SLDBJob 包含 PriorityLevel 来处理 operator<() 优先级排序。

    std::priority_queue<SLDBJob> priorityJobQueue;
//... insert a bunch of Jobs
// enqueue closure objects in highest to lowest priority so that the
// highest ones get started ahead of the lower or equal priority jobs.
// these tasks will be executed in priority order using rPoolSize threads
UtlThreadPool<> threadPool(rPoolSize);
while (!priorityJobQueue.empty()) {
const auto& nextJob = priorityJobQueue.top();
threadPool.enqueue(std::bind(
&SLDBProtocol::moduleCheckingThreadFn,
nextJob, std::ref(gActiveJobs)));
gActiveJobs.insert(nextJob);
priorityJobQueue.pop();
}

这里是优先级

class PriorityLevel {
public:
// default constructor
explicit PriorityLevel(
const int32_t& rChannel = -1,
const char priority = 'Z',
const boost::optional<int32_t>& rSequenceNum =
boost::optional<int32_t>())
: mChannel(rChannel)
, mPriority(priority)
, mSequenceNum(rSequenceNum)
{}

// copy constructor
PriorityLevel(const PriorityLevel& rhs)
: mChannel(rhs.mChannel)
, mPriority(rhs.mPriority)
, mSequenceNum(rhs.mSequenceNum)
{}

// move constructor
PriorityLevel(PriorityLevel&& rhs)
: mChannel(std::move(rhs.mChannel))
, mPriority(std::move(rhs.mPriority))
, mSequenceNum(std::move(rhs.mSequenceNum))
{}

// non-throwing-swap idiom
inline void swap(PriorityLevel& rhs) {
// enable ADL (not necessary in our case, but good practice)
using std::swap;
// no need to swap base members - as we are topmost class
swap(mChannel, rhs.mChannel);
swap(mPriority, rhs.mPriority);
swap(mSequenceNum, rhs.mSequenceNum);
}

// non-throwing copy-and-swap idiom unified assignment
PriorityLevel& operator=(PriorityLevel rhs) {
rhs.swap(*this);
return *this;
}

// equality operator
inline bool operator==(const PriorityLevel& rhs) const {
return std::tie(mChannel, mPriority, mSequenceNum) ==
std::tie(rhs.mChannel, rhs.mPriority, rhs.mSequenceNum);
}

// inequality operator
inline bool operator!=(const PriorityLevel& rhs) const {
return !(operator==(rhs));
}

/**
* comparator that orders the elements in the priority_queue<p>
*
* This is implemented via a lexicographical comparison using a
* std::tuple<T...> as a helper. Tuple compares work as follows:
* compares the first elements, if they are equivalent, compares
* the second elements, if those are equivalent, compares the
* third elements, and so on. All comparison operators are short
* - circuited; they do not access tuple elements beyond what is
* necessary to determine the result of the comparison. note
* that the presence of the sequence number assigns a lower
* priority (bigger value 1) contribution to the lexicographical
* nature of the comparison
*
* @param rhs PriorityLevel to compare against
*
* @return true if this is lower priority than rhs
*/
inline bool operator<(const PriorityLevel& rhs) const {
auto prtyLen = getPriorityStr().length();
auto rhsPrtyLen = rhs.getPriorityStr().length();
auto sequencePrtyVal = mSequenceNum ? mSequenceNum.get() : 0;
auto rhsSequencePrtyVal = rhs.mSequenceNum ? rhs.mSequenceNum.get() : 0;
return std::tie(prtyLen, mPriority, mChannel, sequencePrtyVal) >
std::tie(rhsPrtyLen, rhs.mPriority, rhs.mChannel, rhsSequencePrtyVal);
}

// stream friendly struct
inline friend std::ostream& operator << (std::ostream& os, const PriorityLevel& rValue) {
std::string sequenceInfo;
if (rValue.mSequenceNum) {
sequenceInfo = std::string(", sequence[") +
std::to_string(rValue.mSequenceNum.get()) + "]";
}
os << "channel[" << rValue.mChannel
<< "], priority[" << rValue.mPriority
<< "]" << sequenceInfo;
return os;
}

// channel getter
inline int32_t getChannel() const {
return mChannel;
}

// string representation of the priority string
inline std::string getPriorityStr() const {
std::stringstream ss;
ss << mChannel << mPriority;
if (mSequenceNum) {
ss << mSequenceNum.get();
}
return ss.str();
}
private:
// the 3 fields from the ModuleNameTable::szPriorityLevel
int32_t mChannel;
// single upper case character A=>'highest priority'
char mPriority;
// optional field - when present indicates start order
boost::optional<int32_t> mSequenceNum;
};

最佳答案

我不会将它们全部放入 priority_queue,因为 priority_queue 对于涉及更改优先级的事情极度效率低下。相反,我会将 1 和 5 添加到优先级队列中,并将所有其余部分放入 channel 的“后续映射”中,以获取后续任务列表。当 channel 1 完成时,它会检查 channel 1 是否有后续映射中的任何内容,如果有,则从该列表中弹出第一个项目,并将其添加到 priority_queue。

 using ChannelID = int32_t;
using PriorityLevel = char;

struct dispatcher {
std::priority_queue<SLDBJob> Todo; //starts with items 1 and 5
std::unordered_map<ChannelID, std::vector<SLDBJob>> FollowupMap;
//starts with {1, {2,3,4}}, {2, {6, 7, 8}}
//note the code is actually faster if you store the followups in reverse

void OnTaskComplete(ChannelID id) {
auto it = FollowupMap.find(id);
if (it != FollowupMap.end())
if (it->empty() == false) {
Todo.push_back(std::move(it->front()));
it->erase(it->begin());
}
if (it->empty() == true)
FollowupMap.erase(it);
}
}
};

用法大概如下:

struct reportfinished {
ChannelID id;
~reportfinished() {dispatcher.OnTaskComplete(id);} //check for exceptions? Your call.
};

UtlThreadPool<> threadPool(rPoolSize);
while (!priorityJobQueue.empty()) {
const auto& nextJob = priorityJobQueue.top();
auto wrapper = [&gActiveJobs, =]()
-> decltype(SLDBProtocol::moduleCheckingThreadFn(nextJob, gActiveJobs))
{
reportfinished queue_next{nextJob.channel};
return SLDBProtocol::moduleCheckingThreadFn(nextJob, gActiveJobs);
};
threadPool.enqueue(std::move(wrapper));
gActiveJobs.insert(nextJob);
priorityJobQueue.pop();
}

关于c++ - 如何使用具有串行执行顺序的 C++ 线程池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25798747/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com