- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
使用pika库的BlockingConnection
连接RabbitMQ,发布消息偶尔报错:
Fatal Socket Error: error(32, 'Broken pipe')
这是一个非常简单的子进程,它从内存中的队列中取出一些信息并将一个小的 JSON 消息发送到 AMQP。当系统在几分钟内未发送任何消息时,似乎只会出现此错误。
设置:
connection = pika.BlockingConnection(parameters)
channel = self.connection.channel()
channel.exchange_declare(
exchange='xyz',
exchange_type='fanout',
passive=False,
durable=True,
auto_delete=False
)
排队代码捕获任何连接错误并重试:
def _enqueue(self, message_id, data):
try:
published = self.channel.basic_publish(
self.amqp_exchange,
self.amqp_routing_key,
json.dumps(data),
pika.BasicProperties(
content_type="application/json",
delivery_mode=2,
message_id=message_id
)
)
# Confirm delivery or retry
if published:
self.retry_count = 0
else:
raise EnqueueException("Message publish not confirmed.")
except (EnqueueException, pika.exceptions.AMQPChannelError, pika.exceptions.AMQPConnectionError,
pika.exceptions.ChannelClosed, pika.exceptions.ConnectionClosed, pika.exceptions.UnexpectedFrameError,
pika.exceptions.UnroutableError, socket.timeout) as e:
self.retry_count += 1
if self.retry_count < 5:
logging.warning("Reconnecting and resending")
if self.connection.is_open:
self.connection.close()
self.connect()
self._enqueue(message_id, data)
else:
raise e
这有时会在第二次尝试时起作用。它通常会挂起一段时间或只是在最终抛出异常之前丢弃消息 ( possibly related bug report )。因为它只发生在系统安静几分钟时,我猜这是由于连接超时。但是 AMQP 有一个心跳系统,据报道 pika 使用它(related bug report)。
为什么我会收到此错误或丢失消息,为什么连接在不使用时不会保持打开状态?
最佳答案
来自另一个bug report :
As BlockingConnection doesn't handle heartbeats in the background and the heartbeat_interval can't override the servers suggested heartbeat interval (that's a bug too), i suggest that heartbeats should be disabled by default (rely on TCP keep-alive instead).
If processing a task in a consume block takes longer time then the server suggested heartbeat interval, the connection will be closed by the server and the client won't be able to ack the message when it's done processing.
安update在 v1.0.0 中可能有助于解决这个问题。
所以我实现了一个解决方法。每 30 秒我通过队列发布一条心跳消息。这使连接保持打开状态,并具有向客户确认我的应用程序已启动并正在运行的额外好处。
关于python - RabbitMQ 破损管道错误或丢失消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45064662/
我正在使用 Assets 管道来管理我的 Grails 3.0 应用程序的前端资源。但是,似乎没有创建 CoffeeScript 文件的源映射。有什么办法可以启用它吗? 我的 build.gradle
我有一个我想要的管道: 提供一些资源, 运行一些测试, 拆资源。 我希望第 3 步中的拆卸任务运行 不管 测试是否通过或失败,在第 2 步。据我所知 runAfter如果前一个任务成功,则只运行一个任
如果我运行以下命令: Measure-Command -Expression {gci -Path C:\ -Recurse -ea SilentlyContinue | where Extensio
我知道管道是一个特殊字符,我需要使用: Scanner input = new Scanner(System.in); String line = input.next
我再次遇到同样的问题,我有我的默认处理方式,但它一直困扰着我。 有没有更好的办法? 所以基本上我有一个运行的管道,在管道内做一些事情,并想从管道内返回一个键/值对。 我希望整个管道返回一个类型为 ps
我有三个环境:dev、hml 和 qa。 在我的管道中,根据分支,阶段有一个条件来检查它是否会运行: - stage: Project_Deploy_DEV condition: eq(varia
我有 Jenkins Jenkins ver. 2.82 正在运行并想在创建新作业时使用 Pipeline 功能。但我没有看到这个列为选项。我只能在自由式项目、maven 项目、外部项目和多配置之间进
在对上一个问题 (haskell-data-hashset-from-unordered-container-performance-for-large-sets) 进行一些观察时,我偶然发现了一个奇
我正在寻找有关如何使用管道将标准输出作为其他命令的参数传递的见解。 例如,考虑这种情况: ls | grep Hello grep 的结构遵循以下模式:grep SearchTerm PathOfFi
有没有办法不因声明性管道步骤而失败,而是显示警告?目前我正在通过添加 || exit 0 来规避它到 sh 命令行的末尾,所以它总是可以正常退出。 当前示例: sh 'vendor/bin/phpcs
我们正在从旧的 Jenkins 设置迁移到所有计划都是声明性 jenkinsfile 管道的新服务器……但是,通过使用管道,我们无法再手动清除工作区。我如何设置 Jenkins 以允许 手动点播清理工
我在 Python 中阅读了有关 Pipelines 和 GridSearchCV 的以下示例: http://www.davidsbatista.net/blog/2017/04/01/docume
我有一个这样的管道脚本: node('linux'){ stage('Setup'){ echo "Build Stage" } stage('Build'){ echo
我正在使用 bitbucket 管道进行培训 这是我的 bitbucket-pipelines.yml: image: php:7.2.9 pipelines: default:
我正在编写一个程序,其中输入文件被拆分为多个文件(Shamir 的 secret 共享方案)。 这是我想象的管道: 来源:使用 Conduit.Binary.sourceFile 从输入中读取 导管:
我创建了一个管道,它有一个应该只在开发分支上执行的阶段。该阶段还需要用户输入。即使我在不同的分支上,为什么它会卡在这些步骤的用户输入上?当我提供输入时,它们会被正确跳过。 stage('Deplo
我正在尝试学习管道功能(%>%)。 当试图从这行代码转换到另一行时,它不起作用。 ---- R代码--原版----- set.seed(1014) replicate(6,sample(1:8))
在 Jenkins Pipeline 中,如何将工件从以前的构建复制到当前构建? 即使之前的构建失败,我也想这样做。 最佳答案 Stuart Rowe 还在 Pipeline Authoring Si
我正在尝试使用 执行已定义的作业构建 使用 Jenkins 管道的方法。 这是一个简单的例子: build('jenkins-test-project-build', param1 : 'some-
当我使用 where 过滤器通过管道命令排除对象时,它没有给我正确的输出。 PS C:\Users\Administrator> $proall = Get-ADComputer -filter *
我是一名优秀的程序员,十分优秀!