gpt4 book ai didi

apache-kafka - 具有可变嵌套 JSON 对象作为 KSQL DB 流的 Kafka 主题

转载 作者:行者123 更新时间:2023-12-02 16:27:02 25 4
gpt4 key购买 nike

我正在尝试在 KSQL 中加入两个现有的 Kafka 主题。 Kafka的部分数据样本(实际值因企业环境有所删减):

设备主题:

{
"persistTime" : "2020-10-06T13:30:25.373Z",
"previous" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "registered",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
},
"current" : {
"device" : "REDACTED",
"type" : "REDACTED",
"group" : "REDACTED",
"inventoryState" : "unknown",
"managementState" : "registered",
"communicationId" : "REDACTED",
"manufacturer" : "",
"description" : "",
"model" : "",
"location" : {
"geo" : {
"latitude" : "REDACTED",
"longitude" : "REDACTED"
},
"address" : {
"city" : "",
"postalCode" : "",
"street" : "",
"houseNumber" : "",
"floor" : "",
"company" : "",
"country" : "",
"reference" : "",
"timeZone" : "",
"region" : "",
"district" : ""
},
"logicalInstallationPoint" : ""
},
"tags" : [ ]
}
}

设备事件主题(第一个示例):

{
"device" : "REDACTED",
"event" : "403151",
"firstOccurrenceTime" : "2020-09-30T11:03:50.000Z",
"lastOccurrenceTime" : "2020-09-30T11:03:50.000Z",
"occurrenceCount" : 1,
"receiveTime" : "2020-09-30T11:03:50.000Z",
"persistTime" : "2020-09-30T14:32:59.580Z",
"state" : "open",
"context" : {
"2" : "25",
"3" : "0",
"4" : "60",
"1" : "REDACTED"
}
}

设备事件主题(第二个示例):

{
"device" : "REDACTED",
"event" : "402004",
"firstOccurrenceTime" : "2020-10-07T07:02:48Z",
"lastOccurrenceTime" : "2020-10-07T07:02:48Z",
"occurrenceCount" : 1,
"receiveTime" : "2020-10-07T07:02:48Z",
"persistTime" : "2020-10-07T07:15:28.533Z",
"state" : "open",
"context" : {
"2" : "2020-10-07T07:02:48.0000000Z",
"1" : "REDACTED"
}
}

我面临的问题是设备事件主题下 context 中变量的数量变化。

我已尝试使用以下语句在 ksqlDB 上创建事件流:

CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" ARRAY<STRING>) \
WITH (KAFKA_TOPIC='device-event', VALUE_FORMAT='JSON');
CREATE STREAM "events"\
("device" VARCHAR, \
"event" VARCHAR, \
"firstOccurenceTime" VARCHAR, \
"lastOccurenceTime" VARCHAR, \
"occurenceCount" INTEGER, \
"receiveTime" VARCHAR, \
"persistTime" VARCHAR, \
"state" VARCHAR, \
"context" STRUCT\
<"1" VARCHAR, \
"2" VARCHAR, \
"3" VARCHAR, \
"4" VARCHAR>) \
WITH (KAFKA_TOPIC='ext_device-event_10195', VALUE_FORMAT='JSON');

第二条语句仅引入具有所有四个上下文变量(“1”、“2”、“3”和“4”)的数据。

如何为设备事件 Kafka 主题创建 KSQL 等效流?

最佳答案

您需要使用 MAP 而不是 STRUCT

顺便说一句,您也不再需要 \ 行分隔符:)

这是一个使用 ksqlDB 0.12 的工作示例。

  • 将示例数据加载到主题中

    kafkacat -b localhost:9092 -P -t events <<EOF
    { "device" : "REDACTED", "event" : "403151", "firstOccurrenceTime" : "2020-09-30T11:03:50.000Z", "lastOccurrenceTime" : "2020-09-30T11:03:50.000Z", "occurrenceCount" : 1, "receiveTime" : "2020-09-30T11:03:50.000Z", "persistTime" : "2020-09-30T14:32:59.580Z", "state" : "open", "context" : { "2" : "25", "3" : "0", "4" : "60", "1" : "REDACTED" } }
    { "device" : "REDACTED", "event" : "402004", "firstOccurrenceTime" : "2020-10-07T07:02:48Z", "lastOccurrenceTime" : "2020-10-07T07:02:48Z", "occurrenceCount" : 1, "receiveTime" : "2020-10-07T07:02:48Z", "persistTime" : "2020-10-07T07:15:28.533Z", "state" : "open", "context" : { "2" : "2020-10-07T07:02:48.0000000Z", "1" : "REDACTED" } }
    EOF
  • 在 ksqlDB 中,声明流:

    CREATE STREAM "events" (
    "device" VARCHAR,
    "event" VARCHAR,
    "firstOccurenceTime" VARCHAR,
    "lastOccurenceTime" VARCHAR,
    "occurenceCount" INTEGER,
    "receiveTime" VARCHAR,
    "persistTime" VARCHAR,
    "state" VARCHAR,
    "context" MAP < VARCHAR, VARCHAR >
    ) WITH (KAFKA_TOPIC = 'events', VALUE_FORMAT = 'JSON');
  • 查询流以检查一切正常:

    ksql> SET 'auto.offset.reset' = 'earliest';
    Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

    ksql> SELECT "device", "event", "receiveTime", "state", "context" FROM "events" EMIT CHANGES;
    +----------+--------+--------------------------+--------+------------------------------------+
    |device |event |receiveTime |state |context |
    +----------+--------+--------------------------+--------+------------------------------------+
    |REDACTED |403151 |2020-09-30T11:03:50.000Z |open |{1=REDACTED, 2=25, 3=0, 4=60} |
    |REDACTED |402004 |2020-10-07T07:02:48Z |open |{1=REDACTED, 2=2020-10-07T07:02:48.0|
    | | | | |000000Z} |
  • 使用 [''] 语法访问映射中的特定键:

    ksql> SELECT "device", "event", "context", "context"['1'] AS CONTEXT_1, "context"['3'] AS CONTEXT_3 FROM "events" EMIT CHANGES;
    +-----------+--------+------------------------------------+-----------+-----------+
    |device |event |context |CONTEXT_1 |CONTEXT_3 |
    +-----------+--------+------------------------------------+-----------+-----------+
    |REDACTED |403151 |{1=REDACTED, 2=25, 3=0, 4=60} |REDACTED |0 |
    |REDACTED |402004 |{1=REDACTED, 2=2020-10-07T07:02:48.0|REDACTED |null |
    | | |000000Z} | | |

关于apache-kafka - 具有可变嵌套 JSON 对象作为 KSQL DB 流的 Kafka 主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64241285/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com