gpt4 book ai didi

apache-spark - 使用Spark从Elasticsearch获取最新的N条记录

转载 作者:行者123 更新时间:2023-12-02 22:35:19 24 4
gpt4 key购买 nike

我想检索插入到Elasticsearch中的最后50条记录,以找出它们在异常检测项目中的平均值。
这就是我从ES检索数据的方式。但是,它将获取整个数据,而不是最后50条记录。有什么办法吗?

edf = spark \
.read \
.format("org.elasticsearch.spark.sql") \
.option("es.read.metadata", "false") \
.option("es.nodes.wan.only","true") \
.option("es.port","9200")\
.option("es.net.ssl","false")\
.option("es.nodes", "http://localhost") \
.load("anomaly_detection/data")

# GroupBy based on the `sender` column
df3 = edf.groupBy("sender") \
.agg(expr("avg(amount)").alias("avg_amount"))

这里的 sender列正在获取整个行数据,如何仅获取最后50个 DataFrame行数据?

输入数据模式格式:
|sender|receiver|amount|

最佳答案

您还可以在读取数据时添加查询,如下所示:

query='{"query": {"match_all": {}}, "size": 50, "sort": [{"_timestamp": {"order": "desc"}}]}'

并通过
edf = spark \
.read \
.format("org.elasticsearch.spark.sql") \
.option("es.read.metadata", "false") \
.option("es.nodes.wan.only","true") \
.option("es.port","9200")\
.option("es.net.ssl","false")\
.option("es.nodes", "http://localhost") \
.option("query", query)
.load("anomaly_detection/data")

关于apache-spark - 使用Spark从Elasticsearch获取最新的N条记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57140273/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com