- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
借助这个link ,我成功创建了一个小型 Java 应用程序,可以在一分钟内提取已发布的消息。我的实现看起来像这样。
public static void eventListener() throws InterruptedException {
MessageReceiver receiver = new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("Received message: " + message.getData().toStringUtf8());
consumer.ack();
}
};
//Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscription, receiver)
.setCredentialsProvider(FixedCredentialsProvider.create(creds)).build();
subscriber.addListener(new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
// Handle failure. This is called when the Subscriber encountered a fatal error
// and is
// shutting down.
System.err.println(failure);
}
}, MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
// In this example, we will pull messages for one minute (60,000ms) then stop.
// In a real application, this sleep-then-stop is not necessary.
// Simply call stopAsync().awaitTerminated() when the server is shutting down,
// etc.
Thread.sleep(60000);
} finally {
if (subscriber != null) {
subscriber.stopAsync().awaitTerminated();
}
}
}
当我在main
中调用这个方法时
public static void main(String[] args) throws InterruptedException {
eventListener();
}
并将对象上传到我的 Google 云存储,程序会打印发布者发送的消息,如下所示
Received message: {
"kind": "storage#object",
"id": "roshanbucket/stones.jpg/1553765105996166",
"selfLink": "https://www.googleapis.com/storage/v1/b/roshanbucket/o/stones.jpg",
"name": "stones.jpg",
"bucket": "roshanbucket",
"generation": "1553765105996166",
"metageneration": "1",
"contentType": "image/jpeg",
"timeCreated": "2019-03-28T09:25:05.995Z",
"updated": "2019-03-28T09:25:05.995Z",
"storageClass": "STANDARD",
"timeStorageClassUpdated": "2019-03-28T09:25:05.995Z",
"size": "137256",
"md5Hash": "1GmpUnGeiW+/KU+0U8c8Wg==",
"mediaLink": "https://www.googleapis.com/download/storage/v1/b/roshanbucket/o/stones.jpg?generation=1553765105996166&alt=media",
"crc32c": "FMaEGg==",
"etag": "CIaj1InCpOECEAE="
}
程序执行后的一分钟内,它会打印对象上传帐户上收到的所有消息,然后停止。要在一分钟后接收事件消息,我需要重新启动应用程序。现在,我想做的是连续运行监听器,因此,我尝试在 main 方法内的无限循环内运行方法 eventListener()
,如下所示
public static void main(String[] args) throws InterruptedException {
while(true) {
eventListener();
}
}
有了这个,我似乎能够在每次上传后立即收到事件消息,无论我何时上传对象。但是,每隔一段时间,它就会抛出这个堆栈跟踪。
Mar 28, 2019 12:56:34 PM io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue
SEVERE: *~*~*~ Channel ManagedChannelImpl{logId=6, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:103)
at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:440)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:223)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:164)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:156)
at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:157)
at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:260)
at com.google.cloud.pubsub.v1.Subscriber.doStart(Subscriber.java:268)
at com.google.api.core.AbstractApiService$InnerService.doStart(AbstractApiService.java:148)
at com.google.common.util.concurrent.AbstractService.startAsync(AbstractService.java:225)
at com.google.api.core.AbstractApiService.startAsync(AbstractApiService.java:120)
at com.google.cloud.pubsub.v1.Subscriber.startAsync(Subscriber.java:260)
at listener.AsynchronousPull.eventListener(AsynchronousPull.java:57)
at listener.AsynchronousPull.main(AsynchronousPull.java:74)
但是,它仍然在每次上传后打印消息,同时偶尔抛出堆栈跟踪。我对线程
没有太多经验,非常感谢您提供解决此问题的帮助。
最佳答案
在紧密循环中调用 eventListener()
并不是您想要在此处执行的操作。这将创建许多新的订阅者实例,这些实例接收消息,每个实例的生存时间为 60 秒。您想要的是让您创建的订阅者的单个实例一直存在,直到您想要将其关闭为止。通常,您可以通过创建订阅者并通过 awaitTermulated()
等待其终止来完成此操作。
上面的代码将被修改为这样:
public static void eventListener() throws InterruptedException {
MessageReceiver receiver = new MessageReceiver() {
@Override
public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
System.out.println("Received message: " + message.getData().toStringUtf8());
consumer.ack();
}
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscription, receiver)
.setCredentialsProvider(FixedCredentialsProvider.create(creds)).build();
subscriber.addListener(new Subscriber.Listener() {
@Override
public void failed(Subscriber.State from, Throwable failure) {
// Handle failure. This is called when the Subscriber encountered a fatal error
// and is
// shutting down.
System.err.println(failure);
}
}, MoreExecutors.directExecutor());
subscriber.startAsync().awaitRunning();
subscriber.awaitTerminated();
} finally {
if (subscriber != null) {
subscriber.stopAsync().awaitTerminated();
}
}
}
public static void main(String[] args) throws InterruptedException {
eventListener();
}
如果您只是希望订阅者在应用程序终止时停止,并且不想进行任何额外的清理,那么上面的代码将起作用,允许订阅者运行并接收消息,直到发生错误或应用程序停止运行。关闭。如果您想在应用程序的干净终止时进行一些清理,例如,您想确保已由 receiveMessage
处理的任何消息运行完成,那么您可以 attach a shutdown hook捕获此类终止(尽管它不会在所有情况下运行)。在此 Hook 中,您将调用 stopAsync()
。例如,您可以在 try
block 之前插入以下内容:
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
subscriber.stopAsync().awaitTerminated();
}
});
关于java - 使用异步拉取持续从 Google PubSub 接收消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55395972/
如果我错了,但身份验证 session 有 30 天的最大限制,请纠正我?如果是这种情况,有没有办法让我的服务器节点应用程序永远监听经过身份验证的 dataRef? 干杯, 旅行。 最佳答案 自 on
我目前正在阅读 book Continuos Delivery由 Humble/Farley 撰写,虽然里面的很多东西都是有道理的,但有一件事让我烦恼: 似乎作者只针对基于服务器的(单客户端?)应用程
好吧,我非常了解每个人对自制密码管理器的看法,但我希望得到帮助。 不用于实际使用,仅供学习。 我想知道,在 C++ 中如何拥有长期变量。或者真的,有什么长期的。 长期是什么意思?在下次运行 .exe
我在文本文件中有以下三行(最后 3 行): } } } 我想做的是做这样的事情: } } blablabla blablabla blabla
在 iOS 中,有没有一种简单的方法可以在每天的同一时间发送 10 天的推送通知?我不想向所有用户发送推送通知。我的应用程序的工作方式是,用户可以选择连续十天推送通知的时间。您有推荐的 API 吗?或
我正在努力寻找一种当前最先进的方法来处理频繁更新的通知(例如每 3 分钟一次)。似乎在较新的 Android 版本中内置了如此多的电源效率调整(幸运的是!),我之前成功使用的方法(使用 Broadca
我不得不在一些糟糕的房地产网站上花费大量时间。我比较精通 CSS,并且可以(在 FireFox 中)“检查元素”并更改 CSS 以隐藏或缩小特定页面的华而不实的元素。但我想将此自定义 CSS 应用于特
目前正在研究如何使用 signalR 在处理文件时向用户呈现文件的进度报告。我正在使用 asp.net MVC 4。通过 Ajax 进行发布/获取时,我可以轻松获取状态更改。 因为我需要上传一个文件(
这个问题在这里已经有了答案: How can I round up the time to the nearest X minutes? (15 个答案) Is there a simple fun
我有一个 php 脚本,我想运行特定的时间(例如 5 分钟),但只能运行一次。对于 cron 作业,这将无限期地运行。还有别的办法吗? 最佳答案 处理这个问题的方法是: 当某些事件触发需要 cron
我弄乱了我的 apache 和 php.ini 文件,我网站的用户仍然提示该网站在很短的时间后或每次他们关闭并打开同一个浏览器时将他们注销。 我正在运行 Apache 和 PHP。 我应该进行哪些设置
如何查询今天的总和需要减去前一天的总和,每天持续一个月。 SELECT COUNT(DISTINCT member_profile.memberProfileNumber) FROM member_p
这个问题在这里已经有了答案: How do I add a delay in a JavaScript loop? (32 个答案) 关闭 8 年前。 我认为这个问题之前一定有人问过,但我找不到其他
用户在我的网站上注册后,我们会向他发送一封确认电子邮件。我想要的是 - 三天内每 24 小时为用户重新发送一次电子邮件。例如: user_table id , name, date_registere
最近我从 Codeigniter 换到了 Laravel,一切都很顺利,除了我遇到了 Session::flash 的问题。 当我创建新用户时,我收到成功消息,但它会持续 2 个请求,即使我没有通过验
如果有人能帮助我解决这个问题,我将非常感激。 我正在尝试针对 CPU 使用率 >= 80% 持续 30 分钟或更长时间创建 Azure 监视器警报 我已附上警报规则条件的屏幕截图。在“评估依据”下,聚
如果有人能帮助我解决这个问题,我将非常感激。 我正在尝试针对 CPU 使用率 >= 80% 持续 30 分钟或更长时间创建 Azure 监视器警报 我已附上警报规则条件的屏幕截图。在“评估依据”下,聚
希望大家平安 1。我的目标 我正在尝试模拟 3 天的真实情况。系统每天只能工作 8 小时。 我的目标是模型运行 8 小时,持续 3 天,以获得足够的数据进行分析。 2。我的问题 我有一个代理预约时间表
我需要在 8 小时内每 5 分钟调用一次函数。问题是它必须是同一天。例如,如果用户在 3/29 晚上 11:59 登录系统,而现在是 3/30 凌晨 12:01,则不应再调用该函数。 我知道如何每
我正在开发一个 React Native 应用程序,该应用程序使用 Firebase 的 Firestore 作为后端。现在,每次收到新消息时,我都会从 Firestore 获取所有消息并更新我的状态
我是一名优秀的程序员,十分优秀!