- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在尝试按照以下步骤将数据框加载到 Hive 表中:
读取源表并将数据帧保存为 HDFS 上的 CSV 文件
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", s"(${execQuery}) as year2016").option("user", devUserName).option("password", devPassword).option("partitionColumn","header_id").option("lowerBound", 199199).option("upperBound", 284058).option("numPartitions",10).load()
按照我的 Hive 表列对列进行排序我的配置单元表列以以下格式的字符串存在:
val hiveCols = col1:coldatatype|col2:coldatatype|col3:coldatatype|col4:coldatatype...col200:datatype
val schemaList = hiveCols.split("\\|")
val hiveColumnOrder = schemaList.map(e => e.split("\\:")).map(e => e(0)).toSeq
val finalDF = yearDF.selectExpr(hiveColumnOrder:_*)
我在“execQuery”中读取的列顺序与“hiveColumnOrder”相同,为了确保顺序,我再次使用 selectExpr 选择 yearDF 中的列
将数据帧保存为 HDFS 上的 CSV 文件:
newDF.write.format("CSV").save("hdfs://username/apps/hive/warehouse/database.db/lines_test_data56/")
保存数据框后,我从“hiveCols”中获取相同的列,准备一个 DDL 以在相同位置创建一个配置单元表,其值按给定的逗号分隔下面:
create table if not exists schema.tablename(col1 coldatatype,col2 coldatatype,col3 coldatatype,col4 coldatatype...col200 datatype)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILELOCATION 'hdfs://username/apps/hive/warehouse/database.db/lines_test_data56/';
在我将数据框加载到创建的表中之后,我在这里面临的问题是当我查询表时,我在查询中得到不正确的输出。例如:如果我在将数据框保存为文件之前对数据框应用以下查询:
finalDF.createOrReplaceTempView("tmpTable")
select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from tmpTable where header_id=19924598 and line_num=2
我得到了正确的输出。所有值都与列正确对齐:
[19924598,2,null,null,381761.40000000000000000000,381761.4,-381761.40000000000000000000,-381761.4,0.01489610000000000000,0.014896100000000,5686.76000000000000000000,5686.76]
但是在将数据框保存到 CSV 文件中之后,在其之上创建一个表(第 4 步)并对创建的表应用相同的查询,我发现数据是困惑的并且与列的映射不正确:
select header_id,line_num,debit_rate,debit_rate_text,credit_rate,credit_rate_text,activity_amount,activity_amount_text,exchange_rate,exchange_rate_text,amount_cr,amount_cr_text from schema.tablename where header_id=19924598 and line_num=2
+---------------+--------------+-------------+------------------+-------------+------------------+--------------------------+-------------------------------+------------------------+-----------------------------+--------------------+-------------------------+--+
| header_id | line_num | debit_rate | debit_rate_text | credit_rate | credit_rate_text | activity_amount | activity_amount_text | exchange_rate | exchange_rate_text | amount_cr | amount_cr_text |
+---------------+--------------+-------------+------------------+-------------+------------------+--------------------------+-------------------------------+------------------------+-----------------------------+--------------------+-------------------------+--+
| 19924598 | 2 | NULL | | 381761.4 | | 5686.76 | 5686.76 | NULL | -5686.76 | NULL | |
所以我尝试使用一种不同的方法,即预先创建配置单元表并将数据从数据帧插入其中:
如果我在作业完成后运行上述选择查询,即使这种方式也会失败。我尝试使用 refresh table schema.table
和 msckrepair table schema.table
刷新表,只是为了查看元数据是否有任何问题,但似乎没有任何效果。
谁能告诉我是什么原因造成的,我这里操作数据的方式有什么问题吗?
最佳答案
代码使用 Spark 2.3.2 测试
无需从 CSV 文件创建 Spark 数据框,然后将其注册为 Hive 表,您可以轻松地运行 SQL 命令并从 CSV 文件创建 Hive 表
val conf = new SparkConf
conf
.set("hive.server2.thrift.port", "10000")
.set("spark.sql.hive.thriftServer.singleSession", "true")
.set("spark.sql.warehouse.dir", "hdfs://PATH_FOR_HIVE_METADATA")
.set("spark.sql.catalogImplementation","hive")
.setMaster("local[*]")
.setAppName("ThriftServer")
val spark = SparkSession.builder()
.config(conf)
.enableHiveSupport()
.getOrCreate()
现在使用 spark
对象,您可以作为 Hive 用户运行 SQL 命令:
spark.sql("DROP DATABASE IF EXISTS my_db CASCADE")
spark.sql("create database if not exists my_db")
spark.sql("use my_db")
使用以下代码,您可以加载 HDFS 目录中的所有 csv_files(或者您可以只给出一个 CSV 文件的路径):
spark.sql(
"CREATE TABLE test_table(" +
"id int," +
"time_stamp bigint," +
"user_name string) " +
"ROW FORMAT DELIMITED " +
"FIELDS TERMINATED BY ',' " +
"STORED AS TEXTFILE " +
"LOCATION 'hdfs://PATH_TO_CSV_Directory_OR_CSV_FILE' "
)
最后将 Spark sqlContext 对象注册为 Hive ThriftServer:
HiveThriftServer2.startWithContext(spark.sqlContext)
这将在端口 10000 上创建一个 ThriftServer 端点。
INFO ThriftCLIService: Starting ThriftBinaryCLIService on port 10000 with 5...500 worker threads
现在您可以运行 beeline 并连接到 ThriftServer:
beeline> !connect jdbc:hive2://localhost:10000
Connecting to jdbc:hive2://localhost:10000
Enter username for jdbc:hive2://localhost:10000: enter optional_username
Enter password for jdbc:hive2://localhost:10000: leave blank
Connected to: Spark SQL (version 2.3.2)
Driver: Hive JDBC (version 1.2.1.spark2)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000>
并测试表test_table
是否在my_db
数据库下创建:
0: jdbc:hive2://localhost:10000> use my_db;
0: jdbc:hive2://localhost:10000> show tables ;
+-----------+-----------------------+--------------+--+
| database | tableName | isTemporary |
+-----------+-----------------------+--------------+--+
| my_db | test_table | false |
+-----------+-----------------------+--------------+--+
此外,您可以使用 ThrifServer JDBC 端点创建任何其他 Hive 表(或任何 HiveQL 命令)。
这里是需要的依赖:
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion,
"org.apache.spark" %% "spark-hive-thriftserver" % sparkVersion,
"org.apache.hadoop" % "hadoop-hdfs" % "2.8.3",
"org.apache.hadoop" % "hadoop-common" % "2.8.3",
)
关于apache-spark - 将数据从 CSV 文件映射到 HDFS 上的 Hive 表时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54691697/
我正在使用 node.js 和 mocha 单元测试,并且希望能够通过 npm 运行测试命令。当我在测试文件夹中运行 Mocha 测试时,测试运行成功。但是,当我运行 npm test 时,测试给出了
我的文本区域中有这些标签 ..... 我正在尝试使用 replaceAll() String 方法替换它们 text.replaceAll("", ""); text.replaceAll("", "
早上好,我是 ZXing 的新手,当我运行我的应用程序时出现以下错误: 异常Ljava/lang/NoClassDefFoundError;初始化 ICOM/google/zxing/client/a
我正在制作一些哈希函数。 它的源代码是... #include #include #include int m_hash(char *input, size_t in_length, char
我正在尝试使用 Spritekit 在 Swift 中编写游戏。目的是带着他的角色迎面而来的矩形逃跑。现在我在 SKPhysicsContactDelegate (didBegin ()) 方法中犯了
我正在尝试创建一个用于导入 CSV 文件的按钮,但出现此错误: actionPerformed(java.awt.event.ActionEvent) in cannot implement
请看下面的代码 public List getNames() { List names = new ArrayList(); try { createConnection(); Sta
我正在尝试添加一个事件以在“dealsArchive”表中创建一个条目,然后从“deals”表中删除该条目。它需要在特定时间执行。 这是我正在尝试使用的: DELIMITER $$ CREATE EV
我试图将两个存储过程的表结果存储到 phpmyadmin 例程窗口中的单个表中,这给了我 mariadb 语法错误。单独调用存储过程给出了结果。 存储过程代码 BEGIN CREATE TABLE t
我想在 videoview 中加载视频之前有一个进度条。但是我收到以下错误。我还添加了所有必要的导入。 我在 ANDROID 中使用 AIDE 这是我的代码 public class MainActi
我已经使用了 AsyncTask,但我不明白为什么在我的设备 (OS 4.0) 上测试时仍然出现错误。我的 apk 构建于 2.3.3 中。我想我把代码弄错了,但我不知道我的错误在哪里。任何人都请帮助
我在测试 friend 网站的安全性时,通过在 URL 末尾添加 ' 发现了 SQL 注入(inject)漏洞该网站是用zend框架构建的我遇到的问题是 MySQL -- 中的注释语法不起作用,因此页
我正在尝试使用堆栈溢出答案之一的交互式信息窗口。 链接如下: interactive infowindow 但是我在代码中使用 getMap() 时遇到错误。虽然我尝试使用 getMapAsync 但
当我编译以下代码时出现错误: The method addMouseListener(Player) is undefined for the type Player 代码: import java.
我是 Android 开发的初学者。我正在开发一个接收 MySql 数据然后将其保存在 SQLite 中的应用程序。 我将 Json 用于同步状态,以便我可以将未同步数据的数量显示为要同步的待处理数据
(这里是Hello world级别的自动化测试人员) 我正在尝试下载一个文件并将其重命名以便于查找。我收到一个错误....这是代码 @Test public void allDownload(
我只是在写另一个程序。并使用: while (cin) words.push_back(s); words是string的vector,s是string。 我的 RAM 使用量在 4 或 5
我是 AngularJS 的新手,我遇到了一个问题。我有一个带有提交按钮的页面,当我单击提交模式时必须打开并且来自 URL 的数据必须存在于模式中。现在,模式打开但它是空的并且没有从 URL 获取数据
我正在尝试读取一个文件(它可以包含任意数量的随机数字,但不会超过 500 个)并将其放入一个数组中。 稍后我将需要使用数组来做很多事情。 但到目前为止,这一小段代码给了我 no match for o
有些人在使用 make 命令进行编译时遇到了问题,所以我想我应该在这里尝试一下,我已经在以下操作系统的 ubuntu 32 位和挤压 64 位上尝试过 我克隆了 git 项目 https://gith
我是一名优秀的程序员,十分优秀!