gpt4 book ai didi

python - 如果我使用 celery 作为我的任务调度程序,我如何从我的 python 应用程序登录到 splunk?

转载 作者:太空宇宙 更新时间:2023-11-04 04:22:42 27 4
gpt4 key购买 nike

我有一个在服务器上运行的 python 脚本, celery 调度程序应该每天执行一次。我想将我的日志直接从脚本发送到 splunk。我正在尝试使用这个 splunk_handler图书馆。如果我在本地运行没有 celery 的 splunk_handler,它似乎可以工作。但是,如果我将它与 celery 一起运行,似乎没有日志到达 splunk_handler。控制台日志:

[SplunkHandler DEBUG] Timer thread executed but no payload was available to send

如何正确设置记录器,以便所有日志都进入 splunk_handler?

显然,celery 设置了自己的记录器并覆盖了 python 的根记录器。我尝试了几件事,包括连接 celery 的 setup_logging 信号以防止它覆盖记录器或在此信号中设置记录器。

import logging
import os

from splunk_handler import SplunkHandler

这就是我在文件开头设置记录器的方式

logger = logging.getLogger(__name__)
splunk_handler = SplunkHandler(
host=os.getenv('SPLUNK_HTTP_COLLECTOR_URL'),
port=os.getenv('SPLUNK_HTTP_COLLECTOR_PORT'),
token=os.getenv('SPLUNK_TOKEN'),
index=os.getenv('SPLUNK_INDEX'),
debug=True)

splunk_handler.setFormatter(logging.BASIC_FORMAT)
splunk_handler.setLevel(os.getenv('LOGGING_LEVEL', 'DEBUG'))
logger.addHandler(splunk_handler)

Celery 初始化(不确定,如果 worker_hijack_root_logger 需要设置为 False...)

app = Celery('name_of_the_application', broker=CELERY_BROKER_URL)
app.conf.timezone = 'Europe/Berlin'
app.conf.update({
'worker_hijack_root_logger': False,
})

这里我连接到来自 celery 的 setup_logging 信号

@setup_logging.connect()
def config_loggers(*args, **kwags):
pass
# logger = logging.getLogger(__name__)
# splunk_handler = SplunkHandler(
# host=os.getenv('SPLUNK_HTTP_COLLECTOR_URL'),
# port=os.getenv('SPLUNK_HTTP_COLLECTOR_PORT'),
# token=os.getenv('SPLUNK_TOKEN'),
# index=os.getenv('SPLUNK_INDEX'),
# debug=True)
#
# splunk_handler.setFormatter(logging.BASIC_FORMAT)
# splunk_handler.setLevel(os.getenv('LOGGING_LEVEL', 'DEBUG'))
# logger.addHandler(splunk_handler)

日志语句

logger.info("ARBITRARY LOG MESSAGE")

当在 splunk 处理程序上激活调试(设置为 True)时,splunk 处理程序会注销上面已经发布的没有可用的有效载荷。有人知道我的代码有什么问题吗?

最佳答案

经过几个小时找出我的代码最终可能出错的地方,我现在得到了令我满意的结果。首先,我创建了一个文件 loggingsetup.py,我在其中使用 dictConfig 配置了我的 python 记录器:

LOGGING = {
'version': 1,
'disable_existing_loggers': True,
'formatters': { # Sets up the format of the logging output
'simple': {
'format': '%(asctime)s - %(name)s - %(levelname)s - %(message)s',
'datefmt': '%y %b %d, %H:%M:%S',
},
},
'filters': {
'filterForSplunk': { # custom loggingFilter, to not have Logs logged to Splunk that have the word celery in the name
'()': 'loggingsetup.RemoveCeleryLogs', # class on top of this file
'logsToSkip': 'celery' # word that it is filtered for
},
},
'handlers': {
'splunk': { # handler for splunk, level Warning. to not have many logs sent to splunk
'level': 'WARNING',
'class': 'splunk_logging_handler.SplunkLoggingHandler',
'url': os.getenv('SPLUNK_HTTP_COLLECTOR_URL'),
'splunk_key': os.getenv('SPLUNK_TOKEN'),
'splunk_index': os.getenv('SPLUNK_INDEX'),
'formatter': 'simple',
'filters': ['filterForSplunk']
},
'console': {
'level': 'DEBUG',
'class': 'logging.StreamHandler',
'stream': 'ext://sys.stdout',
'formatter': 'simple',
},
},
'loggers': { # the logger, root is used
'': {
'handlers': ['console', 'splunk'],
'level': 'DEBUG',
'propagate': 'False', # does not give logs to other logers
}
}
}

对于日志过滤器,我必须创建一个继承自 logging.Filter 类的类。该类还依赖于文件 loggingsetup.py

class RemoveCeleryLogs(logging.Filter): # custom class to filter for celery logs (to not send them to Splunk)
def __init__(self, logsToSkip=None):
self.logsToSkip = logsToSkip

def filter(self, record):
if self.logsToSkip == None:
allow = True
else:
allow = self.logsToSkip not in record.name
return allow

之后,您可以像这样配置记录器:

logging.config.dictConfig(loggingsetup.LOGGING)
logger = logging.getLogger('')

并且因为 celery 重定向了它的日志并且日志加倍了,所以我不得不更新 app.conf:

app.conf.update({
'worker_hijack_root_logger': False, # so celery does not set up its loggers
'worker_redirect_stdouts': False, # so celery does not redirect its logs
})

我面临的下一个问题是,我选择的 Splunk_Logging 库与 url 混淆了一些东西。所以我必须创建自己的 splunk_handler 类,它继承自 logging.Handler 类。这里的重要行如下(来 self 的自定义记录器类 splunk_logging_class.py):

auth_header = {'Authorization': 'Splunk {0}'.format(self.splunk_key)}
json_message = {"index": str(self.splunk_index), "event": data}
r = requests.post(self.url, headers=auth_header, json=json_message)

我希望我能帮助那些在 python、splunk 和 celery 日志记录方面面临类似问题的人! :)

关于python - 如果我使用 celery 作为我的任务调度程序,我如何从我的 python 应用程序登录到 splunk?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54144150/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com