gpt4 book ai didi

apache-spark - 如何将 Spark 数据集与 Thrift 结合使用

转载 作者:行者123 更新时间:2023-12-03 21:36:31 39 4
gpt4 key购买 nike

我的数据格式是用 apache thrift 定义的,代码由 scrooge 生成。我使用 Parquet 将其存储在 spark 中,非常类似于此 blog 中的解释.

我可以非常轻松地将该数据读回 Dataframe,只需执行以下操作:

val df = sqlContext.read.parquet("/path/to/data")

我可以在 RDD 中阅读更多的体操:
def loadRdd[V <: TBase[_, _]](inputDirectory: String, vClass: Class[V]): RDD[V] = {
implicit val ctagV: ClassTag[V] = ClassTag(vClass)
ParquetInputFormat.setReadSupportClass(jobConf, classOf[ThriftReadSupport[V]])
ParquetThriftInputFormat.setThriftClass(jobConf, vClass)
val rdd = sc.newAPIHadoopFile(
inputDirectory, classOf[ParquetThriftInputFormat[V]], classOf[Void], vClass, jobConf)
rdd.asInstanceOf[NewHadoopRDD[Void, V]].values
}
loadRdd("/path/to/data", classOf[MyThriftClass])

我的问题是:如何在 spark 1.6 发布的新 Dataset api 中访问该数据?我想要的原因是数据集 api 的好处:具有相同数据帧速度的类型安全。

我知道需要某种编码器,并且已经为原始类型和案例类提供了这些编码器,但是我拥有的是 thrift 生成的代码(java 或 Scala 一个,任何一个都符合要求),它看起来很像一个案例类,但它不是一个真正的类。

我尝试了明显的选项,但没有用:
val df = sqlContext.read.parquet("/path/to/data")

df.as[MyJavaThriftClass]

<console>:25: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing sqlContext.implicits._ Support for serializing other types will be added in future releases.

df.as[MyScalaThriftClass]

scala.ScalaReflectionException: <none> is not a term
at scala.reflect.api.Symbols$SymbolApi$class.asTerm(Symbols.scala:199)
at scala.reflect.internal.Symbols$SymbolContextApiImpl.asTerm(Symbols.scala:84)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:492)
at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
... 48 elided


df.as[MyScalaThriftClass.Immutable]

java.lang.UnsupportedOperationException: No Encoder found for org.apache.thrift.protocol.TField
- field (class: "org.apache.thrift.protocol.TField", name: "field")
- array element class: "com.twitter.scrooge.TFieldBlob"
- field (class: "scala.collection.immutable.Map", name: "_passthroughFields")
- root class: "com.worldsense.scalathrift.ThriftRange.Immutable"
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:597)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502)
at scala.collection.immutable.List.flatMap(List.scala:327)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502)
at org.apache.spark.sql.catalyst.ScalaReflection$.toCatalystArray$1(ScalaReflection.scala:419)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:537)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:509)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor$1.apply(ScalaReflection.scala:502)
at scala.collection.immutable.List.flatMap(List.scala:327)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$extractorFor(ScalaReflection.scala:502)
at org.apache.spark.sql.catalyst.ScalaReflection$.extractorsFor(ScalaReflection.scala:394)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:54)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:41)
... 48 elided

似乎无形 works fine使用 Thrift 生成的代码,我想知道是否可以使用它来生成当前编码器 api 可接受的内容。

任何提示?

最佳答案

应该可以通过传递 Encoders.bean(My.getClass) 来解决这个问题作为显式隐式。

示例:df.as[MyJavaThriftClass](Encoders.bean(MyJavaThriftClass.getClass))

关于apache-spark - 如何将 Spark 数据集与 Thrift 结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35240021/

39 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com