gpt4 book ai didi

java - Apache Spark Sql——分组依据

转载 作者:行者123 更新时间:2023-12-02 12:44:15 25 4
gpt4 key购买 nike

我有以下来自 Rabbit MQ 的 JSON 数据

{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:30","data":{"RunStatus":1"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:35","data":{"RunStatus":3"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:40","data":{"RunStatus":2"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:45","data":{"RunStatus":3"}}
{"DeviceId":"MACH-101","TimeStamp":"29-06-2017 15:21:50","data":{"RunStatus":2"}}

{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:35","data":{"RunStatus":1"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:45","data":{"RunStatus":3"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:50","data":{"RunStatus":2"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:21:55","data":{"RunStatus":3"}}
{"DeviceId":"MACH-102","TimeStamp":"29-06-2017 15:22:00","data":{"RunStatus":2"}}

我试图获取设备所在的每个 RunStatus 的持续时间,因此对于上述设备 - MACH-101 的数据,RunStatus 看起来像这样

在运行状态 1 中,设备处于 - 5 秒 (30 - 35)在运行状态 2 中,设备处于 - 5 秒 (40 - 45)在运行状态 3 中,设备处于 - 10 秒 (35 - 40 + 45 - 50)

以上相同的逻辑也适用于第二设备数据。

下面是我正在尝试的 Apache Spark SQL 查询,但没有得到所需的结果。请提出一些替代方案;我也不介意以非 SQL 方式进行操作。

public static void main(String[] args) {

try {

mconf = new SparkConf();
mconf.setAppName("RabbitMqReceiver");
mconf.setMaster("local[*]");

jssc = new JavaStreamingContext(mconf,Durations.seconds(10));

SparkSession spksess = SparkSession
.builder()
.master("local[*]")
.appName("RabbitMqReceiver2")
.getOrCreate();

SQLContext sqlctxt = new SQLContext(spksess);

JavaDStream<String> strmData = jssc.receiverStream(new mqreceiver(StorageLevel.MEMORY_AND_DISK_2()));

JavaDStream<String> machineData = strmData.window(Durations.minutes(1),Durations.seconds(10));

sqlctxt.udf().register("custdatediff", new UDF2<String, String, String>() {

@Override public String call(String argdt1,String argdt2) {

DateTimeFormatter formatter = DateTimeFormat.forPattern("dd-MM-yyyy HH:mm:ss");
DateTime dt1 = formatter.parseDateTime(argdt1);
DateTime dt2 = formatter.parseDateTime(argdt2);

Seconds retsec = org.joda.time.Seconds.secondsBetween(dt2, dt1);
return retsec.toString();

}
},DataTypes.StringType);

machineData.foreachRDD(new VoidFunction<JavaRDD<String>>() {

@Override
public void call(JavaRDD<String> rdd) {
if(!rdd.isEmpty()){

Dataset<Row> df = sqlctxt.jsonRDD(rdd);
df.createOrReplaceTempView("DeviceData");

// I DONT WANT to GROUP by timestamp, but query requires I pass it.

Dataset<Row> searchResult = sqlctxt.sql("select t1.DeviceId,t1.data.runstatus,"
+ " custdatediff(CAST((t1.timestamp) as STRING),CAST((t2.timestamp) as STRING)) as duration from DeviceData t1"
+ " join DeviceData t2 on t1.DeviceId = t2.DeviceId group by t1.DeviceId,t1.data.runstatus,t1.timestamp,t2.timestamp");

searchResult.show();

}
}
});

jssc.start();

jssc.awaitTermination();

} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

上述代码/sql执行的示例结果如下

+--------+---------+--------+
|DeviceId|runstatus|duration|
+--------+---------+--------+
| NTC-167| 2| PT0S|
| NTC-168| 2| PT0S|
| NTC-168| 2| PT-10S|
| NTC-168| 2| PT-15S|
| NTC-168| 1| PT10S|
| NTC-168| 1| PT0S|
| NTC-168| 1| PT-5S|
| NTC-168| 1| PT15S|
| NTC-168| 1| PT5S|
| NTC-168| 1| PT0S|
+--------+---------+--------+

因此您可以看到状态是重复的,并且在重复的行中,其中之一具有正确的结果。我编写的查询也迫使我按时间戳分组,我想如果我可以避免按时间戳分组结果可能是正确的......对此不确定。

最佳答案

您可以尝试使用 Dataframe 和 Window 函数。使用窗口函数中的“lead”,您可以将当前行时间戳与下一行时间戳进行比较,并找到每个设备和运行状态的差异。就像下面这样,

 val windowSpec_wk = Window.partitionBy(df1("DeviceID")).orderBy(df1("timestamp"))
val df2 = df1.withColumn("period", lead(df1("timestamp"), 1).over(windowSpec_wk))

关于java - Apache Spark Sql——分组依据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44824504/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com