gpt4 book ai didi

java - 使用 SmallRye react 消息动态发布/订阅 MQTT

转载 作者:行者123 更新时间:2023-12-05 07:04:43 29 4
gpt4 key购买 nike

我们尝试使用 smallrye 响应式(Reactive)消息传递来发布和订阅 MQTT 协议(protocol)。我们设法通过以下简单代码将消息实际发布到特定主题/ channel

import io.smallrye.mutiny.Multi;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import javax.enterprise.context.ApplicationScoped;
import java.time.Duration;

@ApplicationScoped
public class Publish {

@Outgoing("pao")
public Multi<String> generate() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.map(x -> "A Message in here");
}
}

我们要做的是在需要 generate() 时调用以某种方式使用动态主题的方法,用户将在其中定义它。那个是我们的问题,但后来我们从那个 repo 中找到了这些类在 github 中。包名io.smallrye.reactive.messaging.mqtt

例如,我们发现有一个类表示它对 MQTT 代理(Mosquitto 服务器启动)进行发布调用。

在声明中 SendingMqttMessage<String> message = new SendingMqttMessage<String>("myTopic","A message in here",0,false);我们在 SendingMqttMessage<String> 下得到红色下划线说'SendingMqttMessage(java.lang.String, java.lang.String, io.netty.handler.codec.mqtt.MqttQoS, boolean)' is not public in 'io.smallrye.reactive.messaging.mqtt.SendingMqttMessage '.无法从外包访问

更新(发布完成)最后向 mqtt 代理(一个 mosquitto 服务器)发出了一个发布请求,所有这些都带有一个由用户配置的动态主题。正如我们发现上一节课 SendingMqttMessage根本不应该使用。我们发现我们还需要发射器来实际发出带有动态主题的发布请求。

    @Inject
@Channel("panatha")
Emitter<String> emitter;

@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response createUser(Device device) {
System.out.println("New Publish request: message->"+device.getMessage()+" & topic->"+device.getTopic());
emitter.send(MqttMessage.of(device.getTopic(), device.getMessage()));
return Response.ok().status(Response.Status.CREATED).build();
}

现在我们需要了解如何动态订阅主题。

最佳答案

首先将我们设置到同一页面:
响应式(Reactive)消息传递适用于主题,但适用于 channel 。请务必注意这一点,因为您可以独占地读取 写入 channel 。所以如果要两者都提供,需要配置两个channel指向同一个topic,一进一出

回答你的问题:

您在发射器方面有了一个很好的开端,但仍然缺乏您想要的动态特性。在您的示例中,您通过 CDI 获得了发射器。
现在这就是我们所需要的,让它变得动态,因为我们不能在运行时使用 CDI 动态注入(inject) Bean,如下所示:

发送消息

private Emitter<byte[]> dynamicEmitter(String topic){
return CDI.current().select(new TypeLiteral<Emitter<byte[]>>() {}, new ChannelAnnotation(topic)).get();
}

另请注意,我正在创建 byte[] 类型的发射器,因为根据其文档,这是 smallrye-mqtt 连接器(版本 3.4.0)目前唯一支持的类型。

接收消息

要从响应式(Reactive)消息 channel 读取消息,您可以使用发射器的对应物,即发布者
它可以模拟使用:

private Publisher<byte[]> dynamicReceiver(String topic){
return CDI.current().select(new TypeLiteral<Publisher<byte[]>>() {}, new ChannelAnnotation(topic)).get();
}

然后您可以按照您喜欢的任何方式处理这些日期。作为演示,它将它卡在一个简单的 REST 端点上

@GET
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<String> stream(@QueryParam("topic") String topic) {
return Multi.createFrom().publisher(dynamicReceiver(topic)).onItem().transform(String::new);
}

@GET
@Path("/publish")
public boolean publish(@QueryParam("msg") String msg, @QueryParam("topic") String topic) {
dynamicEmitter(topic).send(msg.getBytes());
return true;
}

还有一件事

在创建这个解决方案时,我遇到了一些你应该知道的陷阱:

  1. Quarkus 删除所有“未使用”的 CDI-Beans。因此,如果您想动态注入(inject)它们,则需要排除它们或关闭该功能。
  2. 必须配置以这种方式注入(inject)的所有 channel 。否则注入(inject)会失败。
  3. 出于某种原因,(即使完全禁用删除)我无法动态注入(inject)发射器,除非它们被注入(inject)到其他地方。

关于java - 使用 SmallRye react 消息动态发布/订阅 MQTT,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62883516/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com