- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在尝试设置动态序列 etl 作业,它将使用 XCOM 从运行的第一个任务中获取数据。这是当前代码:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime as dt, timedelta as td, date
from airflow.models import BaseOperator
from airflow.operators.sensors import ExternalTaskSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.models import Variable
START_DT = dt.combine(dt.today(), dt.min.time())
END_DT = dt.combine(dt.today(), dt.max.time())
NOW = dt.now()
CURRENT_EXEC = '{{ execution_date }}'
TODAY_MD = dt.today().strftime("%m%d")
def datetime_range(start, end, delta):
"""Generates the date range with time separation"""
current = start
if not isinstance(delta, td):
delta = td(**delta)
while current < end:
yield current
current += delta
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': START_DT,
'email': ['test@test.com'],
'email_on_failure': False,
'email_on_retry': False,
'queue': 'etl',
'retries': 1,
'retry_delay': td(minutes=1),
}
dag_name = 'SEQ_TEST_01'
dag = DAG(dag_id=dag_name, default_args=default_args, schedule_interval=td(minutes=30))
def seq_job(sq_dt, **kwargs):
for count, dt_in in enumerate(datetime_range(START_DT, END_DT, {'minutes':30}), 1):
if sq_dt < str(dt_in):
curr_seq = count, dt_in, dt_in + td(minutes=29, seconds=59)
sequence = int(curr_seq[0])
return sequence
pycall = PythonOperator(
task_id='seq_sensor',
provide_context=True,
python_callable=seq_job,
op_kwargs={'sq_dt': CURRENT_EXEC},
dag=dag)
def group(grp, **context):
sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor')
grp = '%0.2d' % grp
database = 'TEST'
today_date = '{{ ds_nodash }}'
return BashOperator(
task_id='ETL_GRP{}_{}_{}'.format(database, sequence, gap),
bash_command='script.sh {} {} {} {}'.format(today_date, sequence, database, grp),
dag=dag)
complete = DummyOperator(
task_id='All_Sequences_complete',
dag=dag)
pycall >> group(1) >> complete
pycall >> group(2) >> complete
pycall >> group(3) >> complete
问题是无论我尝试什么,我都会不断收到此错误:
Traceback (most recent call last):
File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 263, in process_file
m = imp.load_source(mod_name, filepath)
File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 66, in <module>
pycall >> group(1) >> complete
File "/opt/airflow/incubator-airflow/airflow/dags/new_dag_seq.py", line 56, in group
sequence = context['task_instance'].xcom_pull(task_ids='seq_sensor')
KeyError: 'task_instance'
不确定是不是我遗漏了一些小东西,还是我的一切都错了。仍然是 Airflow 的新手,并尝试将我们的 ETL 测试环境设置为每 30 分钟运行一次,并使用由 datetime_range
生成并基于 execution_date 变量的唯一序列号。
最佳答案
尝试使用 context['ti']
代替。
关于python - Airflow XCOM KeyError : 'task_instance' ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41254253/
对于我的家庭作业,如果用户输入的键(文本)包含任何非字母字符并重新提示,我被告知要引发一个键错误。到目前为止,我有这个似乎有效但显然没有使用预期的 try/except 结构 key=input("P
编写try/except语句时,是否使用 except KeyError: 或 except KeyError as e: 我得到了相同的结果。 两者有什么区别? KeyError as e 只是更具
我在使用 Python Flask 和 Flask-Mail 库时遇到问题。 我收到一个错误: KeyError: 'mail' 谁能帮我解决这个问题? 我的代码是: # -*- coding: ut
我正在尝试获取 Twitter 登录页面中的隐藏元素。我遵循的过程只是获取该页面中的隐藏元素。但问题是,当我尝试获取这些元素的值时,我遇到了关键错误。代码是: import requests,
我正在尝试将 atexit 处理程序添加到我的代码中。但我发现如果我导入了线程模块,它会给我一个 KeyError 异常。这是 python 线程模块中的错误吗? #!/usr/bin/python2
我正在从 Python2.7 numba 代码转换为 Python3.4。此函数pairwise_distance 将多维数组X 和Y 转换为距离矩阵。 但是,我使用 numba 装饰器 @jit 来
我有 2 个用于生产和开发的独立设置文件以及一个通用的 base.py 设置文件 base.py SECRET_KEY = r"!@#$%^&123456" prod.py from .base im
下面的代码 for k in list(g_score.keys()): print(g_score[k]) 返回 KeyError对我来说: Traceback (most recent c
我收到了一份。在Spyder中第二次从子文件夹导入库时出错,但第一次(重新启动Spyder后)或在Spyder外导入时工作正常。。代码是:。其中,test_lib.py只是。输出结果为:。当库不在子文
我希望以下列方式获取一个对象: Collection.objects.get(name='name', type='library', owner=owner, parent=parent) 不幸的是
如何加入这两个文本文档? 文档 1: 1000001 10:0.471669 250:0.127552 30:0.218773 64:0.249413 1000002 130:0.0839656 10
这段代码有什么问题? 这是我的 HTML: File: 这是我的 Python 脚本: #! /usr/bin/env python import os, sys; from mod_py
我正在尝试在 Linux 中使用 cron 运行一个 Python 脚本,它应该构建一个数据字典。我正在尝试使用 datetime().now().time() 作为字典中的键,但它似乎会引发错误。
我正在尝试更改列或处理列,但出现一些 keyError 错误。从事芝加哥犯罪数据分析工作。 例如当我尝试运行时 ds["DATE OF OCCURRENCE"] = pd.to_datetime([d
我有一个包含以下列的数据框,我只是想通过转换现有列来添加新列。我不明白为什么我会收到此错误,特别是考虑到数据框很好并且我可以在 Zip 上使用 groupby 而不会出现任何索引问题。 print(d
我正在尝试使用 ffmpeg 从视频文件中获取分辨率高度和音频比特率,但出现以下错误,但并不能告诉我太多信息: File "/home/user/code/python/reduce_video_si
我正在为每个单独的州分配区域。我的代码从一个 excel 文件中读取,大约有 30k 行。我建立了一个字典,将每个州分配给一个地区,并为每个州名称分配州缩写。我正在尝试创建一个列来填充每个行项目的区域
我仍在努力学习 Python 词典的来龙去脉。当我运行这个: #!/usr/bin/env python3 d = {} d['foo']['bar'] = 1 我收到 KeyError: 'foo'
我正在尝试使用 Tensorflow 训练线性回归器。 如果我通过自动确定实值列来实例化学习器,则拟合工作正常。 auto_feature_columns = tf.contrib.learn
我正在尝试编写一个可以从YouTube下载整个播放列表的代码。它适用于某些播放列表,但不适用于少数播放列表。我在下面的代码中显示的播放列表之一。也可以随时在此代码上添加更多功能。 如果已有下载该播放列
我是一名优秀的程序员,十分优秀!