gpt4 book ai didi

sql - Flink SQL : source table is too big to fit into memory

转载 作者:行者123 更新时间:2023-12-05 06:07:28 24 4
gpt4 key购买 nike

我对 Flink 比较陌生,今天在 Flink 1.11.3 session 集群上使用 Flink SQL 时遇到了一个问题。

问题

我注册了一个使用 jdbc postgres 驱动程序的源表。我正在尝试以 parquet 格式将一些数据从这个在线数据库移动到 AWS S3。这张表很大(~43 GB)。大约 1 分钟后作业失败,任务管理器在没有任何警告的情况下崩溃。但我最好的猜测是任务管理器内存不足。

我的观察

我发现当我执行 tableEnv.executeSql("select ... from huge_table limit 1000") 时,flink 尝试将整个源表扫描到内存中,并且仅在此之后才计划执行限制.

问题

由于我只关心最近几天的数据,有什么办法可以通过时间戳来限制作业扫描的行数吗?

附录

这是一个可以重现问题的最小设置(去除了很多噪音)

环境设置代码

var blinkSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
var tableEnv = TableEnvironment.create(blinkSettings);

Flink SQL 源表DDL

CREATE TABLE source_transactions (
txid STRING,
username STRING,
amount BIGINT,
ts TIMESTAMP,
PRIMARY KEY (txid) NOT ENFORCED
) WITH (
'connector'='jdbc',
'url'='jdbc:postgresql://my.bank',
'table-name'='transactions',
'driver'='org.postgresql.Driver',
'username'='username',
'password'='password',
'scan.fetch-size'='2000'
)

Flink SQL中的Sink table DDL

CREATE TABLE sink_transactions (
create_time TIMESTAMP,
username STRING,
delta_amount DOUBLE,
dt STRING
) PARTITIONED BY (dt) WITH (
'connector'='filesystem',
'path'='s3a://s3/path/to/transactions',
'format'='parquet'
)

在 Flink SQL 中插入查询

INSERT INTO sink_transactions
SELECT ts, username, CAST(t.amount AS DOUBLE) / 100, DATE_FORMAT(ts, 'yyyy-MM-dd')
FROM source_transactions

最佳答案

你的观察是对的,Flink 不支持 JDBC connector 的 limit pushdown 优化,并且有一个几乎合并的 PR 来支持这个特性,这将在 Flink 1.13 中使用,如果你可以 cherry-pick 这个补丁到你的代码中您急需此功能。

1.吉拉:FLINK-19650 Support the limit push down for the Jdbc

2.公关:https://github.com/apache/flink/pull/13800

关于sql - Flink SQL : source table is too big to fit into memory,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65410393/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com