- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一个 luigi python 任务,其中包括一些 pyspark 库。现在我想用 spark-submit 在 mesos 上提交这个任务。我应该怎么做才能运行它?下面是我的代码框架:
from pyspark.sql import functions as F
from pyspark import SparkContext
class myClass(SparkSubmitTask):
# date = luigi.DateParameter()
def __init__(self, date):
self.date = date # date is datetime.date.today().isoformat()
def output(self):
def input(self):
def run(self):
# Some functions are using pyspark libs
if __name__ == "__main__":
luigi.run()
没有 luigi,我将通过以下命令行提交此任务:
/opt/spark/bin/spark-submit --master mesos://host:port --deploy-mode cluster --total-executor-cores 1 --driver-cores 1 --executor-memory 1G --driver-memory 1G my_module.py
现在的问题是我如何激发提交包含 luigi 命令行的 luigi 任务,例如:
luigi --module my_module myClass --local-scheduler --date 2016-01
还有一个问题是,如果 my_module.py 有一个必须首先完成的任务,我是否需要为它做更多的事情,或者只是设置与当前命令行相同?
我非常感谢对此的任何提示或建议。非常感谢。
最佳答案
Luigi 有一些模板任务。其中之一称为 PySparkTask。您可以继承此类并覆盖属性:
https://github.com/spotify/luigi/blob/master/luigi/contrib/spark.py .
我还没有测试过,但根据我使用 luigi 的经验,我会尝试这个:
import my_module
class MyPySparkTask(PySparkTask):
date = luigi.DateParameter()
@property
def name(self):
return self.__class__.__name__
@property
def master(self):
return 'mesos://host:port'
@property
def deploy_mode(self):
return 'cluster'
@property
def total_executor_cores(self):
return 1
@property
def driver_cores(self):
return 1
@property
def executor-memory(self):
return 1G
@property
def driver-memory(self):
return 1G
def main(self, sc, *args):
my_module.run(sc)
def self.app_options():
return [date]
然后你可以运行它: luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01
还有一个选项可以在 client.cfg 文件中设置属性,以使它们成为其他 PySparkTasks 的默认值:
[spark]
master: mesos://host:port
deploy_mode: cluster
total_executor_cores: 1
driver_cores: 1
executor-memory: 1G
driver-memory: 1G
关于python - 如何使用 spark-submit 和 pyspark 运行 luigi 任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39244648/
文档说: If a parameter is created with significant=False, it is ignored as far as the Task signature is
用例:某些任务是耗时数小时的长批处理作业,需要在决定首先重新运行哪个日期之前查看给定日期的已完成内容和失败内容。 如何在不运行任何东西的情况下查看中央调度程序生成的依赖关系图?我确实意识到,我可以简单
我使用 Luigi Orchestrator 编写了以下 Python 代码。 class AggregateArtists(luigi.Task): date = luigi.DatePar
error: can't copy 'luigi\static\visualiser\lib\URI.js': doesn't exist or not a regular file --------
我是 Luigi 的新手,我创建了一个管道,它从数据库获取数据,转换数据,然后将其加载回数据库。我在其中创建了四个任务。但是,当我在 cmd 或 Pycharm 上执行任务时,它说它无法安排非任务。下
我在配置 luigi 每任务重试策略时遇到问题。我已按如下方式配置全局 luigi.cfg 文件: [scheduler] retry-delay: 1 retry_count: 5 [worker]
这是我第二次尝试了解如何在 Luigi 中将参数传递给依赖项。第一个是 here . 想法是:我有 TaskC,它依赖于 TaskB,它依赖于 TaskA,它依赖于 Task0。我希望整个序列始终完全
我试图用 luigi 建立一个管道。首先通过从 API 获取数据,转换然后将其保存到 mongo db。我还是 luigi 的新手,我的问题是如何实现 output() 函数,该函数指定输出到 mon
我想使用 Luigi 在我的 postgres 数据库上进行查询以获取纬度/经度坐标对,然后将该对传递给另一个任务。 我有一个任务 QueryPostgres() 将 SQL 字符串作为输入并执行查询
我有一个 luigi将我的原始数据拆分成较小文件的预处理任务。然后这些文件将由实际管道处理。 所以关于参数,我想要求每个管道都有一个预处理文件 ID 作为参数。但是,此文件 ID 仅在预处理步骤中生成
我对使用 Luigi 创建流程还比较陌生,我想了解为什么我的小工作流程会导致未实现的依赖关系。我正在尝试运行任务 StageProviders(),它有一个依赖项 ErrorsLogFile()。必须
我从 Luigi 开始,我想知道 Luigi 如何知道它不应该重新运行该任务,因为它已经使用相同的参数成功运行了。我通读了文档,但没有找到答案。 假设: Luigi 是否将状态(任务实例及其结果)存储
我正在研究一个 Luigi 管道,它检查是否存在手动创建的文件,如果存在,则继续执行下一个任务: import luigi, os class ExternalFileChecker(luigi.Ex
我已经通过 pip 命令安装了 luigi,我想更改 Web UI 的端口。我试图找到配置文件,但找不到。我需要创建一个吗? 最佳答案 您可以使用 --port 选项启动 luigid。 luigid
我正在尝试模拟为 luigi 参数提供默认值的东西。 一个愚蠢的例子展示了我想要完成的事情: 待测任务: import luigi from bar import Bar bar = Bar() cl
我正在使用 Luigi 启动一些管道。举个简单的例子 task = myTask() w = Worker(scheduler=CentralPlannerScheduler(), worker_pr
我正在编写一个管道,其中后面的任务需要读取前面任务的输出,这样他们就可以知道他们需要在他们的要求中传递哪些参数。 我在下面创建了一个简化的设置示例。 import random import pick
假设我有一个具有以下依赖结构的任务 class ParentTask(luigi.Task): def requires(self): return [ChildTask(cl
我正在为 Luigi Tasks 构建一个包装器,但遇到了 Register 的障碍。类实际上是一个 ABC 元类,当我创建动态 type 时不可选择. 以下代码或多或少是我用来开发动态类的代码。 c
我最近开始使用Luigi。当我运行工作流程时,所有任务都会显示在任务列表中,但依赖关系图仅显示一个空框架: 最佳答案 啊,刚刚发现它只显示特定任务的依赖关系图,而不是整个工作流程。因此,我必须选择一个
我是一名优秀的程序员,十分优秀!