gpt4 book ai didi

c++ - 无法理解这个 "message sequence mismatch error"

转载 作者:行者123 更新时间:2023-12-03 07:02:03 26 4
gpt4 key购买 nike

我使用了 this 中回答的程序链接一些修改。下面是我修改后的代码:

#include <linux/netlink.h>

#include <netlink/netlink.h>
#include <netlink/route/qdisc.h>
#include <netlink/route/qdisc/plug.h>
#include <netlink/socket.h>

#include <atomic>
#include <csignal>
#include <iostream>
#include <stdexcept>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <thread>
#include <queue>
#include <chrono>

/**
* Netlink route socket.
*/
struct Socket {
Socket() : handle{nl_socket_alloc()} {

if (handle == nullptr) {
throw std::runtime_error{"Failed to allocate socket!"};
}

if (int err = nl_connect(handle, NETLINK_ROUTE); err < 0) {
throw std::runtime_error{"Unable to connect netlink socket: " +
std::string{nl_geterror(err)}};
}
}

Socket(const Socket &) = delete;
Socket &operator=(const Socket &) = delete;
Socket(Socket &&) = delete;
Socket &operator=(Socket &&) = delete;

~Socket() { nl_socket_free(handle); }

struct nl_sock *handle;
};

/**
* Read all links from netlink socket.
*/
struct LinkCache {
explicit LinkCache(Socket *socket) : handle{nullptr} {
if (int err = rtnl_link_alloc_cache(socket->handle, AF_UNSPEC, &handle);
err < 0) {
throw std::runtime_error{"Unable to allocate link cache: " +
std::string{nl_geterror(err)}};
}
}

LinkCache(const LinkCache &) = delete;
LinkCache &operator=(const LinkCache &) = delete;
LinkCache(LinkCache &&) = delete;
LinkCache &operator=(LinkCache &&) = delete;

~LinkCache() { nl_cache_free(handle); }

struct nl_cache *handle;
};

/**
* Link (such as "eth0" or "wlan0").
*/
struct Link {
Link(LinkCache *link_cache, const std::string &iface)
: handle{rtnl_link_get_by_name(link_cache->handle, iface.c_str())} {

if (handle == nullptr) {
throw std::runtime_error{"Link does not exist:" + iface};
}
}

Link(const Link &) = delete;
Link &operator=(const Link &) = delete;
Link(Link &&) = delete;
Link &operator=(Link &&) = delete;

~Link() { rtnl_link_put(handle); }

struct rtnl_link *handle;
};

/**
* Queuing discipline.
*/
struct QDisc {
QDisc(const std::string &iface, const std::string &kind)
: handle{rtnl_qdisc_alloc()} {
if (handle == nullptr) {
throw std::runtime_error{"Failed to allocate qdisc!"};
}

struct rtnl_tc *tc = TC_CAST(handle);

// Set link
LinkCache link_cache{&socket};
Link link{&link_cache, iface};
rtnl_tc_set_link(tc, link.handle);

// Set parent qdisc
uint32_t parent = 0;

if (int err = rtnl_tc_str2handle("root", &parent); err < 0) {
throw std::runtime_error{"Unable to parse handle: " +
std::string{nl_geterror(err)}};
}

rtnl_tc_set_parent(tc, parent);

// Set kind (e.g. "plug")
if (int err = rtnl_tc_set_kind(tc, kind.c_str()); err < 0) {
throw std::runtime_error{"Unable to set kind: " +
std::string{nl_geterror(err)}};
}
}

QDisc(const QDisc &) = delete;
QDisc &operator=(const QDisc &) = delete;
QDisc(QDisc &&) = delete;
QDisc &operator=(QDisc &&) = delete;

~QDisc() {
if (int err = rtnl_qdisc_delete(socket.handle, handle); err < 0) {
std::cerr << "Unable to delete qdisc: " << nl_geterror(err) << std::endl;
}

rtnl_qdisc_put(handle);
}

void send_msg() {
int flags = NLM_F_CREATE;

if (int err = rtnl_qdisc_add(socket.handle, handle, flags); err < 0) {
throw std::runtime_error{"Unable to add qdisc: " +
std::string{nl_geterror(err)}};
}
}

Socket socket;
struct rtnl_qdisc *handle;
};

/**
* Queuing discipline for plugging traffic.
*/
class Plug {
public:
Plug(const std::string &iface, uint32_t limit, std::string msg)
: qdisc_{iface, "plug"} {

rtnl_qdisc_plug_set_limit(qdisc_.handle, limit);
qdisc_.send_msg();

// set_enabled(enabled_);
set_msg(msg);
}

// void set_enabled(bool enabled) {
// if (enabled) {
// rtnl_qdisc_plug_buffer(qdisc_.handle);
// } else {
// rtnl_qdisc_plug_release_one(qdisc_.handle);
// }

// qdisc_.send_msg();
// enabled_ = enabled;
// }

void set_msg(std::string msg) {
if (msg == "buffer") {
int ret = rtnl_qdisc_plug_buffer(qdisc_.handle);
//std::cout<<strerror(ret);
} else if(msg == "commit") {
int ret = rtnl_qdisc_plug_release_one(qdisc_.handle);
//std::cout<<strerror(ret);
} else {
int ret = rtnl_qdisc_plug_release_indefinite(qdisc_.handle);
//std::cout<<strerror(ret);
}

qdisc_.send_msg();
}

// bool is_enabled() const { return enabled_; }

private:
QDisc qdisc_;

// bool enabled_;
};

std::atomic<bool> quit{false};

void exit_handler(int /*signal*/) { quit = true; }

// this function busy wait on job queue until there's something
//and calls release operation i.e. unplug qdisc to release output packets
//generated for a particular epoch
void transmit_ckpnt(std::queue<int> &job_queue, Plug &plug){

while(true){

while(!job_queue.empty()){

int id = job_queue.front();
job_queue.pop();
std::string s = std::to_string(id);

std::cout<<"called from parallel thread "<<s<<"\n";

//release buffer
plug.set_msg("commit");
}
}

}

int main() {
std::string iface{"veth-host"};
constexpr uint32_t buffer_size = 10485760;
// bool enabled = true;

Plug plug{iface, buffer_size, "buffer"};

/**
* Set custom exit handler to ensure destructor runs to delete qdisc.
*/
struct sigaction sa {};
sa.sa_handler = exit_handler;
sigfillset(&sa.sa_mask);
sigaction(SIGINT, &sa, nullptr);

pid_t wpid;
int status = 0;
std::queue<int> job_queue;
int ckpnt_no = 1;

std::thread td(transmit_ckpnt, std::ref(job_queue), std::ref(plug));
plug.set_msg("indefinite");

while(true){
//plug the buffer at start of the epoch
plug.set_msg("buffer");

//wait for completion of epoch
sleep(4);

job_queue.push(ckpnt_no);
ckpnt_no += 1;
}

plug.set_msg("indefinite");
td.join();

// while (!quit) {
// std::cout << "Plug set to " << plug.is_enabled() << std::endl;
// std::cout << "Press <Enter> to continue.";
// std::cin.get();

// plug.set_enabled(!plug.is_enabled());
// }

return EXIT_SUCCESS;
}

代码演练:这个程序创建了一个插入/拔出类型的 qdiscs,在插入操作期间,网络数据包被缓冲,而在拔出操作期间,网络数据包从第一个插头释放(排队纪律qdisc的前面)到第二个插入的qdisc。如果插入和拔出操作交替存在,则上述程序可以正常工作。但我想按照它的构建方式使用它,即像 this 中提到的那样链接,即

     TCQ_PLUG_BUFFER (epoch i)
TCQ_PLUG_BUFFER (epoch i+1)
TCQ_PLUG_RELEASE_ONE (for epoch i)
TCQ_PLUG_BUFFER (epoch i+2)
..............................so on

在我的程序中,主线程在每个纪元开始时开始缓冲,并继续执行。作业线程从作业队列中获取作业 ID,并将缓冲的数据包从队列的头部释放到下一个塞子。但这给出了以下错误:

./a.out: /lib/x86_64-linux-gnu/libnl-3.so.200: no version information available (required by ./a.out)
./a.out /usr/lib/x86_64-linux-gnu/libnl-route-3.so.200: no version information available (required by ./a.out)
called from parallel thread 1
called from parallel thread 2
called from parallel thread 3
called from parallel thread 4
called from parallel thread 5
called from parallel thread 6
called from parallel thread 7
terminate called after throwing an instance of 'std::runtime_error'
what(): Unable to add qdisc: Message sequence number mismatch
Aborted

无法理解这是什么以及为什么会出现此错误,当在主线程中按顺序执行释放时它正在工作,但现在当有另一个线程执行释放操作时它只检查 job_queue 是否是是否为空并执行释放操作,直到作业队列中有内容,如果作业队列为空则忙等待。

最佳答案

预期的序列计数器由 libnl 存储为 nl_sock 结构 (reference) 的一部分。当多个线程调用 libnl 函数时,这会导致不一致,例如数据竞争(两个线程同时写入序列计数器)或竞争条件(检查时间使用时间问题,其中一个线程检查计数器是否满足某些条件,然后执行某些操作,但在其他线程之间修改计数器)。参见 here有关数据竞争和竞争条件的更多详细信息。

旁注:g++clang++ 都支持 -fsanitize=thread 标志,它会自动将额外的调试代码插入二进制文件中,这有助于检测这种数据竞争(reference)。尽管在这种情况下,它可能没有那么有用,因为您还必须使用此标志编译 libnl,这可能并不容易。

来自 libnl 文档(reference):

The next step is to check the sequence number of the message against
the currently expected sequence number. The application may provide
its own sequence number checking algorithm by setting the callback
function NL_CB_SEQ_CHECK to its own implementation. In fact, calling
nl_socket_disable_seq_check() to disable sequence number checking will
do nothing more than set the NL_CB_SEQ_CHECK hook to a function which
always returns NL_OK.

这给我们留下了以下选择:

  1. 使用互斥体来保护对可能修改序列计数器的 libnl 函数的所有访问。

  2. 使用 nl_socket_disable_seq_check 禁用序列计数器检查。

在我看来,1) 是更可靠的解决方案。如果您更关心性能而不是健壮性,那么您可以选择 2)。

选项 1:使用互斥体来保护对 libnl 函数的访问

包含标准库中的互斥 header :

#include <mutex>

Plug 类中,添加一个 std::mutex作为成员(member):

class Plug {
...
private:
std::mutex seq_counter_mutex_;
...
};

set_msg的开头,使用std::lock_guard用于在函数执行期间获取互斥量。这样可以保证同一时间只有一个线程可以进入函数:

  void set_msg(std::string msg) {
std::lock_guard guard{seq_counter_mutex_};
...
}

选项 2:禁用序列号检查

Socket 类中,在构造函数的末尾,您可以禁用序列计数器检查:

  nl_socket_disable_seq_check(handle);

关于c++ - 无法理解这个 "message sequence mismatch error",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64669383/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com