gpt4 book ai didi

json - 使用 ClickHouse 使用来自 Kafka 的嵌套 JSON 消息

转载 作者:行者123 更新时间:2023-12-04 14:12:01 53 4
gpt4 key购买 nike

如果它们是平面 JSON 文档,Clickhouse 绝对可以从 Kafka 读取 JSON 消息。
我们在 Clickhouse 中用 kafka_format = 'JSONEachRow' 表示这一点。
这是我们目前使用它的方式:

CREATE TABLE topic1_kafka
(
ts Int64,
event String,
title String,
msg String
) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1test.intra:9092,kafka2test.intra:9092,kafka3test.intra:9092',
kafka_topic_list = 'topic1', kafka_num_consumers = 1, kafka_group_name = 'ch1',
kafka_format = 'JSONEachRow'
只要生产者将平面 JSON 发送到 topic1_kafka 就可以了。但并非所有生产者都发送平面 JSON,大多数应用程序生成嵌套的 JSON 文档,如下所示:
{
"ts": 1598033988,
"deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663",
"location": [39.920515, 32.853708],
"stats": {
"temp": 71.2,
"total_memory": 32,
"used_memory": 21.2
}
}
不幸的是,上面的 JSON 文档与 JSONEachRow 不兼容,因此 ClickHouse 无法将 JSON 文档中的字段映射到表中的列。
有没有办法做这个映射?
EDIT :我们想将嵌套的 json 映射到这样的平面表:
CREATE TABLE topic1
(
ts Int64,
deviceId String,
location_1 Float64,
location_2 Float64,
stats_temp Float64,
stats_total_memory Float64,
stats_used_memory Float64
) ENGINE = MergeTree()

最佳答案

看起来曾经的方法是将“原始”数据作为字符串获取,然后在消费者物化 View 中使用 JSON functions 处理每一行。

WITH '{"ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": { "temp": 71.2, "total_memory": 32, "used_memory": 21.2 }}' AS raw
SELECT
JSONExtractUInt(raw, 'ts') AS ts,
JSONExtractString(raw, 'deviceId') AS deviceId,
arrayMap(x -> toFloat32(x), JSONExtractArrayRaw(raw, 'location')) AS location,
JSONExtract(raw, 'stats', 'Tuple(temp Float64, total_memory Float64, used_memory Float64)') AS stats,
stats.1 AS temp,
stats.2 AS total_memory,
stats.3 AS used_memory;

/*
┌─────────ts─┬─deviceId─────────────────────────────┬─location──────────────┬─stats────────────────────────┬─temp─┬─total_memory─┬────────used_memory─┐
│ 1598033988 │ cf060111-dbe6-4aa8-a2d0-d5aa17f45663 │ [39.920513,32.853706] │ (71.2,32,21.200000000000003) │ 71.2 │ 32 │ 21.200000000000003 │
└────────────┴──────────────────────────────────────┴───────────────────────┴──────────────────────────────┴──────┴──────────────┴────────────────────┘
*/
备注:对于带有浮点数的数字,应使用 Float64 而不是 Float32(请参阅相关的 CH Issue 13962 )。

使用需要更改 JSON 架构的标准数据类型:
  • 将统计数据表示为 Tuple

  • CREATE TABLE test_tuple_field
    (
    ts Int64,
    deviceId String,
    location Array(Float32),
    stats Tuple(Float32, Float32, Float32)
    ) ENGINE = MergeTree()
    ORDER BY ts;


    INSERT INTO test_tuple_field FORMAT JSONEachRow
    { "ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": [71.2, 32, 21.2]};
  • 将统计数据表示为 Nested Structure

  • CREATE TABLE test_nested_field
    (
    ts Int64,
    deviceId String,
    location Array(Float32),
    stats Nested (temp Float32, total_memory Float32, used_memory Float32)
    ) ENGINE = MergeTree()
    ORDER BY ts;


    SET input_format_import_nested_json=1;
    INSERT INTO test_nested_field FORMAT JSONEachRow
    { "ts": 1598033988, "deviceId": "cf060111-dbe6-4aa8-a2d0-d5aa17f45663", "location": [39.920515, 32.853708], "stats": { "temp": [71.2], "total_memory": [32], "used_memory": [21.2] }};

    请参阅相关答案 ClickHouse JSON parse exception: Cannot parse input: expected ',' before

    关于json - 使用 ClickHouse 使用来自 Kafka 的嵌套 JSON 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63528287/

    53 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com