gpt4 book ai didi

scala - Spark Streaming - 根据按键分组的键值对计算统计信息

转载 作者:行者123 更新时间:2023-12-02 22:54:26 31 4
gpt4 key购买 nike

背景:我正在使用 Spark Streaming 从 Kafka 流式传输事件,这些事件采用逗号分隔键值对的形式以下是事件如何流入我的 Spark 应用程序的示例。

Key1=Value1, Key2=Value2, Key3=Value3, Key4=Value4,responseTime=200
Key1=Value5, Key2=Value6, Key3=Value7, Key4=Value8,responseTime=150
Key1=Value9, Key2=Value10, Key3=Value11, Key4=Value12,responseTime=100

输出:

我想计算给定批处理间隔内按流中不同键分组的不同指标(平均值、计数等),例如

  1. 按 Key1、Key2 计算的平均响应时间(响应时间是每个事件中的关键之一)
  2. 按 Key1、Key2 计数

到目前为止我的尝试:

val stream = KafkaUtils
.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)

val pStream = stream.persist()

val events: DStream[String] = pStream.flatMap(_._2.split(","))
val pairs= events.map(data => data.split("=")).map(array => (array(0), array(1)))
// pairs results in tuples of (Key1, Value1), (Key2, Value2) and so on.

更新 - 03/04键 Key1、Key2...在传入流中可能会乱序到达。

感谢您的输入/提示。

最佳答案

一种可能的解决方案是这样的:

  • 创建一个代表每条记录的案例类,这样我们就不必处​​理元组:

    case class Record(
    key1: String, key2: String, key3: String, key4: String, rt: Double)
  • 使用正则表达式解析记录并删除格式错误的条目:

    import scala.util.matching.Regex

    val recordPattern = new Regex(
    "^Key1=(.*?), ?Key2=(.*?), ?Key3=(.*?), ?Key4=(.*?), ?" ++
    "responseTime=(0-9+)$"
    )

    val records = pStream.map {
    case recordPattern(key1, key2, key3, key4, rt) =>
    Some(Record(key1, key2, key3, key4, rt.toDouble))
    case _ => None
    }.flatMap(x => x) // Drop malformed
  • 将数据 reshape 为键值对:

    val pairs = records.map(r => ((r.key1, r.key2), r.rt))
  • 创建分区器并使用 StatCounter 聚合统计信息:

    import org.apache.spark.util.StatCounter
    import org.apache.spark.HashPartitioner

    val paritioner: HashPartitioner = ???

    pairs.combineByKey[StatCounter](
    StatCounter(_), _ merge _, _ merge _, paritioner
    )
  • 提取感兴趣的字段:

    stats.mapValues(s => (s.count, s.mean))

您也可以对无序数据尝试类似的操作,尽管我强烈建议修复上游的问题:

val kvPattern = "(\\w+)=(\\w+)".r
val pairs = pStream.map(line => {
val kvs = kvPattern.findAllMatchIn(line)
.map(m => (m.group(1), m.group(2))).toMap

// This will discard any malformed lines
// (lack of key1, key2, lack or invalid format of responseTime)
Try((
(kvs("Key1"), kvs("Key2")),
kvs("responseTime").toDouble
))

}).flatMap(_.toOption)

然后像以前一样继续。

关于scala - Spark Streaming - 根据按键分组的键值对计算统计信息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35780331/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com