- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用go客户端库从pub-sub订阅中一次提取1条消息。但是,即使订阅中存在消息,消息也不会按请求请求进行拉动。订户正在等待所有消息被处理。
我正在尝试基本代码,每次我只提取一条消息。我已经使用了两个实例,并在两个后台同时运行了4次脚本(以创建订户)。我已将ack_deadline设置为10秒。
我希望结果像每个订阅者都应在确认一条消息后从订阅中获取下一条消息。但是直到完成最后的消息处理后,消息才在实例上运行。
为什么在完成一条消息处理后未提取消息?据我所知,实例或订户不应该有任何依赖关系。
让mi知道需要设置任何其他更改或参数。
提前致谢。
这是一个实例的日志:
2019/10/21 05:22:07 Got message: Message 0 at 2019-10-21 05:22:07.022772532
2019/10/21 05:22:11 Got message: Message 1 at 2019-10-21 05:22:11.330566981
2019/10/21 05:22:14 Got message: Message 2 at 2019-10-21 05:22:14.803031569
2019/10/21 05:22:18 Got message: Message 3 at 2019-10-21 05:22:18.452912271
2019/10/21 05:38:39 Acking message: Message 3 at 2019-10-21 05:38:39.471739478
2019/10/21 05:39:10 Acking message: Message 0 at 2019-10-21 05:39:10.039336794
2019/10/21 05:41:22 Acking message: Message 1 at 2019-10-21 05:41:22.351124342
2019/10/21 05:50:31 Acking message: Message 2 at 2019-10-21 05:50:31.829087762
2019/10/21 05:50:39 Got message: Message 13 at 2019-10-21 05:50:39.005916608
2019/10/21 05:50:39 Got message: Message 11 at 2019-10-21 05:50:39.00623238
2019/10/21 05:50:39 Got message: Message 15 at 2019-10-21 05:50:39.007216256
2019/10/21 05:50:39 Got message: Message 12 at 2019-10-21 05:50:39.008066257
2019/10/21 05:22:29 Got message: Message 4 at 2019-10-21 05:22:29.331569077
2019/10/21 05:22:33 Got message: Message 5 at 2019-10-21 05:22:33.018801275
2019/10/21 05:22:36 Got message: Message 6 at 2019-10-21 05:22:36.803434547
2019/10/21 05:22:40 Got message: Message 7 at 2019-10-21 05:22:40.409314927
2019/10/21 05:39:38 Acking message: Message 4 at 2019-10-21 05:39:38.349619635
2019/10/21 05:42:42 Acking message: Message 6 at 2019-10-21 05:42:42.819874065
2019/10/21 05:47:40 Acking message: Message 5 at 2019-10-21 05:47:40.049128075
2019/10/21 05:50:38 Acking message: Message 7 at 2019-10-21 05:50:38.42874031
2019/10/21 05:50:39 Got message: Message 8 at 2019-10-21 05:50:39.005090906
2019/10/21 05:50:39 Got message: Message 9 at 2019-10-21 05:50:39.005334146
2019/10/21 05:50:39 Got message: Message 16 at 2019-10-21 05:50:39.006427796
2019/10/21 05:50:39 Got message: Message 14 at 2019-10-21 05:50:39.007231713
package main
// [START pubsub_publish_with_error_handling_that_scales]
import (
"context"
"fmt"
"os"
"log"
"time"
"math/rand"
pubsub "cloud.google.com/go/pubsub/apiv1"
pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
)
func main(){
f, _:= os.OpenFile("testlogfile", os.O_RDWR | os.O_CREATE | os.O_APPEND, 0666)
defer f.Close()
log.SetOutput(f)
rand.Seed(time.Now().UTC().UnixNano())
pullMsgs("sureline-dev-1264", "sub7")
}
func random(min, max int) int {
return rand.Intn(max - min) + min
}
func pullMsgs(projectID, subscriptionID string) error {
ctx := context.Background()
client, err := pubsub.NewSubscriberClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
sub := fmt.Sprintf("projects/%s/subscriptions/%s", projectID, subscriptionID)
// Be sure to tune the MaxMessages parameter per your project's needs, and accordingly
// adjust the ack behavior below to batch acknowledgements.
req := pubsubpb.PullRequest{
Subscription: sub,
MaxMessages: 1,
}
fmt.Println("Listening..")
for {
res, err := client.Pull(ctx, &req)
if err != nil {
log.Fatal(err)
}
// client.Pull returns an empty list if there are no messages available in the
// backlog. We should skip processing steps when that happens.
if len(res.ReceivedMessages) == 0 {
continue
}
var recvdAckIDs []string
for _, m := range res.ReceivedMessages {
recvdAckIDs = append(recvdAckIDs, m.AckId)
}
var done = make(chan struct{})
var delay = 0 * time.Second // Tick immediately upon reception
var ackDeadline = 10 * time.Second
// Continuously notify the server that processing is still happening on this batch.
go func() {
for {
select {
case <-ctx.Done():
return
case <-done:
return
case <-time.After(delay):
err := client.ModifyAckDeadline(ctx, &pubsubpb.ModifyAckDeadlineRequest{
Subscription: sub,
AckIds: recvdAckIDs,
AckDeadlineSeconds: int32(ackDeadline.Seconds()),
})
if err != nil {
log.Fatal(err)
}
delay = ackDeadline - 5*time.Second // 5 seconds grace period.
}
}
}()
for _, m := range res.ReceivedMessages {
// Process the message here, possibly in a goroutine.
log.Printf("Got message: %s at %v", string(m.Message.Data), time.Now())
fmt.Printf("Got message: %s at %v", string(m.Message.Data), time.Now())
myrand := random(240, 420)
log.Printf("Sleeping %d seconds...\n", myrand)
time.Sleep(time.Duration(myrand)*time.Second)
err := client.Acknowledge(ctx, &pubsubpb.AcknowledgeRequest{
Subscription: sub,
AckIds: []string{m.AckId},
})
log.Printf("Acking message: %s at %v", string(m.Message.Data), time.Now())
fmt.Printf("Acking message: %s at %v", string(m.Message.Data), time.Now())
if err != nil {
log.Fatal(err)
}
}
close(done)
}
}
最佳答案
尝试以这种方式一次提取一条消息是Cloud Pub / Sub的反模式。在您的情况下,您的订户可能最终会与其他服务器通信,这些服务器已分配了要发送给连接到它们的订户的消息。 Cloud Pub / Sub希望同时从使用此方法接收消息的客户端接收多个拉取请求。因此,您应该一次有几个未完成的拉取请求,或者应该使用asynchronous pull via the Receive method。
关于go - Pubsub.pull请求无法正常工作-转到,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58484386/
我在Windows 10中使用一些简单的Powershell代码遇到了这个奇怪的问题,我认为这可能是我做错了,但我不是Powershell的天才。 我有这个: $ix = [System.Net.Dn
var urlsearch = "http://192.168.10.113:8080/collective-intellegence/StoreClicks?userid=" + userId +
我有一个非常奇怪的问题,过去两天一直让我抓狂。 我有一个我试图控制的串行设备(LS 100 光度计)。使用设置了正确参数的终端(白蚁),我可以发送命令(“MES”),然后是定界符(CR LF),然后我
我目前正试图让无需注册的 COM 使用 Excel 作为客户端,使用 .NET dll 作为服务器。目前,我只是试图让概念验证工作,但遇到了麻烦。 显然,当我使用 Excel 时,我不能简单地使用与可
我开发了简单的 REST API - https://github.com/pavelpetrcz/MandaysFigu - 我的问题是在本地主机上,WildFly 16 服务器的应用程序运行正常。
我遇到了奇怪的情况 - 从 Django shell 创建一些 Mongoengine 对象是成功的,但是从 Django View 创建相同的对象看起来成功,但 MongoDB 中没有出现任何数据。
我是 flask 的新手,只编写了一个相当简单的网络应用程序——没有数据库,只是一个航类搜索 API 的前端。一切正常,但为了提高我的技能,我正在尝试使用应用程序工厂和蓝图重构我的代码。让它与 pus
我的谷歌分析 JavaScript 事件在开发者控制台中运行得很好。 但是当从外部 js 文件包含在页面上时,它们根本不起作用。由于某种原因。 例如; 下面的内容将在包含在控制台中时运行。但当包含在单
这是一本名为“Node.js 8 the Right Way”的书中的任务。你可以在下面看到它: 这是我的解决方案: 'use strict'; const zmq = require('zeromq
我正在阅读文本行,并创建其独特单词的列表(在将它们小写之后)。我可以使它与 flatMap 一起工作,但不能使它与 map 的“子”流一起工作。 flatMap 看起来更简洁和“更好”,但为什么 di
我正在编写一些 PowerShell 脚本来进行一些构建自动化。我发现 here echo $? 根据前面的语句返回真或假。我刚刚发现 echo 是 Write-Output 的别名。 写主机 $?
关闭。这个问题不满足Stack Overflow guidelines .它目前不接受答案。 想改善这个问题吗?更新问题,使其成为 on-topic对于堆栈溢出。 4年前关闭。 Improve thi
我将一个工作 View Controller 类从另一个项目复制到一个新项目中。我无法在新项目中加载 View 。在旧项目中我使用了presentModalViewController。在新版本中,我
我对 javascript 很陌生,所以很难看出我哪里出错了。由于某种原因,我的功能无法正常工作。任何帮助,将不胜感激。我尝试在外部 js 文件、头部/主体中使用它们,但似乎没有任何效果。错误要么出在
我正在尝试学习Flutter中的复选框。 问题是,当我想在Scaffold(body :)中使用复选框时,它正在工作。但我想在不同的地方使用它,例如ListView中的项目。 return Cente
我们当前使用的是 sleuth 2.2.3.RELEASE,我们看不到在 http header 中传递的 userId 字段没有传播。下面是我们的代码。 BaggageField REQUEST_I
我有一个组合框,其中包含一个项目,比如“a”。我想调用该组合框的 Action 监听器,仅在手动选择项目“a”完成时才调用。我也尝试过 ItemStateChanged,但它的工作原理与 Action
你能看一下照片吗?现在,一步前我执行了 this.interrupt()。您可以看到 this.isInterrupted() 为 false。我仔细观察——“这个”没有改变。它具有相同的 ID (1
我们当前使用的是 sleuth 2.2.3.RELEASE,我们看不到在 http header 中传递的 userId 字段没有传播。下面是我们的代码。 BaggageField REQUEST_I
我正在尝试在我的网站上设置一个联系表单,当有人点击发送时,就会运行一个作业,并在该作业中向所有管理员用户发送通知。不过,我在失败的工作表中不断收到此错误: Illuminate\Database\El
我是一名优秀的程序员,十分优秀!