gpt4 book ai didi

apache-spark - Hive无法读取Spark生成的分区 Parquet 文件

转载 作者:行者123 更新时间:2023-12-04 03:19:51 24 4
gpt4 key购买 nike

我在读取Hive中Spark生成的分区 Parquet 文件时遇到问题。我可以在配置单元中创建外部表,但是当我尝试选择几行时,配置单元仅返回“OK”消息,没有行。

我能够在Spark中正确读取分区的 Parquet 文件,因此我假设它们是正确生成的。
当我在 hive 中创建外部表而不进行分区时,我也能够读取这些文件。

有人有建议吗?

我的环境是:

  • 群集EMR 4.1.0
  • hive 1.0.0
  • Spark 1.5.0
  • 色调3.7.1
  • Parquet文件存储在S3存储桶中(s3://staging-dev/test/ttfourfieldspart2/year = 2013/month = 11)

  • 我的Spark配置文件具有以下参数(/etc/spark/conf.dist/spark-defaults.conf):
    spark.master yarn
    spark.driver.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
    spark.driver.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
    spark.executor.extraClassPath /etc/hadoop/conf:/etc/hive/conf:/usr/lib/hadoop/*:/usr/lib/hadoop-hdfs/*:/usr/lib/hadoop-mapreduce/*:/usr/lib/hadoop-yarn/*:/usr/lib/hadoop-lzo/lib/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*
    spark.executor.extraLibraryPath /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native
    spark.eventLog.enabled true
    spark.eventLog.dir hdfs:///var/log/spark/apps
    spark.history.fs.logDirectory hdfs:///var/log/spark/apps
    spark.yarn.historyServer.address ip-10-37-161-246.ec2.internal:18080
    spark.history.ui.port 18080
    spark.shuffle.service.enabled true
    spark.driver.extraJavaOptions -Dlog4j.configuration=file:///etc/spark/conf/log4j.properties -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=512M -XX:OnOutOfMemoryError='kill -9 %p'
    spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'
    spark.executor.memory 4G
    spark.driver.memory 4G
    spark.dynamicAllocation.enabled true
    spark.dynamicAllocation.maxExecutors 100
    spark.dynamicAllocation.minExecutors 1

    Hive配置文件具有以下参数(/etc/hive/conf/hive-site.xml):
    <configuration>

    <!-- Hive Configuration can either be stored in this file or in the hadoop configuration files -->
    <!-- that are implied by Hadoop setup variables. -->
    <!-- Aside from Hadoop setup variables - this file is provided as a convenience so that Hive -->
    <!-- users do not have to edit hadoop configuration files (that may be managed as a centralized -->
    <!-- resource). -->

    <!-- Hive Execution Parameters -->


    <property>
    <name>hbase.zookeeper.quorum</name>
    <value>ip-10-xx-xxx-xxx.ec2.internal</value>
    <description>http://wiki.apache.org/hadoop/Hive/HBaseIntegration</description>
    </property>

    <property>
    <name>hive.execution.engine</name>
    <value>mr</value>
    </property>

    <property>
    <name>fs.defaultFS</name>
    <value>hdfs://ip-10-xx-xxx-xxx.ec2.internal:8020</value>
    </property>

    <property>
    <name>hive.metastore.uris</name>
    <value>thrift://ip-10-xx-xxx-xxx.ec2.internal:9083</value>
    <description>JDBC connect string for a JDBC metastore</description>
    </property>

    <property>
    <name>javax.jdo.option.ConnectionURL</name>
    <value>jdbc:mysql://ip-10-xx-xxx-xxx.ec2.internal:3306/hive?createDatabaseIfNotExist=true</value>
    <description>username to use against metastore database</description>
    </property>

    <property>
    <name>javax.jdo.option.ConnectionDriverName</name>
    <value>org.mariadb.jdbc.Driver</value>
    <description>username to use against metastore database</description>
    </property>

    <property>
    <name>javax.jdo.option.ConnectionUserName</name>
    <value>hive</value>
    <description>username to use against metastore database</description>
    </property>

    <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>1R72JFCDG5XaaDTB</value>
    <description>password to use against metastore database</description>
    </property>

    <property>
    <name>datanucleus.fixedDatastore</name>
    <value>true</value>
    </property>

    <property>
    <name>mapred.reduce.tasks</name>
    <value>-1</value>
    </property>

    <property>
    <name>mapred.max.split.size</name>
    <value>256000000</value>
    </property>

    <property>
    <name>hive.metastore.connect.retries</name>
    <value>5</value>
    </property>

    <property>
    <name>hive.optimize.sort.dynamic.partition</name>
    <value>true</value>
    </property>

    <property><name>hive.exec.dynamic.partition</name><value>true</value></property>
    <property><name>hive.exec.dynamic.partition.mode</name><value>nonstrict</value></property>
    <property><name>hive.exec.max.dynamic.partitions</name><value>10000</value></property>
    <property><name>hive.exec.max.dynamic.partitions.pernode</name><value>500</value></property>

    </configuration>

    我的python代码,用于读取分区的 Parquet 文件:
    from pyspark import *
    from pyspark.sql import *
    from pyspark.sql.types import *
    from pyspark.sql.functions import *

    df7 = sqlContext.read.parquet('s3://staging-dev/test/ttfourfieldspart2/')

    Spark打印的 Parquet 文件架构:
    >>> df7.schema
    StructType(List(StructField(transactionid,StringType,true),StructField(eventts,TimestampType,true),StructField(year,IntegerType,true),StructField(month,IntegerType,true)))

    >>> df7.printSchema()
    root
    |-- transactionid: string (nullable = true)
    |-- eventts: timestamp (nullable = true)
    |-- year: integer (nullable = true)
    |-- month: integer (nullable = true)

    >>> df7.show(10)
    +--------------------+--------------------+----+-----+
    | transactionid| eventts|year|month|
    +--------------------+--------------------+----+-----+
    |f7018907-ed3d-49b...|2013-11-21 18:41:...|2013| 11|
    |f6d95a5f-d4ba-489...|2013-11-21 18:41:...|2013| 11|
    |02b2a715-6e15-4bb...|2013-11-21 18:41:...|2013| 11|
    |0e908c0f-7d63-48c...|2013-11-21 18:41:...|2013| 11|
    |f83e30f9-950a-4b9...|2013-11-21 18:41:...|2013| 11|
    |3425e4ea-b715-476...|2013-11-21 18:41:...|2013| 11|
    |a20a6aeb-da4f-4fd...|2013-11-21 18:41:...|2013| 11|
    |d2f57e6f-889b-49b...|2013-11-21 18:41:...|2013| 11|
    |46f2eda5-408e-44e...|2013-11-21 18:41:...|2013| 11|
    |36fb8b79-b2b5-493...|2013-11-21 18:41:...|2013| 11|
    +--------------------+--------------------+----+-----+
    only showing top 10 rows

    在Hive中创建表:
    create external table if not exists t3(
    transactionid string,
    eventts timestamp)
    partitioned by (year int, month int)
    stored as parquet
    location 's3://staging-dev/test/ttfourfieldspart2/';

    当我尝试在Hive中选择一些行时,它不返回任何行:
    hive> select * from t3 limit 10;
    OK
    Time taken: 0.027 seconds
    hive>

    最佳答案

    我终于找到了问题。在S3或HDFS中已经存在分区数据的Hive中创建表时,需要运行命令以使用表的分区结构更新Hive Metastore。在这里看看:
    https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-RecoverPartitions(MSCKREPAIRTABLE)

    The commands are:

    MSCK REPAIR TABLE table_name;


    And on Hive running in Amazon EMR you can use:

    ALTER TABLE table_name RECOVER PARTITIONS;

    关于apache-spark - Hive无法读取Spark生成的分区 Parquet 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33551878/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com