101 Guide on Apache Airflow Operators

What are Apache Airflow Operators?

The general architecture of apache airflow is seen in the above image.

Properties Of Airflow Operators :

Types Of Airflow Operators :

The images depict the relationships between the DAG, Tasks, and Operators.
dag= DAG(
dag_id='t0',
schedule='@time',
...
)
t01= op01(
task_id='name_task_1',
operator_params=...,
dag=dag,
...
)
t02= op02(
task_id='name_task_2',
operator_params=...,
dag=dag,
...
)
t01 >> t02
t02 >> t03
...
#Task 1 -> Task 2 -> Task 3
t1 = BashOperator(
task_id=t1,
dag=dag,
bash_command='echo "Text"'
)
def print_string():
print("Test String")
t2 = PythonOperator(
task_id="t3",
dag=dag,
python_callable=print_string,
)
t4= EmailOperator(
task_id=t4,
to='test@mail.com',
subject='Alert Mail',
html_content=""" Mail Test """,
dag=dag
)
with DAG(
dag_id="postgres_operator_dag",
start_date=datetime.datetime(2021, 10, 11),
schedule_interval="@once",
catchup=False,
) as dag:
t4= PostgresOperator(
task_id="t4",
sql="""
CREATE TABLE IF NOT EXISTS pet (
table_id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
table_type VARCHAR NOT NULL,
birth_date DATE NOT NULL,
OWNER VARCHAR NOT NULL);
""",
)
t5 = SSHOperator(
task_id='SSHOperator',
ssh_conn_id='ssh_connectionid',
command='echo "Text from SSH Operator"'
)
t6 = DockerOperator(
task_id='docker_command',
image='centos:latest',
api_version='auto',
auto_remove=True,
command="/bin/sleep 30",
docker_url="unix://var/run/docker.sock",
network_mode="bridge"
)
t7 = HttpSensor(
task_id='t7',
http_conn_id='http_default',
endpoint='',
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=4,
dag=dag,
)
dag = DAG(
'example_snowflake',
start_date=datetime(2021, 11, 11),
default_args={'snowflake_id': SNOWFLAKE_ID},
tags=['example'],
catchup=False,
)
t8 = SnowflakeOperator(
task_id='t8',
dag=dag,
sql=CREATE_TABLE_SQL_STRING,
warehouse=SNOWFLAKE_WAREHOUSE,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
role=SNOWFLAKE_ROLE,
)
t9= SparkJDBCOperator(
cmd_type='spark_to_jdbc',
jdbc_table="foo",
spark_jars="${SPARK_HOME}/jars/postgresql-42.2.12.jar",
jdbc_driver="org.postgresql.Driver",
metastore_table="bar",
save_mode="append",
task_id="t9",
)
def _failure_callback():
if isinstance(context['exception'], AirflowSensorTimeout):
print("timed out message")
with DAG() as dag:
t10= FileSensor(
task_id='t10',
poke_interval=100,
timeout=20,
mode="reschedule",
fail_callback=fail_callback
)
create_table = BigQueryCreateEmptyTableOperator(
task_id="t11",
dataset_id=DATASET_NAME,
table_id="test_table11",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)

Apache Airflow Operators Best Practices

Conclusion

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store