gpt4 book ai didi

scala - Spark Structured Streaming MemoryStream + Row + Encoders 问题

转载 作者:行者123 更新时间:2023-12-01 13:18:35 25 4
gpt4 key购买 nike

我正在尝试使用 Spark 结构化流在我的本地机器上运行一些测试。

在批处理模式下,这里是我正在处理的行:

val recordSchema = StructType(List(StructField("Record", MapType(StringType, StringType), false)))
val rows = List(
Row(
Map("ID" -> "1",
"STRUCTUREID" -> "MFCD00869853",
"MOLFILE" -> "The MOL Data",
"MOLWEIGHT" -> "803.482",
"FORMULA" -> "C44H69NO12",
"NAME" -> "Tacrolimus",
"HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
"SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
"METABOLISM" -> "The metabolism 500"
)),
Row(
Map("ID" -> "2",
"STRUCTUREID" -> "MFCD00869854",
"MOLFILE" -> "The MOL Data",
"MOLWEIGHT" -> "603.482",
"FORMULA" -> "",
"NAME" -> "Tacrolimus2",
"HASH" -> "52b966c551cfe0fa7d526bac16abcb7be8b8867d",
"SMILES" -> """[H][C@]12O[C@](O)([C@H](C)C[C@@H]1OC)""",
"METABOLISM" -> "The metabolism 500"
))
)
val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), recordSchema)

在 Batch 中使用它更有魅力,没问题。

现在我尝试使用 MemoryStream 进入流模式进行测试。我添加了以下内容:

implicit val ctx = spark.sqlContext
val intsInput = MemoryStream[Row]


但是编译器提示如下:

No implicits found for parameter evidence$1: Encoder[Row]



因此,我的问题是:我应该在这里做什么才能让它发挥作用

我还看到,如果我添加以下导入,错误就会消失:

import spark.implicits._



实际上,我现在收到以下警告而不是错误

Ambiguous implicits for parameter evidence$1: Encoder[Row]



我不太了解编码器机制,如果有人能向我解释如何不使用这些隐式,我将不胜感激。原因是当涉及到从 Rows 创建 DataFrame 时,我在书中将以下内容标记为红色。

推荐方法:
val myManualSchema = new StructType(Array(
new StructField("some", StringType, true),
new StructField("col", StringType, true),
new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show()

然后作者继续说:

In Scala, we can also take advantage of Spark’s implicits in the console (and if you import them in your JAR code) by running toDF on a Seq type. This does not play well with null types, so it’s not necessarily recommended for production use cases.


val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")

如果有人可以花时间解释当我使用隐式时我的场景中发生了什么,并且这样做相当安全,或者有没有办法更明确地做到这一点而不导入隐式。

最后,如果有人能指出我关于编码器和 Spark 类型映射的好文档,那就太好了。

编辑1

我终于让它工作了
  implicit val ctx = spark.sqlContext
import spark.implicits._
val rows = MemoryStream[Map[String,String]]
val df = rows.toDF()

虽然我的问题是我对自己在做什么没有信心。在我看来,在某些情况下,我需要创建一个 DataSet 以便能够使用 toDF 转换将其转换为 DF[ROW]。我知道使用 DS 是类型安全的,但比使用 DF 慢。那么为什么这个中介有 DataSet 呢?这不是我第一次在 Spark Structured Streaming 中看到它。同样,如果有人可以帮助我解决这些问题,那就太好了。

最佳答案

我鼓励你使用 Scala 的 case classes用于数据建模。

final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)
现在您可以拥有一个 ListProduct在内存中:
  val inMemoryRecords: List[Product] = List(
Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
)
structured streaming API使用广为人知的 Dataset[T] 可以轻松推理流处理。抽象。粗略地说,你只需要担心三件事:
  • Source :源可以生成输入数据流,我们可以将其表示为 Dataset[Input] .每个新数据项 Input到达的将被附加到这个无界数据集中。您可以随意操作数据(例如 Dataset[Input] => Dataset[Output] )。
  • StreamingQueriesSink :查询生成一个结果表,该表在每个触发间隔从源更新。更改被写入称为 Sink 的外部存储中。
  • Output modes :有不同的模式可以将数据写入 Sink:完整模式、追加模式和更新模式。

  • 假设您想知道分子量大于 200 单位的产品。
    正如您所说,使用批处理 API 非常简单直接:
    // Create an static dataset using the in-memory data
    val staticData: Dataset[Product] = spark.createDataset(inMemoryRecords)

    // Processing...
    val result: Dataset[Product] = staticData.filter(_.weight > 200)

    // Print results!
    result.show()
    使用 Streaming API 时,您只需要定义一个 source和一个 sink作为一个额外的步骤。在这个例子中,我们可以使用 MemoryStreamconsole sink 打印出结果。
    // Create an streaming dataset using the in-memory data (memory source)
    val productSource = MemoryStream[Product]
    productSource.addData(inMemoryRecords)

    val streamingData: Dataset[Product] = productSource.toDS()

    // Processing...
    val result: Dataset[Product] = streamingData.filter(_.weight > 200)

    // Print results by using the console sink.
    val query: StreamingQuery = result.writeStream.format("console").start()

    // Stop streaming
    query.awaitTermination(timeoutMs=5000)
    query.stop()

    请注意 staticDatastreamingData具有确切的类型签名(即 Dataset[Product] )。无论使用 Batch 还是 Streaming API,这都允许我们应用相同的处理步骤。你也可以考虑实现一个泛型方法 def processing[In, Out](inputData: Dataset[In]): Dataset[Out] = ???避免在两种方法中重复自己。
    完整代码示例:
    object ExMemoryStream extends App {

    // Boilerplate code...
    val spark: SparkSession = SparkSession.builder
    .appName("ExMemoryStreaming")
    .master("local[*]")
    .getOrCreate()

    spark.sparkContext.setLogLevel("ERROR")

    import spark.implicits._
    implicit val sqlContext: SQLContext = spark.sqlContext

    // Define your data models
    final case class Product(name: String, catalogNumber: String, cas: String, formula: String, weight: Double, mld: String)

    // Create some in-memory instances
    val inMemoryRecords: List[Product] = List(
    Product("Cyclohexanecarboxylic acid", " D19706", "1148027-03-5", "C(11)H(13)Cl(2)NO(5)", 310.131, "MFCD11226417"),
    Product("Tacrolimus", "G51159", "104987-11-3", "C(44)H(69)NO(12)", 804.018, "MFCD00869853"),
    Product("Methanol", "T57494", "173310-45-7", "C(8)H(8)Cl(2)O", 191.055, "MFCD27756662")
    )

    // Defining processing step
    def processing(inputData: Dataset[Product]): Dataset[Product] =
    inputData.filter(_.weight > 200)

    // STATIC DATASET
    val datasetStatic: Dataset[Product] = spark.createDataset(inMemoryRecords)

    println("This is the static dataset:")
    processing(datasetStatic).show()

    // STREAMING DATASET
    val productSource = MemoryStream[Product]
    productSource.addData(inMemoryRecords)

    val datasetStreaming: Dataset[Product] = productSource.toDS()

    println("This is the streaming dataset:")
    val query: StreamingQuery = processing(datasetStreaming).writeStream.format("console").start()
    query.awaitTermination(timeoutMs=5000)

    // Stop query and close Spark
    query.stop()
    spark.close()

    }

    关于scala - Spark Structured Streaming MemoryStream + Row + Encoders 问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52238269/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com