From the documentation SpannerQueryDatabaseInstanceOperator accepts a query parameter. However there's no something smart like the PostgresOperator that accepts also a parameters parameter to use placeholders into the query itself:
从文档SpannerQueryDatabaseInstanceOperator接受查询参数。然而,没有像PostgresOperator这样聪明的东西,它也接受一个parameters参数来使用占位符到查询本身中:
get_birth_date = PostgresOperator(
task_id="get_birth_date",
postgres_conn_id="postgres_default",
sql="SELECT * FROM pet WHERE birth_date BETWEEN SYMMETRIC %(begin_date)s AND %(end_date)s",
parameters={"begin_date": "2020-01-01", "end_date": "2020-12-31"}
)
I am new to Airflow, but by reading a book on it and the documentation it looks like that the suggestion is to try to avoid using PythonOperator as it might lead to define logic within it rather than using Airflow just to do what it's designed for: orchestrating.
我对Airflow还是个新手,但通过阅读一本关于它的书和文档,我的建议是尽量避免使用PythonOperator,因为它可能导致在其中定义逻辑,而不是仅仅为了完成其设计的目的而使用Airflow:编排。
So my question are the following:
因此,我的问题如下:
- How would you insert into Spanner values read from a previous task?
- I read that storing objects into XComs or Airflow itself is not a good practice for the inter-tasks communication, but at the same time if something has to be read by task one and written by task two, I don't see many alternatives to use XComs.
Thanks
谢谢
更多回答
优秀答案推荐
Airflow leverages Jinja to parameterize. When you use Jinja the parameterization is done by Airflow itself and then the SQL statement is submitted to the SQL engine to be executed.
气流利用劲佳进行参数化。当您使用JJJA时,参数化是由Airflow本身完成的,然后将SQL语句提交给SQL引擎执行。
Some integrations/services have their own parameterization mechanisms thus Airflow can also support that and user can choose what to use.
一些集成/服务具有自己的参数化机制,因此Airflow也可以支持该机制,用户可以选择使用什么。
PostgresOperator
can use SqlAlchemy engine thus if you want this engine to render the statement you can pass the variables to it using the parameters
parameter. The answer in https://stackoverflow.com/a/72246305/14624409 shows how to use both options for supported operator.
PostgresOperator可以使用SqlAlChemy引擎,因此,如果您希望此引擎呈现语句,则可以使用参数将变量传递给它。Https://stackoverflow.com/a/72246305/14624409中的答案显示了如何对支持的运算符使用这两个选项。
In your case, SpannerQueryDatabaseInstanceOperator
has query as templated field so you can simply use Jinja engine with it.
在您的例子中,SpannerQueryDatabaseInstanceOperator将查询作为模板化字段,因此您可以简单地将JJJA引擎与其一起使用。
For example:
例如:
SpannerQueryDatabaseInstanceOperator(
instance_id="my_instance",
database_id="my_db",
query="select {{ params.my_parameter }}",
params={"my_parameter": 5},
task_id="spanner_instance_query_task",
)
Which gives:
这提供了:
As for your questions:
至于你的问题,我的答覆如下:
How would you insert into Spanner values read from a previous task?
Simply use {{ ti.xcom_pull(task_ids='run_pod', key='return_value') }}
in the sql statement. It will be rendered by Jinja. task_ids
is the task_id to pull value from and the key is the identifier of the xcom (task can push several xcoms).
只需在SQL语句中使用{{ti.xcom_Pull(TASK_IDS=‘run_pod’,key=‘Return_Value’)}}。它将由金佳呈现。TASK_IDS是从中提取值的TASK_ID,关键字是XCOM的标识符(TASK可以推送多个XCOM)。
I read that storing objects into XComs or Airflow itself is not a good practice for the inter-tasks communication, but at the same time if something has to be read by task one and written by task two, I don't see many alternatives to use XComs.
Xcoms are to make small metadata information accessible to other tasks. For example you can transfer count value of records but not the records themselves. If you need downstream task have access to a big dataset produced by upstream task then store it in the cloud (S3, Google Cloud, etc...). All tasks can access to cloud storage however the local disk of Airflow is not shared between tasks thus you can not relay that storing data on Airflow disk will be available for other tasks.
Xcoms使其他任务可以访问较小的元数据信息。例如,您可以传输记录的计数值,但不能传输记录本身。如果您需要下游任务访问上游任务产生的大数据集,则将其存储在云中(S3、Google Cloud等)。所有任务都可以访问云存储,但任务之间不共享气流本地盘,因此不能保证将数据存储在气流盘上就可以供其他任务使用。
更多回答
我是一名优秀的程序员,十分优秀!