gpt4 book ai didi

elasticsearch - 使用Elasticsearch + Logstash将汇总统计信息推送到Kafka

转载 作者:行者123 更新时间:2023-12-03 00:18:21 30 4
gpt4 key购买 nike

我们正在使用Kafka,Elasticsearch和Logstash。为了进行试验,我们想构建一个小型服务,该服务:

  • 当用户单击给定链接时,产生“单击”消息,并将其推送到Kafka
  • 将这些点击消息编入Elasticsearch
  • 汇总这些点击消息并将其推送到Kafka。

  • 至此,我们已经获得了生成点击消息并将其推送到Kafka主题中的服务,并且我们正在使用Logstash读取这些消息并将其推送到Elasticsearch中。

    我们现在需要的是一种生成包含汇总结果的新Kafka消息的方法。我们尝试使用以下Logstash配置文件(此时的聚合请求完全是随机的):
    input {
    elasticsearch {
    hosts => "localhost"
    query => '{
    "query": {
    "query_string": {
    "query": "*",
    "analyze_wildcard": true
    }
    },
    "size": 0,
    "aggs": {
    "messages": {
    "value_count": {
    "field": "message"
    }
    }
    }
    }'
    }
    }
    output {
    kafka {
    topic_id => 'aggregated_stats'
    }
    }

    不幸的是,Elasticsearch给我们以下错误消息:
    org.elasticsearch.ElasticsearchIllegalArgumentException: aggregations are not supported with search_type=scan

    而且,似乎Logstash在生成单个消息后立即退出,而我们希望它随着聚合统计信息的变化而不断生成新消息。

    这有意义吗?

    最佳答案

    在输出配置中添加“经纪人列表”时,它在我的服务器上工作。

    output {
    kafka {
    broker_list => "x.x.x.x:9092,y.y.y.y:9092"
    topic_id => "aggregated_stats"
    }
    }

    关于elasticsearch - 使用Elasticsearch + Logstash将汇总统计信息推送到Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33876058/

    30 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com