gpt4 book ai didi

scala - 为什么 Writable 的隐式转换不起作用

转载 作者:可可西里 更新时间:2023-11-01 14:30:22 26 4
gpt4 key购买 nike

SparkContextWritable 之间定义了几个隐式转换和它们的原始类型,比如 LongWritable <-> Long , Text <-> String .

  • 测试案例1:

我正在使用以下代码合并小文件

  @Test
def testCombineSmallFiles(): Unit = {
val path = "file:///d:/logs"
val rdd = sc.newAPIHadoopFile[LongWritable,Text, CombineTextInputFormat](path)
println(s"rdd partition number is ${rdd.partitions.length}")
println(s"lines is :${rdd.count()}")
}

上面的代码运行良好,但是如果我使用下面的行来获取rdd,它会导致编译错误:

val rdd = sc.newAPIHadoopFile[Long,String, CombineTextInputFormat](path)

看起来隐式转换没有生效。我想知道这里出了什么问题以及为什么它不起作用。

  • 测试案例2:

使用以下使用 sequenceFile 的代码,隐式转换看起来有效(Text 转换为 String,IntWritable 转换为 Int)

 @Test
def testReadWriteSequenceFile(): Unit = {
val data = List(("A", 1), ("B", 2), ("C", 3))
val outputDir = Utils.getOutputDir()
sc.parallelize(data).saveAsSequenceFile(outputDir)
//implicit conversion works for the SparkContext#sequenceFile method
val rdd = sc.sequenceFile(outputDir + "/part-00000", classOf[String], classOf[Int])
rdd.foreach(println)
}

比较这两个测试用例,我没有看出 make 一个工作,另一个不工作的关键区别。

  • 注意:

SparkContext#sequenceFile我在测试用例 2 中使用的方法是:

  def sequenceFile[K, V](
path: String,
keyClass: Class[K],
valueClass: Class[V]): RDD[(K, V)] = withScope {
assertNotStopped()
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
}

sequenceFile方法,它调用了另一个sequenceFile方法,调用hadoopFile方法读取数据

  def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int
): RDD[(K, V)] = withScope {
assertNotStopped()
val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
}

最佳答案

要使用隐式转换,需要 WritableConverter。例如:

   def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = {...}

我在doc 中看不到任何地方| sc.newAPIHadoopFile 使用了它。所以这是不可能的。

此外,请验证您是否使用了import SparkContext._(我无法在您的帖子中看到导入)

请。看看 SparkContext 中的 WritableConverters其中有以下代码

/**
* A class encapsulating how to convert some type `T` from `Writable`. It stores both the `Writable`
* class corresponding to `T` (e.g. `IntWritable` for `Int`) and a function for doing the
* conversion.
* The getter for the writable class takes a `ClassTag[T]` in case this is a generic object
* that doesn't know the type of `T` when it is created. This sounds strange but is necessary to
* support converting subclasses of `Writable` to themselves (`writableWritableConverter()`).
*/
private[spark] class WritableConverter[T](
val writableClass: ClassTag[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable

object WritableConverter {

// Helper objects for converting common types to Writable
private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
: WritableConverter[T] = {
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}

// The following implicit functions were in SparkContext before 1.3 and users had to
// `import SparkContext._` to enable them. Now we move them here to make the compiler find
// them automatically. However, we still keep the old functions in SparkContext for backward
// compatibility and forward to the following functions directly.

implicit def intWritableConverter(): WritableConverter[Int] =
simpleWritableConverter[Int, IntWritable](_.get)

implicit def longWritableConverter(): WritableConverter[Long] =
simpleWritableConverter[Long, LongWritable](_.get)

implicit def doubleWritableConverter(): WritableConverter[Double] =
simpleWritableConverter[Double, DoubleWritable](_.get)

implicit def floatWritableConverter(): WritableConverter[Float] =
simpleWritableConverter[Float, FloatWritable](_.get)

implicit def booleanWritableConverter(): WritableConverter[Boolean] =
simpleWritableConverter[Boolean, BooleanWritable](_.get)

implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
simpleWritableConverter[Array[Byte], BytesWritable] { bw =>
// getBytes method returns array which is longer then data to be returned
Arrays.copyOfRange(bw.getBytes, 0, bw.getLength)
}
}

implicit def stringWritableConverter(): WritableConverter[String] =
simpleWritableConverter[String, Text](_.toString)

implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
}

编辑:

I have updated my question and give two test cases, one works, the other doesn't, but I can't figure out what's the difference between them.

隐式转换需要

WritableConverter

  • Testcase1 即 val rdd = sc.newAPIHadoopFile...(path) 隐式转换NOT中完成>SparkContext。这就是为什么如果你传递 Long 它不会工作,会给出编译器错误

  • TestCase2 即 val rdd = sc.sequenceFile(...) 您正在直接传递 ClassOf[...]如果您要传递 ClassOf[...],则不需要隐式转换,因为这些类不是 Long 值或 String Value..

关于scala - 为什么 Writable 的隐式转换不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44881367/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com