gpt4 book ai didi

java - 我无法在 Spring Boot 中使用 google pubsub 模拟器发送消息

转载 作者:行者123 更新时间:2023-11-30 05:20:53 26 4
gpt4 key购买 nike

我正在尝试使用pubsub模拟器发送推送消息,我也在使用spring boot,这是我的配置:

依赖关系:

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>

我的 bean :

@Configuration
@AutoConfigureBefore(value= GcpPubSubAutoConfiguration.class)
@EnableConfigurationProperties(value= GcpPubSubProperties.class)
public class EmulatorPubSubConfiguration {
@Value("${spring.gcp.pubsub.projectid}")
private String projectId;

@Value("${spring.gcp.pubsub.subscriptorid}")
private String subscriptorId;

@Value("${spring.gcp.pubsub.topicid}")
private String topicId;

@Bean
public Publisher pubsubEmulator() throws IOException {
String hostport = System.getenv("PUBSUB_EMULATOR_HOST");
ManagedChannel channel = ManagedChannelBuilder.forTarget(hostport).usePlaintext().build();
try {
TransportChannelProvider channelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
CredentialsProvider credentialsProvider = NoCredentialsProvider.create();

// Set the channel and credentials provider when creating a `TopicAdminClient`.
// Similarly for SubscriptionAdminClient
TopicAdminClient topicClient =
TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build());

ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
// Set the channel and credentials provider when creating a `Publisher`.
// Similarly for Subscriber
return Publisher.newBuilder(topicName)
.setChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build();
} finally {
channel.shutdown();
}
}
}

当然,我已经将 PUBSUB_EMULATOR_HOST 系统变量设置为 localhost:8085,模拟器在哪里运行

我创建了一个用于测试的休息 Controller :

  • 用于发送推送消息
@Autowired
private Publisher pubsubPublisher;

@PostMapping("/send1")
public String publishMessage(@RequestParam("message") String message) throws InterruptedException, IOException {
Publisher pubsubPublisher = this.getPublisher();
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> future = pubsubPublisher.publish(pubsubMessage);
//pubsubPublisher.publishAllOutstanding();
try {
// Add an asynchronous callback to handle success / failure
ApiFutures.addCallback(future,
new ApiFutureCallback<String>() {
@Override
public void onFailure(Throwable throwable) {
if (throwable instanceof ApiException) {
ApiException apiException = ((ApiException) throwable);
// details on the API exception
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
}
System.out.println("Error publishing message : " + message);
System.out.println("Error publishing error : " + throwable.getMessage());
System.out.println("Error publishing cause : " + throwable.getCause());
}

@Override
public void onSuccess(String messageId) {
// Once published, returns server-assigned message ids (unique within the topic)
System.out.println(messageId);
}
},
MoreExecutors.directExecutor());
}
finally {
if (pubsubPublisher != null) {
// When finished with the publisher, shutdown to free up resources.
pubsubPublisher.shutdown();
pubsubPublisher.awaitTermination(1, TimeUnit.MINUTES);
}
}
return "ok";
  • 获取消息:
@PostMapping("/pushtest")
public String pushTest(@RequestBody CloudPubSubPushMessage request) {
System.out.println( "------> message received: " + decode(request.getMessage().getData()) );
return request.toString();
}

我已经在模拟器中创建了我的主题和订阅,我按照本教程进行操作:

https://cloud.google.com/pubsub/docs/emulator

我使用以下命令设置端点“pushtest”以在模拟器中获取推送消息:

python subscriber.py PUBSUB_PROJECT_ID create-push TOPIC_ID SUBSCRIPTION_ID PUSH_ENDPOINT

但是当我运行测试时,没有到达“/pushtest”端点,并且收到此错误:

任务 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@265d5d05[未完成,任务 = java.util.concurrent.Executors$RunnableAdapter@a8c8be3[包装任务 = com.google.common.util.concurrent.TrustedListenableFutureTask@1a53c57c[状态=待处理,信息=[任务=[运行=[尚未开始],com.google.api.gax.rpc.AttemptCallable@3866e1d0]]]]]从 java.util.concurrent.ScheduledThreadPoolExecutor@3f34809a 拒绝[已终止,池大小 = 0, Activity 线程 = 0,排队任务 = 0,已完成任务 = 1]

为了确保模拟器运行正常,我使用以下命令在 python 中运行测试:

python publisher.py PUBSUB_PROJECT_ID publish TOPIC_ID

我在“pushtest”端点中正确获取消息。

我不知道为什么要为我的欺辱感到抱歉。

感谢您的帮助。

最佳答案

我发现了问题。

仅在 bean 中注释此行

channel .shutdown();

哈哈,很简单。

关于java - 我无法在 Spring Boot 中使用 google pubsub 模拟器发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59586927/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com