gpt4 book ai didi

apache-spark - 在 Apache Spark JDBC DataFrame 中使用 Postgis 几何类型

转载 作者:行者123 更新时间:2023-12-03 17:17:41 28 4
gpt4 key购买 nike

我想知道我是否可以在 Apache Spark 的 SQL 和 DataFrames 中使用 Postgis 几何类型。

我做到了这一点:我首先注意到我可以编写 Postgis 方言和我称之为 PostgisDialect 的用户定义类型。和 GeometryType .这是我的代码:

object PostgisDialect extends JdbcDialect {

override def canHandle(url: String): Boolean = url.startsWith("jdbc:postgresql")

override def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
if (sqlType == Types.OTHER) {
toCatalystType(typeName)
} else None
}

// TODO: support more type names.
private def toCatalystType(typeName: String): Option[DataType] = typeName match {
case "geometry" => Some(GeometryType)
case _ => None
}

override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
case GeometryType => Some(JdbcType("geometry", Types.OTHER))
case _ => None
}

override def getTableExistsQuery(table: String): String = {
s"SELECT 1 FROM $table LIMIT 1"
}

override def beforeFetch(connection: Connection, properties: Map[String, String]): Unit = {
super.beforeFetch(connection, properties)

if (properties.getOrElse("fetchsize", "0").toInt > 0) {
connection.setAutoCommit(false)
}

}

}

class GeometryType private() extends UserDefinedType[Geometry] {

override def sqlType: DataType = BinaryType

override def pyUDT: String = "my.types.GeometryType"

override def serialize(obj: Any): GenericArrayData = {
obj match {
case p: Geometry =>
val output = (new WKBWriter).write(p)
new GenericArrayData(output)
}
}

override def deserialize(datum: Any): Geometry = {
datum match {
case values: Array[Byte] => (new WKBReader).read(values)
}
}

override def userClass: Class[Geometry] = classOf[Geometry]

override def asNullable: GeometryType = this
}

case object GeometryType extends GeometryType

到目前为止一切顺利,但是当 JDBCRDD 调用方法时 getConversions :

/**
* Maps a StructType to a type tag list.
*/
def getConversions(schema: StructType): Array[JDBCConversion] =
schema.fields.map(sf => getConversions(sf.dataType, sf.metadata))

private def getConversions(dt: DataType, metadata: Metadata): JDBCConversion = dt match {
case BooleanType => BooleanConversion
case DateType => DateConversion
case DecimalType.Fixed(p, s) => DecimalConversion(p, s)
case DoubleType => DoubleConversion
case FloatType => FloatConversion
case IntegerType => IntegerConversion
case LongType => if (metadata.contains("binarylong")) BinaryLongConversion else LongConversion
case StringType => StringConversion
case TimestampType => TimestampConversion
case BinaryType => BinaryConversion
case ArrayType(et, _) => ArrayConversion(getConversions(et, metadata))
case _ => throw new IllegalArgumentException(s"Unsupported type ${dt.simpleString}")
}

当然,我的自定义类型没有转换。
Caused by: java.lang.IllegalArgumentException: Unsupported type geometry
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.org$apache$spark$sql$execution$datasources$jdbc$JDBCRDD$$getConversions(JDBCRDD.scala:351)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConversions$1.apply(JDBCRDD.scala:337)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$getConversions$1.apply(JDBCRDD.scala:337)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.getConversions(JDBCRDD.scala:337)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anon$1.<init>(JDBCRDD.scala:385)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:359)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

有没有办法为我的自定义类型注册转换?

最佳答案

Spark 没有处理几何数据的内置函数,但创建了以下库来处理几何数据类型。这两个库都提供了一个可用于处理几何数据类型的函数。

  • 塞多纳链接 - https://sedona.apache.org/tutorial/sql/
  • GeoMesa 链接 - https://www.geomesa.org/documentation/stable/user/spark/index.html

  • 我最近使用这两个库来处理几何数据类型,以简化和合并几何并将其加载到 postgress。
    以下是一些建议——
    1. 仔细阅读文档,看看您需要的功能是否存在于其中任何一个中,并一次使用一个 libaray,因为在同一个集群上安装两者可能会导致一些问题。
    2.有不同版本兼容不同的spark版本,您可以使用此链接找到您正在使用的spark版本和兼容性的详细信息
    https://sedona.apache.org/download/overview/
    3.请按照本教程中给出的步骤 - https://sedona.apache.org/tutorial/sql/
    在“Register SedonaSQL”步骤之后,您可以通过运行以下命令来交叉检查所有功能是否都可以使用。
    spark.catalog.listFunctions().show()
    谢谢。

    关于apache-spark - 在 Apache Spark JDBC DataFrame 中使用 Postgis 几何类型,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35773934/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com