gpt4 book ai didi

azure - 从 Azure 流分析中的 EventHub 删除重复项

转载 作者:行者123 更新时间:2023-12-03 04:06:53 24 4
gpt4 key购买 nike

我创建了一个 Azure 流分析作业,它将从 EventHub 获取输入数据并写入 cosmosDB 和 Blob。

我有时会看到来自 eventHub 的数据是重复的,因此重复的数据将被写入 cosmosDB 和 Blob 存储。

下面显示了从 EventHub 到流分析的示例输入数据。

[
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"04XXX",
"id":1
},
{
"sig3":"000000",
"id":61
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"04XXX",
"id":1
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00026XXX03",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
[
{
"sig3":"03XXX",
"id":1
},
{
"sig3":"000000",
"id":61
}
],
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

在上面的示例中,idnum: 00086XXX02 的事件重复了 3 次。

我正在进行以下分析并获得重复的输出。

temp AS (
SELECT
input.idnum AS IDNUM,
input.basetime AS BASETIME,
input.time AS TIME,
ROUND(input.sig1,5) AS SIG1,
flatArrayElement as SIG2,
udf.sgnlArrayMap(input.signals, input.basetime) AS SGNL //UDF to process the signals in input
FROM [input01] as input
CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
WHERE GetArrayLength(input.sig2) >=1
),
SIGNALS AS (
SELECT * FROM temp T JOIN master M ON T.SIG2.ArrayValue.sig3 = M.sig3
)

--Insert SIG2 to COSMOS Container
SELECT
t.IDNUM,
t.BASETIME,
t.TIME,
t.SIG1,
t.SIG2.ArrayValue.id AS ID,
t.SIG2.ArrayValue.sig3 AS SIG3,
t.SGNL
INTO [CosmosTbl]
FROM SIGNALS PARTITION BY PartitionId

输出如下,其中 "idnum":"00086XXX02"存在重复事件

[
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00011XXX01",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"000000",
"id":61
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"03XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
},
{
"idnum":"00086XXX02",
"basetime":0,
"time":189834,
"sig1":36.341587,
"sig2":
"sig3":"04XXX",
"id":1
"signals":
[
{
"timestamp":190915,
"value":45,
},
{
"timestamp":190915,
"value":10.2,
},
{
"timestamp":190915,
},
{
"timestamp":190915,
"value":0,
}
],
"sig3TriggeredDateTime": {"dateTime":{"date":{"year":2019,"month":5,"day":16},"time":{"hour":4,"minute":56,"second":19,"nano":480389000}},"offset":{"totalSeconds":0}}
}
]

预期输出将是没有重复的事件(对于提供的示例,“idnum”不应该有重复的事件:“00086XXX02”)

在将数据写入存储之前,我想删除重复的事件。可以通过流分析来实现吗?

使用唯一ID创建cosmos DB集合是Cosmos端的一个解决方案,但是这里的表已经存在,我们可以从流分析端做任何事情吗?

最佳答案

我将你的测试sql简化如下:

with t AS (
SELECT
flatArrayElement as SIG2
FROM fromblob as input
CROSS APPLY GetArrayElements(input.sig2) AS flatArrayElement
WHERE GetArrayLength(input.sig2) >=1
)
SELECT
t.SIG2.ArrayValue.id AS ID,
t.SIG2.ArrayValue.sig3 AS SIG3
FROM t PARTITION BY PartitionId

由于 GetArrayElements() 方法,它会产生重复的数据,我认为这是正常的。数组被拆分,结果肯定应该重复。

根据我的经验和我的 findings (加上这个feedback),ASA中没有明确的方法。我认为原因是ASA处理的是实时流数据,而不是像SQL表这样的静态数据。它无法判断每个时间单位内的数据是否重复。

结合最后一个cosmos db案例(How to find Duplicate documents in Cosmos DB),我认为解决方案的关键点是找到根本原因:为什么事件中心处理重复的源数据。当然,你可以设置一个cosmos db触发器来防止重复的数据流入db。但我认为这不是一个有效的方法。

关于azure - 从 Azure 流分析中的 EventHub 删除重复项,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59408124/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com