MLOps
- Machine Learning Operations
- ๋จธ์ ๋ฌ๋(ML) ๋ชจ๋ธ์ ๋ฐฐํฌ, ๋ชจ๋ํฐ๋ง, ์ ์ง ๊ด๋ฆฌํ๋ ์ผ๋ จ์ ๊ณผ์ ์ ์๋ฏธ
- ๋ชฉํ: ML ๋ชจ๋ธ์ ๊ฐ๋ฐ ์๋ช ์ฃผ๊ธฐ๋ฅผ ์๋ํํ๊ณ ์ต์ ํํ๋ ๊ฒ์ด ๋ชฉํ
- ํ์์ฑ
- ์ํฌํ๋ก์ฐ ์๋ํ: ๋ชจ๋ธ ๊ฐ๋ฐ, ํ์ต, ๋ฐฐํฌ, ๋ชจ๋ํฐ๋ง ๋ฑ ์ ์ฒด ํ์ดํ๋ผ์ธ ์๋ํ
- ์์ ์ฑ: ๋ชจ๋ธ์ ํ๋ก๋์ ํ๊ฒฝ์ ์์ ์ ์ผ๋ก ๋ฐฐํฌํ๊ณ , ์ง์์ ์ธ ๋ชจ๋ํฐ๋ง์ ํตํด ์ด์ ์ํฉ ๊ฐ์ง ๋ฐ ๋์
- ํ์ฅ์ฑ: ๋ฐ์ดํฐ์ ํธ๋ํฝ์ ๋ง๊ฒ ์ธํ๋ผ ํ์ฅ
- ๋ชจ๋ํฐ๋ง: ์ค์๊ฐ ์ฑ๋ฅ ์งํ, ๋ก๊ทธ, ์๋ฆผ ํตํด ์ฑ๋ฅ ์ ํ, ์ค๋ฅ ๋ชจ๋ํฐ๋ง
- ์ฅ์
- ๋น ๋ฅธ ๋ฐฐํฌ: ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ์ ๋ฐฐํฌ ๊ณผ์ ์ ์๋ํ
- ํ์ ํจ์จ์ฑ ํฅ์: ๋ฐ์ดํฐ ๊ณผํ์, ๊ฐ๋ฐ์, ์ด์ํ์ ํ์
- ๋ชจ๋ธ ์ฑ๋ฅ ํฅ์: ๋ชจ๋ํฐ๋ง&ํ์ ์ ์ ๋ฐ์ดํธ
- vs DevOps
- ๊ณตํต์ : ๋ ๋ค ์ํํธ์จ์ด ๊ฐ๋ฐ๊ณผ ์ด์์ ์๋ํํ๊ณ ํจ์จ์ ์ผ๋ก ๊ด๋ฆฌ
- ์ฐจ์ด์
- DevOps: ์ผ๋ฐ์ ์ธ ์ํํธ์จ์ด ๊ฐ๋ฐ ๋ฐ ์ด์์ ์๋ํํ์ฌ CI/CD ํ์ดํ๋ผ์ธ์ ๊ตฌ์ถ
- MLOps: ML ๋ชจ๋ธ์ ์ง์์ ์ธ ํ์ต, ๋ฐฐํฌ ์๋ํ, ๋ฐ์ดํฐ ๋ฐ ๋ชจ๋ธ ๊ด๋ฆฌ
- MLOps ๋๊ตฌ
- ์คํ ๊ด๋ฆฌ: MLflow, Weights & Biases
- ๋ชจ๋ธ ๋ฐฐํฌ: Docker, Kubernetes, FastAPI, TensorFlow Serving
- ๋ชจ๋ธ ๋ชจ๋ํฐ๋ง: Prometheus, Grafana
- CI/CD ํ์ดํ๋ผ์ธ: Jenkins, GitHub Actions, GitLab CI/CD
- ๋ฐ์ดํฐ ๋ฐ ๋ชจ๋ธ ๋ฒ์ ๊ด๋ฆฌ: DVC, MLflow, Pachyderm
MLOps์ ์๋ช ์ฃผ๊ธฐ
- ๋ชจ๋ธ ๊ฐ๋ฐ
- ์ฒซ ๋ฒ์งธ ๋จ๊ณ๋ ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ์ ๊ฐ๋ฐ
- ๋ฐ์ดํฐ ์์ง, ๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ, ๋ชจ๋ธ ํ์ต
- ๋ชจ๋ธ ๋ฐฐํฌ
- ๋ชจ๋ธ ์์ฑ ํ, ์๋ก์ด ๋ฐ์ดํฐ์ ๋ํด ์์ธก์ ์ํํ ์ ์๋๋ก ๋ฐฐํฌ
- ์๋ํ ์ฌ์ฉ
- ๋ชจ๋ํฐ๋ง
- ์ง์์ ์ผ๋ก ๋ชจ๋ํฐ๋ง์ ํตํด ๋ชจ๋ธ์ด ์ฌ๋ฐ๋ฅด๊ฒ ์๋ํ๋์ง, ๋ฐ์ดํฐ๊ฐ ํฌ๊ฒ ๋ณํ์ง ์์๋์ง ํ์ธ
- ๋ชจ๋ธ ์ ์ง๋ณด์
- ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ์์ง -> ๋ชจ๋ธ์ ์ ๋ฐ์ดํธํ๊ฑฐ๋ ์ฌํ์ต
Airflow
- ๋ฐ์ดํฐ ์์ง๋์ด๋ง ๋ถ์ผ์์ ๊ฐ์ฅ ๋๋ฆฌ ์ฌ์ฉ๋๋ ์ํฌํ๋ก์ฐ ๊ด๋ฆฌ ๋๊ตฌ
- Airbnb๊ฐ ๋ด๋ถ ๋ฐ์ดํฐ ์์ง๋์ด๋ง ์ํฌํ๋ก์ฐ๋ฅผ ์๋ํํ๊ธฐ ์ํด Python ๊ธฐ๋ฐ์ผ๋ก ๊ฐ๋ฐํ ์ํฌํ๋ก์ฐ ์ค์ผ์คํธ๋ ์ด์ ๋๊ตฌ
- Flask ๊ธฐ๋ฐ์ผ๋ก ์์ฑ๋ ํ์คํฌ๋ค(๋ฐ์ดํฐ์ ์์ฑ, ๋ชจ๋ธ ํ์ต ๋ฑ)์ ์ผ๋ จ์ ๊ทธ๋ํ๋ก ์ฐ๊ฒฐํ๊ณ ์ค์ผ์ค๋ง, ๋ชจ๋ํฐ๋ง ๋ฑ ํ์ดํ๋ผ์ธ ๊ด๋ฆฌ
- DAG(Directed Acyclic Graph) ๊ฐ๋ ์ ์ฌ์ฉํ์ฌ ETL ์์ ์ ์ค์ผ์ค๋ง, ๋ชจ๋ํฐ๋ง, ์คํ
Airflow์์์ ETL
1. ๋ฐ์ดํฐ ์์ง (Extract)
- ๋ค์ํ ์์ค ํ์ฉ: ์ฌ์ฉ์ ๋ก๊ทธ, ์์ฝ ์ ๋ณด, ๊ฒฐ์ ๋ฐ์ดํฐ, ๊ณ ๊ฐ ๋ฆฌ๋ทฐ ๋ฑ ์ฌ๋ฌ ๋ฐ์ดํฐ ์์ค๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ์ถ์ถ
- MySQL, PostgreSQL, Google BigQuery, AWS S3
2. ๋ฐ์ดํฐ ๋ณํ (Transform)
- ์ ์ ๋ฐ ๊ฐ๊ณต: ์์ ๋ฐ์ดํฐ์๋ ๋ ธ์ด์ฆ๋ ๊ฒฐ์ธก์น๊ฐ ์์ ์ ์์ผ๋ฏ๋ก, Pandas์ ๊ฐ์ ๋๊ตฌ๋ฅผ ํ์ฉํด ๋ฐ์ดํฐ๋ฅผ ์ ์ ํ๊ณ , ํฌ๋งท ๋ง์ถค
- ํผ์ฒ ์์ง๋์ด๋ง: ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ ํ์ต์ ์ ํฉํ๋๋ก ๋ฐ์ดํฐ๋ฅผ ๋ณํํ๊ณ , ์๋ก์ด ํ์ ๋ณ์๋ฅผ ์์ฑ
- ๋ณต์กํ ์ฒ๋ฆฌ: ๋ก๊ทธ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์ฌ์ฉ์ ํ๋ ํจํด์ ๋ถ์ํ๊ฑฐ๋, ์์ฝ ๋ฐ์ดํฐ๋ฅผ ํตํฉํ๋ ๋ฑ ๋ณต์กํ ๋ฐ์ดํฐ ๋ณํ ์์
3. ๋ฐ์ดํฐ ์ ์ฌ (Load)
- ๋ถ์์ฉ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค ๊ตฌ์ถ: ์ ์ ๋ ๋ฐ์ดํฐ๋ฅผ Snowflake, Redshift, BigQuery ๋ฑ ๋๊ท๋ชจ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค์ ๋ก๋
- ๋์๋ณด๋ ๋ฐ ๋ฆฌํฌํ ํ์ฉ: ์ ์ฌ๋ ๋ฐ์ดํฐ๋ฅผ Tableau, Superset ๋ฑ์ BI ๋๊ตฌ์์ ํ์ฉํ์ฌ, ์ค์๊ฐ ๋์๋ณด๋๋ ๋ฆฌํฌํธ๋ฅผ ๋ง๋ค์ด ๊ฒฝ์์ง๊ณผ ํ์๋ค์ด ์ธ์ฌ์ดํธ๋ฅผ ๋์ถํ ์ ์๋๋ก ์ง์
- ETL vs ELT
- ETL: ์ ํต์ ์ธ ๋ฐฉ์, ๋ฐ์ดํฐ๋ฅผ ์ถ์ถํ์ฌ ๋ณํํ ํ ์ ์ฌํ๋ ๋ฐฉ์
- ELT: ๋จผ์ ๋ฐ์ดํฐ๋ฅผ ์ถ์ถํ์ฌ ์ ์ฌํ ํ, ํ์์ ๋ฐ๋ผ ๋ณํํ๋ ํ๋์ ๋ฐฉ์
ETL ํ์ดํ๋ผ์ธ์์ ๋ฐ์ดํฐ๋ฅผ ์ ๋ฐ์ดํธํ๋ ์ ๊ทผ ๋ฐฉ์
- ๋ฐ์ดํฐ๊ฐ ์์ ๊ฒฝ์ฐ Full Refresh(ํต์งธ๋ก ์
๋ฐ์ดํธ)
- ํ ์ด๋ธ์ ์์ ํ ๋น์ฐ๊ณ (Truncate) ์๋ณธ ๋ฐ์ดํฐ๋ฅผ ๋ชจ๋ ์ ์ฌํ๋ ์์ (Task)์ DAG๋ก ๊ตฌ์ฑ
- ์์ด ๋ง์ ๊ฒฝ์ฐ Incremental update(๋ณ๊ฒฝ๋ ๊ฒ๋ง ์
๋ฐ์ดํธ)
- ๋ฐ์ดํฐ ์์ค์์ ๋ง์ง๋ง ์ ๋ฐ์ดํธ ์๊ฐ์ ๊ธฐ๋กํ๊ฑฐ๋, ๋ก๊ทธ ํ ์ด๋ธ ๋๋ CDC(Change Data Capture)๋ฅผ ํ์ฉํ์ฌ ๋ณ๊ฒฝ๋ ๋ฐ์ดํฐ๋ง ์ถ์ถ
- ์ด๋ฅผ ๊ธฐ๋ฐ์ผ๋ก, ๊ธฐ์กด ๋ฐ์ดํฐ์ ์ ๊ท ๋ฐ์ดํฐ๋ฅผ ์ถ๊ฐํ๊ฑฐ๋ ๋ณ๊ฒฝ๋ ๋ถ๋ถ๋ง ์ ๋ฐ์ดํธํ๋ ์์ (Task)์ DAG๋ก ๊ตฌ์ฑ
- Airflow ์์ฒด๋ ETL ์์ ์ ์ค์ผ์ค๋ง ๋ฐ ์ค์ผ์คํธ๋ ์ด์ ์ ๋ด๋น
- Full Refresh์ Incremental Update ์ ๋ต์ ์ฌ์ฉ์๊ฐ ์์ฑํ๋ ํ์คํฌ ๋ด์์ ๊ตฌํ
- Airflow DAG ๋ด์ ์๋ ๊ฐ ํ์คํฌ(์: PythonOperator, SQLExecuteQueryOperator ๋ฑ)๋ฅผ ์ฌ์ฉํด ์ด๋ค ๋ฐฉ์์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ ์ง ๊ฒฐ์ ํ๊ณ , ๊ทธ ์์ ์ ์คํํ๋๋ก ๊ตฌ์ฑ
๋ฉฑ๋ฑ์ฑ
- ํน์ ์ฐ์ฐ์ ์ฌ๋ฌ ๋ฒ ์ํํด๋ ๊ฒฐ๊ณผ๊ฐ ๋์ผํ๊ฒ ์ ์ง๋๋
- ๋ฉฑ๋ฑ์ฑ ๋ณด์ฅ: ํธ๋์ญ์ ์ด ์์
- ๋ฉฑ๋ฑ์ฑ์ด ๋ณด์ฅ๋์ด์ผ ํ ์คํฌ ์คํ๊ณผ ๊ด๊ณ์์ด ๋ฐ์ดํฐ๊ฐ ์ผ์ ํ๊ฒ ์ ์ง๋์ด์ผ ํจ
โ ๋ฉฑ๋ฑ์ฑ์ด ๋ณด์ฅ๋๋ ๋ฉ์๋
- GET: ๊ฐ์ ์์ฒญ์ ์ฌ๋ฌ ๋ฒ ๋ณด๋ด๋ ๋์ผํ ์๋ต์ด ๋ฐํ๋จ (์กฐํ ์ฐ์ฐ)
- PUT: ๋์ผํ ๋ฆฌ์์ค์ ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ๋ฒ ์ ๋ฐ์ดํธํด๋ ๊ฒฐ๊ณผ๊ฐ ๋ณํ์ง ์์
- DELETE: ๊ฐ์ ๋ฆฌ์์ค๋ฅผ ์ฌ๋ฌ ๋ฒ ์ญ์ ์์ฒญํด๋ ์ต์ข ๊ฒฐ๊ณผ๋ "์ญ์ ๋จ" ์ํ๋ก ์ ์ง
โ ๋ฉฑ๋ฑ์ฑ์ด ๋ณด์ฅ๋์ง ์๋ ๋ฉ์๋
- POST: ๊ฐ์ ์์ฒญ์ ์ฌ๋ฌ ๋ฒ ๋ณด๋ด๋ฉด ์๋ก์ด ๋ฆฌ์์ค๊ฐ ์ฌ๋ฌ ๊ฐ ์์ฑ๋ ์ ์์
- PATCH: ์ผ๋ถ ํ๋๋ง ๋ณ๊ฒฝํ๋ฏ๋ก, ์ฐ์ ์์ฒญ ์ ๊ฒฐ๊ณผ๊ฐ ๋ค๋ฅผ ๊ฐ๋ฅ์ฑ์ด ์์
๋ฐฐ์น ํ๋ก์ธ์ค์ ์ค์๊ฐ ํ๋ก์ธ์ค ์ฐจ์ด
๋ฐฐ์น ํ๋ก์ธ์ค

- ํน์ง:์ ํด์ง ์ฃผ๊ธฐ(์: ํ๋ฃจ, ๋งค์๊ฐ, ๋งค๋ถ) ๋๋ ์กฐ๊ฑด์ ๋ฐ๋ผ ๋ฐ์ดํฐ๋ฅผ ํ ๋ฒ์ ์ฒ๋ฆฌ
- ์์: ๋ฆฌ๋ ์ค์ Crontab, ์คํฌ๋ฆฝํธ๋ฅผ ์ฃผ๊ธฐ์ ์ผ๋ก ์คํํ๋ ๋ฐฉ์
- ๋ฌธ์ ์
- ์ฌ์คํ/์ฌ์๋ ๊ธฐ๋ฅ ๋ถ์ฌ: ์ค๋ฅ ๋ฐ์ ์ ์๋์ผ๋ก ์ฌ์คํํ๊ฑฐ๋ ์ฌ์๋๋ฅผ ์ํํ์ง ์๋๋ค.
- ์๋ ๋ฐ ๋ชจ๋ํฐ๋ง ๋ถ์ฌ: ์์ ์คํจ ์ ๋ณ๋์ ์๋์ด๋ ๋ก๊ทธ ํ์ธ ๊ธฐ๋ฅ์ด ์์ด์ ๋ฌธ์ ๊ฐ ๋ฐ์ํด๋ ์ธ์งํ๊ธฐ ์ด๋ ต๋ค.
- ์ค์ ๋ฐ ๋ณ์ ๊ด๋ฆฌ์ ์ด๋ ค์: ๋ณต์กํ ํ๋ผ๋ฏธํฐ๋ ์ค์ ๊ฐ์ ์ธ๋ถ์์ ๋ถ๋ฆฌํ์ฌ ๊ด๋ฆฌํ๊ธฐ ์ด๋ ต๋ค.
์ค์๊ฐ ํ๋ก์ธ์ค
- ํน์ง: ๋ฐ์ดํฐ๊ฐ ๋ฐ์ํ๋ ์ฆ์ ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ผ๋ก, ์ง์ฐ ์์ด ์ค์๊ฐ์ ๊ฐ๊น์ด ๊ฒฐ๊ณผ๋ฅผ ์ ๊ณต
- ์์: ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ, ์ค์๊ฐ ์ด๋ฒคํธ ์ฒ๋ฆฌ
- ๋ฌธ์ ์ : ์ค์๊ฐ ์ฒ๋ฆฌ๋ ๊ณ ๋์ ์ธํ๋ผ์ ๋ณต์กํ ์ฒ๋ฆฌ ๋ก์ง์ด ํ์ํ๋ฉฐ, ์ค๋ฅ ๋ฐ์ ์ ๋น ๋ฅด๊ฒ ๋ณต๊ตฌ ํ์
Airflow ์ ์ฅ์
- ์๋ ์ฌ์๋ ๋ฐ ์คํจ ์ฒ๋ฆฌ: ์์ ์ด ์คํจํ๋ฉด ์๋์ผ๋ก ์ฌ์๋ํ๊ณ , ์คํจ๊ฐ ์ง์๋ ๊ฒฝ์ฐ ์๋
- ์คํ ๋ก๊ทธ ๋ฐ ๋ชจ๋ํฐ๋ง: ๋ชจ๋ ์์ ์ ๋ก๊ทธ๋ฅผ ์ค์์์ ๊ด๋ฆฌํ๊ณ , ์น UI๋ฅผ ํตํด ๋ชจ๋ํฐ๋ง
- ๋์ ์คํ ์์ปค ๊ด๋ฆฌ: ์์ ๊ฐ์ ์์กด์ฑ์ ์ ์ํ๊ณ , ์์ปค ์๋ ๋ฆฌ์์ค ํ ๋น์ ์กฐ์ ํ์ฌ ํจ์จ์ ์ผ๋ก ๋์ ์คํ ๊ด๋ฆฌ
- ์ค์ ๋ฐ ๋ณ์ ๊ด๋ฆฌ: ์ธ๋ถ ๋ณ์์ ์ค์ ํ์ผ์ ๋ถ๋ฆฌํ์ฌ ๊ด๋ฆฌํ ์ ์์ด, ํ๊ฒฝ๋ณ ์ค์ ๋ณ๊ฒฝ ์ฉ์ด
XCom
- Airflow์ Task ๊ฐ ๋ฐ์ดํฐ ๊ณต์ ๋ฉ์ปค๋์ฆ
- ์ฆ, ํ๋์ Task์์ ์คํ๋ ๊ฒฐ๊ณผ๋ฅผ ๋ค๋ฅธ Task์์ ๋ฐ์์ ์ฌ์ฉํ ์ ์๋๋ก ํด์ฃผ๋ ๊ธฐ๋ฅ
์ฌ์ฉ ์ด์
DAG ๋ด์์ ์ฌ๋ฌ Task๊ฐ ์คํ๋ ๋
- Task A๊ฐ ์์ฑํ ๋ฐ์ดํฐ๋ฅผ B์์ ์ฌ์ฉํด์ผ ํ๋ ๊ฒฝ์ฐ
- Task A๊ฐ ์คํ๋ ํ, ๊ทธ ๊ฒฐ๊ณผ์ ๋ฐ๋ผ B์ ๋์์ ๊ฒฐ์ ํด์ผ ํ๋ ๊ฒฝ์ฐ
- Task๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ ๋ฌํ๊ธฐ ์ํด ์ฌ์ฉ
๊ธฐ๋ฅ
- ๊ฐ ์ ์ฅ (push)
- Task๊ฐ ์คํ๋ ํ, return ๊ฐ์ ์๋์ผ๋ก XCom์ ์ ์ฅ (implicit push)
- ti.xcom_push(key, value)๋ฅผ ์ฌ์ฉํด ์๋์ผ๋ก ๊ฐ ์ ์ฅ (explicit push)
- ๊ฐ ์กฐํ (pull)
- ti.xcom_pull(task_ids='task_A')๋ก ํน์ Task์ ์ถ๋ ฅ์ ๊ฐ์ ธ์ฌ ์ ์์
์ฌ์ฉํ ์ํฉ
- ๋ฐ์ดํฐ ๋ค์ด๋ก๋ & ์ฒ๋ฆฌ: Task A์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๊ณ , Task B์์ ์ด๋ฅผ ๋ณํ
- API ํธ์ถ & ์๋ต ์ฒ๋ฆฌ: Task A์์ API ํธ์ถ ํ ์๋ต์ ์ ์ฅํ๊ณ , Task B์์ ์๋ต์ ๋ถ์
- ๋ชจ๋ธ ํ์ต & ๊ฒฐ๊ณผ ์ ์ฅ: Task A์์ ๋ชจ๋ธ์ ํ์ตํ๊ณ , Task B์์ ๊ฒฐ๊ณผ๋ฅผ ํ๊ฐ
dag์ task ๊ฐ๋จํ ์์
DAG 1: ๋ฐ์ดํฐ ์์ง
โโโ Task 1: API์์ ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ
โโโ Task 2: ๋ฐ์ดํฐ ์ ๋ฆฌ
โโโ Task 3: ๋ฐ์ดํฐ ์ ์ฅ
DAG 2: ๋ชจ๋ธ ํ์ต ๋ฐ ๋ฐฐํฌ
โโโ Task 1: ๋ฐ์ดํฐ ๋ถ๋ฌ์ค๊ธฐ
โโโ Task 2: ๋ชจ๋ธ ํ์ต
โโโ Task 3: ๋ชจ๋ธ ๋ฐฐํฌ
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def task_1():
print("Task 1 ์คํ")
def task_2():
print("Task 2 ์คํ")
with DAG(
"example_dag",
start_date=datetime(2025, 3, 19),
schedule_interval="@daily",
catchup=False
) as dag:
t1 = PythonOperator(
task_id="task_1",
python_callable=task_1
)
t2 = PythonOperator(
task_id="task_2",
python_callable=task_2
)
t1 >> t2 # Task ์คํ ์์ ์ง์ (t1 ์คํ ํ t2 ์คํ)
์ค์ต
from datetime import datetime, timedelta
import os
import requests
from dotenv import load_dotenv
from airflow import DAG
from airflow.operators.python import PythonOperator
# .env ํ์ผ์์ OPENWEATHER_API_KEY๋ฅผ ๊ฐ์ ธ์ด
load_dotenv()
API_KEY = os.getenv("OPENWEATHER_API_KEY")
# ๋์ ๋ฐ API ํธ์ถ URL
CITY = "Seoul"
URL = f"https://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"
def get_weather():
"""OpenWeather API๋ฅผ ํธ์ถํ์ฌ ์์ธ์ ๋ ์จ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ด"""
response = requests.get(URL)
if response.status_code == 200:
data = response.json()
temp = data["main"]["temp"]
weather = data["weather"][0]["description"]
humidity = data["main"]["humidity"]
return f"์์ธ์ ํ์ฌ ๋ ์จ: {weather}, ์จ๋: {temp}ยฐC, ์ต๋: {humidity}%"
else:
return "๋ ์จ ์ ๋ณด๋ฅผ ๊ฐ์ ธ์ค๋ ๋ฐ ์คํจํ์ต๋๋ค."
def append_weather_to_file():
"""data ํด๋์ weather.txt ํ์ผ์ ์คํ ์๊ฐ๊ณผ ํจ๊ป ๋ ์จ ์ ๋ณด๋ฅผ ์ถ๊ฐํจ"""
weather_info = get_weather()
now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"{now_str} (UTC) -> {weather_info}\n"
# data ํด๋๊ฐ ์์ผ๋ฉด ์์ฑ
data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)
# weather.txt ํ์ผ์ append ๋ชจ๋๋ก ๋ ์จ ์ ๋ณด ๊ธฐ๋ก
file_path = os.path.join(data_dir, "weather.txt")
with open(file_path, "a", encoding="utf-8") as file:
file.write(log_entry)
print(f"{now_str}์ ๋ ์จ ์ ๋ณด๋ฅผ weather.txt์ ์ถ๊ฐํ์ต๋๋ค.")
# ๊ธฐ๋ณธ DAG ์ธ์ ์ค์
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG ์ ์: 1๋ถ๋ง๋ค ์คํ
dag = DAG(
"weather_to_file_dag",
default_args=default_args,
description="DAG to append weather info to a single file every minute",
schedule_interval=timedelta(minutes=1),
catchup=False,
tags=['weather', 'save'],
)
# PythonOperator: append_weather_to_file ํจ์๋ฅผ ์คํํ์ฌ weather.txt ํ์ผ์ ๋ฐ์ดํฐ๋ฅผ ์ถ๊ฐ
append_weather_task = PythonOperator(
task_id="append_weather_task",
python_callable=append_weather_to_file,
dag=dag,
)
append_weather_task
์คํ ๊ฒฐ๊ณผ
2025-03-18 03:01:18 (UTC) ->
AAPL ์ต์ ๊ฐ๊ฒฉ: 213.9225 USD (์๊ฐ: 2025-03-17 19:59:00)
GOOGL ์ต์ ๊ฐ๊ฒฉ: 164.4200 USD (์๊ฐ: 2025-03-17 19:59:00)
AMZN ์ต์ ๊ฐ๊ฒฉ: 196.0600 USD (์๊ฐ: 2025-03-17 19:59:00)
from datetime import datetime, timedelta
import os
import requests
from dotenv import load_dotenv
from airflow import DAG
from airflow.operators.python import PythonOperator
# .env ํ์ผ์์ OPENWEATHER_API_KEY๋ฅผ ๊ฐ์ ธ์ด
load_dotenv()
API_KEY = os.getenv("OPENWEATHER_API_KEY")
# ๋์ ๋ฐ API ํธ์ถ URL
CITY = "Seoul"
URL = f"https://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"
def get_weather():
"""OpenWeather API๋ฅผ ํธ์ถํ์ฌ ์์ธ์ ๋ ์จ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ด"""
response = requests.get(URL)
if response.status_code == 200:
data = response.json()
temp = data["main"]["temp"]
weather = data["weather"][0]["description"]
humidity = data["main"]["humidity"]
return f"์์ธ์ ํ์ฌ ๋ ์จ: {weather}, ์จ๋: {temp}ยฐC, ์ต๋: {humidity}%"
else:
return "๋ ์จ ์ ๋ณด๋ฅผ ๊ฐ์ ธ์ค๋ ๋ฐ ์คํจํ์ต๋๋ค."
def append_weather_to_file():
"""data ํด๋์ weather.txt ํ์ผ์ ์คํ ์๊ฐ๊ณผ ํจ๊ป ๋ ์จ ์ ๋ณด๋ฅผ ์ถ๊ฐํจ"""
weather_info = get_weather()
now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"{now_str} (UTC) -> {weather_info}\n"
# data ํด๋๊ฐ ์์ผ๋ฉด ์์ฑ
data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)
# weather.txt ํ์ผ์ append ๋ชจ๋๋ก ๋ ์จ ์ ๋ณด ๊ธฐ๋ก
file_path = os.path.join(data_dir, "weather.txt")
with open(file_path, "a", encoding="utf-8") as file:
file.write(log_entry)
print(f"{now_str}์ ๋ ์จ ์ ๋ณด๋ฅผ weather.txt์ ์ถ๊ฐํ์ต๋๋ค.")
# ๊ธฐ๋ณธ DAG ์ธ์ ์ค์
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG ์ ์: 1๋ถ๋ง๋ค ์คํ
dag = DAG(
"weather_to_file_dag",
default_args=default_args,
description="DAG to append weather info to a single file every minute",
schedule_interval=timedelta(minutes=1),
catchup=False,
tags=['weather', 'save'],
)
# PythonOperator: append_weather_to_file ํจ์๋ฅผ ์คํํ์ฌ weather.txt ํ์ผ์ ๋ฐ์ดํฐ๋ฅผ ์ถ๊ฐ
append_weather_task = PythonOperator(
task_id="append_weather_task",
python_callable=append_weather_to_file,
dag=dag,
)
append_weather_task
์คํ ๊ฒฐ๊ณผ
2025-03-18 03:12:01 (UTC) -> ์์ธ์ ํ์ฌ ๋ ์จ: light snow, ์จ๋: 1.76ยฐC, ์ต๋: 87%
2025-03-18 03:13:01 (UTC) -> ์์ธ์ ํ์ฌ ๋ ์จ: light snow, ์จ๋: 1.43ยฐC, ์ต๋: 86%
2025-03-18 03:30:01 (UTC) -> ์์ธ์ ํ์ฌ ๋ ์จ: broken clouds, ์จ๋: 2.76ยฐC, ์ต๋: 81%
2025-03-18 03:41:01 (UTC) -> ์์ธ์ ํ์ฌ ๋ ์จ: mist, ์จ๋: 2.76ยฐC, ์ต๋: 81%
'๐ฆญ AI&Big Data > Big Data' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Elastic Search (0) | 2025.02.15 |
---|---|
Pandas ์ ๋ฆฌ (0) | 2025.01.10 |
NumPy ์ ๋ฆฌ (0) | 2025.01.10 |
๋น ๋ฐ์ดํฐ ์ปค๋ฆฌ์ด ๊ฐ์ด๋๋ถ(2) (0) | 2024.12.24 |
๋น ๋ฐ์ดํฐ ์ปค๋ฆฌ์ด ๊ฐ์ด๋๋ถ(1) (2) | 2024.12.24 |
MLOps
- Machine Learning Operations
- ๋จธ์ ๋ฌ๋(ML) ๋ชจ๋ธ์ ๋ฐฐํฌ, ๋ชจ๋ํฐ๋ง, ์ ์ง ๊ด๋ฆฌํ๋ ์ผ๋ จ์ ๊ณผ์ ์ ์๋ฏธ
- ๋ชฉํ: ML ๋ชจ๋ธ์ ๊ฐ๋ฐ ์๋ช ์ฃผ๊ธฐ๋ฅผ ์๋ํํ๊ณ ์ต์ ํํ๋ ๊ฒ์ด ๋ชฉํ
- ํ์์ฑ
- ์ํฌํ๋ก์ฐ ์๋ํ: ๋ชจ๋ธ ๊ฐ๋ฐ, ํ์ต, ๋ฐฐํฌ, ๋ชจ๋ํฐ๋ง ๋ฑ ์ ์ฒด ํ์ดํ๋ผ์ธ ์๋ํ
- ์์ ์ฑ: ๋ชจ๋ธ์ ํ๋ก๋์ ํ๊ฒฝ์ ์์ ์ ์ผ๋ก ๋ฐฐํฌํ๊ณ , ์ง์์ ์ธ ๋ชจ๋ํฐ๋ง์ ํตํด ์ด์ ์ํฉ ๊ฐ์ง ๋ฐ ๋์
- ํ์ฅ์ฑ: ๋ฐ์ดํฐ์ ํธ๋ํฝ์ ๋ง๊ฒ ์ธํ๋ผ ํ์ฅ
- ๋ชจ๋ํฐ๋ง: ์ค์๊ฐ ์ฑ๋ฅ ์งํ, ๋ก๊ทธ, ์๋ฆผ ํตํด ์ฑ๋ฅ ์ ํ, ์ค๋ฅ ๋ชจ๋ํฐ๋ง
- ์ฅ์
- ๋น ๋ฅธ ๋ฐฐํฌ: ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ์ ๋ฐฐํฌ ๊ณผ์ ์ ์๋ํ
- ํ์ ํจ์จ์ฑ ํฅ์: ๋ฐ์ดํฐ ๊ณผํ์, ๊ฐ๋ฐ์, ์ด์ํ์ ํ์
- ๋ชจ๋ธ ์ฑ๋ฅ ํฅ์: ๋ชจ๋ํฐ๋ง&ํ์ ์ ์ ๋ฐ์ดํธ
- vs DevOps
- ๊ณตํต์ : ๋ ๋ค ์ํํธ์จ์ด ๊ฐ๋ฐ๊ณผ ์ด์์ ์๋ํํ๊ณ ํจ์จ์ ์ผ๋ก ๊ด๋ฆฌ
- ์ฐจ์ด์
- DevOps: ์ผ๋ฐ์ ์ธ ์ํํธ์จ์ด ๊ฐ๋ฐ ๋ฐ ์ด์์ ์๋ํํ์ฌ CI/CD ํ์ดํ๋ผ์ธ์ ๊ตฌ์ถ
- MLOps: ML ๋ชจ๋ธ์ ์ง์์ ์ธ ํ์ต, ๋ฐฐํฌ ์๋ํ, ๋ฐ์ดํฐ ๋ฐ ๋ชจ๋ธ ๊ด๋ฆฌ
- MLOps ๋๊ตฌ
- ์คํ ๊ด๋ฆฌ: MLflow, Weights & Biases
- ๋ชจ๋ธ ๋ฐฐํฌ: Docker, Kubernetes, FastAPI, TensorFlow Serving
- ๋ชจ๋ธ ๋ชจ๋ํฐ๋ง: Prometheus, Grafana
- CI/CD ํ์ดํ๋ผ์ธ: Jenkins, GitHub Actions, GitLab CI/CD
- ๋ฐ์ดํฐ ๋ฐ ๋ชจ๋ธ ๋ฒ์ ๊ด๋ฆฌ: DVC, MLflow, Pachyderm
MLOps์ ์๋ช ์ฃผ๊ธฐ
- ๋ชจ๋ธ ๊ฐ๋ฐ
- ์ฒซ ๋ฒ์งธ ๋จ๊ณ๋ ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ์ ๊ฐ๋ฐ
- ๋ฐ์ดํฐ ์์ง, ๋ฐ์ดํฐ ์ ์ฒ๋ฆฌ, ๋ชจ๋ธ ํ์ต
- ๋ชจ๋ธ ๋ฐฐํฌ
- ๋ชจ๋ธ ์์ฑ ํ, ์๋ก์ด ๋ฐ์ดํฐ์ ๋ํด ์์ธก์ ์ํํ ์ ์๋๋ก ๋ฐฐํฌ
- ์๋ํ ์ฌ์ฉ
- ๋ชจ๋ํฐ๋ง
- ์ง์์ ์ผ๋ก ๋ชจ๋ํฐ๋ง์ ํตํด ๋ชจ๋ธ์ด ์ฌ๋ฐ๋ฅด๊ฒ ์๋ํ๋์ง, ๋ฐ์ดํฐ๊ฐ ํฌ๊ฒ ๋ณํ์ง ์์๋์ง ํ์ธ
- ๋ชจ๋ธ ์ ์ง๋ณด์
- ์๋ก์ด ๋ฐ์ดํฐ๊ฐ ์์ง -> ๋ชจ๋ธ์ ์ ๋ฐ์ดํธํ๊ฑฐ๋ ์ฌํ์ต
Airflow
- ๋ฐ์ดํฐ ์์ง๋์ด๋ง ๋ถ์ผ์์ ๊ฐ์ฅ ๋๋ฆฌ ์ฌ์ฉ๋๋ ์ํฌํ๋ก์ฐ ๊ด๋ฆฌ ๋๊ตฌ
- Airbnb๊ฐ ๋ด๋ถ ๋ฐ์ดํฐ ์์ง๋์ด๋ง ์ํฌํ๋ก์ฐ๋ฅผ ์๋ํํ๊ธฐ ์ํด Python ๊ธฐ๋ฐ์ผ๋ก ๊ฐ๋ฐํ ์ํฌํ๋ก์ฐ ์ค์ผ์คํธ๋ ์ด์ ๋๊ตฌ
- Flask ๊ธฐ๋ฐ์ผ๋ก ์์ฑ๋ ํ์คํฌ๋ค(๋ฐ์ดํฐ์ ์์ฑ, ๋ชจ๋ธ ํ์ต ๋ฑ)์ ์ผ๋ จ์ ๊ทธ๋ํ๋ก ์ฐ๊ฒฐํ๊ณ ์ค์ผ์ค๋ง, ๋ชจ๋ํฐ๋ง ๋ฑ ํ์ดํ๋ผ์ธ ๊ด๋ฆฌ
- DAG(Directed Acyclic Graph) ๊ฐ๋ ์ ์ฌ์ฉํ์ฌ ETL ์์ ์ ์ค์ผ์ค๋ง, ๋ชจ๋ํฐ๋ง, ์คํ
Airflow์์์ ETL
1. ๋ฐ์ดํฐ ์์ง (Extract)
- ๋ค์ํ ์์ค ํ์ฉ: ์ฌ์ฉ์ ๋ก๊ทธ, ์์ฝ ์ ๋ณด, ๊ฒฐ์ ๋ฐ์ดํฐ, ๊ณ ๊ฐ ๋ฆฌ๋ทฐ ๋ฑ ์ฌ๋ฌ ๋ฐ์ดํฐ ์์ค๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ์ถ์ถ
- MySQL, PostgreSQL, Google BigQuery, AWS S3
2. ๋ฐ์ดํฐ ๋ณํ (Transform)
- ์ ์ ๋ฐ ๊ฐ๊ณต: ์์ ๋ฐ์ดํฐ์๋ ๋ ธ์ด์ฆ๋ ๊ฒฐ์ธก์น๊ฐ ์์ ์ ์์ผ๋ฏ๋ก, Pandas์ ๊ฐ์ ๋๊ตฌ๋ฅผ ํ์ฉํด ๋ฐ์ดํฐ๋ฅผ ์ ์ ํ๊ณ , ํฌ๋งท ๋ง์ถค
- ํผ์ฒ ์์ง๋์ด๋ง: ๋จธ์ ๋ฌ๋ ๋ชจ๋ธ ํ์ต์ ์ ํฉํ๋๋ก ๋ฐ์ดํฐ๋ฅผ ๋ณํํ๊ณ , ์๋ก์ด ํ์ ๋ณ์๋ฅผ ์์ฑ
- ๋ณต์กํ ์ฒ๋ฆฌ: ๋ก๊ทธ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ์ฌ์ฉ์ ํ๋ ํจํด์ ๋ถ์ํ๊ฑฐ๋, ์์ฝ ๋ฐ์ดํฐ๋ฅผ ํตํฉํ๋ ๋ฑ ๋ณต์กํ ๋ฐ์ดํฐ ๋ณํ ์์
3. ๋ฐ์ดํฐ ์ ์ฌ (Load)
- ๋ถ์์ฉ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค ๊ตฌ์ถ: ์ ์ ๋ ๋ฐ์ดํฐ๋ฅผ Snowflake, Redshift, BigQuery ๋ฑ ๋๊ท๋ชจ ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค์ ๋ก๋
- ๋์๋ณด๋ ๋ฐ ๋ฆฌํฌํ ํ์ฉ: ์ ์ฌ๋ ๋ฐ์ดํฐ๋ฅผ Tableau, Superset ๋ฑ์ BI ๋๊ตฌ์์ ํ์ฉํ์ฌ, ์ค์๊ฐ ๋์๋ณด๋๋ ๋ฆฌํฌํธ๋ฅผ ๋ง๋ค์ด ๊ฒฝ์์ง๊ณผ ํ์๋ค์ด ์ธ์ฌ์ดํธ๋ฅผ ๋์ถํ ์ ์๋๋ก ์ง์
- ETL vs ELT
- ETL: ์ ํต์ ์ธ ๋ฐฉ์, ๋ฐ์ดํฐ๋ฅผ ์ถ์ถํ์ฌ ๋ณํํ ํ ์ ์ฌํ๋ ๋ฐฉ์
- ELT: ๋จผ์ ๋ฐ์ดํฐ๋ฅผ ์ถ์ถํ์ฌ ์ ์ฌํ ํ, ํ์์ ๋ฐ๋ผ ๋ณํํ๋ ํ๋์ ๋ฐฉ์
ETL ํ์ดํ๋ผ์ธ์์ ๋ฐ์ดํฐ๋ฅผ ์ ๋ฐ์ดํธํ๋ ์ ๊ทผ ๋ฐฉ์
- ๋ฐ์ดํฐ๊ฐ ์์ ๊ฒฝ์ฐ Full Refresh(ํต์งธ๋ก ์
๋ฐ์ดํธ)
- ํ ์ด๋ธ์ ์์ ํ ๋น์ฐ๊ณ (Truncate) ์๋ณธ ๋ฐ์ดํฐ๋ฅผ ๋ชจ๋ ์ ์ฌํ๋ ์์ (Task)์ DAG๋ก ๊ตฌ์ฑ
- ์์ด ๋ง์ ๊ฒฝ์ฐ Incremental update(๋ณ๊ฒฝ๋ ๊ฒ๋ง ์
๋ฐ์ดํธ)
- ๋ฐ์ดํฐ ์์ค์์ ๋ง์ง๋ง ์ ๋ฐ์ดํธ ์๊ฐ์ ๊ธฐ๋กํ๊ฑฐ๋, ๋ก๊ทธ ํ ์ด๋ธ ๋๋ CDC(Change Data Capture)๋ฅผ ํ์ฉํ์ฌ ๋ณ๊ฒฝ๋ ๋ฐ์ดํฐ๋ง ์ถ์ถ
- ์ด๋ฅผ ๊ธฐ๋ฐ์ผ๋ก, ๊ธฐ์กด ๋ฐ์ดํฐ์ ์ ๊ท ๋ฐ์ดํฐ๋ฅผ ์ถ๊ฐํ๊ฑฐ๋ ๋ณ๊ฒฝ๋ ๋ถ๋ถ๋ง ์ ๋ฐ์ดํธํ๋ ์์ (Task)์ DAG๋ก ๊ตฌ์ฑ
- Airflow ์์ฒด๋ ETL ์์ ์ ์ค์ผ์ค๋ง ๋ฐ ์ค์ผ์คํธ๋ ์ด์ ์ ๋ด๋น
- Full Refresh์ Incremental Update ์ ๋ต์ ์ฌ์ฉ์๊ฐ ์์ฑํ๋ ํ์คํฌ ๋ด์์ ๊ตฌํ
- Airflow DAG ๋ด์ ์๋ ๊ฐ ํ์คํฌ(์: PythonOperator, SQLExecuteQueryOperator ๋ฑ)๋ฅผ ์ฌ์ฉํด ์ด๋ค ๋ฐฉ์์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ ์ง ๊ฒฐ์ ํ๊ณ , ๊ทธ ์์ ์ ์คํํ๋๋ก ๊ตฌ์ฑ
๋ฉฑ๋ฑ์ฑ
- ํน์ ์ฐ์ฐ์ ์ฌ๋ฌ ๋ฒ ์ํํด๋ ๊ฒฐ๊ณผ๊ฐ ๋์ผํ๊ฒ ์ ์ง๋๋
- ๋ฉฑ๋ฑ์ฑ ๋ณด์ฅ: ํธ๋์ญ์ ์ด ์์
- ๋ฉฑ๋ฑ์ฑ์ด ๋ณด์ฅ๋์ด์ผ ํ ์คํฌ ์คํ๊ณผ ๊ด๊ณ์์ด ๋ฐ์ดํฐ๊ฐ ์ผ์ ํ๊ฒ ์ ์ง๋์ด์ผ ํจ
โ ๋ฉฑ๋ฑ์ฑ์ด ๋ณด์ฅ๋๋ ๋ฉ์๋
- GET: ๊ฐ์ ์์ฒญ์ ์ฌ๋ฌ ๋ฒ ๋ณด๋ด๋ ๋์ผํ ์๋ต์ด ๋ฐํ๋จ (์กฐํ ์ฐ์ฐ)
- PUT: ๋์ผํ ๋ฆฌ์์ค์ ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ๋ฒ ์ ๋ฐ์ดํธํด๋ ๊ฒฐ๊ณผ๊ฐ ๋ณํ์ง ์์
- DELETE: ๊ฐ์ ๋ฆฌ์์ค๋ฅผ ์ฌ๋ฌ ๋ฒ ์ญ์ ์์ฒญํด๋ ์ต์ข ๊ฒฐ๊ณผ๋ "์ญ์ ๋จ" ์ํ๋ก ์ ์ง
โ ๋ฉฑ๋ฑ์ฑ์ด ๋ณด์ฅ๋์ง ์๋ ๋ฉ์๋
- POST: ๊ฐ์ ์์ฒญ์ ์ฌ๋ฌ ๋ฒ ๋ณด๋ด๋ฉด ์๋ก์ด ๋ฆฌ์์ค๊ฐ ์ฌ๋ฌ ๊ฐ ์์ฑ๋ ์ ์์
- PATCH: ์ผ๋ถ ํ๋๋ง ๋ณ๊ฒฝํ๋ฏ๋ก, ์ฐ์ ์์ฒญ ์ ๊ฒฐ๊ณผ๊ฐ ๋ค๋ฅผ ๊ฐ๋ฅ์ฑ์ด ์์
๋ฐฐ์น ํ๋ก์ธ์ค์ ์ค์๊ฐ ํ๋ก์ธ์ค ์ฐจ์ด
๋ฐฐ์น ํ๋ก์ธ์ค

- ํน์ง:์ ํด์ง ์ฃผ๊ธฐ(์: ํ๋ฃจ, ๋งค์๊ฐ, ๋งค๋ถ) ๋๋ ์กฐ๊ฑด์ ๋ฐ๋ผ ๋ฐ์ดํฐ๋ฅผ ํ ๋ฒ์ ์ฒ๋ฆฌ
- ์์: ๋ฆฌ๋ ์ค์ Crontab, ์คํฌ๋ฆฝํธ๋ฅผ ์ฃผ๊ธฐ์ ์ผ๋ก ์คํํ๋ ๋ฐฉ์
- ๋ฌธ์ ์
- ์ฌ์คํ/์ฌ์๋ ๊ธฐ๋ฅ ๋ถ์ฌ: ์ค๋ฅ ๋ฐ์ ์ ์๋์ผ๋ก ์ฌ์คํํ๊ฑฐ๋ ์ฌ์๋๋ฅผ ์ํํ์ง ์๋๋ค.
- ์๋ ๋ฐ ๋ชจ๋ํฐ๋ง ๋ถ์ฌ: ์์ ์คํจ ์ ๋ณ๋์ ์๋์ด๋ ๋ก๊ทธ ํ์ธ ๊ธฐ๋ฅ์ด ์์ด์ ๋ฌธ์ ๊ฐ ๋ฐ์ํด๋ ์ธ์งํ๊ธฐ ์ด๋ ต๋ค.
- ์ค์ ๋ฐ ๋ณ์ ๊ด๋ฆฌ์ ์ด๋ ค์: ๋ณต์กํ ํ๋ผ๋ฏธํฐ๋ ์ค์ ๊ฐ์ ์ธ๋ถ์์ ๋ถ๋ฆฌํ์ฌ ๊ด๋ฆฌํ๊ธฐ ์ด๋ ต๋ค.
์ค์๊ฐ ํ๋ก์ธ์ค
- ํน์ง: ๋ฐ์ดํฐ๊ฐ ๋ฐ์ํ๋ ์ฆ์ ์ฒ๋ฆฌํ๋ ๋ฐฉ์์ผ๋ก, ์ง์ฐ ์์ด ์ค์๊ฐ์ ๊ฐ๊น์ด ๊ฒฐ๊ณผ๋ฅผ ์ ๊ณต
- ์์: ์คํธ๋ฆฌ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ, ์ค์๊ฐ ์ด๋ฒคํธ ์ฒ๋ฆฌ
- ๋ฌธ์ ์ : ์ค์๊ฐ ์ฒ๋ฆฌ๋ ๊ณ ๋์ ์ธํ๋ผ์ ๋ณต์กํ ์ฒ๋ฆฌ ๋ก์ง์ด ํ์ํ๋ฉฐ, ์ค๋ฅ ๋ฐ์ ์ ๋น ๋ฅด๊ฒ ๋ณต๊ตฌ ํ์
Airflow ์ ์ฅ์
- ์๋ ์ฌ์๋ ๋ฐ ์คํจ ์ฒ๋ฆฌ: ์์ ์ด ์คํจํ๋ฉด ์๋์ผ๋ก ์ฌ์๋ํ๊ณ , ์คํจ๊ฐ ์ง์๋ ๊ฒฝ์ฐ ์๋
- ์คํ ๋ก๊ทธ ๋ฐ ๋ชจ๋ํฐ๋ง: ๋ชจ๋ ์์ ์ ๋ก๊ทธ๋ฅผ ์ค์์์ ๊ด๋ฆฌํ๊ณ , ์น UI๋ฅผ ํตํด ๋ชจ๋ํฐ๋ง
- ๋์ ์คํ ์์ปค ๊ด๋ฆฌ: ์์ ๊ฐ์ ์์กด์ฑ์ ์ ์ํ๊ณ , ์์ปค ์๋ ๋ฆฌ์์ค ํ ๋น์ ์กฐ์ ํ์ฌ ํจ์จ์ ์ผ๋ก ๋์ ์คํ ๊ด๋ฆฌ
- ์ค์ ๋ฐ ๋ณ์ ๊ด๋ฆฌ: ์ธ๋ถ ๋ณ์์ ์ค์ ํ์ผ์ ๋ถ๋ฆฌํ์ฌ ๊ด๋ฆฌํ ์ ์์ด, ํ๊ฒฝ๋ณ ์ค์ ๋ณ๊ฒฝ ์ฉ์ด
XCom
- Airflow์ Task ๊ฐ ๋ฐ์ดํฐ ๊ณต์ ๋ฉ์ปค๋์ฆ
- ์ฆ, ํ๋์ Task์์ ์คํ๋ ๊ฒฐ๊ณผ๋ฅผ ๋ค๋ฅธ Task์์ ๋ฐ์์ ์ฌ์ฉํ ์ ์๋๋ก ํด์ฃผ๋ ๊ธฐ๋ฅ
์ฌ์ฉ ์ด์
DAG ๋ด์์ ์ฌ๋ฌ Task๊ฐ ์คํ๋ ๋
- Task A๊ฐ ์์ฑํ ๋ฐ์ดํฐ๋ฅผ B์์ ์ฌ์ฉํด์ผ ํ๋ ๊ฒฝ์ฐ
- Task A๊ฐ ์คํ๋ ํ, ๊ทธ ๊ฒฐ๊ณผ์ ๋ฐ๋ผ B์ ๋์์ ๊ฒฐ์ ํด์ผ ํ๋ ๊ฒฝ์ฐ
- Task๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ ๋ฌํ๊ธฐ ์ํด ์ฌ์ฉ
๊ธฐ๋ฅ
- ๊ฐ ์ ์ฅ (push)
- Task๊ฐ ์คํ๋ ํ, return ๊ฐ์ ์๋์ผ๋ก XCom์ ์ ์ฅ (implicit push)
- ti.xcom_push(key, value)๋ฅผ ์ฌ์ฉํด ์๋์ผ๋ก ๊ฐ ์ ์ฅ (explicit push)
- ๊ฐ ์กฐํ (pull)
- ti.xcom_pull(task_ids='task_A')๋ก ํน์ Task์ ์ถ๋ ฅ์ ๊ฐ์ ธ์ฌ ์ ์์
์ฌ์ฉํ ์ํฉ
- ๋ฐ์ดํฐ ๋ค์ด๋ก๋ & ์ฒ๋ฆฌ: Task A์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๊ณ , Task B์์ ์ด๋ฅผ ๋ณํ
- API ํธ์ถ & ์๋ต ์ฒ๋ฆฌ: Task A์์ API ํธ์ถ ํ ์๋ต์ ์ ์ฅํ๊ณ , Task B์์ ์๋ต์ ๋ถ์
- ๋ชจ๋ธ ํ์ต & ๊ฒฐ๊ณผ ์ ์ฅ: Task A์์ ๋ชจ๋ธ์ ํ์ตํ๊ณ , Task B์์ ๊ฒฐ๊ณผ๋ฅผ ํ๊ฐ
dag์ task ๊ฐ๋จํ ์์
DAG 1: ๋ฐ์ดํฐ ์์ง
โโโ Task 1: API์์ ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ
โโโ Task 2: ๋ฐ์ดํฐ ์ ๋ฆฌ
โโโ Task 3: ๋ฐ์ดํฐ ์ ์ฅ
DAG 2: ๋ชจ๋ธ ํ์ต ๋ฐ ๋ฐฐํฌ
โโโ Task 1: ๋ฐ์ดํฐ ๋ถ๋ฌ์ค๊ธฐ
โโโ Task 2: ๋ชจ๋ธ ํ์ต
โโโ Task 3: ๋ชจ๋ธ ๋ฐฐํฌ
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def task_1():
print("Task 1 ์คํ")
def task_2():
print("Task 2 ์คํ")
with DAG(
"example_dag",
start_date=datetime(2025, 3, 19),
schedule_interval="@daily",
catchup=False
) as dag:
t1 = PythonOperator(
task_id="task_1",
python_callable=task_1
)
t2 = PythonOperator(
task_id="task_2",
python_callable=task_2
)
t1 >> t2 # Task ์คํ ์์ ์ง์ (t1 ์คํ ํ t2 ์คํ)
์ค์ต
from datetime import datetime, timedelta
import os
import requests
from dotenv import load_dotenv
from airflow import DAG
from airflow.operators.python import PythonOperator
# .env ํ์ผ์์ OPENWEATHER_API_KEY๋ฅผ ๊ฐ์ ธ์ด
load_dotenv()
API_KEY = os.getenv("OPENWEATHER_API_KEY")
# ๋์ ๋ฐ API ํธ์ถ URL
CITY = "Seoul"
URL = f"https://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"
def get_weather():
"""OpenWeather API๋ฅผ ํธ์ถํ์ฌ ์์ธ์ ๋ ์จ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ด"""
response = requests.get(URL)
if response.status_code == 200:
data = response.json()
temp = data["main"]["temp"]
weather = data["weather"][0]["description"]
humidity = data["main"]["humidity"]
return f"์์ธ์ ํ์ฌ ๋ ์จ: {weather}, ์จ๋: {temp}ยฐC, ์ต๋: {humidity}%"
else:
return "๋ ์จ ์ ๋ณด๋ฅผ ๊ฐ์ ธ์ค๋ ๋ฐ ์คํจํ์ต๋๋ค."
def append_weather_to_file():
"""data ํด๋์ weather.txt ํ์ผ์ ์คํ ์๊ฐ๊ณผ ํจ๊ป ๋ ์จ ์ ๋ณด๋ฅผ ์ถ๊ฐํจ"""
weather_info = get_weather()
now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"{now_str} (UTC) -> {weather_info}\n"
# data ํด๋๊ฐ ์์ผ๋ฉด ์์ฑ
data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)
# weather.txt ํ์ผ์ append ๋ชจ๋๋ก ๋ ์จ ์ ๋ณด ๊ธฐ๋ก
file_path = os.path.join(data_dir, "weather.txt")
with open(file_path, "a", encoding="utf-8") as file:
file.write(log_entry)
print(f"{now_str}์ ๋ ์จ ์ ๋ณด๋ฅผ weather.txt์ ์ถ๊ฐํ์ต๋๋ค.")
# ๊ธฐ๋ณธ DAG ์ธ์ ์ค์
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG ์ ์: 1๋ถ๋ง๋ค ์คํ
dag = DAG(
"weather_to_file_dag",
default_args=default_args,
description="DAG to append weather info to a single file every minute",
schedule_interval=timedelta(minutes=1),
catchup=False,
tags=['weather', 'save'],
)
# PythonOperator: append_weather_to_file ํจ์๋ฅผ ์คํํ์ฌ weather.txt ํ์ผ์ ๋ฐ์ดํฐ๋ฅผ ์ถ๊ฐ
append_weather_task = PythonOperator(
task_id="append_weather_task",
python_callable=append_weather_to_file,
dag=dag,
)
append_weather_task
์คํ ๊ฒฐ๊ณผ
2025-03-18 03:01:18 (UTC) ->
AAPL ์ต์ ๊ฐ๊ฒฉ: 213.9225 USD (์๊ฐ: 2025-03-17 19:59:00)
GOOGL ์ต์ ๊ฐ๊ฒฉ: 164.4200 USD (์๊ฐ: 2025-03-17 19:59:00)
AMZN ์ต์ ๊ฐ๊ฒฉ: 196.0600 USD (์๊ฐ: 2025-03-17 19:59:00)
from datetime import datetime, timedelta
import os
import requests
from dotenv import load_dotenv
from airflow import DAG
from airflow.operators.python import PythonOperator
# .env ํ์ผ์์ OPENWEATHER_API_KEY๋ฅผ ๊ฐ์ ธ์ด
load_dotenv()
API_KEY = os.getenv("OPENWEATHER_API_KEY")
# ๋์ ๋ฐ API ํธ์ถ URL
CITY = "Seoul"
URL = f"https://api.openweathermap.org/data/2.5/weather?q={CITY}&appid={API_KEY}&units=metric"
def get_weather():
"""OpenWeather API๋ฅผ ํธ์ถํ์ฌ ์์ธ์ ๋ ์จ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ด"""
response = requests.get(URL)
if response.status_code == 200:
data = response.json()
temp = data["main"]["temp"]
weather = data["weather"][0]["description"]
humidity = data["main"]["humidity"]
return f"์์ธ์ ํ์ฌ ๋ ์จ: {weather}, ์จ๋: {temp}ยฐC, ์ต๋: {humidity}%"
else:
return "๋ ์จ ์ ๋ณด๋ฅผ ๊ฐ์ ธ์ค๋ ๋ฐ ์คํจํ์ต๋๋ค."
def append_weather_to_file():
"""data ํด๋์ weather.txt ํ์ผ์ ์คํ ์๊ฐ๊ณผ ํจ๊ป ๋ ์จ ์ ๋ณด๋ฅผ ์ถ๊ฐํจ"""
weather_info = get_weather()
now_str = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"{now_str} (UTC) -> {weather_info}\n"
# data ํด๋๊ฐ ์์ผ๋ฉด ์์ฑ
data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)
# weather.txt ํ์ผ์ append ๋ชจ๋๋ก ๋ ์จ ์ ๋ณด ๊ธฐ๋ก
file_path = os.path.join(data_dir, "weather.txt")
with open(file_path, "a", encoding="utf-8") as file:
file.write(log_entry)
print(f"{now_str}์ ๋ ์จ ์ ๋ณด๋ฅผ weather.txt์ ์ถ๊ฐํ์ต๋๋ค.")
# ๊ธฐ๋ณธ DAG ์ธ์ ์ค์
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG ์ ์: 1๋ถ๋ง๋ค ์คํ
dag = DAG(
"weather_to_file_dag",
default_args=default_args,
description="DAG to append weather info to a single file every minute",
schedule_interval=timedelta(minutes=1),
catchup=False,
tags=['weather', 'save'],
)
# PythonOperator: append_weather_to_file ํจ์๋ฅผ ์คํํ์ฌ weather.txt ํ์ผ์ ๋ฐ์ดํฐ๋ฅผ ์ถ๊ฐ
append_weather_task = PythonOperator(
task_id="append_weather_task",
python_callable=append_weather_to_file,
dag=dag,
)
append_weather_task
์คํ ๊ฒฐ๊ณผ
2025-03-18 03:12:01 (UTC) -> ์์ธ์ ํ์ฌ ๋ ์จ: light snow, ์จ๋: 1.76ยฐC, ์ต๋: 87%
2025-03-18 03:13:01 (UTC) -> ์์ธ์ ํ์ฌ ๋ ์จ: light snow, ์จ๋: 1.43ยฐC, ์ต๋: 86%
2025-03-18 03:30:01 (UTC) -> ์์ธ์ ํ์ฌ ๋ ์จ: broken clouds, ์จ๋: 2.76ยฐC, ์ต๋: 81%
2025-03-18 03:41:01 (UTC) -> ์์ธ์ ํ์ฌ ๋ ์จ: mist, ์จ๋: 2.76ยฐC, ์ต๋: 81%
'๐ฆญ AI&Big Data > Big Data' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Elastic Search (0) | 2025.02.15 |
---|---|
Pandas ์ ๋ฆฌ (0) | 2025.01.10 |
NumPy ์ ๋ฆฌ (0) | 2025.01.10 |
๋น ๋ฐ์ดํฐ ์ปค๋ฆฌ์ด ๊ฐ์ด๋๋ถ(2) (0) | 2024.12.24 |
๋น ๋ฐ์ดํฐ ์ปค๋ฆฌ์ด ๊ฐ์ด๋๋ถ(1) (2) | 2024.12.24 |