Omar14 Omar14 - 1 year ago 119
Python Question

Airflow: pass {{ ds }} as param to PostgresOperator

i would like to use execution date as parameter to my sql file:

i tried

dt = '{{ ds }}'

s3_to_redshift = PostgresOperator(
task_id='s3_to_redshift',
postgres_conn_id='redshift',
sql='s3_to_redshift.sql',
params={'file': dt},
dag=dag
)


but it doesn't work.

Answer Source

dt = '{{ ds }}'

Doesn't work because Jinja (the templating engine used within airflow) does not process the entire Dag definition file.

For each Operator there are fields which Jinja will process, which are part of the definition of the operator itself.

In this case, you can make the params field (which is actually called parameters, make sure to change this) templated if you extend the PostgresOperator like this:

class MyPostgresOperator(PostgresOperator):
    template_fields = ('sql','parameters')

Now you should be able to do:

s3_to_redshift = MyPostgresOperator(
    task_id='s3_to_redshift',
    postgres_conn_id='redshift',
    sql='s3_to_redshift.sql',
    parameters={'file': '{{ ds }}'},
    dag=dag
)
Recommended from our users: Dynamic Network Monitoring from WhatsUp Gold from IPSwitch. Free Download