gpt4 book ai didi

c++ - 在消费者进程中 boost 共享内存和同步队列问题/崩溃

转载 作者:行者123 更新时间:2023-11-30 01:51:08 26 4
gpt4 key购买 nike

我正在尝试从子进程使用 C++ 中的同步队列。我在 C++ () ( http://www.internetmosquito.com/2011/04/making-thread-safe-queue-in-c-i.html ) 中使用这个同步队列

我将队列修改为可在 boost 中序列化,还替换了使用过的 boost::mutex io_mutex_改为使用进程间互斥锁(感谢@Sehe)boost::interprocess::interprocess_mutex io_mutex_锁定时我改变了每一行 boost::mutex::scoped_lock lock(io_mutex_);scoped_lock<interprocess_mutex> lock(io_mutex_);

template<class T>
class SynchronizedQueue
{
friend class boost::serialization::access;
template<class Archive>
void serialize(Archive & ar, const unsigned int version)
{
ar & sQueue;
ar & io_mutex_;
ar & waitCondition;
}
... // queue implementation (see [http://www.internetmosquito.com/2011/04/making-thread-safe-queue-in-c-i.html][2])

在我的测试应用程序中,我正在创建同步队列并在其中存储该类的 100 个实例:

class gps_position
{
friend class boost::serialization::access;
template<class Archive>
void serialize(Archive & ar, const unsigned int version)
{
ar & degrees;
ar & minutes;
ar & seconds;
}
public:
int degrees;
int minutes;
float seconds;

gps_position() {};
gps_position(int d, int m, float s) :
degrees(d), minutes(m), seconds(s)
{}
};

消费者和生产者之间的通用定义:

 char *SHARED_MEMORY_NAME = "MySharedMemory";
char *SHARED_QUEUE_NAME = "MyQueue";
typedef SynchronizedQueue<gps_position> MySynchronisedQueue;

生产者流程代码:

    // Remove shared memory if it was created before
shared_memory_object::remove(SHARED_MEMORY_NAME);
// Create a new segment with given name and size
managed_shared_memory mysegment(create_only,SHARED_MEMORY_NAME, 65536);
MySynchronisedQueue *myQueue = mysegment.construct<MySynchronisedQueue>(SHARED_QUEUE_NAME)();
//Insert data in the queue
for(int i = 0; i < 100; ++i) {
gps_position position(i, 2, 3);
myQueue->push(position);
}
// Start 1 process (for testing for now)
STARTUPINFO info1={sizeof(info1)};
PROCESS_INFORMATION processInfo1;
ZeroMemory(&info1, sizeof(info1));
info1.cb = sizeof info1 ; //Only compulsory field
ZeroMemory(&processInfo1, sizeof(processInfo1));
// Launch child process
LPTSTR szCmdline = _tcsdup(TEXT("ClientTest.exe"));
CreateProcess(NULL, szCmdline, NULL, NULL, TRUE, 0, NULL, NULL, &info1, &processInfo1);
// Wait a little bit ( 5 seconds) for the started client process to load
WaitForSingleObject(processInfo1.hProcess, 5000);

/* THIS TESTING CODE WORK HERE AT PARENT PROCESS BUT NOT IN CLIENT PROCESS
// Open the managed segment memory
managed_shared_memory openedSegment(open_only, SHARED_MEMORY_NAME);
//Find the synchronized queue using it's name
MySynchronisedQueue *openedQueue = openedSegment.find<MySynchronisedQueue>(SHARED_QUEUE_NAME).first;
gps_position position;
while (true) {
if (myQueue->pop(position)) {
std::cout << "Degrees= " << position.degrees << " Minutes= " << position.minutes << " Seconds= " << position.seconds;
std::cout << "\n";
}
else
break;
}*/


// Wait until the queue is empty: has been processed by client(s)
while(myQueue->sizeOfQueue() > 0) continue;

// Close process and thread handles.
CloseHandle( processInfo1.hThread );

我的消费者代码如下:

    //Open the managed segment memory
managed_shared_memory segment(open_only, SHARED_MEMORY_NAME);
//Find the vector using it's name
MySynchronisedQueue *myQueue = segment.find<MySynchronisedQueue>(SHARED_QUEUE_NAME).first;
gps_position position;
// Pop each position until the queue become empty and output its values
while (true)
{
if (myQueue->pop(position)) { // CRASH HERE
std::cout << "Degrees= " << position.degrees << " Minutes= " << position.minutes << " Seconds= " << position.seconds;
std::cout << "\n";
}
else
break;
}

当我运行创建队列的父进程(生产者)并创建子(消费者)进程时,子进程在尝试从队列中“弹出”时崩溃。

我在这里做错了什么?任何想法 ?感谢您的任何见解。这是我使用 boost 和共享内存创建的第一个应用程序。

我的目标是能够从多个进程中使用这个队列。在上面的示例中,我只创建了一个子进程,以确保在创建其他子进程之前首先它可以工作。这个想法是队列将由项目提前填充,并且多个创建的进程将从队列中“弹出”项目而不会相互冲突。

最佳答案

更新后的代码:

  • 如果你要共享队列,你应该使用 interprocess_mutex;这意味着许多相关的变化。
  • 如果您要共享队列,您的队列应该使用共享内存分配器
  • 应该在互斥量下提出条件以在所有平台上实现可靠的行为
  • 你锁里面失败toString() .即使您复制了集合,这还远远不够,因为容器可能会在复制过程中被修改。
  • 队列设计很有意义(返回 empty() 的“线程安全”函数有什么用?在处理返回值之前它可能不再为空/只是空...这些被称为 竞争条件并导致真正难以跟踪的错误
  • Boost Serialization 与什么有什么关系?它似乎只是为了混淆画面,因为它不是必需的,也没有被使用
  • 同样适用于 Boost Any。为什么是any用于 toString() ?由于队列的设计,typeid 始终为 gpsposition 无论如何。
  • 同样适用于 boost::lexical_cast<> (如果你已经有了字符串流,为什么还要进行字符串连接?)
  • 为什么是empty() , toString() , sizeOfQueue()不是const

我强烈推荐使用 boost::interprocess::message_queue .这似乎是您实际想要使用的。

这是一个修改后的版本,将容器放在共享内存中并且可以正常工作:

#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/thread/lock_guard.hpp>
#include <sstream>

namespace bip = boost::interprocess;

template <class T> class SynchronizedQueue {

public:
typedef bip::allocator<T, bip::managed_shared_memory::segment_manager> allocator_type;
private:
bip::deque<T, allocator_type> sQueue;
mutable bip::interprocess_mutex io_mutex_;
mutable bip::interprocess_condition waitCondition;
public:
SynchronizedQueue(allocator_type alloc) : sQueue(alloc) {}

void push(T element) {
boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
sQueue.push_back(element);
waitCondition.notify_one();
}
bool empty() const {
boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
return sQueue.empty();
}
bool pop(T &element) {
boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);

if (sQueue.empty()) {
return false;
}

element = sQueue.front();
sQueue.pop_front();

return true;
}
unsigned int sizeOfQueue() const {
// try to lock the mutex
boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
return sQueue.size();
}
void waitAndPop(T &element) {
boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);

while (sQueue.empty()) {
waitCondition.wait(lock);
}

element = sQueue.front();
sQueue.pop();
}

std::string toString() const {
bip::deque<T> copy;
// make a copy of the class queue, to reduce time locked
{
boost::lock_guard<bip::interprocess_mutex> lock(io_mutex_);
copy.insert(copy.end(), sQueue.begin(), sQueue.end());
}

if (copy.empty()) {
return "Queue is empty";
} else {
std::stringstream os;
int counter = 0;

os << "Elements in the Synchronized queue are as follows:" << std::endl;
os << "**************************************************" << std::endl;

while (!copy.empty()) {
T object = copy.front();
copy.pop_front();
os << "Element at position " << counter << " is: [" << typeid(object).name() << "]\n";
}
return os.str();
}
}
};

struct gps_position {
int degrees;
int minutes;
float seconds;

gps_position(int d=0, int m=0, float s=0) : degrees(d), minutes(m), seconds(s) {}
};

static char const *SHARED_MEMORY_NAME = "MySharedMemory";
static char const *SHARED_QUEUE_NAME = "MyQueue";
typedef SynchronizedQueue<gps_position> MySynchronisedQueue;

#include <boost/interprocess/shared_memory_object.hpp>
#include <iostream>

void consumer()
{
bip::managed_shared_memory openedSegment(bip::open_only, SHARED_MEMORY_NAME);

MySynchronisedQueue *openedQueue = openedSegment.find<MySynchronisedQueue>(SHARED_QUEUE_NAME).first;
gps_position position;

while (openedQueue->pop(position)) {
std::cout << "Degrees= " << position.degrees << " Minutes= " << position.minutes << " Seconds= " << position.seconds;
std::cout << "\n";
}
}

void producer() {
bip::shared_memory_object::remove(SHARED_MEMORY_NAME);

bip::managed_shared_memory mysegment(bip::create_only,SHARED_MEMORY_NAME, 65536);

MySynchronisedQueue::allocator_type alloc(mysegment.get_segment_manager());
MySynchronisedQueue *myQueue = mysegment.construct<MySynchronisedQueue>(SHARED_QUEUE_NAME)(alloc);

for(int i = 0; i < 100; ++i)
myQueue->push(gps_position(i, 2, 3));

// Wait until the queue is empty: has been processed by client(s)
while(myQueue->sizeOfQueue() > 0)
continue;
}

int main() {
producer();
// or enable the consumer code for client:
// consumer();
}

关于c++ - 在消费者进程中 boost 共享内存和同步队列问题/崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26509815/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com