gpt4 book ai didi

带有 Docker 的 Python Luigi - 线程/信号问题

转载 作者:行者123 更新时间:2023-11-28 19:02:23 25 4
gpt4 key购买 nike

概述

我们正在使用 Luigi 在 Docker 容器内构建管道.这
这是我第一次使用 Luigi,我正试图让它运行,但我遇到了 Python 线程/信号错误。

我们正在 build 的
我们有一个运行 setup.py 脚本作为入口点的容器。这个脚本导入我的 Luigi 任务,但它的主要功能是打开一个 PubSub channel 到谷歌云服务。当它在该 channel 上接收到消息时,它会启动一系列任务。

错误 .
我直接从 Python 调用 Luigi,试验了这个命令的变体:
luigi.build([GetImageSet()], workers=2, local_scheduler=True, no_lock=True)
并收到此错误:
ValueError: signal only works in main thread
Signal和Luigi的背景

来自 Python Signal 模块文档:
signal.signal:这个函数只能从主线程调用;尝试从其他线程调用它会导致引发 ValueError 异常。

来自 Luigi worker.py 脚本 here
Luigi 提供 no_install_shutdown_handler 标志(默认为 false)。 '如果为真,SIGUSR1 关闭处理程序将不会安装在工作人员上'。这也是发生错误的地方(第 538 行)。该脚本在运行 signal.signal() 之前检查 no_install_shutdown_handler 的配置标志是否(默认)为 false。到目前为止,我未能让 Luigi 读取我的 client.cfg 文件并将该标志设置为 true,而 Docker 可能是罪魁祸首。

来自 Luigi interface.py 脚本 here
如果您不想从命令行运行 luigi。您可以使用此模块中定义的方法以编程方式运行 luigi。在这个脚本中,我可以提供一个自定义的 worker 调度工厂,但我还不能解决这个问题。

本地与全局 Luigi 调度程序
Luigi 为运行任务提供了两个调度程序选项。本地的

Dockerfile 问题 :在这个容器的 Dockerfile 中,我正在通过 pip 安装 Luigi,但没有做太多其他事情。审核后thisthis github 上的 docker/luigi 实现,我开始担心我在 Dockerfile 中做得不够。

我认为发生错误的可能原因

  • pub-sub channel 订阅者是非阻塞的,所以我正在做一些可能很糟糕的事情来阻止主线程在我们在后台等待消息时退出。这似乎是我的线程问题的可能来源。
  • no_install_shutdown_handler 标志未成功设置为 True,这有望规避错误,但不一定是我想要的
  • 本地任务调度程序。我应该使用全局调度程序而不是本地调度程序。无论如何,我最终将不得不让这个工作用于生产...
  • 从 Python 而不是命令行运行脚本
  • 使用 luigi.build .相反,我应该使用 luigi.run ,但基于 Running from Python 的文档页面build “如果您想从另一个来源(例如数据库)获取一些动态参数,或者在开始任务之前提供额外的逻辑,这很有用。”这听起来很适合我的用例(在从发布-订阅 channel 接收消息后触发任务,该消息传递了运行第一个任务所需的变量)

  • 反正我做错了吗?
    如果您对实现我所描述的系统有任何建议,请告诉我。我还将根据要求发布我的 Dockerfile 和 setup.py 尝试。

    一些代码示例

    这是 Dockerfile
    # ./Dockerfile
    # sfm-base:test is the container with tensorflow & our python sfm-library. It installs Ubuntu, Python, pip etc.
    FROM sfm-base:test
    LABEL maintainer "---@---.io"

    # Install luigi, google-cloud w/ pip in module mode
    RUN python -m pip install luigi && \
    python -m pip install --upgrade google-cloud

    # for now at least, start.sh just calls setup.py and sets google credentials. Ignore that chmod bit if it's bad I don't know.
    COPY start.sh /usr/local/bin
    RUN chmod -R 755 "/usr/local/bin/start.sh"
    ENTRYPOINT [ "start.sh" ]

    WORKDIR /src
    COPY . .

    # this was my attempt at setting the Luigi client.cfg in the container
    # all I'm having the config do for now is set [core] no_install_shutdown_handler: true
    ENV LUIGI_CONFIG_PATH /src/client.cfg

    这是 setup.py (针对 SO 进行了编辑)
    # setup.py
    from google.cloud import pubsub_v1
    from time import sleep
    import luigitasks
    import luigi
    import logging
    import json

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
    'servicename', 'pubsubcommand')

    # Example task. These are actually in luigitasks.py
    class GetImageSet(luigi.Task):
    uri = luigi.Parameter(default='')

    def requires(self):
    return []

    def output(self):
    # write zip to local
    return

    def run(self):
    # use the URI to retrieve the ImageSet.zip from the bucket
    logging.info('Running getImageSet')

    # Pubsub message came in
    def onMessageReceived(message):
    print('Received message: {}'.format(message))

    if message.attributes:
    for key in message.attributes:
    if key == 'ImageSetUri':
    value = message.attributes.get(key)
    # Kick off the pipeline starting with the GetImageSet Task
    # I've tried setting no_lock, take_lock, local_scheduler...
    # General flags to try and prevent the thread issues
    luigi.build([GetImageSet()], workers=3, local_scheduler=True, no_lock=False)
    message.ack()

    subscriber.subscribe(subscription_path, callback=onMessageReceived)

    # The subscriber is non-blocking, so I keep the main thread from
    # exiting to allow it to process messages in the background. Is this
    # the cause of my woes?
    print('Listening for messages on {}'.format(subscription_path))
    while True:
    sleep(60)

    最佳答案

    发生这种情况是因为 subscriber.subscribe启动后台线程。当该线程调用 luigi.build抛出异常。

    这里最好的解决方案是使用 subscriber.pull 从主线程读取发布-订阅消息。 .见示例 in the docs .

    关于带有 Docker 的 Python Luigi - 线程/信号问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51295521/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com