gpt4 book ai didi

java - 获取EventHubException : Connection aborted when I create TWO instances of EventHubClient

转载 作者:行者123 更新时间:2023-12-03 00:29:10 30 4
gpt4 key购买 nike

这也不是 'Connection aborted' error while trying to send events to Azure Event Hub using java EventHubClient API 的重复项因为这个问题中的问题与代理有关。这不是我的代理设置的问题,因为此代码适用于 Single Event hub 客户端

我的问题:是否可以有一个 Java 应用程序向两个或更多事件中心客户端发送消息?

我正在尝试将事件数据发布/发送到 Azure 事件中心的多个实例。我已经配置了两个事件中心命名空间,每个命名空间都有其事件中心。我有两个事件中心,它们都有自己的连接字符串、SAS 键以及命名空间和名称。由于每个事件中心命名空间在没有手动干预(服务请求)的情况下只能支持 20 个吞吐量单位,因此我试图查找是否可以将数据发送到多个事件中心。我可以看到我的代码对于 SINGLE EventHubClient 运行良好。此时,我的代码尝试创建第二个 EventHubClient,我收到此连接中止异常。

我正在使用 azure 的 git hub 上共享的示例代码 https://github.com/MicrosoftDocs/azure-docs/blob/master/articles/event-hubs/event-hubs-java-get-started-send.md

我看到以下异常:

Exception in thread "main" com.microsoft.azure.eventhubs.EventHubException: connection aborted
at com.microsoft.azure.eventhubs.impl.ExceptionUtil.toException(ExceptionUtil.java:59)
at com.microsoft.azure.eventhubs.impl.MessagingFactory.onConnectionError(MessagingFactory.java:249)
at com.microsoft.azure.eventhubs.impl.ConnectionHandler.onTransportError(ConnectionHandler.java:102)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:191)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:445)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

任何指针/输入都将受到高度赞赏。相关代码如下:

public class Sender {

private static final String EVENTHUB_NS1= "TT";
private static final String EVENTHUB1= "TT";
private static final String SAS_KEY_NAME1= "RootManageSharedAccessKey";
private static final String SAS_KEY_VAL1= "SECRET1";

private static final String EVENTHUB_NS2= "TT1";
private static final String EVENTHUB2= "TT1";
private static final String SAS_KEY_NAME2= "RootManageSharedAccessKey";
private static final String SAS_KEY_VAL2= "SECRET2";


private EventData getEventData(int eventDataPrefix) throws IOException, URISyntaxException {
String msgData = "<=>"+eventDataPrefix + "<=>"+"TEST MESSAGE..";
final Gson gson = new GsonBuilder().create();
byte [] data =gson.toJson(msgData).getBytes(Charset.defaultCharset());
EventData ed = EventData.create(data);
return ed;
}
public static final int MAX_BATCH_SIZE=100;
private List<EventData> getBatchOfEvents() throws IOException, URISyntaxException {
List<EventData> events = new ArrayList<>();
for(int i = 0; i < MAX_BATCH_SIZE; i ++){
events.add(getEventData(i));
}
return events;
}
private List<String> getConnectionStrings(){
List<String> connStrings = new ArrayList<>();
ConnectionStringBuilder csBldr1 = new ConnectionStringBuilder();
csBldr1.setNamespaceName(EVENTHUB_NS1);
csBldr1.setEventHubName(EVENTHUB1);
csBldr1.setSasKeyName(SAS_KEY_NAME1);
csBldr1.setSasKey(SAS_KEY_VAL1);

ConnectionStringBuilder csbldr2 = new ConnectionStringBuilder();
csbldr2.setNamespaceName(EVENTHUB_NS2);
csbldr2.setEventHubName(EVENTHUB2);
csbldr2.setSasKeyName(SAS_KEY_NAME2);
csbldr2.setSasKey(SAS_KEY_VAL2);

connStrings.add(csBldr1.toString());
connStrings.add(csbldr2.toString());
return connStrings;
}

private List<EventHubClient> getEHClients() throws IOException, EventHubException, ExecutionException, InterruptedException {

List<EventHubClient> ehClients = new ArrayList<>();
System.out.println("Starting getEhCLients..");

for( String connStr: getConnectionStrings()){

final ExecutorService executorService = Executors.newSingleThreadExecutor();
//The second iteration of for loop gives the EventHubException
EventHubClient client= EventHubClient.createSync(connStr, executorService);


ehClients.add(client);
System.out.println("EH CONNSTR::"+connStr);
}

return ehClients;
}


private void sendBatch( List<EventData> events) throws IOException, EventHubException, ExecutionException, InterruptedException {
List<EventHubClient> ehClients = getEHClients();
if( ehClients.size() <=0) {
System.out.println("NO EH CLients.. to send..");
return;
}
for(int i = 0; i < events.size();i++){
EventData data = events.get(i);
int ehClientIndex = i % ehClients.size();
EventHubClient client = ehClients.get(ehClientIndex);
client.sendSync(data);
System.out.print("MsgSent:"+ehClientIndex);
}
System.out.println("\nDone");
}
public static void main(String[] args) throws IOException, URISyntaxException, EventHubException, ExecutionException, InterruptedException {
Sender sender = new Sender();
List<EventData> events = sender.getBatchOfEvents();
sender.sendBatch(events);

}

}

最佳答案

非常有趣的用例,我可以尝试从我这边重现。我的猜测是 EventHubClient 有一些静态字段,该字段将由多个实例共享,这可能会导致您的问题。

除此之外,我想了解为什么使用两个命名空间,如何使用一个具有更高吞吐量的命名空间。一个命名空间就像一个集群。实际上,我来自 Spring Cloud Azure,并尝试改善 Azure 上的 Java 体验。请随意尝试我们的 Activity 中心 Binder 。 https://github.com/Microsoft/spring-cloud-azure

关于java - 获取EventHubException : Connection aborted when I create TWO instances of EventHubClient,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50085651/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com