- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
面向连接的传输(如 TCP)管理两个网络端点之间的连接的建立,在连接的生命周期的有序和可靠的消息传输,以及最后,连接的有序终止。相比之下,类似 UDP 的无连接协议中则没有持久化连接的概念,此外,UDP 也没有 TCP 的纠错机制。但 UDP 的性能比 TCP 要好很多,适合那些能够处理或者忍受消息丢失的应用程序
目前为止,我们所有的例子都是采用一种叫作单播的传输模式,定义为发送消息给一个由唯一地址所标识的单一的网络目的地。面向连接的协议和无连接协议都支持这种模式
UDP 提供了向多个接收者发送消息的额外传输模式:
本章示例将通过发送能够被同一个网络中的所有主机接收的消息来演示 UDP 广播的使用
我们的程序将打开一个文件,随后通过 UDP 把每一行作为一个消息广播到一个指定的端口。而接收方只需简单地在指定端口上启动一个监听程序,便可以创建一个事件监视器来接受消息。本次示例以日志文件处理程序为例
在这个应用程序中,我们将会把消息作为事件处理,并且由于该数据来自于日志文件,所以我们将它称为 LogEvent
public class LogEvent {
public static final byte SEPARATOR = (byte) ':';
private final InetSocketAddress source;
private final String logfile;
private final String msg;
private final long received;
public LogEvent(String logfile, String msg) {
this(null, logfile, msg, -1);
}
public LogEvent(InetSocketAddress source, String logfile, String msg, long received) {
this.source = source;
this.logfile = logfile;
this.msg = msg;
this.received = received;
}
public InetSocketAddress getSource() {
return source;
}
public String getLogfile() {
return logfile;
}
public String getMsg() {
return msg;
}
public long getReceived() {
return received;
}
}
Netty 的 DatagramPacket 是一个简单的消息容器,DatagramChannel 实现和远程节点的通信,要将 LogEvent 消息转换为 DatagramPacket,我们需要一个编码器
下述是编码器的代码实现
public class LogEventEncoder extends MessageToMessageEncoder<LogEvent> {
private final InetSocketAddress remoteAddress;
public LogEventEncoder(InetSocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
}
@Override
protected void encode(ChannelHandlerContext ctx, LogEvent logEvent, List<Object> out) throws Exception {
byte[] file = logEvent.getLogfile().getBytes(StandardCharsets.UTF_8);
byte[] msg = logEvent.getMsg().getBytes(StandardCharsets.UTF_8);
ByteBuf buf = ctx.alloc().buffer(file.length + msg.length + 1);
buf.writeBytes(file);
buf.writeByte(LogEvent.SEPARATOR);
buf.writeBytes(msg);
out.add(new DatagramPacket(buf, remoteAddress));
}
}
接下来准备引导该服务器,包括设置 ChannelOption,以及在 ChannelPipeline 中安装所需的 ChannelHandler,这部分通过主类 LogEventBroadcaster 完成
public class LogEventBroadcaster {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
private final File file;
public LogEventBroadcaster(InetSocketAddress address, File file) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new LogEventEncoder(address));
this.file = file;
}
public void run() throws Exception {
// 绑定 Channel
Channel ch = bootstrap.bind(0).sync().channel();
long pointer = 0;
for (; ; ) {
long len = file.length();
if (len < pointer) {
// 将文件指针指向文件的最后一个字节
pointer = len;
} else if (len > pointer) {
RandomAccessFile raf = new RandomAccessFile(file, "r");
// 设置当前文件指针
raf.seek(pointer);
String line;
while ((line = raf.readLine()) != null) {
ch.writeAndFlush(new LogEvent(null, line, file.getAbsolutePath(), -1));
}
pointer = raf.getFilePointer();
raf.close();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.interrupted();
break;
}
}
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new InterruptedException();
}
LogEventBroadcaster broadcaster = new LogEventBroadcaster(new InetSocketAddress("255.255.255.255",
Integer.parseInt(args[0])), new File(args[1]));
try {
broadcaster.run();
}
finally {
broadcaster.stop();
}
}
}
编写一个称为 LogEventMonitor 的消费者程序,它的作用包括:
和之前一样,解码器 LogEventDecoder 负责将传入的 DatagramPacket 解码为 LogEvent 消息
public class LogEventDecoder extends MessageToMessageDecoder<DatagramPacket> {
@Override
protected void decode(ChannelHandlerContext ctx, DatagramPacket datagramPacket, List<Object> out) throws Exception {
ByteBuf data = datagramPacket.content();
int idx = data.indexOf(0, data.readableBytes(), LogEvent.SEPARATOR);
String filename = data.slice(0, idx).toString(CharsetUtil.UTF_8);
String logMsg = data.slice(idx + 1, data.readableBytes()).toString(CharsetUtil.UTF_8);
LogEvent event = new LogEvent(datagramPacket.sender(), logMsg, filename, System.currentTimeMillis());
out.add(event);
}
}
创建一个处理 LogEvent 的 ChannelHandler
public class LogEventHandler extends SimpleChannelInboundHandler<LogEvent> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
protected void messageReceived(ChannelHandlerContext ctx, LogEvent event) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append(event.getReceived());
builder.append("[");
builder.append(event.getSource().toString());
builder.append("] [");
builder.append(event.getLogfile());
builder.append("] : ");
builder.append(event.getMsg());
System.out.println(builder.toString());
}
}
现在需要将 LogEventDecoder 和 LogEventHandler 安装到 ChannelPipeline 中,下述代码展示了如何通过 LogEventMonitor 主类来做到这一点
public class LogEventMonitor {
private final EventLoopGroup group;
private final Bootstrap bootstrap;
public LogEventMonitor(InetSocketAddress address) {
group = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioDatagramChannel.class)
.option(ChannelOption.SO_BROADCAST, true)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new LogEventDecoder());
pipeline.addLast(new LogEventHandler());
}
})
.localAddress(address);
}
public Channel bind() {
return bootstrap.bind().syncUninterruptibly().channel();
}
public void stop() {
group.shutdownGracefully();
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
throw new IllegalArgumentException("Usage: LogEventMonitor <port>");
}
LogEventMonitor monitor = new LogEventMonitor(new InetSocketAddress(Integer.parseInt(args[0])));
try {
Channel channel = monitor.bind();
channel.closeFuture().sync();
}
finally {
monitor.stop();
}
}
}
谁能给我提供代码或链接,以便在可能的情况下使用 UDP 发送和接收广播消息? 我一直被困在一个问题中,希望你们能帮助我解决它。谢谢 最佳答案 这是一个 C# 示例: using System; usi
我想将形状为 [a,b,c] 的张量中的元素相乘,每个元素在第 3 维中使用来自形状为 [a,b] 的张量的标量。 例如, x = |[1,2][3,4][5,6]| |[1,2][3,4][5,6]
广播是使具有不同形状的数组具有用于算术运算的兼容形状的过程。在 numpy 中,我们可以广播数组。 TensorFlow 图是否支持类似于 numpy 的广播? 最佳答案 是的,它是支持的。打开终端并
我有一个刷新功能,需要广播到子 Controller 。我在父 Controller 中做了类似的事情: // Refresh/Cancel $scope.OnGridBODRefre
我正在尝试在计算中使用字典值,如下所示: mydict = dict(zip(['key1', 'key2', 'key3'], [1, 2, 3])) print
刚刚掌握使用 MPI 的 Java 接口(interface)进行并行编程。只是想知道是否有人可以非常简单地解释广播的工作原理? 我有以下内容: if (me ==0) { // This is th
我正在处理一个项目,当我发送消息时,我将它作为通知发送给另一个用户使用广播它工作正常但是当我再次发送新消息然后替换为旧通知而不创建新通知 下面是我生成通知的代码 NotificationCompat.
我是 android 的初学者。但我非常需要你的帮助。我有一个流媒体视频广播视频项目。我找不到好的示例,在哪里可以实现从摄像机录制视频、将流发送(上传)到服务器以及从服务器下载(获取流)到播放器。请帮
请帮我解决我的问题。当我从父 Controller 调用并在子 Controller 中捕获时,为什么 $broadcast 函数不起作用?
我如何从 shell 中看到设置了哪些套接字选项?我特别想知道是否设置了 SO_BROADCAST? 最佳答案 你看过lsof了吗? 关于linux - 广播 socket ,我们在Stack Ove
当我在 Numpy 中进行此操作时会发生什么? a = np.ones([500,1]) b = np.ones([5000,])/2 c = a + b # a.shape (500,1) # b.
我有一个 Nexus S,当我在手机上手动更改日期时,并不总是广播 ACTION_DATE_CHANGED。如果我将日期从 2014 年 2 月 13 日更改为 2014 年 2 月 14 日,我还没
环境:springboot2.3.9RELEASE + RocketMQ4.8.0 依赖 <dependency>  
UDP 广播 面向连接的传输(如 TCP)管理两个网络端点之间的连接的建立,在连接的生命周期的有序和可靠的消息传输,以及最后,连接的有序终止。相比之下,类似 UDP 的无连接协议中则没有持久化连接的概
我正在开发一个带有 Angular 的单页应用程序,我需要在两个不同的指令之间进行通信,这些指令基本上没有父子关系。 在指令 A 中,我有 2 个地方需要从不同的功能广播相同的事件。在指令 B 中,为
我有一个带有多个重复项的主要二维 numpy 数组和一个具有第一个唯一值的辅助数组。 [[ 0 0 1 ] [ 1 0 2 ] [ 2 0 2 ] ... [ 0 0 1 ]
我正在制作多人网络游戏。现在要连接到服务器,客户端需要服务器的 ip 地址。 所以,我的实现方式如下。 客户端在广播 IP 和端口 A 上广播其 IP 地址。服务器通过 A 监听它,并且 服务器与客户
是否可以在没有 Urban Airship 等服务的情况下广播推送通知? 谢谢。 最佳答案 当然可以,但是您需要自己实现整个基础架构。 http://developer.apple.com/libra
我想复制矩阵的每一行 M没有任何复制发生(即通过创建 View ): 0 1 0 1 2 3 -> 0 1 2 3
我从一个 2D 数组开始,想将它广播到一个 3D 数组(例如,从灰度图像到 rgb 图像)。这是我使用的代码。 >>> img_grey = np.random.randn(4, 4) >>> img
我是一名优秀的程序员,十分优秀!