Airflow(2)
์ด์ ๊ธ
https://koreatstm.tistory.com/268
Airflow
MLOps Machine Learning Operations๋จธ์ ๋ฌ๋(ML) ๋ชจ๋ธ์ ๋ฐฐํฌ, ๋ชจ๋ํฐ๋ง, ์ ์ง ๊ด๋ฆฌํ๋ ์ผ๋ จ์ ๊ณผ์ ์ ์๋ฏธ๋ชฉํ: ML ๋ชจ๋ธ์ ๊ฐ๋ฐ ์๋ช ์ฃผ๊ธฐ๋ฅผ ์๋ํํ๊ณ ์ต์ ํํ๋ ๊ฒ์ด ๋ชฉํํ์์ฑ์ํฌํ๋ก์ฐ ์๋ํ:
koreatstm.tistory.com
์คํ(Execution or Trigger)
- DAG๋ฅผ ์ค์ ๋ก ์ฆ์ ์คํํ๊ฑฐ๋, ์์ฝ๋ ์ค์ผ์ค์ ๋ฐ๋ผ ์คํ๋๋ ํ์
- ์ง์ UI์์ ์๋ ์คํ(Trigger)ํ ์๋ ์๊ณ , ์ค์ผ์ค๋ฌ์ ์ํด ์๋ ์คํ๋ ์๋ ์์
- DAG๊ฐ ์คํ๋๋ฉด ํด๋น DAG Run์ด ๋ง๋ค์ด์ง
- ์คํ์ ์ข ๋ฅ: ์๋ ์คํ(ํธ๋ฆฌ๊ฑฐ), ์์ฝ ์คํ(์ค์ผ์ค)
์ค์ผ์ค(Schedule)
- DAG๊ฐ ์๋์ผ๋ก ์ธ์ ์คํ๋ ์ง ์ฃผ๊ธฐ๋ฅผ ์ค์ ํ๋ ๊ฒ
- ์: ๋งค์ผ ์์ ์ ์คํ, 5๋ถ๋ง๋ค ์คํ, ๋งค์ฃผ ์์์ผ ์คํ ๋ฑ
- Airflow ์ค์ผ์ค๋ฌ๊ฐ ์ด ์ฃผ๊ธฐ์ ๋ฐ๋ผ DAG ์คํ์ ์์ฝํ๊ณ ์๋์ผ๋ก ์คํ์ํด
๊ถ๊ธํ ๊ฒ!
- Q: task1 >> task2 >> task3 ์ด๋ ๊ฒ task๊ฐ ์์กด ๊ด๊ณ ์ค์ ์ํ๋ฉด ์์๊ฐ ์ด๋ป๊ฒ ๋ ๊น?
- ๋ช ์์ ์ผ๋ก ์์กด ๊ด๊ณ๋ฅผ ์ง์ ํ์ง ์์ผ๋ฉด, Airflow๋ ํด๋น ํ์คํฌ๋ค์ ๋ณ๋ ฌ(๋ ๋ฆฝ) ์์ ์ผ๋ก ์ธ์ํ๊ณ ์คํ ์์๋ฅผ ์์์ ์ ํ๋ค.
- ์ด ๊ฒฝ์ฐ ์คํ ์์๋ ์ค์ผ์ค๋ฌ์ ์์ปค์ ์ํ, ๋ฆฌ์์ค, ์ฐ์ ์์ ๋ฑ์ ๋ฐ๋ผ ๋ฌ๋ผ์ง๋ค. -> ์์ธก ๋ถ๊ฐ๋ฅํ ์คํ ์์
BashOperator
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
'''
DAG ์์ฒด ์ ์
์ปจํ
์คํธ ๋ธ๋ก์ผ๋ก, Task๋ค์ด ์ด DAG์ ์ผ๋ถ๋ผ๋ ๊ฒ์ ๋ช
์
'''
with DAG(
dag_id='every_10_seconds',
start_date=datetime(2025, 6, 29),
schedule=timedelta(seconds=10),
catchup=False,
max_active_runs=1,
tags=['test'] #tag๋ ํํฐ๋ง์ ์ํ ๋ผ๋ฒจ
) as dag:
task1 = BashOperator(
task_id='print_time',
bash_command='date "+%Y-%m-%d %H:%M:%S"'
)
task2 = BashOperator(
task_id='say_hello',
bash_command='echo "Hello Airflow!"'
)
task1 >> task2 # task1 ์คํ ํ task2 ์คํ
'''
DAG์ ์ํ Task ์ ์
DAG๋ ์ฌ๋ฌ Task๋ก ๊ตฌ์ฑ๋๋ค.
'''
- catchup: Airflow๋ start_date๋ถํฐ ํ์ฌ ์๊ฐ๊น์ง์ ๋ชจ๋ ์คํ ๋ ์ง์ ๋ํด DAG์ ์คํํ๋ ค๊ณ ํ๋ค.
- catchup=False : ๊ณผ๊ฑฐ ์คํ์ด ๋ฐ๋ ธ์ ๋ backlog๋ฅผ ๋ฐ๋ผ์ก์ง ์๊ณ ์ค๋๋ถํฐ ์คํ(์ค๋ ์ดํ๋ง ์คํํ๊ณ , ๊ณผ๊ฑฐ ๋ ์ง๋ ๋ฌด์ํ๋ค.)
PythonOperator
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
# Python ํจ์ ์ ์
def my_python_function():
print("๐ Hello from PythonOperator!")
# DAG ์ ์
with DAG(
dag_id='every_10_seconds_with_python',
start_date=datetime(2025, 6, 29),
schedule=timedelta(seconds=10),
catchup=False,
max_active_runs=1,
tags=['test']
) as dag:
# BashOperator Task
print_time = BashOperator(
task_id='print_time',
bash_command='date "+%Y-%m-%d %H:%M:%S"'
)
# PythonOperator Task
python_hello = PythonOperator(
task_id='python_hello',
python_callable=my_python_function
)
# Task ์์กด์ฑ ์ค์
print_time >> python_hello
- DAG ์คํ ์ฃผ๊ธฐ: 10์ด๋ง๋ค ํ ๋ฒ์ฉ ์คํ
- Task ๊ตฌ์ฑ
- print_time (BashOperator) → ํ์ฌ ์๊ฐ์ ์ถ๋ ฅ (date "+%Y-%m-%d %H:%M:%S")
- python_hello (PythonOperator) → my_python_function() ํจ์ ์คํ (ํ๋ฆฐํธ ๋ฉ์์ง ์ถ๋ ฅ)
- ์์กด์ฑ: print_time >> python_hello
- ์ฆ, print_time์ด ์ฑ๊ณต์ ์ผ๋ก ๋๋์ผ๋ง python_hello๊ฐ ์คํ๋จ
TaskGroup
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
with DAG(
dag_id='taskgroup_example',
start_date=datetime(2025, 6, 29),
schedule=timedelta(seconds=30),
catchup=False,
max_active_runs=1,
tags=['group', 'example']
) as dag:
start = BashOperator(
task_id='start',
bash_command='echo "Start DAG Execution"'
)
with TaskGroup("group1", tooltip="First Task Group") as group1:
t1 = BashOperator(
task_id='task_1_1',
bash_command='echo "Group1 - Task 1"'
)
t2 = BashOperator(
task_id='task_1_2',
bash_command='echo "Group1 - Task 2"'
)
with TaskGroup("group2", tooltip="Second Task Group") as group2:
t3 = BashOperator(
task_id='task_2_1',
bash_command='echo "Group2 - Task 1"'
)
t4 = BashOperator(
task_id='task_2_2',
bash_command='echo "Group2 - Task 2"'
)
end = BashOperator(
task_id='end',
bash_command='echo "End DAG Execution"'
)
# DAG ์คํ ํ๋ฆ ์ ์
start >> group1 >> group2 >> end
- TaskGroup์ ๋จ์ํ Task๋ค์ ๊ทธ๋ฃน์ผ๋ก ๋ฌถ๋ ์ปจํ ์ด๋์ด๋ค.
- ๋ด๋ถ Task๋ค์ ๋ณ๋ ฌ๋ก ์คํ๋จ. (๋ณ๋ ์์กด๊ด๊ณ ์์ ๊ฒฝ์ฐ)
- ์ฆ, >>์ ๋ฌ๋ฆฌ ๊ทธ๋ฃน ๋ด์ task๋ค์ ๋ณ๋ ฌ๋ก ์คํ๋๋ค. ๋ค์๋งํด, ๊ทธ๋ฃน๋ด์ task๊ฐ ์๋๋๋ผ๋, ๊ทธ๊ฒ๊ณผ ์๊ด์์ด task๋ ์งํ๋๋ค.
- TaskGroup์ ๋จ์ํ ๋ ผ๋ฆฌ์ ์ธ ๊ทธ๋ฃนํ๋ฅผ ์ ๊ณตํ์ง๋ง, ์ค์ ๋ก ๊ทธ๋ฃน ๋ด์ Task๋ค์ ๋ณ๋ ฌ๋ก ์คํ๋๋ค.
XCom
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def push_function(**kwargs):
# XCom์ ๊ฐ ๋ณด๋ด๊ธฐ (push)
ti = kwargs['ti']
ti.xcom_push(key='sample_key', value='Hello from push_function!')
def pull_function(**kwargs):
# XCom์์ ๊ฐ ๊ฐ์ ธ์ค๊ธฐ (pull)
ti = kwargs['ti']
pulled_value = ti.xcom_pull(key='sample_key', task_ids='push_task')
print(f"Pulled value from XCom: {pulled_value}")
with DAG(
dag_id='xcom_example',
start_date=datetime(2025, 6, 29),
schedule=timedelta(seconds=30),
catchup=False
) as dag:
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
)
push_task >> pull_task
- kwargs๋ Airflow๊ฐ ์๋์ผ๋ก ์ ๋ฌํ๋ ์ปจํ ์คํธ ์ ๋ณด ๋ด์
- ์ด์ค์์ ti(TaskInstance๊ฐ์ฒด)๋ฅผ ์ฌ์ฉ
- Airflow ๋ด๋ถ ์์ ์ ์ฅ์ XCom์ ๋ฌธ์์ด ์ ์ฅ ๋จ
Branch
from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta
import pendulum
# ๋ถ๊ธฐ ํจ์
def decide_what_to_do():
today = pendulum.now("Asia/Seoul").day_of_week # 0: Monday ~ 6: Sunday
if today == 6: # Sunday
return 'take_rest'
else:
return 'work_hard'
with DAG(
dag_id='branching_example',
start_date=datetime(2025, 6, 29),
schedule=timedelta(minutes=1),
catchup=False,
max_active_runs=1, # ํด๋น DAG ์ธ์คํด์ค๋ ํ๋์ฉ๋ง ์คํ ๋จ
tags=['branch', 'example'],
) as dag:
start = EmptyOperator(task_id='start')
branch = BranchPythonOperator(
task_id='branch_decision',
python_callable=decide_what_to_do,
)
rest = EmptyOperator(task_id='take_rest')
work = EmptyOperator(task_id='work_hard')
# trigger_rule='none_failed_min_one_success: ์ด์ ์์
๋ค ์ค ํ๋๋ผ๋ ์ฑ๊ณตํ๋ฉด ์คํ ๋จ
end = EmptyOperator(task_id='end', trigger_rule='none_failed_min_one_success')
# DAG ํ๋ฆ ์ ์
start >> branch
branch >> [rest, work] >> end
TaskGroup ์ค์ฒฉ
SubDAG
- ๊ธฐ์กด DAG ์์ ๋ ๋ค๋ฅธ DAG๋ฅผ ๋ฃ๋ ๋ฐฉ์
- ๋ ๋ฆฝ์ ์ธ DAG๋ฅผ ํ๋์ Task์ฒ๋ผ ์ฌ์ฉํ๋ ๊ฐ๋
- ๋ค๋ง Airflow์์๋ SubDAG์ ๋ณต์ก์ฑ ๋๋ฌธ์ ๊ถ์ฅํ์ง ์๋ ์ถ์ธ์ด๊ณ , ๋์ TaskGroup์ ์ฃผ๋ก ์ฌ์ฉ
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta
with DAG(
dag_id='nested_taskgroup_example',
start_date=datetime(2025, 6, 29),
schedule=timedelta(minutes=5),
catchup=False,
max_active_runs=1,
tags=['example', 'taskgroup'],
) as dag:
start = EmptyOperator(task_id='start')
# ์ต์์ TaskGroup
with TaskGroup('group1') as group1:
task_a = EmptyOperator(task_id='task_a')
task_b = EmptyOperator(task_id='task_b')
# ์ค์ฒฉ๋ TaskGroup
with TaskGroup('subgroup1') as subgroup1:
task_c = EmptyOperator(task_id='task_c')
task_d = EmptyOperator(task_id='task_d')
# ๊ทธ๋ฃน ๋ด๋ถ์์ subgroup1์ ํฌํจํ์ฌ ์์
ํ๋ฆ ์ง์ ๊ฐ๋ฅ
task_a >> subgroup1 >> task_b
end = EmptyOperator(task_id='end')
start >> group1 >> end
- DAG ์์ → start task ์คํ
- group1 TaskGroup ์คํ
- task_a ์คํ ํ, ์ค์ฒฉ TaskGroup์ธ subgroup1 ์คํ (task_c์ task_d ํฌํจ)
- ๋ค์ task_b ์คํ
- group1 ์ข ๋ฃ ํ end task ์คํ
Sensor
Sensor๋ Airflow์์ ์ด๋ค ์กฐ๊ฑด์ด ์ถฉ์กฑ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฌ๋ Task
- ํ์ผ์ด ๋์ฐฉํ ๋๊น์ง ๊ธฐ๋ค๋ฆฌ๊ธฐ → FileSensor
- API ์๋ต์ด ํน์ ๊ฐ์ ๋ฐํํ ๋๊น์ง → HttpSensor
- ์ธ๋ถ ํ ์ด๋ธ์ด ์๊ธธ ๋๊น์ง → ExternalTaskSensor, SqlSensor
์๋๋ฐฉ์
๋ชจ๋ | ์ค๋ช |
poke | ๊ณ์ ์คํ ์ค์ธ ์ํ๋ก ๋๊ธฐํ๋ฉฐ ์ผ์ ์ฃผ๊ธฐ๋ง๋ค ์กฐ๊ฑด์ ์ฒดํฌ |
reschedule | ์คํ ์ค์ด ์๋๋ผ ์ฌ์๋ค๊ฐ ๋ค์ ์คํ (๋ฐ๋ผ์, ์์์ ์ ๊ฒ ์) |
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta
with DAG(
dag_id='sensor_example',
start_date=datetime(2025, 6, 29),
schedule=timedelta(minutes=5),
catchup=False,
max_active_runs=1,
tags=['sensor', 'example'],
) as dag:
start = EmptyOperator(task_id='start')
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/opt/airflow/dags/input_file.txt',
poke_interval=10, # 10์ด๋ง๋ค ํ์ธ
timeout=60, # 60์ด ๋์ ๊ธฐ๋ค๋ฆผ (์์ผ๋ฉด ์คํจ)
mode='poke', # poke ๋ชจ๋: ์ฃผ๊ธฐ์ ์ผ๋ก ํ์ธ
)
done = EmptyOperator(task_id='done')
start >> wait_for_file >> done
์ต์ด ์คํ์์๋ ํด๋น ํ์ผ์ด ์๊ธฐ์ ์๋ฌ๊ฐ ๋ฌ๋ค. ํฐ๋ฏธ๋์์ ์๋ ๋ช ๋ น์ด๋ก file์ ๋ง๋ค๊ณ , triggerํด๋ณด๋ฉด
echo "test" > ./dags/input_file.txt
TriggerDagRunOperator
ํ์ฌ DAG์์ ๋ค๋ฅธ DAG์ ์คํ(trigger)
- dags/ ํด๋์ ๋ ํ์ผ ์ ์ฅ
- Airflow UI์์ parent_dag ์คํ
- child_dag์ด ์๋์ผ๋ก ํธ๋ฆฌ๊ฑฐ๋๋์ง ํ์ธ
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime
with DAG(
dag_id='child_dag',
start_date=datetime(2025, 6, 29),
schedule=None,
catchup=False,
tags=['TriggerDagRunOperator'],
) as dag:
start = EmptyOperator(task_id='child_start')
end = EmptyOperator(task_id='child_end')
start >> end
# 8-2
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime, timedelta
with DAG(
dag_id='parent_dag',
start_date=datetime(2025, 6, 29),
schedule=timedelta(minutes=5),
catchup=False,
tags=['TriggerDagRunOperator'],
) as dag:
start = EmptyOperator(task_id='start')
trigger_child = TriggerDagRunOperator(
task_id='trigger_child_dag',
trigger_dag_id='child_dag',
)
end = EmptyOperator(task_id='end')
start >> trigger_child >> end
child_dag๋ฅผ ํ์ฑํํด ๋๊ณ parent_dag๋ฅผ ํ์ฑํํ๋ฉด, child_dag๋ฅผ ๋ณ๋๋ก ์ค์ผ์คํ์ง ์์๋ parent_dag์์ TriggerDagRunOperator๋ก ํธ๋ฆฌ๊ฑฐ๋์ด ์คํ๋๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
Variable
Airflow์ ์ ์ญ ์ค์ ๊ฐ ์ ์ฅ์์ธ Variable์ ์ฌ์ฉํด์ DAG ์์์ ๋์ ์ผ๋ก ๊ฐ์ ๊ฐ์ ธ์ค๊ธฐ
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta
def print_variable():
filename = Variable.get("filename", default_var="default.txt") # key=filename ์์ผ๋ฉด default.txt ๊ธฐ๋ณธ๊ฐ
print(f"๐ Airflow Variable 'filename' is: {filename}")
with DAG(
dag_id='variable_example',
start_date=datetime(2025, 6, 29),
schedule=timedelta(minutes=10),
catchup=False,
tags=['variable'],
) as dag:
task = PythonOperator(
task_id='print_airflow_variable',
python_callable=print_variable,
)
- Airflow UI ์ ์ → Admin > Variables ํด๋ฆญ
- Key = filename, Value = input_file.txt ์ถ๊ฐ
- DAG ์คํ → ๋ก๊ทธ์์ filename์ด ์ถ๋ ฅ๋จ
varialbe์ด ์๋ ๊ฒฝ์ฐ -> input_file.txt
Log message source details: sources=["/opt/airflow/logs/dag_id=variable_example/run_id=scheduled__2025-06-29T09:53:05.436680+00:00/task_id=print_airflow_variable/attempt=1.log"]
[2025-06-29, 18:53:15] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-29, 18:53:15] INFO - Filling up the DagBag from /opt/airflow/dags/variable_example.py: source="airflow.models.dagbag.DagBag"
[2025-06-29, 18:53:15] WARNING - /home/airflow/.local/lib/python3.12/site-packages/airflow/models/variable.py:147: DeprecationWarning: Using Variable.get from `airflow.models` is deprecated.Please use `from airflow.sdk import Variable` instead
warnings.warn(
: source="py.warnings"
[2025-06-29, 18:53:15] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-06-29, 18:53:15] INFO - ๐ Airflow Variable 'filename' is: input_file.txt: chan="stdout": source="task"
varialbe์ด ์๋ ๊ฒฝ์ฐ -> default์ธ default.txt
Log message source details: sources=["/opt/airflow/logs/dag_id=variable_example/run_id=manual__2025-06-29T09:53:54.881342+00:00/task_id=print_airflow_variable/attempt=1.log"]
[2025-06-29, 18:53:55] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-29, 18:53:55] INFO - Filling up the DagBag from /opt/airflow/dags/variable_example.py: source="airflow.models.dagbag.DagBag"
[2025-06-29, 18:53:55] WARNING - /home/airflow/.local/lib/python3.12/site-packages/airflow/models/variable.py:147: DeprecationWarning: Using Variable.get from `airflow.models` is deprecated.Please use `from airflow.sdk import Variable` instead
warnings.warn(
: source="py.warnings"
[2025-06-29, 18:53:55] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-06-29, 18:53:55] INFO - ๐ Airflow Variable 'filename' is: default.txt: chan="stdout": source="task"
๋ฉ์ผ๋ณด๋ด๊ธฐ
๋จผ์ ๋ฉ์ผ ์์ฒด๋ฅผ ํ์ธ ํด๋ณด์
import smtplib
smtp_host = 'smtp.gmail.com'
smtp_port = 587
username = 'name@gmail.com' # ๋ณธ์ธ์ Gmail ์ฃผ์
password = '16์๋ฆฌ ๋น๋ฐ๋ฒํธ' # Gmail ์ฑ ๋น๋ฐ๋ฒํธ
server = smtplib.SMTP(smtp_host, smtp_port)
server.starttls()
server.login(username, password)
from email.mime.text import MIMEText
msg = MIMEText('This is a test email from Python SMTP')
msg['Subject'] = 'Test Email'
msg['From'] = username
msg['To'] = 'name@naver.com' # ๋ฐ๋ ์ฌ๋ ์ด๋ฉ์ผ ์ฃผ์
server.sendmail(username, ['name@naver.com'], msg.as_string())
server.quit()
print("Email sent successfully!")
์ด์ DAG๋ก ์งํํด๋ณด์
from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta
with DAG(
dag_id='email_example',
start_date=datetime(2025, 6, 29),
schedule=timedelta(minutes=10),
catchup=False,
tags=['email'],
) as dag:
start = EmptyOperator(task_id='start')
send_email = EmailOperator(
task_id='send_email_task',
to='name@naver.com',
subject='Airflow Email Test',
html_content="""
<h3>๐ฌ This is a test email sent from Airflow DAG</h3>
<p>Hello! This email was sent automatically via EmailOperator.</p>
""",
from_email='name@gmail.com'
)
end = EmptyOperator(task_id='end')
start >> send_email >> end
# SMTP
AIRFLOW__SMTP__SMTP_HOST: smtp.gmail.com
AIRFLOW__SMTP__SMTP_PORT: 587
AIRFLOW__SMTP__SMTP_STARTTLS: 'True'
AIRFLOW__SMTP__SMTP_SSL: 'False'
AIRFLOW__SMTP__SMTP_USER: name@gmail.com
AIRFLOW__SMTP__SMTP_PASSWORD: <16์๋ฆฌ ๋น๋ฐ๋ฒํธ>
AIRFLOW__SMTP__SMTP_MAIL_FROM: name@gmail.com
๋์ปค ์ปดํฌ์ฆ ํ๊ฒฝ๋ณ์์ SMTP ์ค์ ์ ๋ฃ์๋๋ฐ๋ ์ค๋ฅ๊ฐ ๋๋ ์ด์ ๋, Airflow EmailOperator๊ฐ ์ค์ ๋ก ๋ฉ์ผ์ ๋ณด๋ด๊ธฐ ์ํด์๋ ‘Connection’์ด๋ผ๋ ๋ณ๋์ ์ค์ ์ด Airflow ๋ด๋ถ์ ๋ฐ๋์ ๋ฑ๋ก๋์ด ์์ด์ผ ํ๋ค.
- ๋์ปค ์ปดํฌ์ฆ ํ๊ฒฝ๋ณ์์ SMTP ์ ๋ณด ๋ฃ์ผ๋ฉด Airflow ์๋ฒ ํ๊ฒฝ์๋ ๋ฐ์๋์ง๋ง,
- Airflow ๋ด๋ถ์์ conn_id='smtp_default'๋ผ๋ Connection ์ค์ ์ด ๋ฐ๋ก ์์ด์ผ ์ด๋ฉ์ผ ๋ณด๋ด๊ธฐ๊ฐ ๊ฐ๋ฅํ๋ค.
- ์ด Connection ์ค์ ์ Airflow UI(Admin > Connections) ์์ ์ง์ ๋ฑ๋กํ๊ฑฐ๋, CLI ๋๋ ์ฝ๋๋ก ์์ฑํ๋ฉด ๋๋ค.
์ ์ด๊ฒ ํ์ํ๊ฐ?
- Airflow EmailOperator๋ ๋ด๋ถ์์ smtp_default๋ผ๋ Connection ์ ๋ณด๋ฅผ ์ฐพ๋๋ค.
- ๊ทธ Connection์ด ์์ผ๋ฉด "Connection not found" ์ค๋ฅ๊ฐ ๋ฐ์
- ๋์ปค ์ปดํฌ์ฆ์ ๋ฃ๋ ํ๊ฒฝ๋ณ์๋ SMTP ์๋ฒ ๊ธฐ๋ณธ ์ ๋ณด(ํธ์คํธ, ํฌํธ, ์ฌ์ฉ์ ๋ฑ)๋ฅผ ์ ๊ณตํ๋ ์ญํ
- Airflow UI์ Connections ๋ฉ๋ด์์ smtp_default๋ฅผ ๋ฑ๋กํ๋ ๊ฑด Airflow๊ฐ ๋ด๋ถ์ ์ผ๋ก ๋ฉ์ผ ์๋ฒ ์ ์ ์ ๋ณด๋ฅผ ๊ด๋ฆฌํ๋ ๋ฐฉ๋ฒ
Slack ์ฐ๋
๋ฉ์ผ ๋ณด๋ผ๋ ์ฌ๋์ผ๋ก ๋ฉ์ผ๋ณด๋๋ค๊ณ ์๋ฆผ ์ถ๊ฐํด๋ณด์
from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta
with DAG(
dag_id='email_with_slack_example',
start_date=datetime(2025, 6, 29),
schedule=timedelta(minutes=10),
catchup=False,
tags=['email', 'slack'],
) as dag:
start = EmptyOperator(task_id='start')
send_email = EmailOperator(
task_id='send_email_task',
to='name@naver.com',
subject='Airflow Email Test',
html_content="""
<h3>๐ฌ This is a test email sent from Airflow DAG</h3>
<p>Hello! This email was sent automatically via EmailOperator.</p>
""",
from_email='name@gmail.com'
)
notify_slack = SlackWebhookOperator(
task_id='notify_slack_task',
slack_webhook_conn_id="slack_conn",
message="โ
Email successfully sent to name@gmail.com from Airflow!",
username='airflow',
channel="#airflow_practice",
)
end = EmptyOperator(task_id='end')
start >> send_email >> notify_slack >> end
docker compose run --rm airflow-cli connections add slack_conn \
--conn-type slackwebhook \
--conn-password 'https://hooks.slack.com/services/XXX/YYY/ZZZ'
Slack Webhook์ URL(=endpoint)์ Airflow ์ปจํ ์ด๋ ์์ Connection์ผ๋ก ๋ฑ๋กํ์ฌ DAG์์ ์ฌ๋์ผ๋ก ์๋ฆผ์ ๋ณด๋ผ ์ ์๋๋ก ํ๋ ๊ฒ
jinja
์ฐธ๊ณ : DAG์ ์ ๋ด์ฉ์ด ๊ธธ์ด์ง๋
# default_args ์์ด with ์์ ๋ค ๋ฃ๋ ๋ฐฉ์
with DAG(
'jinja_template_example',
owner='airflow',
start_date=datetime(2025, 6, 30),
retries=1,
retry_delay=timedelta(minutes=5),
schedule='@daily',
catchup=False,
tags=['example'],
) as dag:
# default_args ๋ฐ๋ก ๋นผ๋ ๋ฐฉ์
default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 6, 30),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'jinja_template_example',
default_args=default_args,
schedule='@daily',
catchup=False,
tags=['example'],
) as dag:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'start_date': datetime(2025, 6, 30),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'jinja_template_example',
default_args=default_args,
schedule='@daily',
catchup=False,
tags=['example'],
) as dag:
# BashOperator์์ ds ๋ณ์๋ฅผ ์ด์ฉํด ์ถ๋ ฅํ๊ธฐ
templated_command = """
echo "์ค๋ ๋ ์ง๋ {{ ds }} ์
๋๋ค."
echo "7์ผ ํ ๋ ์ง๋ {{ macros.ds_add(ds, 7) }}"
echo "ํ๋ผ๋ฏธํฐ๋ก ๋ฐ์ ๊ฐ: {{ params.my_param }}"
"""
run_bash = BashOperator(
task_id='print_dates',
bash_command=templated_command,
params={'my_param': 'Hello Airflow!'}
)
- {{ ds }} : Airflow ๋ด์ฅ ๋ณ์๋ก DAG ์คํ ๋ ์ง(yyyy-mm-dd) ๊ฐ ๋ค์ด๊ฐ
- {{ macros.ds_add(ds, 7) }} : ds ๋ ์ง์ 7์ผ์ ๋ํ ๋ ์ง๋ฅผ ์ถ๋ ฅ. macros๋ Airflow๊ฐ ์ ๊ณตํ๋ ์ฌ๋ฌ ์ ํธ ํจ์ ๋ชจ์
- {{ params.my_param }} : params ๋์ ๋๋ฆฌ์ ๋๊ธด ์ฌ์ฉ์ ์ง์ ํ๋ผ๋ฏธํฐ๋ฅผ ์ถ๋ ฅ
[2025-06-30, 11:16:24] INFO - Running command: ['/usr/bin/bash', '-c', '\n echo "์ค๋ ๋ ์ง๋ 2025-06-30 ์
๋๋ค."\n echo "7์ผ ํ ๋ ์ง๋ 2025-07-07"\n echo "ํ๋ผ๋ฏธํฐ๋ก ๋ฐ์ ๊ฐ: Hello Airflow!"\n ']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
- Airflow์์ Jinja ํ ํ๋ฆฟ์ด๋?
- Airflow๋ Python์ผ๋ก DAG๋ฅผ ์ ์ํ๋ค. -> DAG ์์์ ํ๋ผ๋ฏธํฐ, ๋ช ๋ น์ด, SQL ๋ฑ์ ๋ฌธ์์ด๋ก ์์ฑ
- ์ด ๋ฌธ์์ด์ ์คํ ์์ ์ ๋ ์ง๋ ๋ณ์์ ๋ง๊ฒ ๋ฐ๊พธ๋(๋์ ์ฒ๋ฆฌ) ๊ธฐ๋ฅ์ด ํ์
- ๊ทธ๋์ Airflow๋ Python์์ ๋๋ฆฌ ์ฐ์ด๋ ํ ํ๋ฆฟ ์์ง์ธ Jinja2๋ฅผ ์ฌ์ฉ