gpt4 book ai didi

json - 如何使用 flink 流式传输 json?

转载 作者:行者123 更新时间:2023-12-05 06:36:02 25 4
gpt4 key购买 nike

我实际上正在处理一个流,接收一堆字符串并需要对所有字符串进行计数。总和被加总,这意味着对于第二条记录,总和被添加到前一天输出必须是一些类似 json 的文件

{
"aggregationType" : "day",
"days before" : 2,
"aggregates" : [
{"date" : "2018-03-03",
"sum" : 120},
{"date" :"2018-03-04",
"sum" : 203}
]
}

我创建了一个看起来像这样的流:

val eventStream : DataStream [String] = 
eventStream
.addSource(source)
.keyBy("")
.TimeWindow(Time.days(1), Time.days(1))
.trigger(new MyTriggerFunc)
.aggregation(new MyAggregationFunc)
.addSink(sink)

提前感谢您的帮助:)

最佳答案

在 Flink 中使用 JSON 的注意事项:

使用 JSONDeserializationSchema 反序列化事件,这将生成 ObjectNode。为方便起见,您可以将 ObjectNode 映射到 YourObject 或继续使用 ObjectNode

使用 ObjectNode 的教程:http://www.baeldung.com/jackson-json-node-tree-model

回到你的案例,你可以像下面那样做:

val eventStream : DataStream [ObjectNode] = 
oneMinuteAgg
.addSource(source)
.windowAll()
.TimeWindow(Time.minutes(1))
.trigger(new MyTriggerFunc)
.aggregation(new MyAggregationFunc)

将输出 1 分钟的聚合流

[     
{
"date" :2018-03-03
"sum" : 120
},
{
"date" :2018-03-03
"sum" : 120
}
]

然后将另一个运算符链接到“oneMinuteAgg”,它将 1 分钟聚合添加到 1 天聚合中:

[...]
oneMinuteAgg
.windowAll()
.TimeWindow(Time.days(1))
.trigger(new Whatever)
.aggregation(new YourDayAggF)

这将输出你所需要的

{
"aggregationType" : "day"
"days before" : 4
"aggregates : [{
"date" :2018-03-03
"sum" : 120
},
{
"date" :2018-03-03
"sum" : 120
}]
}

我使用 windowAll() 假设您不需要为流设置 key 。

关于json - 如何使用 flink 流式传输 json?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49380778/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com