gpt4 book ai didi

json - 不是 Kibana 上的索引,Elasticsearch

转载 作者:行者123 更新时间:2023-12-03 02:16:46 26 4
gpt4 key购买 nike

我像这样在 kafka 中将数据从 mqtt 发件人消费到我的代理 mqtt

mosquitto_pub -h 0.0.0.0 -p 1883 -t temperature -m '{"who":"ben", "timeepoc":1558212482, "lat":-33.87052833, "lon":151.21292, "alt":31.0, "batt":0, "speed":12.86}'
然后我在kafka中创建一个主题
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic temperature
后来我用 ksql 中的表创建了我的流
create stream carsensor (who VARCHAR, batt INTEGER, lon DOUBLE, lat DOUBLE, timeepoc BIGINT, alt INTEGER, speed DOUBLE) 
with (kafka_topic = 'temperature',value_format='JSON');
CREATE table runner_status with (value_format='JSON') AS
select who
, min(speed) as min_speed
, max(speed) as max_speed
, min(GEO_DISTANCE(lat, lon, -33.87014, 151.211945, 'km')) as dist_to_finish
, count(*) as num_events
from carsensor WINDOW TUMBLING (size 5 minute)
group by who;
这是我的表格数据运行器
{
"ROWTIME": 1597418628366,
"ROWKEY": "ben",
"WINDOWSTART": 1597418400000,
"WINDOWEND": 1597418700000,
"WHO": "ben",
"MIN_SPEED": 12.91,
"MAX_SPEED": 12.91,
"DIST_TO_FINISH": 0.07441178137496719,
"NUM_EVENTS": 2
}
这是我的 table 汽车传感器
{
"ROWTIME": 1597418628366,
"ROWKEY": "temperature",
"WHO": "ben",
"BATT": 0,
"LON": 151.21273,
"LAT": -33.87029167,
"TIMEEPOC": 1558212492,
"ALT": 31,
"SPEED": 12.91
}
然后我创建一个到 elasticsearch 的连接器
 CREATE SINK CONNECTOR SINK_ELASTIC_TEST_02_M WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://localhost:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081',
'type.name' = '_doc',
'topics' = 'carsensor',
'key.ignore' = 'false',
'schema.ignore' = 'false',
'transforms' = 'ExtractTimestamp',
'transforms.ExtractTimestamp.type' = 'org.apache.kafka.connect.transforms.InsertField$Value',
'transforms.ExtractTimestamp.timestamp.field' = 'EVENT_TS'
);
但我可以在 kibana 上看到任何索引我想在 map 中绘制位置

最佳答案

我找到了解决方案jajaj,因为我的数据是我需要使用的 json
键忽略和模式忽略,我只根据温度而不是两个创建表

CREATE SINK CONNECTOR SINK_ELASTIC_TEST_JSON_x WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://localhost:9200',
'key.converter' = 'org.apache.kafka.connect.storage.StringConverter',
'value.converter' = 'org.apache.kafka.connect.json.JsonConverter',
'value.converter.schemas.enable' = 'false',
'type.name' = '_doc',
'topics' = 'temperature',
'key.ignore' = 'true',
'schema.ignore' = 'true'
);

关于json - 不是 Kibana 上的索引,Elasticsearch,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63415949/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com