gpt4 book ai didi

python - 如何在 python PubSub 订阅者中捕获内部/库线程中发生的异常?

转载 作者:行者123 更新时间:2023-11-28 18:01:19 28 4
gpt4 key购买 nike

我正在运行以大约每秒一条消息的速度处理传入消息的 pubsub 消费者。事情通常运行良好,但是,每隔几天或几小时,我们偶尔会看到 pubsub 模块内部线程抛出的异常,我不清楚如何捕获它们。这是一个典型的例子(也有其他类似的痕迹,但消息略有不同):

Exception in thread Thread-LeaseMaintainer:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable
return callable_(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/grpc/_channel.py", line 549, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/usr/local/lib/python3.6/site-packages/grpc/_channel.py", line 466, in _end_unary_response_blocking
raise _Rendezvous(state, None, None, deadline)
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with:
status = StatusCode.UNAVAILABLE
details = "channel is in state TRANSIENT_FAILURE"
debug_error_string = "{"created":"@1554568036.075280756","description":"channel is in state TRANSIENT_FAILURE","file":"src/core/ext/filters/client_channel/client_channel.cc","file_line":2294,"grpc_status":14}"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/google/api_core/retry.py", line 179, in retry_target
return target()
File "/usr/local/lib/python3.6/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout
return func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
six.raise_from(exceptions.from_grpc_error(exc), exc)
File "<string>", line 3, in raise_from
google.api_core.exceptions.ServiceUnavailable: 503 channel is in state TRANSIENT_FAILURE

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/usr/local/lib/python3.6/threading.py", line 916, in _bootstrap_inner
self.run()
File "/usr/local/lib/python3.6/threading.py", line 864, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py", line 146, in maintain_leases
[requests.ModAckRequest(ack_id, p99) for ack_id in ack_ids]
File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py", line 152, in modify_ack_deadline
self._manager.send(request)
File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 268, in send
self._send_unary_request(request)
File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 259, in _send_unary_request
ack_deadline_seconds=deadline,
File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/_gapic.py", line 45, in <lambda>
fx = lambda self, *a, **kw: wrapped_fx(self.api, *a, **kw) # noqa
File "/usr/local/lib/python3.6/site-packages/google/cloud/pubsub_v1/gapic/subscriber_client.py", line 723, in modify_ack_deadline
request, retry=retry, timeout=timeout, metadata=metadata
File "/usr/local/lib/python3.6/site-packages/google/api_core/gapic_v1/method.py", line 143, in __call__
return wrapped_func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/google/api_core/retry.py", line 270, in retry_wrapped_func
on_error=on_error,
File "/usr/local/lib/python3.6/site-packages/google/api_core/retry.py", line 199, in retry_target
last_exc,
File "<string>", line 3, in raise_from
google.api_core.exceptions.RetryError: Deadline of 600.0s exceeded while calling functools.partial(<function _wrap_unary_errors.<locals>.error_remapped_callable at 0x7f86228cd400>, subscription: "projects/xxxxx-dev/subscriptions/telemetry-sub"
ack_deadline_seconds: 10
ack_ids: "QBJMJwFESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRoHIGoKOUJdEmJoXFx1B1ALEHQoYnxvWRYFCEdReF1YHQdodGxXOFUEHnN1Y3xtWhQDAEFXf3f8gIrJ38BtZho9WxJLLD5-LDRFQV4"
, metadata=[('x-goog-api-client', 'gl-python/3.6.8 grpc/1.19.0 gax/1.8.2 gapic/0.40.0')]), last exception: 503 channel is in state TRANSIENT_FAILURE

Thread-ConsumeBidirectionalStream caught unexpected exception Deadline of 600.0s exceeded while calling functools.partial(<function _wrap_unary_errors.<locals>.error_remapped_callable at 0x7f86228cda60>, subscription: "projects/xxxxx-dev/subscriptions/telemetry-sub"
ack_deadline_seconds: 10
ack_ids: "QBJMJwFESVMrQwsqWBFOBCEhPjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRoHIGoKOUJdEmJoXFx1B1ALEHQoYnxvWRYFCEdReF1YHAdodGxXOFUEHnN1aXVoWxAIBEdXeXf8gIrJ38BtZho9WxJLLD5-LDRFQV4"
, metadata=[('x-goog-api-client', 'gl-python/3.6.8 grpc/1.19.0 gax/1.8.2 gapic/0.40.0')]), last exception: 503 channel is in state TRANSIENT_FAILURE and will exit.

(在这种特殊情况下,订阅者收到几个类似的错误并完全停止使用消息(主线程不退出);但是,在其他此类库错误的情况下观察到其他行为。)

我们的代码大致如下(经过一些简化):

client = pubsub_v1.SubscriberClient()
flow_control = pubsub_v1.types.FlowControl(max_messages=500)
future = client.subscribe(subscription_path, callback=callback,
flow_control=flow_control)
...
try:
future.result(timeout=1)
except pubsub_v1.exceptions.TimeoutError:
pass
except _Rendezvous as exc:
logger.error('Got Rendezvous error in subscriber. Retrying in 1s. '
f'Detail: {exc}')
# NEVER GET HERE
time.sleep(1)
continue
except RetryError as exc:
logger.error('Got RetryError in subscriber. Retrying in 1s. '
f'Detail: {exc}')
# NEVER GET HERE
time.sleep(1)
continue
except Exception as exc: # pylint: disable=broad-except
logger.exception('Got uncaught exception in subscriber or callback ...')
# NEVER GET HERE

任何异常分支都不会被采用。这让我认为异常发生在某些后台线程中,并且 future.result() 调用没有报告,尽管文档( GCP PubSub Docs )相反。我已经阅读了许多 github 问题,仔细阅读了文档,并搜索了与此相关的 SO 问题,但我一直无法找到真正捕获此类错误的解决方案。非常感谢任何帮助或建议。

版本:

  • python == 3.6.5
  • google-cloud-pubsub == 0.40.0 # 但这已经表现出来了至少最近几个版本也是如此
  • google-api-core == 1.8.2
  • google-api-python-client == 1.7.8

最佳答案

应该没有必要捕获并处理这些内部错误。它们是图书馆与 Google Cloud Pub/Sub 服务的连接中的暂时性错误,图书馆本身应该处理它们。如果它没有——特别是如果它阻止在不重新启动的情况下接收更多消息,那么它就是一个错误,应该在 GitHub repo for the Python client library 中输入一个问题。 .

关于python - 如何在 python PubSub 订阅者中捕获内部/库线程中发生的异常?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55552606/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com