gpt4 book ai didi

java - 在 Spark 中读取自定义序列文件

转载 作者:行者123 更新时间:2023-12-02 19:58:35 25 4
gpt4 key购买 nike

我在 Hadoop 中有一个自定义可写类,它保存为序列文件,如下所示

   public class ABC implements Writable{
private byte[] myId;
private byte[] myType;

//Constructor and other methods
@Override
public void write(DataOutput out) throws IOException {
myId.write(out);
myType.write(out);
}

@Override
public void readFields(DataInput in) throws IOException {
myId.readFields(in);
myType.readFields(in);
}
}
我想使用 PySpark 读取序列文件并获取数据。我尝试了以下三种方法:
  • 直接阅读:

  • sc.sequenceFile("file:///Test.seq", keyClass = "ABC", valueClass ="ABC")
    但得到
    object not serializable (class: ABC, value: ABC@451de3ec)
  • 写转换器:

  • 来自官方教程 http://spark.apache.org/docs/latest/programming-guide.html#external-datasets ,它说

    If you have custom serialized binary data (such as loading data fromCassandra / HBase), then you will first need to transform that data onthe Scala/Java side to something which can be handled by Pyrolite’spickler. A Converter trait is provided for this. Simply extend thistrait and implement your transformation code in the convert method.


    因此,我实现了一个转换器,如下所示:
    import test.ABC
    import java.io.DataInput
    import org.apache.spark.api.python.Converter

    /**
    * Implementation of [[org.apache.spark.api.python.Converter]] that converts data
    * to ABC
    */
    class DataToABCConverter extends Converter[Any, ABC] {
    override def convert(obj: Any): ABC = {
    if (obj == null) {
    return null
    }
    val in = obj.asInstanceOf[DataInput]
    val abc = new ABC()
    abc.readFields(in)
    abc
    }
    }
    在 PySpark 我使用以下代码
    sc.sequenceFile("file:///Test.seq", keyClass = "ABC", valueClass ="ABC",  keyConverter="DataToABCConverter",  valueConverter="DataToABCConverter" )
    但是得到以下错误
    java.lang.ClassCastException: ABC cannot be cast to java.io.DataInput
    看起来转换器的输入是我的 ABC 类而不是 java.io.DataInput,所以我不能应用 readFields 方法来获取数据。
  • 使用 BytesWritable:

  • 我添加一个 geID()获取字节并更改转换器的方法如下:
    class DataToChunkConverter extends Converter[Any, BytesWritable] {
    override def convert(obj: Any): BytesWritable = {
    if (obj == null) {
    return null
    }
    val abc = obj.asInstanceOf[ABC]
    val idd = abc.getID()
    new BytesWritable(idd)
    }
    }
    比我运行 pyspark 使用
    pyspark --master=local[8] --conf "spark.kryo.classesToRegister=org.apache.hadoop.io.BytesWritable" --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
    但是得到以下错误
    Failed to pickle Java object as value: BytesWritable, falling back
    to 'toString'. Error: couldn't pickle object of type class org.apache.hadoop.io.BytesWritable
    所以我的问题是在 PySpark 中读取自定义序列文件的正确方法是什么?什么类型可以通过 PySpark 序列化?任何建议表示赞赏!

    最佳答案

    经过一些实验(遵循第三种方法),事实证明如果使用scala或Java中的native类型作为转换器的返回类型,它就可以工作。

    例如,使用 Array[Byte]作为返回类型,Pyspark 可以成功获取数据:

     class DataToChunkConverter extends Converter[Any,  Array[Byte]] {
    override def convert(obj: Any): Array[Byte] = {
    if (obj == null) {
    return null
    }
    val abc = obj.asInstanceOf[ABC]
    val idd = abc.getID()
    idd
    }
    }

    关于java - 在 Spark 中读取自定义序列文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42687835/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com