gpt4 book ai didi

apache-kafka - Filebeat 5.0 输出到 Kafka 多个主题

转载 作者:行者123 更新时间:2023-12-04 18:29:31 25 4
gpt4 key购买 nike

我在我的应用服务器上安装了 Filebeat 5.0 并且有 3 个 Filebeat 探矿者,每个探矿者都指向不同的日志路径并输出到一个名为 myapp_applog 的 kafka 主题。一切正常。

我的 Filebeat 输出配置到一个主题 - 工作

output.kafka:
# initial brokers for reading cluster metadata
hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]

# message topic selection + partitioning
topic: 'myapp_applog'
partition.round_robin:
reachable_only: false

required_acks: 1
compression: gzip
max_message_bytes: 1000000

我想要做的是根据条件将每个日志文件发送到单独的主题,请参阅 topics 上的文档部分.我曾尝试这样做,但没有向任何主题发送数据。有谁知道为什么我的条件不匹配或者它是正确的。我似乎可以找到有关如何正确使用“主题主题条件”的示例。

这是我的 kafka 输出到多个主题配置。

不工作
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]

# message topic selection + partitioning
topics:
- topic: 'myapp_applog'
when:
equals:
document_type: applog_myappapi
- topic: 'myapp_applog_stats'
when:
equals:
document_type: applog_myappapi_stats
- topic: 'myapp_elblog'
when:
equals:
document_type: elblog_myappapi
partition.round_robin:
reachable_only: false

required_acks: 1
compression: gzip
max_message_bytes: 1000000

这是完整的 filebeat.yml 配置文件。
################### Filebeat Configuration Example #########################
############################# Filebeat ######################################
filebeat.prospectors:
# App logs - prospector
- input_type: log
paths:
- /myapp/logs/myapp.log
exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
exclude_files: [".gz$", ".tmp"]
fields:
api: myappapi
environment: STG
ignore_older: 24h
document_type: applog_myappapi
scan_frequency: 1s

# Multine on Timestamp, YYYY-MM-DD
# https://www.elastic.co/guide/en/beats/filebeat/master/multiline-examples.html
multiline:
pattern: '^[0-9]{4}-[0-9]{2}-[0-9]{2}'
negate: true
match: after
max_lines: 500
timeout: 5s

# Server Stats - prospector
- input_type: log
paths:
- /myapp/logs/serverstats.log

# Exclude messages with log level
exclude_lines: [".+? ERROR[^*].+", ".+? DEBUG[^*].+"]
exclude_files: [".gz$", ".tmp"]
fields:
api: myappapi
environment: STG
ignore_older: 24h
document_type: applog_myappapi_stats
scan_frequency: 1s

# ELB prospector
-
input_type: log
paths:
- /var/log/httpd/elasticbeanstalk-access_log
document_type: elblog_myappapi
fields:
api: myappapi
environment: STG
exclude_lines: [".+? INFO[^*].+", ".+? DEBUG[^*].+"]
exclude_files: [".gz$", ".tmp"]
ignore_older: 24h

# 0s, it is done as often as possible. Default: 10s
scan_frequency: 1s
registry_file: /var/lib/filebeat/registry

############################# Output ##########################################
# Configure what outputs to use when sending the data collected by the beat.
# Multiple outputs may be used.
#----------------------------- Kafka output --------------------------------

output.kafka:
# initial brokers for reading cluster metadata
hosts: ["broker.1.ip.address:9092", "broker.2.ip.address:9092", "broker.3.ip.address:9092"]

# message topic selection + partitioning
topics:
- topic: 'myapp_applog'
when:
equals:
document_type: applog_myappapi
- topic: 'myapp_applog_stats'
when:
equals:
document_type: applog_myappapi_stats
- topic: 'myapp_elblog'
when:
equals:
document_type: elblog_myappapi
partition.round_robin:
reachable_only: false

required_acks: 1
compression: gzip
max_message_bytes: 1000000

############################# Logging #########################################

# There are three options for the log ouput: syslog, file, stderr.
# Under Windos systems, the log files are per default sent to the file output,
# under all other system per default to syslog.
logging:

# Send all logging output to syslog. On Windows default is false, otherwise
# default is true.
to_syslog: true

# Write all logging output to files. Beats automatically rotate files if rotateeverybytes
# limit is reached.
to_files: true

# To enable logging to files, to_files option has to be set to true
files:
# The directory where the log files will written to.
path: /var/log/

# The name of the files where the logs are written to.
name: filebeats.log

# Configure log file size limit. If limit is reached, log file will be
# automatically rotated
rotateeverybytes: 10485760 # = 10MB

# Number of rotated log files to keep. Oldest files will be deleted first.
keepfiles: 7

# Enable debug output for selected components. To enable all selectors use ["*"]
# Other available selectors are beat, publish, service
# Multiple selectors can be chained.
#selectors: ["*" ]

# Sets log level. The default log level is error.
# Available log levels are: critical, error, warning, info, debug
level: info

最佳答案

我遇到了同样的问题,并通过将输出定义为:

topics:
- topic: '%{[type]}'
use_type: true

作为输入,您只需在 document_type: kaffka's topic 中设置
  • 输入类型:日志
    路径:
  • /路径/到/日志/文件
    document_type: "你是卡夫卡的主题 1"
  • 输入类型:日志
    路径:
  • /path/to/another/log/file

  • document_type: "你是另一个卡夫卡的话题 1"

    关于apache-kafka - Filebeat 5.0 输出到 Kafka 多个主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40952714/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com