gpt4 book ai didi

dataframe - 时间序列/报价数据集的 Spark 转换

转载 作者:行者123 更新时间:2023-12-02 23:41:04 29 4
gpt4 key购买 nike

我们在 hive 中有一个表,它将每天结束时的交易订单数据存储为 order_date。其他重要的列是产品契约(Contract)价格(下订单的价格),ttime(交易时间)状态(插入、更新或删除)价格(订单价格)

我们必须从主表中以逐笔数据的方式构建一个图表,其中包含从市场开盘的早上到当时每行(订单)的最大和最小价格订单。即对于给定的订单,我们将有 4 列填充为 maxPrice(到目前为止的最高价格)、maxpriceOrderId(最高价格的 orderid)、minPrice 和 minPriceOrderId
这必须适用于每个产品、契约(Contract),即该产品、契约(Contract)的最高和最低价格。

在计算这些值时,我们需要排除所有已平仓订单来自聚合。即到目前为止所有订单价格的最大值和最小值,不包括状态为“删除”的订单

我们使用的是:Spark 2.2,输入数据格式是 parquet。输入记录 enter image description here

输出记录

enter image description here

给出一个简单的 SQL View - 问题通过自连接解决,如下所示:通过 ttime 上的有序数据集,我们必须获取特定产品的最高和最低价格,从早上到该订单时间的每一行(订单)的契约(Contract)。这将针对批处理中的每个 eod (order_date) 数据集运行:

select mainSet.order_id,    mainSet.product,mainSet.contract,mainSet.order_date,mainSet.price,mainSet.ttime,mainSet.status,
max(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) as max_price,
first_value(aggSet.order_id) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) order by (aggSet.price desc,aggSet.ttime desc ) as maxOrderId
min(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) as min_price as min_price
first_value(aggSet.order_id) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) order by (aggSet.price ,aggSet.ttime) as minOrderId
from order_table mainSet
join order_table aggSet
ON (mainSet.produuct=aggSet.product,
mainSet.contract=aggSet.contract,
mainSet.ttime>=aggSet.ttime,
aggSet.status <> 'Remove')

用 Spark 编写:

我们从 Spark sql 开始,如下所示:

val mainDF: DataFrame= sparkSession.sql("select * from order_table where order_date ='eod_date' ")

val ndf=mainDf.alias("mainSet").join(mainDf.alias("aggSet"),
(col("mainSet.product")===col("aggSet.product")
&& col("mainSet.contract")===col("aggSet.contract")
&& col("mainSet.ttime")>= col("aggSet.ttime")
&& col("aggSet.status") <> "Remove")
,"inner")
.select(mainSet.order_id,mainSet.ttime,mainSet.product,mainSet.contract,mainSet.order_date,mainSet.price,mainSet.status,aggSet.order_id as agg_orderid,aggSet.ttime as agg_ttime,price as agg_price) //Renaming of columns

val max_window = Window.partitionBy(col("product"),col("contract"),col("ttime"))
val min_window = Window.partitionBy(col("product"),col("contract"),col("ttime"))
val maxPriceCol = max(col("agg_price")).over(max_window)
val minPriceCol = min(col("agg_price")).over(min_window)
val firstMaxorder = first_value(col("agg_orderid")).over(max_window.orderBy(col("agg_price").desc, col("agg_ttime").desc))
val firstMinorder = first_value(col("agg_orderid")).over(min_window.orderBy(col("agg_price"), col("agg_ttime")))


val priceDF= ndf.withColumn("max_price",maxPriceCol)
.withColumn("maxOrderId",firstMaxorder)
.withColumn("min_price",minPriceCol)
.withColumn("minOrderId",firstMinorder)

priceDF.show(20)

成交量统计:

平均计数 700 万条记录每个组的平均计数(产品、契约(Contract))= 600K

该作业运行了几个小时,但还没有完成。我尝试过增加内存和其他参数,但没有成功。作业陷入困境,很多时候我遇到内存问题容器因超出内存限制而被 YARN 杀死。使用了 4.9 GB 或 4.5 GB 物理内存。考虑提升spark.yarn.executor.memoryOverhead

另一种方法:

对最低组列(产品和契约(Contract))进行重新分区,然后按时在分区内进行排序,以便我们按时收到为 mapPartition 函数排序的每一行。

执行mappartition,同时在分区级别维护一个集合(键为order_id,价格为值),以计算最高和最低价格及其orderid。

当我们收到状态为“删除”的订单时,我们将继续从集合中删除这些订单。一旦 Mapparition 中给定行的集合被更新,我们就可以计算集合中的最大值和最小值并返回更新的行。

val mainDF: DataFrame= sparkSession.sql("select order_id,product,contract,order_date,price,status,null as maxPrice,null as maxPriceOrderId,null as minPrice,null as minPriceOrderId from order_table where order_date ='eod_date' ").repartitionByRange(col("product"),col("contract"))

case class summary(order_id:String ,ttime:string,product:String,contract :String,order_date:String,price:BigDecimal,status :String,var maxPrice:BigDecimal,var maxPriceOrderId:String ,var minPrice:BigDecimal,var minPriceOrderId String)

val summaryEncoder = Encoders.product[summary]
val priceDF= mainDF.as[summary](summaryEncoder).sortWithinPartitions(col("ttime")).mapPartitions( iter => {
//collection at partition level
//key as order_id and value as price
var priceCollection = Map[String, BigDecimal]()

iter.map( row => {
val orderId= row.order_id
val rowprice= row.price

priceCollection = row.status match {
case "Remove" => if (priceCollection.contains(orderId)) priceCollection -= orderId
case _ => priceCollection += (orderId -> rowPrice)
}

row.maxPrice = if(priceCollection.size > 0) priceCollection.maxBy(_._2)._2 // Gives key,value tuple from collectin for max value )
row.maxPriceOrderId = if(priceCollection.size > 0) priceCollection.maxBy(_._2)._1

row.minPrice = if(priceCollection.size > 0) priceCollection.minBy(_._2)._2 // Gives key,value tuple from collectin for min value )
row.minPriceOrderId = if(priceCollection.size > 0) priceCollection.minBy(_._2)._1

row

})
}).show(20)

对于较小的数据集,这运行良好并在 20 分钟内完成,但我发现对于 23 个工厂记录(具有 17 个差异产品和契约(Contract)),结果似乎不正确。我可以看到来自 mappartition 的一个分区(输入拆分)的数据正在转到另一个分区,从而弄乱了值。

--> 我们能否实现这样一种情况,即我可以保证每个映射分区任务都能在此处获取功能键(产品和契约(Contract))的所有数据。。 据我所知,mappartition 在每个 Spark 分区上执行函数(类似于 Map Reduce 中的输入拆分),因此如何强制 Spark 创建具有该产品和契约(Contract)组的所有值的输入拆分/分区。

--> 还有其他方法可以解决这个问题吗?

非常感谢您的帮助,因为我们被困在这里。

最佳答案

编辑:这是一个 article为什么许多小文件不好

Why is poorly compacted data bad? Poorly compacted data is bad for Spark applications in the sense that it is extremely slow to process. Continuing with our previous example, anytime we want to process a day’s worth of events we have to open up 86,400 files to get to the data. This slows down processing massively because our Spark application is effectively spending most of its time just opening and closing files. What we normally want is for our Spark application to spend most of its time actually processing the data. We’ll do some experiments next to show the difference in performance when using properly compacted data as compared to poorly compacted data.

<小时/>

我敢打赌,如果您正确地将源数据分区为您要加入的方式并摆脱所有这些窗口,您最终会处于一个更好的位置。

每次点击partitionBy时,都会强制进行洗牌,每次点击orderBy时,都会强制进行昂贵的排序。

我建议您查看 Dataset API 并学习一些 groupBy 和 flatMapGroups/reduce/sliding 以进行 O(n) 时间计算。您可以一次性获得最小值/最大值。

此外,听起来您的驱动程序由于许多小文件问题而耗尽了内存。尝试尽可能压缩源数据并正确分区表。在这种特殊情况下,我建议按 order_date 进行分区(也许每天?),然后对产品和契约(Contract)进行子分区。

这是我花了大约 30 分钟编写的代码片段,其运行速度可能比您的窗口函数好得多。它应该在 O(n) 时间内运行,但如果您有许多小文件问题,它并不能弥补。如果有什么遗漏请告诉我。

import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import scala.collection.mutable

case class Summary(
order_id: String,
ttime: String,
product: String,
contract: String,
order_date: String,
price: BigDecimal,
status: String,
maxPrice: BigDecimal = 0,
maxPriceOrderId: String = null,
minPrice: BigDecimal = 0,
minPriceOrderId: String = null
)

class Workflow()(implicit spark: SparkSession) {

import MinMaxer.summaryEncoder

val mainDs: Dataset[Summary] =
spark.sql(
"""
select order_id, ttime, product, contract, order_date, price, status
from order_table where order_date ='eod_date'
"""
).as[Summary]

MinMaxer.minMaxDataset(mainDs)
}

object MinMaxer {

implicit val summaryEncoder: Encoder[Summary] = Encoders.product[Summary]
implicit val groupEncoder: Encoder[(String, String)] = Encoders.product[(String, String)]

object SummaryOrderer extends Ordering[Summary] {
def compare(x: Summary, y: Summary): Int = x.ttime.compareTo(y.ttime)
}

def minMaxDataset(ds: Dataset[Summary]): Dataset[Summary] = {
ds
.groupByKey(x => (x.product, x.contract))
.flatMapGroups({ case (_, t) =>
val sortedRecords: Seq[Summary] = t.toSeq.sorted(SummaryOrderer)

generateMinMax(sortedRecords)
})
}

def generateMinMax(summaries: Seq[Summary]): Seq[Summary] = {
summaries.foldLeft(mutable.ListBuffer[Summary]())({case (b, summary) =>

if (b.lastOption.nonEmpty) {
val lastSummary: Summary = b.last

var minPrice: BigDecimal = 0
var minPriceOrderId: String = null
var maxPrice: BigDecimal = 0
var maxPriceOrderId: String = null

if (summary.status != "remove") {
if (lastSummary.minPrice >= summary.price) {
minPrice = summary.price
minPriceOrderId = summary.order_id
} else {
minPrice = lastSummary.minPrice
minPriceOrderId = lastSummary.minPriceOrderId
}

if (lastSummary.maxPrice <= summary.price) {
maxPrice = summary.price
maxPriceOrderId = summary.order_id
} else {
maxPrice = lastSummary.maxPrice
maxPriceOrderId = lastSummary.maxPriceOrderId
}

b.append(
summary.copy(
maxPrice = maxPrice,
maxPriceOrderId = maxPriceOrderId,
minPrice = minPrice,
minPriceOrderId = minPriceOrderId
)
)
} else {
b.append(
summary.copy(
maxPrice = lastSummary.maxPrice,
maxPriceOrderId = lastSummary.maxPriceOrderId,
minPrice = lastSummary.minPrice,
minPriceOrderId = lastSummary.minPriceOrderId
)
)
}
} else {
b.append(
summary.copy(
maxPrice = summary.price,
maxPriceOrderId = summary.order_id,
minPrice = summary.price,
minPriceOrderId = summary.order_id
)
)
}

b
})
}
}

关于dataframe - 时间序列/报价数据集的 Spark 转换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58027080/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com