- VisualStudio2022插件的安装及使用-编程手把手系列文章
- pprof-在现网场景怎么用
- C#实现的下拉多选框,下拉多选树,多级节点
- 【学习笔记】基础数据结构:猫树
通过前面的介绍,订阅者、发布者与Roudi守护进程之间也需要通信,如上文介绍的,请求Roudi守护进村创建并配置端口数据。整体结构如下图所示:
由于通信层在类Unix操作系统和Windows操作系统下实现不同(见下面的代码片段),所以我们分开介绍其实现.
#if defined(_WIN32)
using IoxIpcChannelType = iox::posix::NamedPipe;
#else
using IoxIpcChannelType = iox::posix::UnixDomainSocket;
#endif
接下来我们从数据的序列化和反序列化开始.
前一篇文章中,这部分通信没有使用三方框架,使用简单的字符串拼接的方式进行序列化,如下所示:
template <typename T>
void IpcMessage::addEntry(const T& entry) noexcept
{
std::stringstream newEntry;
newEntry << entry;
if (!isValidEntry(newEntry.str()))
{
LogError() << "\'" << newEntry.str().c_str() << "\' is an invalid IPC channel entry";
m_isValid = false;
}
else
{
m_msg.append(newEntry.str() + m_separator);
++m_numberOfElements;
}
}
template <typename T>
IpcMessage& IpcMessage::operator<<(const T& entry) noexcept
{
addEntry(entry);
return *this;
}
上面的代码较为简单,这里不作详细解释了。反序列化也很简单,这里就贴一下代码了,非常简单粗暴的实现:
std::string IpcMessage::getElementAtIndex(const uint32_t index) const noexcept
{
std::string messageRemainder(m_msg);
size_t startPos = 0u;
size_t endPos = messageRemainder.find_first_of(m_separator, startPos);
for (uint32_t counter = 0u; endPos != std::string::npos; ++counter)
{
if (counter == index)
{
return messageRemainder.substr(startPos, endPos - startPos);
}
startPos = endPos + 1u;
endPos = messageRemainder.find_first_of(m_separator, startPos);
}
return std::string();
}
正如在 引言 中介绍的,类Unix系统使用Unix域套接字实现IPC通信机制。由UnixDomainSocket封装初始化、销毁、发送和接收等逻辑,这里我们主要介绍发送和接收逻辑的具体实现.
职责: 封装客户端的消息发送逻辑 。
参数:
msg
:待发送的消息。cxx::expected<IpcChannelError> UnixDomainSocket::send(const std::string& msg) const noexcept
{
// we also support timedSend. The setsockopt call sets the timeout for all further sendto calls, so we must set
// it to 0 to turn the timeout off
return timedSend(msg, units::Duration::fromSeconds(0ULL));
}
发送函数send只是简单地调用地超时时间的发送函数timedSend。输入的超时时间为0,意味着立即发送。timedSend的实现如下所示:
cxx::expected<IpcChannelError> UnixDomainSocket::timedSend(const std::string& msg,
const units::Duration& timeout) const noexcept
{
if (msg.size() > m_maxMessageSize)
{
return cxx::error<IpcChannelError>(IpcChannelError::MESSAGE_TOO_LONG);
}
if (IpcChannelSide::SERVER == m_channelSide)
{
std::cerr << "sending on server side not supported for unix domain socket \"" << m_name << "\"" << std::endl;
return cxx::error<IpcChannelError>(IpcChannelError::INTERNAL_LOGIC_ERROR);
}
auto tv = timeout.timeval();
auto setsockoptCall = posixCall(iox_setsockopt)(m_sockfd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv))
.failureReturnValue(ERROR_CODE)
.ignoreErrnos(EWOULDBLOCK)
.evaluate();
if (setsockoptCall.has_error())
{
return cxx::error<IpcChannelError>(convertErrnoToIpcChannelError(setsockoptCall.get_error().errnum));
}
auto sendCall = posixCall(iox_sendto)(m_sockfd, msg.c_str(), msg.size() + NULL_TERMINATOR_SIZE, 0, nullptr, 0)
.failureReturnValue(ERROR_CODE)
.evaluate();
if (sendCall.has_error())
{
return cxx::error<IpcChannelError>(convertErrnoToIpcChannelError(sendCall.get_error().errnum));
}
return cxx::success<void>();
}
逐段代码分析:
LINE 04 ~ LINE 13: 错误处理——消息长度过长、类型服务端。整体结构图中,黄色的 。
LINE 15 ~ LINE 24: 调用POSIX接口(类Unix系统调用)setsockopt,设置超时时间.
LINE 25 ~ LINE 32: 调用POSIX接口(类Unix系统调用)sendto发送数据.
可以看到,Unix版本的发送实现就是简单地调用系统调用.
职责: 封装消息接收逻辑.
返回: 消息字符串或错误类型.
cxx::expected<std::string, IpcChannelError> UnixDomainSocket::receive() const noexcept
{
// we also support timedReceive. The setsockopt call sets the timeout for all further recvfrom calls, so we must set
// it to 0 to turn the timeout off
struct timeval tv = {};
tv.tv_sec = 0;
tv.tv_usec = 0;
return timedReceive(units::Duration(tv));
}
接收函数receive只是简单地调用地超时时间的发送函数timedReceive。输入的超时时间为0,即没有结果立即返回。timedReceive的实现如下所示:
cxx::expected<std::string, IpcChannelError>
UnixDomainSocket::timedReceive(const units::Duration& timeout) const noexcept
{
if (IpcChannelSide::CLIENT == m_channelSide)
{
std::cerr << "receiving on client side not supported for unix domain socket \"" << m_name << "\"" << std::endl;
return cxx::error<IpcChannelError>(IpcChannelError::INTERNAL_LOGIC_ERROR);
}
auto tv = timeout.timeval();
auto setsockoptCall = posixCall(iox_setsockopt)(m_sockfd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv))
.failureReturnValue(ERROR_CODE)
.ignoreErrnos(EWOULDBLOCK)
.evaluate();
if (setsockoptCall.has_error())
{
return cxx::error<IpcChannelError>(convertErrnoToIpcChannelError(setsockoptCall.get_error().errnum));
}
// NOLINTJUSTIFICATION needed for recvfrom
// NOLINTNEXTLINE(hicpp-avoid-c-arrays, cppcoreguidelines-avoid-c-arrays)
char message[MAX_MESSAGE_SIZE + 1];
auto recvCall = posixCall(iox_recvfrom)(m_sockfd, &message[0], MAX_MESSAGE_SIZE, 0, nullptr, nullptr)
.failureReturnValue(ERROR_CODE)
.suppressErrorMessagesForErrnos(EAGAIN, EWOULDBLOCK)
.evaluate();
message[MAX_MESSAGE_SIZE] = 0;
if (recvCall.has_error())
{
return cxx::error<IpcChannelError>(convertErrnoToIpcChannelError(recvCall.get_error().errnum));
}
return cxx::success<std::string>(&message[0]);
}
逐段代码分析:
LINE 04 ~ LINE 08: 错误处理——通道类型服务端。整体结构图中,黄色的.
LINE 10 ~ LINE 19: 调用POSIX接口(类Unix系统调用)setsockopt,设置超时时间.
LINE 22 ~ LINE 33: 调用POSIX接口(类Unix系统调用)recvfrom接收数据.
由于Windows不支持Unix域套接字,使用共享内存的方式来模拟。每引入一个发布者或订阅者,都需要开辟两条通道——收和发,每条通道会使用单独一块共享内存,即需要开辟两块共享内存.
职责: 封装消息发送逻辑.
参数:
msg
:待发送的消息。cxx::expected<IpcChannelError> NamedPipe::send(const std::string& message) const noexcept
{
if (!m_isInitialized)
{
return cxx::error<IpcChannelError>(IpcChannelError::NOT_INITIALIZED);
}
if (message.size() > MAX_MESSAGE_SIZE)
{
return cxx::error<IpcChannelError>(IpcChannelError::MESSAGE_TOO_LONG);
}
cxx::Expects(!m_data->sendSemaphore().wait().has_error());
IOX_DISCARD_RESULT(m_data->messages.push(Message_t(cxx::TruncateToCapacity, message)));
cxx::Expects(!m_data->receiveSemaphore().post().has_error());
return cxx::success<>();
}
逐段代码分析:
LINE 03 ~ LINE 11: 错误处理——未初始化(消息队列共享内存未创建)、消息长度过长。这里没有判断是服务端还是客户端,估计是不同人实现的.
LINE 13 ~ LINE 15: 第14行,往消息队列(共享内存)中存入消息。第13行是通过发送信号量判断消息队列是否已满,若已满,则一直等待,直到接收端读取消息,唤醒发送端。第15行是唤醒接收端读取消息.
iceoryx还提供了timedSend函数,带有超时机制,即超时则发送失败。还提供了不等待的版本trySend,若队列已满,则发送失败。这两个函数本文不做介绍.
职责: 封装消息接收逻辑.
返回: 消息字符串或错误类型.
cxx::expected<std::string, IpcChannelError> NamedPipe::receive() const noexcept
{
if (!m_isInitialized)
{
return cxx::error<IpcChannelError>(IpcChannelError::NOT_INITIALIZED);
}
cxx::Expects(!m_data->receiveSemaphore().wait().has_error());
auto message = m_data->messages.pop();
if (message.has_value())
{
cxx::Expects(!m_data->sendSemaphore().post().has_error());
return cxx::success<std::string>(message->c_str());
}
return cxx::error<IpcChannelError>(IpcChannelError::INTERNAL_LOGIC_ERROR);
}
逐段代码分析:
LINE 03 ~ LINE 06: 错误处理——未初始化(消息队列共享内存未创建)。这里没有判断是服务端还是客户端,估计是不同人实现的.
LINE 08 ~ LINE 14: 第14行,往消息队列(共享内存)中存入消息。第8行是通过接收信号量判断消息队列是否为空,若为空,则一直等待,直到发送端发送消息,唤醒发送端。第12行是唤醒发送端发送消息.
iceoryx还提供了timedReceive函数,带有超时机制,即超时则接收失败。还提供了不等待的版本tryReceive,若队列为空,则接收失败。这两个函数本文不做介绍.
Roudi启动后,会开启一个线程来监听和处理来自客户端(订阅者、发布者)的请求,如下所示:
void RouDi::startProcessRuntimeMessagesThread() noexcept
{
m_handleRuntimeMessageThread = std::thread(&RouDi::processRuntimeMessages, this);
posix::setThreadName(m_handleRuntimeMessageThread.native_handle(), "IPC-msg-process");
}
线程执行函数为processRuntimeMessages,内部就是一个循环,如下所示:
void RouDi::processRuntimeMessages() noexcept
{
runtime::IpcInterfaceCreator roudiIpcInterface{IPC_CHANNEL_ROUDI_NAME};
// the logger is intentionally not used, to ensure that this message is always printed
std::cout << "RouDi is ready for clients" << std::endl;
while (m_runHandleRuntimeMessageThread)
{
// read RouDi's IPC channel
runtime::IpcMessage message;
if (roudiIpcInterface.timedReceive(m_runtimeMessagesThreadTimeout, message))
{
auto cmd = runtime::stringToIpcMessageType(message.getElementAtIndex(0).c_str());
std::string runtimeName = message.getElementAtIndex(1);
processMessage(message, cmd, RuntimeName_t(cxx::TruncateToCapacity, runtimeName));
}
}
}
通过上述代码可知,发送给Roudi的所有消息,第一项为请求类型,第二项为运行。这里调用了processMessage函数,这和上一篇文章中的 3.5 RouDi::processMessage 关联了.
最后此篇关于iceoryx源码阅读(八)——IPC通信机制的文章就讲到这里了,如果你想了解更多关于iceoryx源码阅读(八)——IPC通信机制的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
广播的原则 如果两个数组的后缘维度(从末尾开始算起的维度)的轴长度相符或其中一方的长度为1,则认为它们是广播兼容的。广播会在缺失维度和(或)轴长度为1的维度上进行。 在上面的对arr每一列减去列
之前在讲 MySQL 事务隔离性提到过,对于写操作给读操作的影响这种情形下发生的脏读、不可重复读、虚读问题。是通过MVCC 机制来进行解决的,那么MVCC到底是如何实现的,其内部原理是怎样的呢?我们要
我创建了一个 JavaScript 对象来保存用户在 ColorBox 中检查复选框时设置的值。 . 我对 jQuery 和“以正确的方式”编程 JavaScript 比较陌生,希望确保以下用于捕获用
我为了回答aquestion posted here on SO而玩示例,发现很难理解python的import *破坏作用域的机制。 首先是一点上下文:这个问题不涉及实际问题;我很清楚from fo
我想让我的类具有标识此类的参数 ID。例如我想要这样的东西: class Car { public static virtual string ID{get{return "car";}} }
更新:我使用的是 Java 1.6.34,没有机会升级到 Java 7。 我有一个场景,我每分钟只能调用一个方法 80 次。它实际上是由第 3 方编写的服务 API,如果您多次调用它,它会“关闭”(忽
希望这对于那些使用 Javascript 的人来说是一个简单的答案...... 我有一个日志文件,该文件正在被一个脚本监视,该脚本将注销中的新行提供给任何连接的浏览器。一些人评论说,他们希望看到的更多
我们正在开发针对 5.2 开发的 PHP 应用程序,但我们最近迁移到了 PHP 5.3。我们没有时间去解决所有迁移到 PHP 5.3 的问题。具体来说,我们有很多消息: Declaration of
简介 在实现定时调度功能的时候,我们往往会借助于第三方类库来完成,比如: quartz 、 spring schedule 等等。jdk从1.3版本开始,就提供了基于 timer 的定时调度功能。
Java中,一切都是对象,在分布式环境中经常需要将Object从这一端网络或设备传递到另一端。这就需要有一种可以在两端传输数据的协议。Java序列化机制就是为了解决这个问题而
我将编写自己的自定义控件,它与 UIButton 有很大不同。由于差异太大,我决定从头开始编写。所以我所有的子类都是 UIControl。 当我的控件在内部被触摸时,我想以目标操作的方式触发一条消息。
在我的代码中,在创建 TIdIMAP4 连接之前,我设置了一大堆 SASL 机制,希望按照规定的“最好到最差”顺序,如下所示: IMAP.SASLMechanisms.Add.SASL := mIdS
在 Kubernetes 中,假设我们有 3 个 pod,它们物理上托管在节点 X、Y 和 Z 上。当我使用“kubectl expose”将它们公开为服务时,它们都是集群中的节点(除了 X、Y 和
关闭。这个问题需要多问focused 。目前不接受答案。 想要改进此问题吗?更新问题,使其仅关注一个问题 editing this post . 已关闭 9 年前。 Improve this ques
我知道进程间通信 (ipc) 有几种方法,例如: 文件 信号 socket 消息队列 管道 命名管道 信号量 共享内存 消息传递 内存映射文件 但是我无法找到将这些机制相互比较并指出它们在不同环境中的
当我尝试连接到 teradata 时,出现了TD2 机制不支持单点登录 错误。 在 C# 中,我遇到了类似的问题,我通过添加 connectionStringBuilder.Authetication
我有一个带有 JSON API 的简单 Javascript 应用程序。目前它在客户端运行,但我想将它从客户端移动到服务器。我习惯于学习新平台,但在这种情况下,我的时间非常有限 - 所以我需要找到绝对
我想了解事件绑定(bind)/解除绑定(bind)在浏览器中是如何工作的。具体来说,如果我删除一个已经绑定(bind)了事件的元素,例如使用 jQuery:$("#anElement").remove
我不是在寻找具体答案,只是一个想法或提示。我有以下问题: Android 应用程序是 Web 服务的客户端。它有一个线程,通过 http 协议(protocol)发送事件(带有请求 ID 的 XML
我正在研究 FreeBSD TCP/IP 栈。似乎有 2 种 syn flood 机制,syncookies 和 syncache。我的问题是关于 syncookies,它是从头开始还是在 SYN 队
我是一名优秀的程序员,十分优秀!