gpt4 book ai didi

java - sqoop import-all-tables slow 和 sequence files 是自定义 java objects

转载 作者:可可西里 更新时间:2023-11-01 15:26:54 27 4
gpt4 key购买 nike

我正在努力将一个非常大的数据库同步到配置单元。

有 2 个问题:(1) 文本导入速度较慢,而且 mapreduce 步长较大。 (2) 序列文件速度更快,但无法通过正常方式读取。

详情如下:

(1) 如果我们将数据导入为文本,则速度较慢。这些文件累积在临时文件夹中的主目录中,但最终会创建一个相当慢的 mapreduce 作业。

17/04/25 04:18:34 INFO mapreduce.Job: Job job_1490822567992_0996 running in uber mode : false
17/04/25 04:18:34 INFO mapreduce.Job: map 0% reduce 0%
17/04/25 11:05:59 INFO mapreduce.Job: map 29% reduce 0%
17/04/25 11:20:18 INFO mapreduce.Job: map 86% reduce 0% <-- tends to hang a very long time here

(为简洁起见删除了很多行。)

(2) 如果我们将文件作为序列文件导入,速度会快得多,但是 Hive 无法读取检索到的数据,因为它需要了解创建的自动生成的 Java 文件。这也有一个 mapreduce 步骤,但它似乎进行得更快(或者那可能是一天中的某个时间……)。

对于由 sqoop 生成的每个表,我们都有一系列这样的类:public class MyTableName extends SqoopRecord implements DBWritable, Writable

使用这些类的步骤是什么?我们如何在配置单元中安装它们?令人惊讶的是,Cloudera 支持工程师并不知道,因为这一定是不常被标示的区域??

sqoop import-all-tables --connect '...' --relaxed-isolation --num-mappers 7 --compress --autoreset-to-one-mapper --compression-codec=snappy --outdir javadir --as-sequencefile --hive-delims-replacement ' '

有什么建议吗?

最佳答案

I am open to Spark. Do you have some sample code?

免责声明:我只是从多个笔记本中汇集了一些片段,并且懒得(也饿了)在离开办公室之前启动测试运行。任何错误和拼写错误都由您来查找。


使用 Cloudera parcel (支持 Hive) 提供的 Spark 2.0,一种交互式风格的 Scala 脚本,在本地模式下,没有任何数据分区,一个 Microsoft SQL服务器连接,并直接插入到现有的 Hive 管理表中(带有一些额外的业务逻辑)...

spark2-shell --master local --driver-class-path /some/path/to/sqljdbc42.jar

//旁注:4 类 JDBC 驱动程序的自动注册在多个 Spark 构建中被破坏,并且错误不断出现,因此指定驱动程序类更安全,以防万一...

val weather = spark.read.format("jdbc").option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option("url", "jdbc:sqlserver://myhost\\SQLExpress:9433;database=mydb").option("user", "mylogin").option("password", "*****").option("dbtable", "weather_obs").load()
{ printf( "%%% Partitions: %d / Records: %d\n", weather.rdd.getNumPartitions, weather.count)
println("%%% Detailed DF schema:")
weather.printSchema
}

//"dbtable" 使用子查询的替代方法:
//"(SELECT station, dt_obs_utc, temp_k FROM observation_meteo WHERE station LIKE '78%') x")

weather.registerTempTable("wth")
spark.sql(
"""
INSERT INTO TABLE somedb.sometable
SELECT station, dt_obs_utc, CAST(temp_k -273.15 AS DECIMAL(3,1)) as temp_c
FROM wth
WHERE temp_k IS NOT NULL
""")
dropTempTable("wth")

weather.unpersist()


现在,如果您想使用 GZip 压缩在 Parquet 文件上动态创建 Hive 外部表,请将“临时表”技巧替换为...

weather.write.option("compression","gzip").mode("overwrite").parquet("hdfs:///some/directory/")

//Parquet 支持的压缩编解码器:无、snappy(默认)、gzip
//支持的 CSV 压缩编解码器:无(默认)、snappy、lz4、gzip、bzip2

def toImpalaType(sparkType : String ) : String = {
if (sparkType == "StringType" || sparkType == "BinaryType") { return "string" }
if (sparkType == "BooleanType") { return "boolean" }
if (sparkType == "ByteType") { return "tinyint" }
if (sparkType == "ShortType") { return "smallint" }
if (sparkType == "IntegerType") { return "int" }
if (sparkType == "LongType") { return "bigint" }
if (sparkType == "FloatType") { return "float" }
if (sparkType == "DoubleType") { return "double" }
if (sparkType.startsWith("DecimalType")) { return sparkType.replace("DecimalType","decimal") }
if (sparkType == "TimestampType" || sparkType == "DateType") { return "timestamp" }
println("########## ERROR - \"" +sparkType +"\" not supported (bug)")
return "string"
}

spark.sql("DROP TABLE IF EXISTS somedb.sometable")
{ val query = new StringBuilder
query.append("CREATE EXTERNAL TABLE somedb.sometable")
val weatherSchema =weather.dtypes
val (colName0,colType0) = weatherSchema(0)
query.append("\n ( " +colName0 + " " +toImpalaType(colType0))
for ( i <- 2 to tempSchema.length) { val (colName_,colType_) = tempSchema(i-1) ; query.append("\n , " +colName_ + " " +toImpalaType(colType_)) }
query.append("\n )\nCOMMENT 'Imported from SQL Server by Spark'")
query.append("\nSTORED AS Parquet")
query.append("\nLOCATION 'hdfs:///some/directory'")
sqlContext.sql(query.toString())
query.clear()
}


如果您想对输入表进行分区(基于数字列 - 日期/时间不支持 AFAIK),请查看 JDBC 导入选项 partitionColumnlowerBoundupperBound

如果你想在 YARN-client 模式下并行加载这些分区,那么添加一个 --jars 参数来将 JDBC 驱动程序上传到执行器。

关于java - sqoop import-all-tables slow 和 sequence files 是自定义 java objects,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43617552/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com