- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们在 hive 中有一个表,它将每天结束时的交易订单数据存储为 order_date。其他重要的列是产品,契约(Contract),价格(下订单的价格),ttime(交易时间)状态(插入、更新或删除)价格(订单价格)
我们必须从主表中以逐笔数据的方式构建一个图表,其中包含从市场开盘的早上到当时每行(订单)的最大和最小价格订单。即对于给定的订单,我们将有 4 列填充为 maxPrice(到目前为止的最高价格)、maxpriceOrderId(最高价格的 orderid)、minPrice 和 minPriceOrderId
这必须适用于每个产品、契约(Contract),即该产品、契约(Contract)的最高和最低价格。
在计算这些值时,我们需要排除所有已平仓订单来自聚合。即到目前为止所有订单价格的最大值和最小值,不包括状态为“删除”的订单
我们使用的是:Spark 2.2,输入数据格式是 parquet。输入记录
输出记录
给出一个简单的 SQL View - 问题通过自连接解决,如下所示:通过 ttime 上的有序数据集,我们必须获取特定产品的最高和最低价格,从早上到该订单时间的每一行(订单)的契约(Contract)。这将针对批处理中的每个 eod (order_date) 数据集运行:
select mainSet.order_id, mainSet.product,mainSet.contract,mainSet.order_date,mainSet.price,mainSet.ttime,mainSet.status,
max(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) as max_price,
first_value(aggSet.order_id) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) order by (aggSet.price desc,aggSet.ttime desc ) as maxOrderId
min(aggSet.price) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) as min_price as min_price
first_value(aggSet.order_id) over (partition by mainSet.product,mainSet.contract,mainSet.ttime) order by (aggSet.price ,aggSet.ttime) as minOrderId
from order_table mainSet
join order_table aggSet
ON (mainSet.produuct=aggSet.product,
mainSet.contract=aggSet.contract,
mainSet.ttime>=aggSet.ttime,
aggSet.status <> 'Remove')
用 Spark 编写:
我们从 Spark sql 开始,如下所示:
val mainDF: DataFrame= sparkSession.sql("select * from order_table where order_date ='eod_date' ")
val ndf=mainDf.alias("mainSet").join(mainDf.alias("aggSet"),
(col("mainSet.product")===col("aggSet.product")
&& col("mainSet.contract")===col("aggSet.contract")
&& col("mainSet.ttime")>= col("aggSet.ttime")
&& col("aggSet.status") <> "Remove")
,"inner")
.select(mainSet.order_id,mainSet.ttime,mainSet.product,mainSet.contract,mainSet.order_date,mainSet.price,mainSet.status,aggSet.order_id as agg_orderid,aggSet.ttime as agg_ttime,price as agg_price) //Renaming of columns
val max_window = Window.partitionBy(col("product"),col("contract"),col("ttime"))
val min_window = Window.partitionBy(col("product"),col("contract"),col("ttime"))
val maxPriceCol = max(col("agg_price")).over(max_window)
val minPriceCol = min(col("agg_price")).over(min_window)
val firstMaxorder = first_value(col("agg_orderid")).over(max_window.orderBy(col("agg_price").desc, col("agg_ttime").desc))
val firstMinorder = first_value(col("agg_orderid")).over(min_window.orderBy(col("agg_price"), col("agg_ttime")))
val priceDF= ndf.withColumn("max_price",maxPriceCol)
.withColumn("maxOrderId",firstMaxorder)
.withColumn("min_price",minPriceCol)
.withColumn("minOrderId",firstMinorder)
priceDF.show(20)
成交量统计:
平均计数 700 万条记录每个组的平均计数(产品、契约(Contract))= 600K
该作业运行了几个小时,但还没有完成。我尝试过增加内存和其他参数,但没有成功。作业陷入困境,很多时候我遇到内存问题容器因超出内存限制而被 YARN 杀死。使用了 4.9 GB 或 4.5 GB 物理内存。考虑提升spark.yarn.executor.memoryOverhead
另一种方法:
对最低组列(产品和契约(Contract))进行重新分区,然后按时在分区内进行排序,以便我们按时收到为 mapPartition 函数排序的每一行。
执行mappartition,同时在分区级别维护一个集合(键为order_id,价格为值),以计算最高和最低价格及其orderid。
当我们收到状态为“删除”的订单时,我们将继续从集合中删除这些订单。一旦 Mapparition 中给定行的集合被更新,我们就可以计算集合中的最大值和最小值并返回更新的行。
val mainDF: DataFrame= sparkSession.sql("select order_id,product,contract,order_date,price,status,null as maxPrice,null as maxPriceOrderId,null as minPrice,null as minPriceOrderId from order_table where order_date ='eod_date' ").repartitionByRange(col("product"),col("contract"))
case class summary(order_id:String ,ttime:string,product:String,contract :String,order_date:String,price:BigDecimal,status :String,var maxPrice:BigDecimal,var maxPriceOrderId:String ,var minPrice:BigDecimal,var minPriceOrderId String)
val summaryEncoder = Encoders.product[summary]
val priceDF= mainDF.as[summary](summaryEncoder).sortWithinPartitions(col("ttime")).mapPartitions( iter => {
//collection at partition level
//key as order_id and value as price
var priceCollection = Map[String, BigDecimal]()
iter.map( row => {
val orderId= row.order_id
val rowprice= row.price
priceCollection = row.status match {
case "Remove" => if (priceCollection.contains(orderId)) priceCollection -= orderId
case _ => priceCollection += (orderId -> rowPrice)
}
row.maxPrice = if(priceCollection.size > 0) priceCollection.maxBy(_._2)._2 // Gives key,value tuple from collectin for max value )
row.maxPriceOrderId = if(priceCollection.size > 0) priceCollection.maxBy(_._2)._1
row.minPrice = if(priceCollection.size > 0) priceCollection.minBy(_._2)._2 // Gives key,value tuple from collectin for min value )
row.minPriceOrderId = if(priceCollection.size > 0) priceCollection.minBy(_._2)._1
row
})
}).show(20)
对于较小的数据集,这运行良好并在 20 分钟内完成,但我发现对于 23 个工厂记录(具有 17 个差异产品和契约(Contract)),结果似乎不正确。我可以看到来自 mappartition 的一个分区(输入拆分)的数据正在转到另一个分区,从而弄乱了值。
--> 我们能否实现这样一种情况,即我可以保证每个映射分区任务都能在此处获取功能键(产品和契约(Contract))的所有数据。。 据我所知,mappartition 在每个 Spark 分区上执行函数(类似于 Map Reduce 中的输入拆分),因此如何强制 Spark 创建具有该产品和契约(Contract)组的所有值的输入拆分/分区。
--> 还有其他方法可以解决这个问题吗?
非常感谢您的帮助,因为我们被困在这里。
最佳答案
编辑:这是一个 article为什么许多小文件不好
<小时/>Why is poorly compacted data bad? Poorly compacted data is bad for Spark applications in the sense that it is extremely slow to process. Continuing with our previous example, anytime we want to process a day’s worth of events we have to open up 86,400 files to get to the data. This slows down processing massively because our Spark application is effectively spending most of its time just opening and closing files. What we normally want is for our Spark application to spend most of its time actually processing the data. We’ll do some experiments next to show the difference in performance when using properly compacted data as compared to poorly compacted data.
我敢打赌,如果您正确地将源数据分区为您要加入的方式并摆脱所有这些窗口,您最终会处于一个更好的位置。
每次点击partitionBy时,都会强制进行洗牌,每次点击orderBy时,都会强制进行昂贵的排序。
我建议您查看 Dataset API 并学习一些 groupBy 和 flatMapGroups/reduce/sliding 以进行 O(n) 时间计算。您可以一次性获得最小值/最大值。
此外,听起来您的驱动程序由于许多小文件问题而耗尽了内存。尝试尽可能压缩源数据并正确分区表。在这种特殊情况下,我建议按 order_date 进行分区(也许每天?),然后对产品和契约(Contract)进行子分区。
这是我花了大约 30 分钟编写的代码片段,其运行速度可能比您的窗口函数好得多。它应该在 O(n) 时间内运行,但如果您有许多小文件问题,它并不能弥补。如果有什么遗漏请告诉我。
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import scala.collection.mutable
case class Summary(
order_id: String,
ttime: String,
product: String,
contract: String,
order_date: String,
price: BigDecimal,
status: String,
maxPrice: BigDecimal = 0,
maxPriceOrderId: String = null,
minPrice: BigDecimal = 0,
minPriceOrderId: String = null
)
class Workflow()(implicit spark: SparkSession) {
import MinMaxer.summaryEncoder
val mainDs: Dataset[Summary] =
spark.sql(
"""
select order_id, ttime, product, contract, order_date, price, status
from order_table where order_date ='eod_date'
"""
).as[Summary]
MinMaxer.minMaxDataset(mainDs)
}
object MinMaxer {
implicit val summaryEncoder: Encoder[Summary] = Encoders.product[Summary]
implicit val groupEncoder: Encoder[(String, String)] = Encoders.product[(String, String)]
object SummaryOrderer extends Ordering[Summary] {
def compare(x: Summary, y: Summary): Int = x.ttime.compareTo(y.ttime)
}
def minMaxDataset(ds: Dataset[Summary]): Dataset[Summary] = {
ds
.groupByKey(x => (x.product, x.contract))
.flatMapGroups({ case (_, t) =>
val sortedRecords: Seq[Summary] = t.toSeq.sorted(SummaryOrderer)
generateMinMax(sortedRecords)
})
}
def generateMinMax(summaries: Seq[Summary]): Seq[Summary] = {
summaries.foldLeft(mutable.ListBuffer[Summary]())({case (b, summary) =>
if (b.lastOption.nonEmpty) {
val lastSummary: Summary = b.last
var minPrice: BigDecimal = 0
var minPriceOrderId: String = null
var maxPrice: BigDecimal = 0
var maxPriceOrderId: String = null
if (summary.status != "remove") {
if (lastSummary.minPrice >= summary.price) {
minPrice = summary.price
minPriceOrderId = summary.order_id
} else {
minPrice = lastSummary.minPrice
minPriceOrderId = lastSummary.minPriceOrderId
}
if (lastSummary.maxPrice <= summary.price) {
maxPrice = summary.price
maxPriceOrderId = summary.order_id
} else {
maxPrice = lastSummary.maxPrice
maxPriceOrderId = lastSummary.maxPriceOrderId
}
b.append(
summary.copy(
maxPrice = maxPrice,
maxPriceOrderId = maxPriceOrderId,
minPrice = minPrice,
minPriceOrderId = minPriceOrderId
)
)
} else {
b.append(
summary.copy(
maxPrice = lastSummary.maxPrice,
maxPriceOrderId = lastSummary.maxPriceOrderId,
minPrice = lastSummary.minPrice,
minPriceOrderId = lastSummary.minPriceOrderId
)
)
}
} else {
b.append(
summary.copy(
maxPrice = summary.price,
maxPriceOrderId = summary.order_id,
minPrice = summary.price,
minPriceOrderId = summary.order_id
)
)
}
b
})
}
}
关于dataframe - 时间序列/报价数据集的 Spark 转换,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58027080/
初学者 android 问题。好的,我已经成功写入文件。例如。 //获取文件名 String filename = getResources().getString(R.string.filename
我已经将相同的图像保存到/data/data/mypackage/img/中,现在我想显示这个全屏,我曾尝试使用 ACTION_VIEW 来显示 android 标准程序,但它不是从/data/dat
我正在使用Xcode 9,Swift 4。 我正在尝试使用以下代码从URL在ImageView中显示图像: func getImageFromUrl(sourceUrl: String) -> UII
我的 Ubuntu 安装 genymotion 有问题。主要是我无法调试我的数据库,因为通过 eclipse 中的 DBMS 和 shell 中的 adb 我无法查看/data/文件夹的内容。没有显示
我正在尝试用 PHP 发布一些 JSON 数据。但是出了点问题。 这是我的 html -- {% for x in sets %}
我观察到两种方法的结果不同。为什么是这样?我知道 lm 上发生了什么,但无法弄清楚 tslm 上发生了什么。 > library(forecast) > set.seed(2) > tts lm(t
我不确定为什么会这样!我有一个由 spring data elasticsearch 和 spring data jpa 使用的类,但是当我尝试运行我的应用程序时出现错误。 Error creatin
在 this vega 图表,如果我下载并转换 flare-dependencies.json使用以下 jq 到 csv命令, jq -r '(map(keys) | add | unique) as
我正在提交一个项目,我必须在其中创建一个带有表的 mysql 数据库。一切都在我这边进行,所以我只想检查如何将我所有的压缩文件发送给使用不同计算机的人。基本上,我如何为另一台计算机创建我的数据库文件,
我有一个应用程序可以将文本文件写入内部存储。我想仔细看看我的电脑。 我运行了 Toast.makeText 来显示路径,它说:/数据/数据/我的包 但是当我转到 Android Studio 的 An
我喜欢使用 Genymotion 模拟器以如此出色的速度加载 Android。它有非常好的速度,但仍然有一些不稳定的性能。 如何从 Eclipse 中的文件资源管理器访问 Genymotion 模拟器
我需要更改 Silverlight 中文本框的格式。数据通过 MVVM 绑定(bind)。 例如,有一个 int 属性,我将 1 添加到 setter 中的值并调用 OnPropertyChanged
我想向 Youtube Data API 提出请求,但我不需要访问任何用户信息。我只想浏览公共(public)视频并根据搜索词显示视频。 我可以在未经授权的情况下这样做吗? 最佳答案 YouTube
我已经设置了一个 Twilio 应用程序,我想向人们发送更新,但我不想回复单个文本。我只是想让他们在有问题时打电话。我一切正常,但我想在发送文本时显示传入文本,以确保我不会错过任何问题。我正在使用 p
我有一个带有表单的网站(目前它是纯 HTML,但我们正在切换到 JQuery)。流程是这样的: 接受用户的输入 --- 5 个整数 通过 REST 调用网络服务 在服务器端运行一些计算...并生成一个
假设我们有一个名为 configuration.js 的文件,当我们查看内部时,我们会看到: 'use strict'; var profile = { "project": "%Projec
这部分是对 Previous Question 的扩展我的: 我现在可以从我的 CI Controller 成功返回 JSON 数据,它返回: {"results":[{"id":"1","Sourc
有什么有效的方法可以删除 ios 中 CBL 的所有文档存储?我对此有疑问,或者,如果有人知道如何从本质上使该应用程序像刚刚安装一样,那也会非常有帮助。我们正在努力确保我们的注销实际上将应用程序设置为
我有一个 Rails 应用程序,它与其他 Rails 应用程序通信以进行数据插入。我使用 jQuery $.post 方法进行数据插入。对于插入,我的其他 Rails 应用程序显示 200 OK。但在
我正在为服务于发布请求的 API 调用运行单元测试。我正在传递请求正文,并且必须将响应作为帐户数据返回。但我只收到断言错误 注意:数据是从 Azure 中获取的 spec.js const accou
我是一名优秀的程序员,十分优秀!