gpt4 book ai didi

java - 如何使用 Apache Beam 管理背压

转载 作者:行者123 更新时间:2023-12-05 06:24:27 25 4
gpt4 key购买 nike

我有非常基本的 apache beam 管道,它在 GCP Dataflow 上运行并从 PubSub 读取一些数据,对其进行转换并将其写入 Postgres 数据库。所有这些都是通过 Apache Beam 的标准读取器/写入器组件完成的。问题是当我的管道开始接收大量数据时,我的 Postgres 端由于等待 ShareLocks 而出现死锁错误。

很明显,这种事情的发生是因为 Postgres 端溢出。我的管道试图一次写得太快和太多东西,所以为了避免这种情况,它应该放慢速度。因此,我们可以使用诸如背压之类的机制。我试图挖掘有关 Apache Beam 背压配置的任何信息,不幸的是,官方文档似乎对此类问题只字未提。

我对以下类型的异常感到不知所措:

java.sql.BatchUpdateException: Batch entry <NUMBER>
<MY_STATEMENT>
was aborted: ERROR: deadlock detected
Detail: Process 87768 waits for ShareLock on transaction 1939992; blocked by process 87769.
Process 87769 waits for ShareLock on transaction 1939997; blocked by process 87768.
Hint: See server log for query details.
Where: while inserting index tuple (5997152,9) in relation "<MY_TABLE>" Call getNextException to see other errors in the batch.

我想知道是否有任何背压工具包或类似工具可以帮助我在不编写自己的 PostgresIO.Writer 的情况下管理我的问题。

非常感谢。

最佳答案

假设您使用JdbcIO 写入Postgres,您可以尝试增加批大小(参见withBatchSize(long batchSize)),默认为1K 条记录,可能还不够。

此外,如果出现 SQL 异常,并且您想重试,则需要确保使用正确的重试策略(参见 withRetryStrategy(RetryStrategy retryStrategy))。在这种情况下,将应用 FluentBackoff

关于java - 如何使用 Apache Beam 管理背压,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57580362/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com