gpt4 book ai didi

java - 传入消息在来自 Azure 服务总线主题订阅的监听器之间分配

转载 作者:行者123 更新时间:2023-12-01 20:27:17 24 4
gpt4 key购买 nike

我正在使用 Microsoft 门户中的以下代码来向 Azure 服务总线主题/订阅发送和接收消息。代码运行良好。当我运行 2 个接收器代码实例时,总线将消息分配给 2 个接收器。 10 发送 && 接收 5 各。我只是想知道如何获取所有接收器中的所有消息。

public class ServiceBusTopicSender 
{

static final Gson GSON = new Gson();

public static void main(String[] args) throws Exception, ServiceBusException {
// TODO Auto-generated method stub

TopicClient sendClient;
String connectionString = "Endpoint=sb://basicbus.servicebus.windows.net/;"
+ "SharedAccessKeyName=RootManageSharedAccessKey;"
+ "SharedAccessKey=xxxxxpxxxxxxxxxxxxxxxxxx/xxxxxxxxxx=";

sendClient = new TopicClient(new ConnectionStringBuilder(connectionString, "basictopic"));
sendMessagesAsync(sendClient).thenRunAsync(() -> sendClient.closeAsync());
}

static CompletableFuture<Void> sendMessagesAsync(TopicClient sendClient) {
List<HashMap<String, String>> data =
GSON.fromJson(
"[" +
"{'name' = 'Einstein', 'firstName' = 'Albert'}," +
"{'name' = 'Heisenberg', 'firstName' = 'Werner'}," +
"{'name' = 'Curie', 'firstName' = 'Marie'}," +
"{'name' = 'Hawking', 'firstName' = 'Steven'}," +
"{'name' = 'Newton', 'firstName' = 'Isaac'}," +
"{'name' = 'Bohr', 'firstName' = 'Niels'}," +
"{'name' = 'Faraday', 'firstName' = 'Michael'}," +
"{'name' = 'Galilei', 'firstName' = 'Galileo'}," +
"{'name' = 'Kepler', 'firstName' = 'Johannes'}," +
"{'name' = 'Kopernikus', 'firstName' = 'Nikolaus'}" +
"]",
new TypeToken<List<HashMap<String, String>>>() {
}.getType());

List<CompletableFuture> tasks = new ArrayList<>();
for (int i = 0; i < data.size(); i++) {
final String messageId = Integer.toString(i);
Message message = new Message(GSON.toJson(data.get(i), Map.class).getBytes(UTF_8));
message.setContentType("application/json");
message.setLabel("Scientist");
message.setMessageId(messageId);
message.setTimeToLive(Duration.ofMinutes(2));
System.out.printf("Message sending: Id = %s\n", message.getMessageId());
tasks.add(
sendClient.sendAsync(message).thenRunAsync(() -> {
System.out.printf("\tMessage acknowledged: Id = %s\n", message.getMessageId());
}));
}
return CompletableFuture.allOf(tasks.toArray(new CompletableFuture<?>[tasks.size()]));
}
}

------------------------------------------------------------ ------------------------------------------

public class ServiceBusTopicReceiver 
{
static final Gson GSON = new Gson();

public static void main(String[] args) throws Exception, ServiceBusException {

String connectionString = "Endpoint=sb://basicbus.servicebus.windows.net/;"
+ "SharedAccessKeyName=RootManageSharedAccessKey;"
+ "SharedAccessKey=xxxxxpxxxxxxxxxxxxxxxxxx/xxxxxxxxxx=";

SubscriptionClient subscription1Client = new SubscriptionClient(new ConnectionStringBuilder(connectionString,
"basictopic/subscriptions/basicsubscription"), ReceiveMode.PEEKLOCK);


registerMessageHandlerOnClient(subscription1Client);

}

static void registerMessageHandlerOnClient(SubscriptionClient receiveClient) throws Exception {

// register the RegisterMessageHandler callback
IMessageHandler messageHandler = new IMessageHandler() {
// callback invoked when the message handler loop has obtained a message
public CompletableFuture<Void> onMessageAsync(IMessage message) {
// receives message is passed to callback
if (message.getLabel() != null &&
message.getContentType() != null &&
message.getLabel().contentEquals("Scientist") &&
message.getContentType().contentEquals("application/json")) {

byte[] body = message.getBody();
Map scientist = GSON.fromJson(new String(body, UTF_8), Map.class);

System.out.printf(
"\n\t\t\t\t%s Message received: \n\t\t\t\t\t\tMessageId = %s, \n\t\t\t\t\t\tSequenceNumber = %s, \n\t\t\t\t\t\tEnqueuedTimeUtc = %s," +
"\n\t\t\t\t\t\tExpiresAtUtc = %s, \n\t\t\t\t\t\tContentType = \"%s\", \n\t\t\t\t\t\tContent: [ firstName = %s, name = %s ]\n",
receiveClient.getEntityPath(),
message.getMessageId(),
message.getSequenceNumber(),
message.getEnqueuedTimeUtc(),
message.getExpiresAtUtc(),
message.getContentType(),
scientist != null ? scientist.get("firstName") : "",
scientist != null ? scientist.get("name") : "");
System.out.println("Partition Key is ::::: " + message.getPartitionKey());
}
return receiveClient.completeAsync(message.getLockToken());
}

public void notifyException(Throwable throwable, ExceptionPhase exceptionPhase) {
System.out.printf(exceptionPhase + "-" + throwable.getMessage());
}
};
receiveClient.registerMessageHandler(
messageHandler,
// callback invoked when the message handler has an exception to report
// 1 concurrent call, messages are auto-completed, auto-renew duration
new MessageHandlerOptions(1, false, Duration.ofMinutes(1)));

}

}

最佳答案

Azure 服务总线是一个代理。当您接收消息时,您使用的是 Competing Consumer图案。这意味着,如果您有一个订阅,并且进程的多个实例监听同一订阅,它们将不会收到相同的消息。他们只会获得一个子集,这是其他实例未收到的。这是故意的。如果您希望多个处理器实例接收相同的消息,则需要为每个处理器实例创建订阅。这样,发送到该主题的每条消息都将被复制到每个订阅,并且每个订阅者(接收者)将获得自己的副本。

关于java - 传入消息在来自 Azure 服务总线主题订阅的监听器之间分配,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58909017/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com