๊ณ„๋ž€์†Œ๋…„ 2025. 6. 29. 17:18

์ด์ „ ๊ธ€

https://koreatstm.tistory.com/268

 

Airflow

MLOps Machine Learning Operations๋จธ์‹ ๋Ÿฌ๋‹(ML) ๋ชจ๋ธ์„ ๋ฐฐํฌ, ๋ชจ๋‹ˆํ„ฐ๋ง, ์œ ์ง€ ๊ด€๋ฆฌํ•˜๋Š” ์ผ๋ จ์˜ ๊ณผ์ •์„ ์˜๋ฏธ๋ชฉํ‘œ: ML ๋ชจ๋ธ์˜ ๊ฐœ๋ฐœ ์ƒ๋ช… ์ฃผ๊ธฐ๋ฅผ ์ž๋™ํ™”ํ•˜๊ณ  ์ตœ์ ํ™”ํ•˜๋Š” ๊ฒƒ์ด ๋ชฉํ‘œํ•„์š”์„ฑ์›Œํฌํ”Œ๋กœ์šฐ ์ž๋™ํ™”:

koreatstm.tistory.com

์‹คํ–‰(Execution or Trigger)

  • DAG๋ฅผ ์‹ค์ œ๋กœ ์ฆ‰์‹œ ์‹คํ–‰ํ•˜๊ฑฐ๋‚˜, ์˜ˆ์•ฝ๋œ ์Šค์ผ€์ค„์— ๋”ฐ๋ผ ์‹คํ–‰๋˜๋Š” ํ–‰์œ„
  • ์ง์ ‘ UI์—์„œ ์ˆ˜๋™ ์‹คํ–‰(Trigger)ํ•  ์ˆ˜๋„ ์žˆ๊ณ , ์Šค์ผ€์ค„๋Ÿฌ์— ์˜ํ•ด ์ž๋™ ์‹คํ–‰๋  ์ˆ˜๋„ ์žˆ์Œ
  • DAG๊ฐ€ ์‹คํ–‰๋˜๋ฉด ํ•ด๋‹น DAG Run์ด ๋งŒ๋“ค์–ด์ง
  • ์‹คํ–‰์˜ ์ข…๋ฅ˜:  ์ˆ˜๋™ ์‹คํ–‰(ํŠธ๋ฆฌ๊ฑฐ), ์˜ˆ์•ฝ ์‹คํ–‰(์Šค์ผ€์ค„)

 

์Šค์ผ€์ค„(Schedule)

  • DAG๊ฐ€ ์ž๋™์œผ๋กœ ์–ธ์ œ ์‹คํ–‰๋ ์ง€ ์ฃผ๊ธฐ๋ฅผ ์„ค์ •ํ•˜๋Š” ๊ฒƒ
  • ์˜ˆ: ๋งค์ผ ์ž์ •์— ์‹คํ–‰, 5๋ถ„๋งˆ๋‹ค ์‹คํ–‰, ๋งค์ฃผ ์›”์š”์ผ ์‹คํ–‰ ๋“ฑ
  • Airflow ์Šค์ผ€์ค„๋Ÿฌ๊ฐ€ ์ด ์ฃผ๊ธฐ์— ๋”ฐ๋ผ DAG ์‹คํ–‰์„ ์˜ˆ์•ฝํ•˜๊ณ  ์ž๋™์œผ๋กœ ์‹คํ–‰์‹œํ‚ด

 

๊ถ๊ธˆํ•œ ๊ฒƒ!

  • Q: task1 >> task2 >> task3 ์ด๋ ‡๊ฒŒ task๊ฐ„ ์˜์กด ๊ด€๊ณ„ ์„ค์ • ์•ˆํ•˜๋ฉด ์ˆœ์„œ๊ฐ€ ์–ด๋–ป๊ฒŒ ๋ ๊นŒ?
  • ๋ช…์‹œ์ ์œผ๋กœ ์˜์กด ๊ด€๊ณ„๋ฅผ ์ง€์ •ํ•˜์ง€ ์•Š์œผ๋ฉด, Airflow๋Š” ํ•ด๋‹น ํƒœ์Šคํฌ๋“ค์„ ๋ณ‘๋ ฌ(๋…๋ฆฝ) ์ž‘์—…์œผ๋กœ ์ธ์‹ํ•˜๊ณ  ์‹คํ–‰ ์ˆœ์„œ๋ฅผ ์•Œ์•„์„œ ์ •ํ•œ๋‹ค.
  • ์ด ๊ฒฝ์šฐ ์‹คํ–‰ ์ˆœ์„œ๋Š” ์Šค์ผ€์ค„๋Ÿฌ์™€ ์›Œ์ปค์˜ ์ƒํƒœ, ๋ฆฌ์†Œ์Šค, ์šฐ์„ ์ˆœ์œ„ ๋“ฑ์— ๋”ฐ๋ผ ๋‹ฌ๋ผ์ง„๋‹ค. -> ์˜ˆ์ธก ๋ถˆ๊ฐ€๋Šฅํ•œ ์‹คํ–‰ ์ˆœ์„œ

 

BashOperator

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
'''
DAG ์ž์ฒด ์ •์˜
์ปจํ…์ŠคํŠธ ๋ธ”๋ก์œผ๋กœ, Task๋“ค์ด ์ด DAG์˜ ์ผ๋ถ€๋ผ๋Š” ๊ฒƒ์„ ๋ช…์‹œ
'''
with DAG(
    dag_id='every_10_seconds',
    start_date=datetime(2025, 6, 29),
    schedule=timedelta(seconds=10),
    catchup=False,
    max_active_runs=1,
    tags=['test'] #tag๋Š” ํ•„ํ„ฐ๋ง์„ ์œ„ํ•œ ๋ผ๋ฒจ

    
) as dag:
    

    task1 = BashOperator(
        task_id='print_time',
        bash_command='date "+%Y-%m-%d %H:%M:%S"'
    )

    task2 = BashOperator(
        task_id='say_hello',
        bash_command='echo "Hello Airflow!"'
    )

    task1 >> task2  # task1 ์‹คํ–‰ ํ›„ task2 ์‹คํ–‰
'''
DAG์— ์†ํ•œ Task ์ •์˜
DAG๋Š” ์—ฌ๋Ÿฌ Task๋กœ ๊ตฌ์„ฑ๋œ๋‹ค.
'''
  • catchup: Airflow๋Š” start_date๋ถ€ํ„ฐ ํ˜„์žฌ ์‹œ๊ฐ„๊นŒ์ง€์˜ ๋ชจ๋“  ์‹คํ–‰ ๋‚ ์งœ์— ๋Œ€ํ•ด DAG์„ ์‹คํ–‰ํ•˜๋ ค๊ณ  ํ•œ๋‹ค.
  • catchup=False : ๊ณผ๊ฑฐ ์‹คํ–‰์ด ๋ฐ€๋ ธ์„ ๋•Œ backlog๋ฅผ ๋”ฐ๋ผ์žก์ง€ ์•Š๊ณ  ์˜ค๋Š˜๋ถ€ํ„ฐ ์‹คํ–‰(์˜ค๋Š˜ ์ดํ›„๋งŒ ์‹คํ–‰ํ•˜๊ณ , ๊ณผ๊ฑฐ ๋‚ ์งœ๋Š” ๋ฌด์‹œํ•œ๋‹ค.)

 

PythonOperator

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# Python ํ•จ์ˆ˜ ์ •์˜
def my_python_function():
    print("๐Ÿ‘‹ Hello from PythonOperator!")

# DAG ์ •์˜
with DAG(
    dag_id='every_10_seconds_with_python',
    start_date=datetime(2025, 6, 29),
    schedule=timedelta(seconds=10),
    catchup=False,
    max_active_runs=1,
    tags=['test']
) as dag:

    # BashOperator Task
    print_time = BashOperator(
        task_id='print_time',
        bash_command='date "+%Y-%m-%d %H:%M:%S"'
    )

    # PythonOperator Task
    python_hello = PythonOperator(
        task_id='python_hello',
        python_callable=my_python_function
    )

    # Task ์˜์กด์„ฑ ์„ค์ •
    print_time >> python_hello

 

  • DAG ์‹คํ–‰ ์ฃผ๊ธฐ: 10์ดˆ๋งˆ๋‹ค ํ•œ ๋ฒˆ์”ฉ ์‹คํ–‰
  • Task ๊ตฌ์„ฑ
    1. print_time (BashOperator) → ํ˜„์žฌ ์‹œ๊ฐ„์„ ์ถœ๋ ฅ (date "+%Y-%m-%d %H:%M:%S")
    2. python_hello (PythonOperator) → my_python_function() ํ•จ์ˆ˜ ์‹คํ–‰ (ํ”„๋ฆฐํŠธ ๋ฉ”์‹œ์ง€ ์ถœ๋ ฅ)
  • ์˜์กด์„ฑ: print_time >> python_hello
  • ์ฆ‰, print_time์ด ์„ฑ๊ณต์ ์œผ๋กœ ๋๋‚˜์•ผ๋งŒ python_hello๊ฐ€ ์‹คํ–‰๋จ

 

TaskGroup

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta

with DAG(
    dag_id='taskgroup_example',
    start_date=datetime(2025, 6, 29),
    schedule=timedelta(seconds=30),
    catchup=False,
    max_active_runs=1,
    tags=['group', 'example']
) as dag:

    start = BashOperator(
        task_id='start',
        bash_command='echo "Start DAG Execution"'
    )

    with TaskGroup("group1", tooltip="First Task Group") as group1:
        t1 = BashOperator(
            task_id='task_1_1',
            bash_command='echo "Group1 - Task 1"'
        )

        t2 = BashOperator(
            task_id='task_1_2',
            bash_command='echo "Group1 - Task 2"'
        )

    with TaskGroup("group2", tooltip="Second Task Group") as group2:
        t3 = BashOperator(
            task_id='task_2_1',
            bash_command='echo "Group2 - Task 1"'
        )

        t4 = BashOperator(
            task_id='task_2_2',
            bash_command='echo "Group2 - Task 2"'
        )

    end = BashOperator(
        task_id='end',
        bash_command='echo "End DAG Execution"'
    )

    # DAG ์‹คํ–‰ ํ๋ฆ„ ์ •์˜
    start >> group1 >> group2 >> end

 

  • TaskGroup์€ ๋‹จ์ˆœํžˆ Task๋“ค์„ ๊ทธ๋ฃน์œผ๋กœ ๋ฌถ๋Š” ์ปจํ…Œ์ด๋„ˆ์ด๋‹ค.
  • ๋‚ด๋ถ€ Task๋“ค์€ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰๋จ. (๋ณ„๋„ ์˜์กด๊ด€๊ณ„ ์—†์„ ๊ฒฝ์šฐ)
  • ์ฆ‰, >>์™€ ๋‹ฌ๋ฆฌ ๊ทธ๋ฃน ๋‚ด์˜ task๋“ค์€ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰๋œ๋‹ค. ๋‹ค์‹œ๋งํ•ด, ๊ทธ๋ฃน๋‚ด์˜ task๊ฐ€ ์•ˆ๋˜๋”๋ผ๋„, ๊ทธ๊ฒƒ๊ณผ ์ƒ๊ด€์—†์ด task๋Š” ์ง„ํ–‰๋œ๋‹ค.
  • TaskGroup์€ ๋‹จ์ˆœํžˆ ๋…ผ๋ฆฌ์ ์ธ ๊ทธ๋ฃนํ™”๋ฅผ ์ œ๊ณตํ•˜์ง€๋งŒ, ์‹ค์ œ๋กœ ๊ทธ๋ฃน ๋‚ด์˜ Task๋“ค์€ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰๋œ๋‹ค.

 

XCom

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def push_function(**kwargs):
    # XCom์— ๊ฐ’ ๋ณด๋‚ด๊ธฐ (push)
    ti = kwargs['ti']
    ti.xcom_push(key='sample_key', value='Hello from push_function!')


def pull_function(**kwargs):
    # XCom์—์„œ ๊ฐ’ ๊ฐ€์ ธ์˜ค๊ธฐ (pull)
    ti = kwargs['ti']
    pulled_value = ti.xcom_pull(key='sample_key', task_ids='push_task')
    print(f"Pulled value from XCom: {pulled_value}")

with DAG(
    dag_id='xcom_example',
    start_date=datetime(2025, 6, 29),
    schedule=timedelta(seconds=30),
    catchup=False
) as dag:

    push_task = PythonOperator(
        task_id='push_task',
        python_callable=push_function,
    )

    pull_task = PythonOperator(
        task_id='pull_task',
        python_callable=pull_function,
    )

    push_task >> pull_task

 

 

  • kwargs๋Š” Airflow๊ฐ€ ์ž๋™์œผ๋กœ ์ „๋‹ฌํ•˜๋Š” ์ปจํ…์ŠคํŠธ ์ •๋ณด ๋‹ด์Œ
  • ์ด์ค‘์—์„œ ti(TaskInstance๊ฐ์ฒด)๋ฅผ ์‚ฌ์šฉ
  • Airflow ๋‚ด๋ถ€ ์ž„์‹œ ์ €์žฅ์†Œ XCom์— ๋ฌธ์ž์—ด ์ €์žฅ ๋จ

 

๋งจ ๋ฐ‘ ์ค„์— print๋ฌธ ํ™•์ธ ๊ฐ€๋Šฅ

 

Branch

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.empty import EmptyOperator 
from datetime import datetime, timedelta
import pendulum

# ๋ถ„๊ธฐ ํ•จ์ˆ˜
def decide_what_to_do():
    today = pendulum.now("Asia/Seoul").day_of_week  # 0: Monday ~ 6: Sunday
    if today == 6:  # Sunday
        return 'take_rest'
    else:
        return 'work_hard'

with DAG(
    dag_id='branching_example',
    start_date=datetime(2025, 6, 29),
    schedule=timedelta(minutes=1),
    catchup=False,
    max_active_runs=1, # ํ•ด๋‹น DAG ์ธ์Šคํ„ด์Šค๋Š” ํ•˜๋‚˜์”ฉ๋งŒ ์‹คํ–‰ ๋จ
    tags=['branch', 'example'],
) as dag:

    start = EmptyOperator(task_id='start')

    branch = BranchPythonOperator(
        task_id='branch_decision',
        python_callable=decide_what_to_do,
    )

    rest = EmptyOperator(task_id='take_rest')
    work = EmptyOperator(task_id='work_hard')

    # trigger_rule='none_failed_min_one_success: ์ด์ „ ์ž‘์—…๋“ค ์ค‘ ํ•˜๋‚˜๋ผ๋„ ์„ฑ๊ณตํ•˜๋ฉด ์‹คํ–‰ ๋จ
    end = EmptyOperator(task_id='end', trigger_rule='none_failed_min_one_success') 

    # DAG ํ๋ฆ„ ์ •์˜
    start >> branch
    branch >> [rest, work] >> end

์˜ค๋Š˜์ด ์ผ์š”์ผ์ด๋ฏ€๋กœ take_rest๋กœ ๋ถ„๊ธฐ ๋จ

 

TaskGroup ์ค‘์ฒฉ

 

SubDAG

  • ๊ธฐ์กด DAG ์•ˆ์— ๋˜ ๋‹ค๋ฅธ DAG๋ฅผ ๋„ฃ๋Š” ๋ฐฉ์‹
  • ๋…๋ฆฝ์ ์ธ DAG๋ฅผ ํ•˜๋‚˜์˜ Task์ฒ˜๋Ÿผ ์‚ฌ์šฉํ•˜๋Š” ๊ฐœ๋…
  • ๋‹ค๋งŒ Airflow์—์„œ๋Š” SubDAG์€ ๋ณต์žก์„ฑ ๋•Œ๋ฌธ์— ๊ถŒ์žฅํ•˜์ง€ ์•Š๋Š” ์ถ”์„ธ์ด๊ณ , ๋Œ€์‹  TaskGroup์„ ์ฃผ๋กœ ์‚ฌ์šฉ

 

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime, timedelta

with DAG(
    dag_id='nested_taskgroup_example',
    start_date=datetime(2025, 6, 29),
    schedule=timedelta(minutes=5),
    catchup=False,
    max_active_runs=1,
    tags=['example', 'taskgroup'],
) as dag:

    start = EmptyOperator(task_id='start')

    # ์ตœ์ƒ์œ„ TaskGroup
    with TaskGroup('group1') as group1:
        
        task_a = EmptyOperator(task_id='task_a')
        task_b = EmptyOperator(task_id='task_b')

        # ์ค‘์ฒฉ๋œ TaskGroup
        with TaskGroup('subgroup1') as subgroup1:
            task_c = EmptyOperator(task_id='task_c')
            task_d = EmptyOperator(task_id='task_d')

        # ๊ทธ๋ฃน ๋‚ด๋ถ€์—์„œ subgroup1์„ ํฌํ•จํ•˜์—ฌ ์ž‘์—… ํ๋ฆ„ ์ง€์ • ๊ฐ€๋Šฅ
        task_a >> subgroup1 >> task_b

    end = EmptyOperator(task_id='end')

    start >> group1 >> end

 

  • DAG ์‹œ์ž‘ → start task ์‹คํ–‰
  • group1 TaskGroup ์‹คํ–‰
  • task_a ์‹คํ–‰ ํ›„, ์ค‘์ฒฉ TaskGroup์ธ subgroup1 ์‹คํ–‰ (task_c์™€ task_d ํฌํ•จ)
  • ๋‹ค์‹œ task_b ์‹คํ–‰
  • group1 ์ข…๋ฃŒ ํ›„ end task ์‹คํ–‰

 

 

Sensor

 

Sensor๋Š” Airflow์—์„œ ์–ด๋–ค ์กฐ๊ฑด์ด ์ถฉ์กฑ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฌ๋Š” Task

  • ํŒŒ์ผ์ด ๋„์ฐฉํ•  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฌ๊ธฐ → FileSensor
  • API ์‘๋‹ต์ด ํŠน์ • ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•  ๋•Œ๊นŒ์ง€ → HttpSensor
  • ์™ธ๋ถ€ ํ…Œ์ด๋ธ”์ด ์ƒ๊ธธ ๋•Œ๊นŒ์ง€ → ExternalTaskSensor, SqlSensor

 

์ž‘๋™๋ฐฉ์‹

๋ชจ๋“œ ์„ค๋ช…
poke ๊ณ„์† ์‹คํ–‰ ์ค‘์ธ ์ƒํƒœ๋กœ ๋Œ€๊ธฐํ•˜๋ฉฐ ์ผ์ • ์ฃผ๊ธฐ๋งˆ๋‹ค ์กฐ๊ฑด์„ ์ฒดํฌ
reschedule ์‹คํ–‰ ์ค‘์ด ์•„๋‹ˆ๋ผ ์‰ฌ์—ˆ๋‹ค๊ฐ€ ๋‹ค์‹œ ์‹คํ–‰ (๋”ฐ๋ผ์„œ, ์ž์›์„ ์ ๊ฒŒ ์”€)
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta

with DAG(
    dag_id='sensor_example',
    start_date=datetime(2025, 6, 29),
    schedule=timedelta(minutes=5),
    catchup=False,
    max_active_runs=1,
    tags=['sensor', 'example'],
) as dag:

    start = EmptyOperator(task_id='start')

    wait_for_file = FileSensor(
        task_id='wait_for_file',
        filepath='/opt/airflow/dags/input_file.txt',  
        poke_interval=10,     # 10์ดˆ๋งˆ๋‹ค ํ™•์ธ
        timeout=60,           # 60์ดˆ ๋™์•ˆ ๊ธฐ๋‹ค๋ฆผ (์—†์œผ๋ฉด ์‹คํŒจ)
        mode='poke',          # poke ๋ชจ๋“œ: ์ฃผ๊ธฐ์ ์œผ๋กœ ํ™•์ธ
    )

    done = EmptyOperator(task_id='done')

    start >> wait_for_file >> done

์ตœ์ดˆ ์‹คํ–‰์‹œ์—๋Š” ํ•ด๋‹น ํŒŒ์ผ์ด ์—†๊ธฐ์— ์—๋Ÿฌ๊ฐ€ ๋œฌ๋‹ค. ํ„ฐ๋ฏธ๋„์—์„œ ์•„๋ž˜ ๋ช…๋ น์–ด๋กœ file์„ ๋งŒ๋“ค๊ณ , triggerํ•ด๋ณด๋ฉด

 echo "test" > ./dags/input_file.txt

 

TriggerDagRunOperator 

 

ํ˜„์žฌ DAG์—์„œ ๋‹ค๋ฅธ DAG์„ ์‹คํ–‰(trigger)

  1. dags/ ํด๋”์— ๋‘ ํŒŒ์ผ ์ €์žฅ
  2. Airflow UI์—์„œ parent_dag ์‹คํ–‰
  3. child_dag์ด ์ž๋™์œผ๋กœ ํŠธ๋ฆฌ๊ฑฐ๋˜๋Š”์ง€ ํ™•์ธ

 

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from datetime import datetime

with DAG(
    dag_id='child_dag',
    start_date=datetime(2025, 6, 29),
    schedule=None,
    catchup=False,
    tags=['TriggerDagRunOperator'],
) as dag:
    start = EmptyOperator(task_id='child_start')
    end = EmptyOperator(task_id='child_end')

    start >> end
# 8-2
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='parent_dag',
    start_date=datetime(2025, 6, 29),
    schedule=timedelta(minutes=5),
    catchup=False,
    tags=['TriggerDagRunOperator'],
) as dag:
    start = EmptyOperator(task_id='start')

    trigger_child = TriggerDagRunOperator(
        task_id='trigger_child_dag',
        trigger_dag_id='child_dag',
    )

    end = EmptyOperator(task_id='end')

    start >> trigger_child >> end

child_dag๋ฅผ ํ™œ์„ฑํ™”ํ•ด ๋†“๊ณ  parent_dag๋ฅผ ํ™œ์„ฑํ™”ํ•˜๋ฉด, child_dag๋ฅผ ๋ณ„๋„๋กœ ์Šค์ผ€์ค„ํ•˜์ง€ ์•Š์•„๋„ parent_dag์—์„œ TriggerDagRunOperator๋กœ ํŠธ๋ฆฌ๊ฑฐ๋˜์–ด ์‹คํ–‰๋˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

Variable

 

Airflow์˜ ์ „์—ญ ์„ค์ •๊ฐ’ ์ €์žฅ์†Œ์ธ Variable์„ ์‚ฌ์šฉํ•ด์„œ DAG ์•ˆ์—์„œ ๋™์ ์œผ๋กœ ๊ฐ’์„ ๊ฐ€์ ธ์˜ค๊ธฐ

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable
from datetime import datetime, timedelta

def print_variable():
    filename = Variable.get("filename", default_var="default.txt")  # key=filename ์—†์œผ๋ฉด default.txt ๊ธฐ๋ณธ๊ฐ’
    print(f"๐Ÿ“‚ Airflow Variable 'filename' is: {filename}")

with DAG(
    dag_id='variable_example',
    start_date=datetime(2025, 6, 29),
    schedule=timedelta(minutes=10),
    catchup=False,
    tags=['variable'],
) as dag:

    task = PythonOperator(
        task_id='print_airflow_variable',
        python_callable=print_variable,
    )

 

  1. Airflow UI ์ ‘์† → Admin > Variables ํด๋ฆญ
  2. Key = filename, Value = input_file.txt ์ถ”๊ฐ€
  3. DAG ์‹คํ–‰ → ๋กœ๊ทธ์—์„œ filename์ด ์ถœ๋ ฅ๋จ

 

 

varialbe์ด ์žˆ๋Š” ๊ฒฝ์šฐ -> input_file.txt

Log message source details: sources=["/opt/airflow/logs/dag_id=variable_example/run_id=scheduled__2025-06-29T09:53:05.436680+00:00/task_id=print_airflow_variable/attempt=1.log"]
[2025-06-29, 18:53:15] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-29, 18:53:15] INFO - Filling up the DagBag from /opt/airflow/dags/variable_example.py: source="airflow.models.dagbag.DagBag"
[2025-06-29, 18:53:15] WARNING - /home/airflow/.local/lib/python3.12/site-packages/airflow/models/variable.py:147: DeprecationWarning: Using Variable.get from `airflow.models` is deprecated.Please use `from airflow.sdk import Variable` instead
  warnings.warn(
: source="py.warnings"
[2025-06-29, 18:53:15] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-06-29, 18:53:15] INFO - ๐Ÿ“‚ Airflow Variable 'filename' is: input_file.txt: chan="stdout": source="task"

 

varialbe์ด ์—†๋Š” ๊ฒฝ์šฐ -> default์ธ  default.txt

Log message source details: sources=["/opt/airflow/logs/dag_id=variable_example/run_id=manual__2025-06-29T09:53:54.881342+00:00/task_id=print_airflow_variable/attempt=1.log"]
[2025-06-29, 18:53:55] INFO - DAG bundles loaded: dags-folder: source="airflow.dag_processing.bundles.manager.DagBundlesManager"
[2025-06-29, 18:53:55] INFO - Filling up the DagBag from /opt/airflow/dags/variable_example.py: source="airflow.models.dagbag.DagBag"
[2025-06-29, 18:53:55] WARNING - /home/airflow/.local/lib/python3.12/site-packages/airflow/models/variable.py:147: DeprecationWarning: Using Variable.get from `airflow.models` is deprecated.Please use `from airflow.sdk import Variable` instead
  warnings.warn(
: source="py.warnings"
[2025-06-29, 18:53:55] INFO - Done. Returned value was: None: source="airflow.task.operators.airflow.providers.standard.operators.python.PythonOperator"
[2025-06-29, 18:53:55] INFO - ๐Ÿ“‚ Airflow Variable 'filename' is: default.txt: chan="stdout": source="task"

 

๋ฉ”์ผ๋ณด๋‚ด๊ธฐ

 

๋จผ์ € ๋ฉ”์ผ ์ž์ฒด๋ฅผ ํ™•์ธ ํ•ด๋ณด์ž

import smtplib

smtp_host = 'smtp.gmail.com'
smtp_port = 587
username = 'name@gmail.com'       # ๋ณธ์ธ์˜ Gmail ์ฃผ์†Œ
password = '16์ž๋ฆฌ ๋น„๋ฐ€๋ฒˆํ˜ธ'    # Gmail ์•ฑ ๋น„๋ฐ€๋ฒˆํ˜ธ 

server = smtplib.SMTP(smtp_host, smtp_port)
server.starttls()
server.login(username, password)

from email.mime.text import MIMEText

msg = MIMEText('This is a test email from Python SMTP')
msg['Subject'] = 'Test Email'
msg['From'] = username
msg['To'] = 'name@naver.com'   # ๋ฐ›๋Š” ์‚ฌ๋žŒ ์ด๋ฉ”์ผ ์ฃผ์†Œ

server.sendmail(username, ['name@naver.com'], msg.as_string())
server.quit()
print("Email sent successfully!")

๋ฉ”์ผ ์ž˜ ์˜จ๋‹ค.

 

์ด์ œ DAG๋กœ ์ง„ํ–‰ํ•ด๋ณด์ž

from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.empty import EmptyOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='email_example',
    start_date=datetime(2025, 6, 29),
    schedule=timedelta(minutes=10),  
    catchup=False,
    tags=['email'],
) as dag:

    start = EmptyOperator(task_id='start')

    send_email = EmailOperator(
        task_id='send_email_task',
        to='name@naver.com',
        subject='Airflow Email Test',
        html_content="""
        <h3>๐Ÿ“ฌ This is a test email sent from Airflow DAG</h3>
        <p>Hello! This email was sent automatically via EmailOperator.</p>
        """,
        from_email='name@gmail.com'
    )

    end = EmptyOperator(task_id='end')

    start >> send_email >> end
    # SMTP
    AIRFLOW__SMTP__SMTP_HOST: smtp.gmail.com
    AIRFLOW__SMTP__SMTP_PORT: 587
    AIRFLOW__SMTP__SMTP_STARTTLS: 'True'
    AIRFLOW__SMTP__SMTP_SSL: 'False'
    AIRFLOW__SMTP__SMTP_USER: name@gmail.com
    AIRFLOW__SMTP__SMTP_PASSWORD: <16์ž๋ฆฌ ๋น„๋ฐ€๋ฒˆํ˜ธ>
    AIRFLOW__SMTP__SMTP_MAIL_FROM: name@gmail.com


๋„์ปค ์ปดํฌ์ฆˆ ํ™˜๊ฒฝ๋ณ€์ˆ˜์— SMTP ์„ค์ •์„ ๋„ฃ์—ˆ๋Š”๋ฐ๋„ ์˜ค๋ฅ˜๊ฐ€ ๋‚˜๋Š” ์ด์œ ๋Š”, Airflow EmailOperator๊ฐ€ ์‹ค์ œ๋กœ ๋ฉ”์ผ์„ ๋ณด๋‚ด๊ธฐ ์œ„ํ•ด์„œ๋Š” ‘Connection’์ด๋ผ๋Š” ๋ณ„๋„์˜ ์„ค์ •์ด Airflow ๋‚ด๋ถ€์— ๋ฐ˜๋“œ์‹œ ๋“ฑ๋ก๋˜์–ด ์žˆ์–ด์•ผ ํ•œ๋‹ค. 

  • ๋„์ปค ์ปดํฌ์ฆˆ ํ™˜๊ฒฝ๋ณ€์ˆ˜์— SMTP ์ •๋ณด ๋„ฃ์œผ๋ฉด Airflow ์„œ๋ฒ„ ํ™˜๊ฒฝ์—๋Š” ๋ฐ˜์˜๋˜์ง€๋งŒ,
  • Airflow ๋‚ด๋ถ€์—์„œ conn_id='smtp_default'๋ผ๋Š” Connection ์„ค์ •์ด ๋”ฐ๋กœ ์žˆ์–ด์•ผ ์ด๋ฉ”์ผ ๋ณด๋‚ด๊ธฐ๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค.
  • ์ด Connection ์„ค์ •์€ Airflow UI(Admin > Connections) ์—์„œ ์ง์ ‘ ๋“ฑ๋กํ•˜๊ฑฐ๋‚˜, CLI ๋˜๋Š” ์ฝ”๋“œ๋กœ ์ƒ์„ฑํ•˜๋ฉด ๋œ๋‹ค.

 

์™œ ์ด๊ฒŒ ํ•„์š”ํ•œ๊ฐ€?

  • Airflow EmailOperator๋Š” ๋‚ด๋ถ€์—์„œ smtp_default๋ผ๋Š” Connection ์ •๋ณด๋ฅผ ์ฐพ๋Š”๋‹ค.
  • ๊ทธ Connection์ด ์—†์œผ๋ฉด "Connection not found" ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒ
  • ๋„์ปค ์ปดํฌ์ฆˆ์— ๋„ฃ๋Š” ํ™˜๊ฒฝ๋ณ€์ˆ˜๋Š” SMTP ์„œ๋ฒ„ ๊ธฐ๋ณธ ์ •๋ณด(ํ˜ธ์ŠคํŠธ, ํฌํŠธ, ์‚ฌ์šฉ์ž ๋“ฑ)๋ฅผ ์ œ๊ณตํ•˜๋Š” ์—ญํ• 
  • Airflow UI์˜ Connections ๋ฉ”๋‰ด์—์„œ smtp_default๋ฅผ ๋“ฑ๋กํ•˜๋Š” ๊ฑด Airflow๊ฐ€ ๋‚ด๋ถ€์ ์œผ๋กœ ๋ฉ”์ผ ์„œ๋ฒ„ ์ ‘์† ์ •๋ณด๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•

 

Airflow ๋‚ด๋ถ€์ ์œผ๋กœ EmailOperator๊ฐ€ SMTP ์„œ๋ฒ„ ์„ค์ •์„ smtp_default๋ผ๋Š”  connection ID๋กœ  ์ฐธ์กฐํ•˜๊ธฐ  ๋•Œ๋ฌธ
์„ฑ๊ณต!!

 

Slack ์—ฐ๋™

 

๋ฉ”์ผ ๋ณด๋‚ผ๋•Œ ์Šฌ๋ž™์œผ๋กœ ๋ฉ”์ผ๋ณด๋ƒˆ๋‹ค๊ณ  ์•Œ๋ฆผ ์ถ”๊ฐ€ํ•ด๋ณด์ž

from airflow import DAG
from airflow.operators.email import EmailOperator
from airflow.operators.empty import EmptyOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime, timedelta

with DAG(
    dag_id='email_with_slack_example',
    start_date=datetime(2025, 6, 29),
    schedule=timedelta(minutes=10),
    catchup=False,
    tags=['email', 'slack'],
) as dag:

    start = EmptyOperator(task_id='start')

    send_email = EmailOperator(
        task_id='send_email_task',
        to='name@naver.com',
        subject='Airflow Email Test',
        html_content="""
        <h3>๐Ÿ“ฌ This is a test email sent from Airflow DAG</h3>
        <p>Hello! This email was sent automatically via EmailOperator.</p>
        """,
        from_email='name@gmail.com'
    )

    notify_slack = SlackWebhookOperator(
        task_id='notify_slack_task',
        slack_webhook_conn_id="slack_conn",  
        message="โœ… Email successfully sent to name@gmail.com from Airflow!",
        username='airflow',
        channel="#airflow_practice",  

    )

    end = EmptyOperator(task_id='end')

    start >> send_email >> notify_slack >> end
docker compose run --rm airflow-cli connections add slack_conn \
  --conn-type slackwebhook \
  --conn-password 'https://hooks.slack.com/services/XXX/YYY/ZZZ'

 

Slack Webhook์˜ URL(=endpoint)์„ Airflow ์ปจํ…Œ์ด๋„ˆ ์•ˆ์— Connection์œผ๋กœ ๋“ฑ๋กํ•˜์—ฌ DAG์—์„œ ์Šฌ๋ž™์œผ๋กœ ์•Œ๋ฆผ์„ ๋ณด๋‚ผ ์ˆ˜ ์žˆ๋„๋ก ํ•˜๋Š” ๊ฒƒ

์„ฑ๊ณต์ ์œผ๋กœ ๋ฉ”์ผ์ด ์˜จ๋‹ค.

 

jinja

 

์ฐธ๊ณ : DAG์ •์˜ ๋‚ด์šฉ์ด ๊ธธ์–ด์งˆ๋•Œ

# default_args ์—†์ด with ์•ˆ์— ๋‹ค ๋„ฃ๋Š” ๋ฐฉ์‹
with DAG(
    'jinja_template_example',
    owner='airflow',
    start_date=datetime(2025, 6, 30),
    retries=1,
    retry_delay=timedelta(minutes=5),
    schedule='@daily',
    catchup=False,
    tags=['example'],
) as dag:

# default_args ๋”ฐ๋กœ ๋นผ๋Š” ๋ฐฉ์‹
default_args = {
    'owner': 'airflow',
    'start_date': datetime(2025, 6, 30),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'jinja_template_example',
    default_args=default_args,
    schedule='@daily',
    catchup=False,
    tags=['example'],
) as dag:

 

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'start_date': datetime(2025, 6, 30),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'jinja_template_example',
    default_args=default_args,
    schedule='@daily',
    catchup=False,
    tags=['example'],
) as dag:

    # BashOperator์—์„œ ds ๋ณ€์ˆ˜๋ฅผ ์ด์šฉํ•ด ์ถœ๋ ฅํ•˜๊ธฐ
    templated_command = """
    echo "์˜ค๋Š˜ ๋‚ ์งœ๋Š” {{ ds }} ์ž…๋‹ˆ๋‹ค."
    echo "7์ผ ํ›„ ๋‚ ์งœ๋Š” {{ macros.ds_add(ds, 7) }}"
    echo "ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋ฐ›์€ ๊ฐ’: {{ params.my_param }}"
    """

    run_bash = BashOperator(
        task_id='print_dates',
        bash_command=templated_command,
        params={'my_param': 'Hello Airflow!'}
    )

 

  • {{ ds }} : Airflow ๋‚ด์žฅ ๋ณ€์ˆ˜๋กœ DAG ์‹คํ–‰ ๋‚ ์งœ(yyyy-mm-dd) ๊ฐ€ ๋“ค์–ด๊ฐ
  • {{ macros.ds_add(ds, 7) }} : ds ๋‚ ์งœ์— 7์ผ์„ ๋”ํ•œ ๋‚ ์งœ๋ฅผ ์ถœ๋ ฅ. macros๋Š” Airflow๊ฐ€ ์ œ๊ณตํ•˜๋Š” ์—ฌ๋Ÿฌ ์œ ํ‹ธ ํ•จ์ˆ˜ ๋ชจ์Œ
  • {{ params.my_param }} : params ๋”•์…”๋„ˆ๋ฆฌ์— ๋„˜๊ธด ์‚ฌ์šฉ์ž ์ง€์ • ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์ถœ๋ ฅ
[2025-06-30, 11:16:24] INFO - Running command: ['/usr/bin/bash', '-c', '\n    echo "์˜ค๋Š˜ ๋‚ ์งœ๋Š” 2025-06-30 ์ž…๋‹ˆ๋‹ค."\n    echo "7์ผ ํ›„ ๋‚ ์งœ๋Š” 2025-07-07"\n    echo "ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋ฐ›์€ ๊ฐ’: Hello Airflow!"\n    ']: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"

 

  • Airflow์—์„œ Jinja ํ…œํ”Œ๋ฆฟ์ด๋ž€?
  • Airflow๋Š” Python์œผ๋กœ DAG๋ฅผ ์ •์˜ํ•œ๋‹ค. -> DAG ์•ˆ์—์„œ ํŒŒ๋ผ๋ฏธํ„ฐ, ๋ช…๋ น์–ด, SQL ๋“ฑ์„ ๋ฌธ์ž์—ด๋กœ ์ž‘์„ฑ
  • ์ด ๋ฌธ์ž์—ด์„ ์‹คํ–‰ ์‹œ์ ์— ๋‚ ์งœ๋‚˜ ๋ณ€์ˆ˜์— ๋งž๊ฒŒ ๋ฐ”๊พธ๋Š”(๋™์  ์ฒ˜๋ฆฌ) ๊ธฐ๋Šฅ์ด ํ•„์š”
  • ๊ทธ๋ž˜์„œ Airflow๋Š” Python์—์„œ ๋„๋ฆฌ ์“ฐ์ด๋Š” ํ…œํ”Œ๋ฆฟ ์—”์ง„์ธ Jinja2๋ฅผ ์‚ฌ์šฉ