作者热门文章
- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
<分区>
我正在尝试编写一个 C++11/14 程序,其中固定数量的线程(比如 4 个)不断地从线程安全队列中取出一个工作,直到队列中没有剩余的工作。
线程安全队列实现:
template<typename T>
class threadsafe_queue
{
private:
mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue() {}
threadsafe_queue(threadsafe_queue const &other)
{
std::lock_guard<std::mutex> lk(other.mut);
data_queue = other.data_queue;
}
void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(new_value);
data_cond.notify_one();
}
void wait_and_pop(T &value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
value = data_queue.front();
data_queue.pop();
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this]{return !data_queue.empty();});
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool try_pop(T &value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = data_queue.front();
data_queue.pop();
return true;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
data_queue.pop();
return res;
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
每个线程运行的函数:
void insertintobidask(std::string connstring, std::string ziparchivename, OFStreamWriter &errlog) { /.../ }
Main,其中线程应该从工作队列中取出一个工作,直到队列中没有工作为止:
int main()
{
std::ofstream errlog
errlog.open("/home/vorlket/Desktop/Project/Code/Test/errlog.txt", std::ofstream::out);
OFStreamWriter ofsw(&errlog);
threadsafe_queue<std::string> wqueue;
boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data");
std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip");
for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter)
{
std::string name = iter->path().filename().string();
if (std::regex_match(name, pattern_fx))
{
wqueue.push(name);
}
}
/* Each thread below would run once, how do I modify it to make it continuously take a work off the queue and run until there is no work left in the queue?
std::thread consumer1 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
std::thread consumer2 (insertintobidask, "hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
std::thread consumer3 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
std::thread consumer4 (insertintobidask, "hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J", wqueue.wait_and_pop(), &ofsw);
consumer1.join();
consumer2.join();
consumer3.join();
consumer4.join();
*/
errlog.close();
return 0;
}
我根据下面 Nim 的回答尝试了另一种方法,它有效。
/* g++ -std=gnu++11 fxetl.cxx -o fxetl -lboost_system -lboost_filesystem -lzip -lpqxx -lpq -pthread */
#include <boost/filesystem.hpp>
#include <regex>
#include <iostream>
#include <fstream>
#include <string>
#include <pqxx/pqxx>
#include <zip.h>
#include <thread>
#include <boost/asio.hpp>
#include "threadsafe_oerrlog.h"
void insertintobidask(pqxx::nontransaction &txn, std::string ziparchivename, OFStreamWriter &errlog)
{
std::string fileyearmonth = ziparchivename.substr(27, 6);
std::string ziparchivepath = "/home/vorlket/Desktop/Project/Code/Test/Data/HISTDATA_COM_ASCII_AUDUSD_T" + fileyearmonth + ".zip";
std::string zipfilepath = "DAT_ASCII_AUDUSD_T_" + fileyearmonth + ".csv";
int err, r;
char buffer[39]; // each line takes up 39 bytes
struct zip *ziparchive = zip_open(ziparchivepath.c_str(), 0, &err);
if (ziparchive)
{
struct zip_file *zipfile = zip_fopen(ziparchive, zipfilepath.c_str(), 0);
if (zipfile)
{
while ((r = zip_fread(zipfile, buffer, sizeof(buffer))) > 0)
{
std::string str(buffer);
txn.exec("INSERT INTO fx.bidask VALUES('AUDUSD', to_timestamp(" +txn.quote(str.substr(0, 18)) + ", 'YYYYMMDD HH24MISSMS'), " + txn.quote(str.substr(19, 8)) + ", " + txn.quote(str.substr(28, 8)) + ")");
}
zip_fclose(zipfile);
std::cout << fileyearmonth << std::endl;
}
else
{
errlog << zipfilepath;
}
}
else
{
errlog << ziparchivepath;
}
zip_close(ziparchive);
}
int main()
{
pqxx::connection conn1("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J");
pqxx::nontransaction txn1(conn1);
pqxx::connection conn2("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J");
pqxx::nontransaction txn2(conn2);
pqxx::connection conn3("hostaddr=192.168.2.104 port=5433 dbname=fxproj user=vorlket password=K1156312J");
pqxx::nontransaction txn3(conn3);
pqxx::connection conn4("hostaddr=192.168.3.104 port=5433 dbname=fxproj user=vorlket password=K1156312J");
pqxx::nontransaction txn4(conn4);
std::ofstream errlog("/home/vorlket/Desktop/Project/Code/Test/errlog.txt");
OFStreamWriter ofsw(&errlog);
boost::asio::io_service service1; // queue
boost::asio::io_service service2;
boost::asio::io_service service3;
boost::asio::io_service service4;
boost::filesystem::path fx_dir("/home/vorlket/Desktop/Project/Code/Test/Data");
std::regex pattern_fx("HISTDATA_COM_ASCII_.*.zip");
int serviceid = 0;
for (boost::filesystem::recursive_directory_iterator iter(fx_dir), end; iter != end; ++iter)
{
std::string name = iter->path().filename().string();
if (std::regex_match(name, pattern_fx))
{
serviceid %= 3;
switch (serviceid)
{
case 0 :
service1.post([&txn1, name, &ofsw]() { insertintobidask(txn1, name, ofsw); });
break;
case 1 :
service2.post([&txn2, name, &ofsw]() { insertintobidask(txn2, name, ofsw); });
break;
case 2 :
service3.post([&txn3, name, &ofsw]() { insertintobidask(txn3, name, ofsw); });
break;
case 3 :
service4.post([&txn4, name, &ofsw]() { insertintobidask(txn4, name, ofsw); });
break;
}
++serviceid;
}
}
std::thread t1([&service1]() { service1.run(); });
std::thread t2([&service2]() { service2.run(); });
std::thread t3([&service3]() { service3.run(); });
std::thread t4([&service4]() { service4.run(); });
t1.join();
t2.join();
t3.join();
t4.join();
}
不确定哪种方法更快,但我想这取决于工作量和正在处理的平台。值得一试,看看哪个更快。任何关于哪种方法会更快以及为什么的评论都将受到赞赏。
我是一名优秀的程序员,十分优秀!