gpt4 book ai didi

hadoop - sqoop中数据是如何拆分成part文件的

转载 作者:可可西里 更新时间:2023-11-01 15:55:12 28 4
gpt4 key购买 nike

我怀疑如果数据是倾斜的,数据是如何划分成部分文件的。如果可能,请帮我澄清一下。

假设这是我的 department 表,其中 department_id 作为主键。

mysql> select * from departments;
2 Fitness
3 Footwear
4 Apparel
5 Golf
6 Outdoors
7 Fan Shop

如果我通过在导入命令中提及 -m 1 使用 sqoop import,我知道我只会生成一个包含所有记录的部分文件。

现在我在不指定任何映射器的情况下运行命令。所以默认情况下它应该使用 4 个映射器并在 HDFS 中创建 4 个部分文件。以下是记录如何按零件文件分发。

[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00000
2,Fitness
3,Footwear
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00001
4,Apparel
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00002
5,Golf
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00003
6,Outdoors
7,Fan Shop

根据 BoundingValsQuery,默认使用 Min(department_id)=2、Max(department_id)=8 和 4 个映射器。

经计算,每个mapper应该得到(8-2)/4=1.5条记录。

这里我不知道如何分发数据。我不明白 part-m-00000 中有 2 条记录,而 part-m-00001 和 part-m-00002 中只有一条,而 part-m-00003 中又有两条记录。

最佳答案

如果你有机会去图书馆看看。其中涉及一系列步骤。

Sqoop job Read records. via DBRecordReader

 org.apache.sqoop.mapreduce.db.DBRecordReader

这里有两种方法。

方法一

protected ResultSet executeQuery(String query) throws SQLException {
Integer fetchSize = dbConf.getFetchSize();
/*get fetchSize according to split which is calculated via getSplits() method of
org.apache.sqoop.mapreduce.db.DBInputFormat.And no. of splits are calculated
via no. of (count from table/no. of mappers). */
}

拆分计算:-

org.apache.sqoop.mapreduce.db.DBInputFormat
public List<InputSplit> getSplits(JobContext job) throws IOException {
.......//here splits are calculated accroding to count of source table
.......query.append("SELECT COUNT(*) FROM " + tableName);
}

方法二。

 protected String getSelectQuery() {
if (dbConf.getInputQuery() == null) {
query.append("SELECT ");

for (int i = 0; i < fieldNames.length; i++) {
query.append(fieldNames[i]);
if (i != fieldNames.length -1) {
query.append(", ");
}
}

query.append(" FROM ").append(tableName);
query.append(" AS ").append(tableName);
if (conditions != null && conditions.length() > 0) {
query.append(" WHERE (").append(conditions).append(")");
}

String orderBy = dbConf.getInputOrderBy();
if (orderBy != null && orderBy.length() > 0) {
query.append(" ORDER BY ").append(orderBy);
}
} else {
//PREBUILT QUERY
query.append(dbConf.getInputQuery());
}

try {// main logic to decide division of records between mappers.
query.append(" LIMIT ").append(split.getLength());
query.append(" OFFSET ").append(split.getStart());
} catch (IOException ex) {
// Ignore, will not throw.
}

return query.toString();
}

检查注释下的代码部分主要逻辑......这里记录按照LIMIT和OFFSET划分。这个逻辑对于每个 RDBMS 的实现都是不同的。只需查找 org.apache.sqoop.mapreduce.db.OracleDBRecordReader 它与 getSelectQuery() 方法的实现几乎没有什么不同。

希望这能让您快速了解如何将记录划分为不同的映射器。

关于hadoop - sqoop中数据是如何拆分成part文件的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45100487/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com