gpt4 book ai didi

azure - 如何在 Quarkus 中连接到 Azure ServiceBus

转载 作者:行者123 更新时间:2023-12-02 23:22:09 25 4
gpt4 key购买 nike

我想从 Quarkus 连接到 ServiceBus 订阅。我找到了这个article ,建议使用 ServiceBusJmsConnectionFactory 并且我正在尝试使其与 Smallrye 一起使用

到目前为止我尝试过:

import com.microsoft.azure.servicebus.jms.ServiceBusJmsConnectionFactory
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder
import javax.enterprise.context.ApplicationScoped
import javax.enterprise.inject.Produces
import javax.jms.ConnectionFactory

@ApplicationScoped
class ConnectionFactoryBean {
@Produces
fun factory(): ConnectionFactory {
var connectionStringBuilder = ConnectionStringBuilder("Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=NAME;SharedAccessKey=KEY");
return ServiceBusJmsConnectionFactory(connectionStringBuilder, null)
}
}

然后在我的 application.properties 中:

mp.messaging.incoming.my_topic_name.connector=smallrye-jms

最后接收消息:

@Incoming("my_topic_name")
protected suspend fun receiveMyEvent(myEvent: String) {
//process
}

就版本而言:

<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>service-bus-jms-connection-factory</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>io.smallrye.reactive</groupId>
<artifactId>smallrye-reactive-messaging-jms</artifactId>
<version>3.4.0</version>
</dependency>

由于“API 不兼容”错误而失败。

使用 Kotlin/Quarkus 从 Azure ServiceBus 接收消息的正确方法是什么?

更新我还尝试遵循 Reactive messaging AMQP and Service Bus with Open Liberty 中的方法为此,我删除了 2 个列出的依赖项,将它们替换为

<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging-amqp</artifactId>
</dependency>

并完全删除ConnectionFactoryBean。为此我面临

javax.net.ssl.SSLHandshakeException: Failed to create SSL connection

错误。将 quarkus.tls.trust-all=truequarkus.ssl.native=true 添加到属性不起作用...

而且还不清楚,如何连接订阅?是否会更改 @Incoming 属性?

最佳答案

您可以使用 Azure SDK 库 (maven artefact - com.azure.messaging.servicebus ) 连接到服务总线订阅。

为此,您需要使用常用的连接字符串(可从 azure 门户获取)创建一个 ServiceBusProcessorClient 连接到您的命名空间在访问策略下),指定它的 topicNamesubscriptionName,然后等待消息。下面是一个示例代码:

@ApplicationScoped
class ServiceBusClient() {
private val scope = CoroutineScope(SupervisorJob())

fun onStart(@Observes event: StartupEvent) {
scope.startClient()
}
fun onStop(@Observes event: ShutdownEvent) {
scope.cancel()
}

private fun CoroutineScope.startClient() = launch {
val processorClient: ServiceBusProcessorClient = ServiceBusClientBuilder()
.connectionString("CONNECTION_STRING")
.processor()
.topicName("TOPIC_NAME")
.subscriptionName("SUBSCRIPTION_NAME")
.processMessage { receivedMessageContext -> onMessage(receivedMessageContext) }
.processError { errorContext -> onError(errorContext) }
.buildProcessorClient()

processorClient.start()
}

private fun onMessage(context: ServiceBusReceivedMessageContext) {
//Process message
}

private fun onError(context: ServiceBusErrorContext) {
//Handle errors
}
}

您可以使用 context.message.applicationProperties["PROPERTY_NAME"]context.message.bodycontext.message 获取消息内容。主题按要求。

关于azure - 如何在 Quarkus 中连接到 Azure ServiceBus,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73962138/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com