gpt4 book ai didi

hadoop - 从HBase提取数据时,在RDD中获取Null数据

转载 作者:行者123 更新时间:2023-12-02 21:06:30 25 4
gpt4 key购买 nike

我需要使用Spark API从HBase提取数据,并像SparkSQL一样在数据顶部进行查询。

我所做的事情如下:

  • 创建Spark conf对象
  • 创建的HBase对象
  • 编写JAVPairRDD以获取记录。

  • 我的主要类(class)代码如下:
    import java.io.Serializable;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.client.Result;
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.examples.sql.JavaSparkSQL;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.hive.HiveContext;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.apache.spark.api.java.function.Function;

    import scala.Tuple2;

    import com.lsr.LSRTar;
    import com.lsr.utils.*;
    import com.spark.hbase.TestData;

    public class SparkSQLHBaseMain extends SparkJob implements Serializable{
    public static final String APPNAME = "Spark-SQL HBase Application";

    public static Logger logger = LoggerFactory.getLogger(SparkSQLHBaseMain.class);

    public static void main(String[] args) {
    logger.info("Calling method runJob()");
    new SparkSQLHBaseMain().runJob();
    }

    public void runJob() {
    final JavaSparkContext javaSparkContext = getSparkContext(APPNAME);
    logger.info("Spark Object created !!!");
    SQLContext sqlContext = new SQLContext(javaSparkContext);
    Configuration hbaseConfig = HBaseUtils.getHBaseConf();

    hbaseConfig.set(TableInputFormat.INPUT_TABLE, "emp");
    hbaseConfig.set(TableInputFormat.SCAN_COLUMN_FAMILY, "a"); // column family
    hbaseConfig.set(TableInputFormat.SCAN_COLUMNS, "a:id a:name");

    JavaPairRDD<ImmutableBytesWritable, Result> hBaseRDD =
    javaSparkContext.newAPIHadoopRDD(hbaseConfig, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);

    JavaRDD<TestData> rowPairRDD = hBaseRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, TestData>() {
    public TestData call(Tuple2<ImmutableBytesWritable, Result> entry) throws Exception {
    TestData cd = new TestData();
    Result r = entry._2;
    String keyRow = Bytes.toString(r.getRow());
    cd.setRowkey(keyRow);
    cd.setId((String) Bytes.toString(r.getValue(Bytes.toBytes("a"), Bytes.toBytes("id"))));
    cd.setName((String) Bytes.toString(r.getValue(Bytes.toBytes("a"), Bytes.toBytes("name"))));
    return cd;
    }
    });
    System.out.println("Result : \n"+"ID : "+rowPairRDD.id()+"Name : "+rowPairRDD.name());

    DataFrame dataFrame = sqlContext.createDataFrame(rowPairRDD, TestData.class);
    dataFrame.show();
    dataFrame.cache();
    dataFrame.repartition(100);
    dataFrame.printSchema();
    }

    }

    我的Bean类代码如下:
    package com.spark.hbase;

    import java.io.Serializable;

    public class TestData extends java.lang.Object implements Serializable{
    /**
    *
    */
    private static final long serialVersionUID = 1L;
    String keyRow;
    String id;
    String name;
    public void setRowkey(String keyRow) {
    this.keyRow = keyRow;
    }

    public void setId(String id) {
    this.id = id;
    }

    public void setName(String name) {
    this.name = name;
    }

    }

    低于异常:
    17/01/18 12:28:54 INFO HBaseUtils: HBase is running !!!
    HBase is running!
    17/01/18 12:28:56 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 214.5 KB, free 214.5 KB)
    17/01/18 12:28:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 25.0 KB, free 239.5 KB)
    17/01/18 12:28:57 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:43929 (size: 25.0 KB, free: 257.8 MB)
    17/01/18 12:28:57 INFO SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkSQLHBaseMain.java:49
    Result :
    ID : 1Name : null
    Exception in thread "main" java.lang.NullPointerException
    at org.spark-project.guava.reflect.TypeToken.method(TypeToken.java:465)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:110)
    at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$2.apply(JavaTypeInference.scala:109)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$inferDataType(JavaTypeInference.scala:109)
    at org.apache.spark.sql.catalyst.JavaTypeInference$.inferDataType(JavaTypeInference.scala:54)
    at org.apache.spark.sql.SQLContext.getSchema(SQLContext.scala:941)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:572)
    at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:591)
    at com.ril.jio.spark.sql.SparkSQLHBaseMain.runJob(SparkSQLHBaseMain.java:63)
    at com.ril.jio.spark.sql.SparkSQLHBaseMain.main(SparkSQLHBaseMain.java:35)

    我的Spark&HBase正常工作。

    请帮助我解决此问题。

    最佳答案

    我认为您的代码应如下所示:(未测试)
    rowPairRDD之后

    JavaRDD<TestData> rowPairRDD = ....

    使用 foreach()对RDD进行迭代以读取记录
    rowPairRDD.foreach(new VoidFunction<TestData>() {
    public void call(TestData entry) {
    System.out.println("Result : ID : " + entry.id() + " Name : " + entry.name());
    }
    }

    关于hadoop - 从HBase提取数据时,在RDD中获取Null数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41713281/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com