gpt4 book ai didi

spark-streaming - Spark结构化的流和过滤器

转载 作者:行者123 更新时间:2023-12-03 15:50:25 27 4
gpt4 key购买 nike

Spark 2.1,具有原始count(*),sum(field)的结构化流在 Parquet 文件上正常运行,但是过滤不起作用。
样例代码:

Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0.2.6.0.3-8
/_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.types._

val userSchema = new StructType()
.add("caseId", StringType)
.add("ts", LongType)
.add("rowtype", StringType)
.add("rowordernumber", IntegerType)
.add("parentrowordernumber", IntegerType)
.add("fieldname", StringType)
.add("valuestr", StringType)

val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2")

csvDF.createOrReplaceTempView("tmptable")
val aggDF = spark.sql("select count(*) from tmptable where rowtype='3600'")

aggDF
.writeStream
.outputMode("complete")
.format("console")
.start()

aggDF
.writeStream
.queryName("aggregates") // this query name will be the table name
.outputMode("complete")
.format("memory")
.start()
spark.sql("select * from aggregates").show()


// Exiting paste mode, now interpreting.

+--------+
|count(1)|
+--------+
+--------+

import org.apache.spark.sql.types._
userSchema: org.apache.spark.sql.types.StructType = StructType(StructField(caseId,StringType,true), StructField(ts,LongType,true), StructField(rowtype,StringType,true), StructField(rowordernumber,IntegerType,true), StructField(parentrowordernumber,IntegerType,true), StructField(fieldname,StringType,true), StructField(valuestr,StringType,true))
csvDF: org.apache.spark.sql.DataFrame = [caseId: string, ts: bigint ... 5 more fields]
aggDF: org.apache.spark.sql.DataFrame = [count(1): bigint]

-------------------------------------------
Batch: 0
-------------------------------------------
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+--------+
|count(1)|
+--------+
| 0|
+--------+

我也尝试过noSQL样式过滤:
val aggDF = csvDF.filter(“rowtype =='3600'”)。agg(count(“caseId”))

没有成功,我已经检查了 Parquet 文件,其中有些行rowtype ='3600'
[root@sandbox ~]# spark-sql
SPARK_MAJOR_VERSION is set to 2, using Spark2
spark-sql> select count(*) from tab1 where rowtype='3600' ;
433698463

最佳答案

当数据是静态的时,您无需指定自己的架构。在这种情况下,Spark可以自行计算 Parquet 数据集的架构。例如。:

case class Foo(lowercase: String, upperCase: String)
val df = spark.createDataset(List(Foo("abc","DEF"), Foo("ghi","JKL")))
df.write.parquet("/tmp/parquet-test")
val rdf = spark.read.parquet("/tmp/parquet-test")
rdf.printSchema
// root
// |-- lowercase: string (nullable = true)
// |-- upperCase: string (nullable = true)

在此阶段,随后的SQL查询将不考虑这种情况:

rdf.createOrReplaceTempView("rdf")
spark.sql("select uppercase from rdf").collect
// Array[org.apache.spark.sql.Row] = Array([DEF], [JKL])

Spark有一个选项 spark.sql.caseSensitive来启用/禁用区分大小写(默认值为 true),但它似乎仅在写入时有效。

尝试对流执行相同操作将导致异常:
java.lang.IllegalArgumentException: Schema must be specified when creating a streaming
source DataFrame. If some files already exist in the directory, then depending
on the file format you may be able to create a static DataFrame on that directory
with 'spark.read.load(directory)' and infer schema from it.

这为您提供以下选择:
  • 和您一样提供自己的架构(请注意,尽管它区分大小写)。
  • 请遵循异常(exception)中的建议,并从已存储在文件夹中的数据中获取架构:

  • val userSchema = spark.read.parquet("/tmp/parquet-test").schema
    val streamDf = spark.readStream.schema(userSchema).parquet("/tmp/parquet-test")
  • 告诉Spark通过将spark.sql.streaming.schemaInference设置为true来推断模式:

  • spark.sql("set spark.sql.streaming.schemaInference=true")
    val streamDf = spark.readStream.parquet("/tmp/parquet-test")
    streamDf.createOrReplaceTempView("stream_rdf")
    val query = spark.sql("select uppercase, count(*) from rdf group by uppercase")
    .writeStream
    .format("console")
    .outputMode("complete")
    .start

    关于spark-streaming - Spark结构化的流和过滤器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45411285/

    27 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com