๊ฐ๋
MinIO๋? MinIO๋ AWS S3์ ํธํ๋๋ ์ค๋ธ์ ํธ ์คํ ๋ฆฌ์ง ์์คํ ์ผ๋ก, ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๊ณ ๊ด๋ฆฌํ๋ ์ญํ
์ค์ต
์์ฒญ์ฌํญ
- data๋ minio์ raw-data ๋ฒํท์์ ๊ฐ์ ธ์์ airflow๋ก ๋งค ์๊ฐ ์ ๊ฐ์ ์คํ๋๋, ๋ก๊ทธ๋ mlflow์ ๋จ๋ pipeline dag๋ฅผ ์์ฑ
- dag์ด๋ฆ์ 00_airflow_mlops.py๋ก ํฉ๋๋ค.
- experiment ์ด๋ฆ์ fisa-ml
์ฌ๊ณ ๊ณผ์
MinIO์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์์, ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ํ, MLflow์ ๋ก๊น ํ๋ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ๊ตฌ์ถํ๋ DAG๋ฅผ ๋ง๋ค๋ฉด ๋๋ค.
- MinIO์์ raw-data ๋ฒํท์ ์๋ data.csv ํ์ผ์ ๊ฐ์ ธ์ด
- ๊ฐ์ ธ์จ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ์ฌ ํต๊ณ๊ฐ(ํ๊ท )์ ๊ณ์ฐ
- MLflow์ ๋ก๊ทธ๋ฅผ ๊ธฐ๋กํ์ฌ ์คํ ๊ด๋ฆฌ
- ๋งค ์๊ฐ ์ ๊ฐ(0๋ถ)๋ง๋ค ์คํ๋๋ ์๋ํ๋ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ
1. ์ค์
from datetime import datetime, timedelta
import os
import boto3
import pandas as pd
import mlflow
from airflow import DAG
from airflow.operators.python import PythonOperator
# ------------------------------
# 1. MinIO ์ค์
# ------------------------------
MINIO_ENDPOINT = "http://minio-server:9000"
MINIO_ACCESS_KEY = "minio"
MINIO_SECRET_KEY = "miniostorage"
BUCKET_NAME = "raw-data" # ์ฌ์ ์ ์์ฑ
FILE_NAME = "data.csv"
# ------------------------------
# 2. MLflow ์ค์
# ------------------------------
MLFLOW_TRACKING_URI = "http://mlflow:600"
EXPERIMENT_NAME = "fisa-ml" # MLflow์์ ์ฌ์ฉํ Experiment ์ด๋ฆ
# ------------------------------
# 3. MinIO ํด๋ผ์ด์ธํธ ์์ฑ
# ------------------------------
# boto3๋ฅผ ์ฌ์ฉํ์ฌ MinIO์ ์ฐ๊ฒฐ
s3_client = boto3.client(
"s3",
endpoint_url=MINIO_ENDPOINT,
aws_access_key_id=MINIO_ACCESS_KEY,
aws_secret_access_key=MINIO_SECRET_KEY,
)
MinIO ์ค์
- MinIO ์๋ฒ๋ http://minio-server:9000์์ ์คํ๋จ
- (localhost๋ก ํด๋ ๋์ง๋ง, ๋ ๋์ปค ์ปดํฌ์ฆ๋ก ๋์ด์ ์ปจํ ์ด๋๋ณ๋ก ๊ตฌ๋ถํ๊ธฐ ์ํด ํด๋น ์ฃผ์๋ฅผ ์ฌ์ฉํ๋ค.)
์ minio-server:9000 ์ ์ฌ์ฉํ๋๊ฐ
- Airflow ์ปจํ ์ด๋์์ MinIO ์ปจํ ์ด๋๋ฅผ ์ ๊ทผํด์ผ ํ๊ธฐ ๋๋ฌธ
- Docker Compose ๋ด์์ ์ปจํ ์ด๋ ๊ฐ ํต์ ์ ํ๊ธฐ ๋๋ฌธ
์ ๋ฆฌ
- ๋์ปค ์ปจํ ์ด๋๋ ๊ฐ์ ๋ ๋ฆฝ๋ ๋คํธ์ํฌ๋ฅผ ์ฌ์ฉ
- localhost๋ฅผ ์ฐ๋ฉด Airflow ์ปจํ ์ด๋ ๋ด๋ถ์์ ์คํ๋๋ localhost๋ฅผ ๊ฐ๋ฆฌํค๊ฒ ๋จ
- MinIO๋ Airflow ๋ด๋ถ์ localhost๊ฐ ์๋๋ผ, ๋ณ๋์ MinIO ์ปจํ ์ด๋์์ ์คํ ์ค
- ๋ฐ๋ผ์ MinIO์ ์ปจํ ์ด๋ ์ด๋ฆ(minio-server)์ ์ฌ์ฉํด์ผ ํจ
๊ฐ๋
- docker-compose.yml์์ ๊ฐ์ services ์๋ ์ ์๋ ์ปจํ ์ด๋๋ค์ ์๋์ผ๋ก ๊ฐ์ ๋คํธ์ํฌ์ ์ํจ
- ๋ฐ๋ผ์ ์ปจํ ์ด๋ ๊ฐ์๋ ์๋น์ค๋ช (์ปจํ ์ด๋๋ช )์ผ๋ก ์ ๊ทผ ๊ฐ๋ฅ
- minio-server๋ผ๋ ์๋น์ค๋ช ์ด ์ปจํ ์ด๋์ ํธ์คํธ ์ด๋ฆ ์ญํ ์ ํจ
- services ์๋ minio-server๋ก ์ ์ → ์ด๊ฒ ์ปจํ ์ด๋์ ๋คํธ์ํฌ ์ด๋ฆ์ด ๋จ
๋ด ๋์ปค ์ปดํฌ์ฆ minio ๋ถ๋ถ
services:
minio-server:
image: minio/minio
ports:
- "9000:9000"
- "9001:9001"
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: miniostorage
command: server /data/minio --console-address :9001


- ์ ๊ทผ์ ์ํด minio(์์ด๋) / miniostorage(๋น๋ฐ๋ฒํธ)๋ฅผ ์ฌ์ฉ
- raw-data ๋ฒํท์์ data.csv ํ์ผ์ ๊ฐ์ ธ์ฌ ์์
MLflow ์ค์
- MLflow ์๋ฒ๋ http://mlflow:600์์ ์คํ๋จ
- fisa-ml์ด๋ผ๋ Experiment์ ๋ฐ์ดํฐ๋ฅผ ๊ธฐ๋กํ ์์
- ๋ก๊ทธํ ๋ฐ์ดํฐ๋ ํต๊ณ ์ ๋ณด(ex. ํ๊ท ๊ฐ)๋ก ์ ์ฅ๋จ
2. MinIO์์ ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ
def fetch_data_from_minio():
local_file_path = f"/tmp/{FILE_NAME}" # ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ ๋ก์ปฌ ๊ฒฝ๋ก ์ง์
# MinIO์์ ํ์ผ ๋ค์ด๋ก๋
s3_client.download_file(BUCKET_NAME, FILE_NAME, local_file_path)
print(f"๐ฅ MinIO์์ {FILE_NAME} ๋ค์ด๋ก๋ ์๋ฃ: {local_file_path}")
# ๋ค์ด๋ก๋๋ ํ์ผ ๊ฒฝ๋ก๋ฅผ ๋ฐํ (XCom์ ํตํด ๋ค์ Task์์ ์ฌ์ฉ ๊ฐ๋ฅ)
return local_file_path
- MinIO์์ raw-data ๋ฒํท์ ์๋ data.csv ํ์ผ์ ๋ค์ด๋ก๋
- ๋ค์ด๋ก๋๋ ํ์ผ์ ๋ก์ปฌ /tmp/data.csv ๊ฒฝ๋ก์ ์ ์ฅ
- Airflow์ XCom ๊ธฐ๋ฅ์ ํ์ฉํ์ฌ ํ์ผ ๊ฒฝ๋ก๋ฅผ ๋ค์ Task์ ์ ๋ฌ
http://localhost:9001/browser ์ผ๋ก MinIO์ ์ ์
raw-data๋ผ๋ ๋ฒํท์์ data.csv ํ์ผ์ด ์์นํด ์์ด์ผ ํ๋ค.
3. ๋ฐ์ดํฐ ์ฒ๋ฆฌ & MLflow ๊ธฐ๋ก
def process_data(**kwargs):
# Airflow XCom์์ fetch_data Task๊ฐ ๋ฐํํ ํ์ผ ๊ฒฝ๋ก ๊ฐ์ ธ์ค๊ธฐ
ti = kwargs["ti"]
file_path = ti.xcom_pull(task_ids="fetch_data")
# CSV ํ์ผ ๋ก๋
df = pd.read_csv(file_path)
print(f"๐ ๋ฐ์ดํฐ ๋ก๋ ์๋ฃ: {df.shape}")
# ์์นํ ์ปฌ๋ผ ํ๊ท ๊ณ์ฐ
mean_values = df.mean().to_dict()
# MLflow ์ค์
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
mlflow.set_experiment(EXPERIMENT_NAME)
# ์ปฌ๋ผ๋ช
๋ณํ (๊ณต๋ฐฑ, ๊ดํธ ๋ฑ ์ ๊ฑฐ)
cleaned_mean_values = {col.replace(" ", "_").replace("(", "").replace(")", ""): val for col, val in mean_values.items()}
# MLflow ์คํ
with mlflow.start_run():
mlflow.log_params({"source": "MinIO", "file": FILE_NAME})
mlflow.log_metrics(cleaned_mean_values)
print("โ
MLflow์ ๋ฐ์ดํฐ ๊ธฐ๋ก ์๋ฃ!")
๋ค์ด๋ก๋ํ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ณ , MLflow์ ๋ก๊น ํ๋ ํจ์
- fetch_data Task์์ ๋ค์ด๋ก๋ํ ํ์ผ์ ๊ฒฝ๋ก๋ฅผ XCom์ ์ด์ฉํด ๊ฐ์ ธ์ด
- pandas๋ฅผ ์ฌ์ฉํ์ฌ CSV ๋ฐ์ดํฐ๋ฅผ ๋ก๋ํ๊ณ ์์นํ ์ปฌ๋ผ์ ํ๊ท ๊ฐ์ ๊ณ์ฐ
- mlflow.log_metrics(mean_values)๋ก MLflow์ ํ๊ท ๊ฐ์ ๊ธฐ๋ก
- ์ปฌ๋ผ๋ช ์ ๊ณต๋ฐฑ/ํน์๋ฌธ์๊ฐ ์์ผ๋ฉด ๋ณํํ์ฌ ์ ์ฅ (MLflow ๊ท์น ์ค์)
EXPERIMENT_NAME = "fisa-ml"๋ก ์ค์ ํ๊ธฐ์, http://localhost:600/ ์ผ๋ก ์ ์ ์ ๋ฌธ์ ๊ฐ ์๋ค๋ฉด ์๋์ ๊ฐ์ด MLflow๊ฐ ์คํ ์๋ฃ ๋ ๊ฒ์ ํ์ธํ ์ ์๋ค.
XCOM?
https://koreatstm.tistory.com/268#XCom-1
4. DAG ์ ์ ๋ฐ ์คํ ์์
default_args = {
"owner": "fisa",
"start_date": datetime(2025, 3, 19),
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
dag = DAG(
"00_airflow_mlops",
default_args=default_args,
description="MinIO์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ MLflow์ ๊ธฐ๋กํ๋ DAG",
schedule_interval="0 * * * *",
catchup=False,
tags=["minio", "mlflow", "airflow"],
)
- Task ์คํ ์คํจ ์, 5๋ถ ๊ฐ๊ฒฉ์ผ๋ก ์ต๋ 1๋ฒ ์ฌ์๋
- DAG๋ ๋งค ์๊ฐ ์ ๊ฐ(0๋ถ)์ ์คํ๋จ (crontab ํ์์ผ๋ก ์ ์)
๐ 5. DAG์ Task ์ ์
fetch_data_task = PythonOperator(
task_id="fetch_data",
python_callable=fetch_data_from_minio,
dag=dag,
)
process_data_task = PythonOperator(
task_id="process_data",
python_callable=process_data,
provide_context=True,
dag=dag,
)
fetch_data_task >> process_data_task
- fetch_data_task → MinIO์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๋ Airflow Task
- process_data_task → ๊ฐ์ ธ์จ ๋ฐ์ดํฐ๋ฅผ MLflow์ ๊ธฐ๋กํ๋ Airflow Task
- Task ๊ฐ ์คํ ์์ ์ง์ → fetch_data_task >> process_data_task
fetch_data, process_data Task ์ฑ๊ณต์ ์ผ๋ก ์๋ฃ๋ ๊ฒ์ ํ์ธ
์ ์ฒด ์ฝ๋
from datetime import datetime, timedelta
import os
import boto3
import pandas as pd
import mlflow
from airflow import DAG
from airflow.operators.python import PythonOperator
# ------------------------------
# 1. MinIO ์ค์
# ------------------------------
MINIO_ENDPOINT = "http://minio-server:9000"
MINIO_ACCESS_KEY = "minio"
MINIO_SECRET_KEY = "miniostorage"
BUCKET_NAME = "raw-data" # ์ฌ์ ์ ์์ฑ
FILE_NAME = "data.csv"
# ------------------------------
# 2. MLflow ์ค์
# ------------------------------
MLFLOW_TRACKING_URI = "http://mlflow:600"
EXPERIMENT_NAME = "fisa-ml" # MLflow์์ ์ฌ์ฉํ Experiment ์ด๋ฆ
# ------------------------------
# 3. MinIO ํด๋ผ์ด์ธํธ ์์ฑ
# ------------------------------
# boto3๋ฅผ ์ฌ์ฉํ์ฌ MinIO์ ์ฐ๊ฒฐ
s3_client = boto3.client(
"s3",
endpoint_url=MINIO_ENDPOINT,
aws_access_key_id=MINIO_ACCESS_KEY,
aws_secret_access_key=MINIO_SECRET_KEY,
)
def fetch_data_from_minio():
"""
MinIO์์ ์ง์ ๋ ํ์ผ์ ๋ค์ด๋ก๋ํ์ฌ ๋ก์ปฌ `/tmp` ๋๋ ํ ๋ฆฌ์ ์ ์ฅํ๋ ํจ์
/tmp๋ Airflow๊ฐ ์คํ์ค์ธ ์๋ฒ์ ์์ ๋๋ ํ ๋ฆฌ
์ดํ XCOM์ ์ฌ์ฉํ์ฌ ๋ค์ Task์์ ์ด ํ์ผ ์ฌ์ฉ ๊ฐ๋ฅ
"""
local_file_path = f"/tmp/{FILE_NAME}" # ํ์ผ์ ์ ์ฅํ ๊ฒฝ๋ก ์ค์
# MinIO์์ ํ์ผ ๋ค์ด๋ก๋
s3_client.download_file(BUCKET_NAME, FILE_NAME, local_file_path)
print(f"๐ฅ MinIO์์ {FILE_NAME} ๋ค์ด๋ก๋ ์๋ฃ: {local_file_path}")
# ๋ค์ด๋ก๋๋ ํ์ผ ๊ฒฝ๋ก๋ฅผ ๋ฐํ
return local_file_path
def process_data(**kwargs):
"""
๋ค์ด๋ก๋ํ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ณ , MLflow์ ๋ก๊น
ํ๋ ํจ์
"""
# Airflow XCom์ ์ฌ์ฉํ์ฌ ์ด์ task(fetch_data)์ ์ถ๋ ฅ ๊ฐ ๊ฐ์ ธ์ค๊ธฐ
ti = kwargs["ti"]
file_path = ti.xcom_pull(task_ids="fetch_data") # fetch_data์์ ๋ฐํ๋ ํ์ผ ๊ฒฝ๋ก ๊ฐ์ ธ์ด
# 1. CSV ๋ฐ์ดํฐ ๋ก๋
df = pd.read_csv(file_path) # Pandas๋ฅผ ์ด์ฉํด CSV ํ์ผ ์ฝ๊ธฐ
print(f"๐ ๋ฐ์ดํฐ ๋ก๋ ์๋ฃ: {df.shape}") # ๋ฐ์ดํฐ ๊ฐ์ ์ถ๋ ฅ
# 2. ๋ฐ์ดํฐ ์ฒ๋ฆฌ (ํ๊ท ๊ฐ ๊ณ์ฐ)
mean_values = df.mean().to_dict() # ๋ชจ๋ ์์นํ ์ปฌ๋ผ์ ํ๊ท ๊ฐ์ ๊ณ์ฐ
# ------------------------------
# 3. MLflow์ ๋ฐ์ดํฐ ๋ก๊น
(์ปฌ๋ผ๋ช
์ ๋ฆฌ)
# ------------------------------
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI) # MLflow ์๋ฒ ์ฃผ์ ์ค์
mlflow.set_experiment(EXPERIMENT_NAME) # ์ฌ์ฉํ Experiment ์ค์
# ์ปฌ๋ผ๋ช
๋ณํ (๊ณต๋ฐฑ, ๊ดํธ ๋ฑ์ '_'๋ก ๋ณ๊ฒฝ)
cleaned_mean_values = {col.replace(" ", "_").replace("(", "").replace(")", ""): val for col, val in mean_values.items()}
# ์๋ก์ด Run์ ์์ํ๊ณ ๋ก๊ทธ ๊ธฐ๋ก
with mlflow.start_run():
mlflow.log_params({"source": "MinIO", "file": FILE_NAME}) # ๋ฐ์ดํฐ ์์ค ์ ๋ณด ๊ธฐ๋ก
mlflow.log_metrics(cleaned_mean_values) # ๋ณํ๋ ์ปฌ๋ผ๋ช
์ผ๋ก ๋ฉํธ๋ฆญ ๋ก๊น
print("โ
MLflow์ ๋ฐ์ดํฐ ๊ธฐ๋ก ์๋ฃ!") # ์ฑ๊ณต ๋ฉ์์ง ์ถ๋ ฅ
# ------------------------------
# 4. DAG ์ ์
# ------------------------------
# dag์ ๋จ์์ธ ๊ฐ task๋ฅผ 5๋ถ์ ํ๋ฒ์ฉ
default_args = {
"owner": "fisa", # DAG ์์ ์
"start_date": datetime(2025, 3, 19), # DAG ์์ ๋ ์ง
"retries": 1, # ์คํจ ์ ์ฌ์๋ ํ์
"retry_delay": timedelta(minutes=5), # ์ฌ์๋ ๊ฐ๊ฒฉ (5๋ถ)
}
# dag์์ฒด๋ฅผ 1์๊ฐ์ ํ๋ฒ์ฉ
dag = DAG(
"00_airflow_mlops", # DAG ์ด๋ฆ
default_args=default_args, # ๊ธฐ๋ณธ ์ธ์ ์ค์
description="MinIO์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ MLflow์ ๊ธฐ๋กํ๋ DAG", # ์ค๋ช
schedule_interval="0 * * * *", # ๋งค ์๊ฐ ์ ๊ฐ์ ์คํ
catchup=False, # ๊ณผ๊ฑฐ ์คํ ์ ํจ
tags=["minio", "mlflow", "airflow"], # ํ๊ทธ ์ค์
)
# ------------------------------
# 5. Task ์ ์
# ------------------------------
# (1) MinIO์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๋ Task
fetch_data_task = PythonOperator(
task_id="fetch_data", # Task ์ด๋ฆ
python_callable=fetch_data_from_minio, # ์คํํ ํจ์
dag=dag,
)
# (2) ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ฐ MLflow ๊ธฐ๋ก Task
process_data_task = PythonOperator(
task_id="process_data", # Task ์ด๋ฆ
python_callable=process_data, # ์คํํ ํจ์
provide_context=True, # XCom์ ์ฌ์ฉํ๊ธฐ ์ํด ์ค์
dag=dag,
)
# ------------------------------
# 6. DAG ์คํ ์์ ์ ์
# ------------------------------
fetch_data_task >> process_data_task # fetch_data ์คํ ํ process_data ์คํ
'๐ฆญ AI&Big Data > ML' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
๋น์ง๋ํ์ต (0) | 2025.03.05 |
---|---|
๊ฒฐ์ ํธ๋ฆฌ & ์์๋ธ (0) | 2025.03.05 |
๋ก์ง์คํฑ ํ๊ท (0) | 2025.03.05 |
ํ๊ท (0) | 2025.02.28 |
K ์ต๊ทผ์ ์ด์ (0) | 2025.02.27 |