gpt4 book ai didi

apache-spark - Spark - 将 kafka 流式传输到每天都在变化的文件?

转载 作者:行者123 更新时间:2023-12-04 04:39:08 25 4
gpt4 key购买 nike

我有一个 kafka 流,我将在 spark 中处理。我想将此流的输出写入文件。但是,我想按天对这些文件进行分区,因此每天它都会开始写入一个新文件。可以做这样的事情吗?我希望它继续运行,当新的一天到来时,它会切换到写入一个新文件。

val streamInputDf = spark.readStream.format("kafka")
.option("kafka.bootstrapservers", "XXXX")
.option("subscribe", "XXXX")
.load()
val streamSelectDf = streamInputDf.select(...)

streamSelectDf.writeStream.format("parquet)
.option("path", "xxx")
???

最佳答案

Adding partition from spark can be done with partitionBy provided in DataFrameWriter for non-streamed or with DataStreamWriter for streamed data.



以下是签名:

public DataFrameWriter partitionBy(scala.collection.Seq colNames)

DataStreamWriter partitionBy(scala.collection.Seq colNames) Partitions the output by the given columns on the file system.

DataStreamWriter partitionBy(String... colNames) Partitions the output by the given columns on the file system.

Description : partitionBy public DataStreamWriter partitionBy(String... colNames) Partitions the output by the given columns on the file system. If specified, the output is laid out on the file system similar to Hive's partitioning scheme. As an example, when we partition a dataset by year and then month, the directory layout would look like:

 - year=2016/month=01/ - year=2016/month=02/

Partitioning is one of the most widely used techniques to optimize physical data layout. It provides a coarse-grained index for skipping unnecessary data reads when queries have predicates on the partitioned columns. In order for partitioning to work well, the number of distinct values in each column should typically be less than tens of thousands.

Parameters: colNames - (undocumented) Returns: (undocumented) Since: 2.0.0



因此,如果您想按年和月对数据进行分区,spark 会将数据保存到如下文件夹中:
year=2019/month=01/05year=2019/month=02/05

Option 1 (Direct write):You have mentioned parquet - you can use saving as a parquet format with:

df.write.partitionBy('year', 'month','day').format("parquet").save(path)

选项 2 (使用相同的 partitionBy 插入到 hive 中):

您还可以插入到 hive 表中,例如:
df.write.partitionBy('year', 'month', 'day').insertInto(String tableName)

获取所有 hive 分区:

Spark sql 基于 hive 查询语言,所以你可以使用 SHOW PARTITIONS
获取特定表中的分区列表。
sparkSession.sql("SHOW PARTITIONS partitionedHiveParquetTable")

结论 :
我建议选项 2 ... 因为 Advantage 是后来的,您可以基于分区查询数据(也就是查询原始数据以了解您收到的内容)并且底层文件可以是 parquet 或 orc。

注意:

只要确保你有 .enableHiveSupport()当您使用 SparkSessionBuilder 创建 session 时并确定您是否有 hive-conf.xml等配置正确。

关于apache-spark - Spark - 将 kafka 流式传输到每天都在变化的文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55184770/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com