gpt4 book ai didi

java - Spark Structured Streaming - 有状态流处理中使用窗口操作进行事件处理

转载 作者:行者123 更新时间:2023-12-01 22:54:50 24 4
gpt4 key购买 nike

我是 Spark 结构化流处理的新手,目前正在研究一个用例,其中结构化流应用程序将从 Azure IoT 中心-事件中心获取事件(例如每 20 秒后)。

任务是消耗这些事件并实时处理它。为此,我在 Spark-Java 中编写了下面的 Spark 结构化流程序。

以下是要点

  1. 目前我已经应用了10分钟的窗口操作间隔和 5 分钟滑动窗口。
  2. 水印以 10 分钟的间隔应用于 eventDate 参数。
  3. 目前我没有执行任何其他操作,只是将其以 Parquet 格式存储在指定位置。
  4. 程序正在将一个事件存储在一个文件中。

问题:

  1. 是否可以在一个文件中以 parquet 格式存储多个事件基于窗口时间?
  2. 在这种情况下窗口操作如何工作?
  3. 此外,我想检查之前事件的事件状态,并根据一些计算(假设 5 分钟内未收到事件)我想更新状态。

...

public class EventSubscriber {

public static void main(String args[]) throws InterruptedException, StreamingQueryException {

String eventHubCompatibleEndpoint = "<My-EVENT HUB END POINT CONNECTION STRING>";

String connString = new ConnectionStringBuilder(eventHubCompatibleEndpoint).build();

EventHubsConf eventHubsConf = new EventHubsConf(connString).setConsumerGroup("$Default")
.setStartingPosition(EventPosition.fromEndOfStream()).setMaxRatePerPartition(100)
.setReceiverTimeout(java.time.Duration.ofMinutes(10));

SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("IoT Spark Streaming");

SparkSession spSession = SparkSession.builder()
.appName("IoT Spark Streaming")
.config(sparkConf).config("spark.sql.streaming.checkpointLocation", "<MY-CHECKPOINT-LOCATION>")
.getOrCreate();

Dataset<Row> inputStreamDF = spSession.readStream()
.format("eventhubs")
.options(eventHubsConf.toMap())
.load();

Dataset<Row> bodyRow = inputStreamDF.withColumn("body", new Column("body").cast(DataTypes.StringType)).select("body");

StructType jsonStruct = new StructType()
.add("eventType", DataTypes.StringType)
.add("payload", DataTypes.StringType);

Dataset<Row> messageRow = bodyRow.map((MapFunction<Row, Row>) value -> {
String valStr = value.getString(0).toString();

String payload = valStr;

Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();

JsonObject jsonObj = gson.fromJson(valStr, JsonObject.class);

JsonElement methodName = jsonObj.get("method");

String eventType = null;
if(methodName != null) {
eventType = "OTHER_EVENT";
} else {
eventType = "DEVICE_EVENT";
}

Row jsonRow = RowFactory.create(eventType, payload);
return jsonRow;

}, RowEncoder.apply(jsonStruct));

messageRow.printSchema();

Dataset<Row> deviceEventRowDS = messageRow.filter("eventType = 'DEVICE_EVENT'");

deviceEventRowDS.printSchema();

Dataset<DeviceEvent> deviceEventDS = deviceEventRowDS.map((MapFunction<Row, DeviceEvent>) value -> {

String jsonString = value.getString(1).toString();

Gson gson = new GsonBuilder().serializeNulls().setPrettyPrinting().create();

DeviceMessage deviceMessage = gson.fromJson(jsonString, DeviceMessage.class);
DeviceEvent deviceEvent = deviceMessage.getDeviceEvent();
return deviceEvent;

}, Encoders.bean(DeviceEvent.class));

deviceEventDS.printSchema();

Dataset<Row> messageDataset = deviceEventDS.select(
functions.col("eventType"),
functions.col("deviceID"),
functions.col("description"),
functions.to_timestamp(functions.col("eventDate"), "yyyy-MM-dd hh:mm:ss").as("eventDate"),
functions.col("deviceModel"),
functions.col("pingRate"))
.select("eventType", "deviceID", "description", "eventDate", "deviceModel", "pingRate");

messageDataset.printSchema();

Dataset<Row> devWindowDataset = messageDataset.withWatermark("eventDate", "10 minutes")
.groupBy(functions.col("deviceID"),
functions.window(
functions.col("eventDate"), "10 minutes", "5 minutes"))
.count();

devWindowDataset.printSchema();

StreamingQuery query = devWindowDataset.writeStream().outputMode("append")
.format("parquet")
.option("truncate", "false")
.option("path", "<MY-PARQUET-FILE-OUTPUT-LOCATION>")
.start();

query.awaitTermination();
}}

...

与此相关的任何帮助或指示都会有用。

感谢和问候,

阿维纳什·德什穆克

最佳答案

Is it possible to store multiple events in parquet format in a file based on the window time?

是的。

How does the window operation works in this case?

以下代码是 Spark Structured Streaming 应用程序的主要部分:

Dataset<Row> devWindowDataset = messageDataset
.withWatermark("eventDate", "10 minutes")
.groupBy(
functions.col("deviceID"),
functions.window(functions.col("eventDate"), "10 minutes", "5 minutes"))
.count();

这表示底层状态存储应将每个 deviceIDeventDate 的状态保留 10 分钟以及额外的 10 分钟(每个 withWatermark)对于迟到的事件。换句话说,一旦事件的 eventDate 比流式查询开始时间晚 20 分钟,您就应该看到结果。

withWatermark 适用于后期事件,因此即使 groupBy 生成结果,只有在超过水印阈值后才会发出结果。

每 10 分钟(+ 10 分钟水印)应用相同的过程,并使用 5 分钟的窗口幻灯片。

将带有 window 运算符的 groupBy 视为多列聚合。

Also I would like to check the event state with previous event and based on some calculations (say event is not received by 5 minutes) I want to update the state.

这听起来像是 KeyValueGroupedDataset.flatMapGroupsWithState 的一个用例运算符(又名任意状态流聚合)。咨询Arbitrary Stateful Operations .

您也可能只想要众多 aggregation standard functions 之一。或 user-defined aggregation function (UDAF) .

关于java - Spark Structured Streaming - 有状态流处理中使用窗口操作进行事件处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58444825/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com