4. 모델 파이프라인 구성
📈 트래픽 예측 모델 실습 중 모델 자동화 파이프라인을 구성하는 과정을 설명합니다.
- 예상 소요 시간: 60분
- 권장 운영 체제: MacOS, Ubuntu
시나리오 소개
이 튜토리얼에서는 Kubeflow Pipeline(KFP) 를 이용하여 전체 머신러닝 파이프라인 워크플로우를 자동화합니다. 데이터 로드 → 피처 엔지니어링 → 하이퍼파라미터 튜닝 → 모델 평가와 등록 - 서빙 업데이트의 전 과정을 하나의 파이프라인으로 구성하고 실행하는 방식을 소개합니다.
KFP 주요 개념
Kubeflow Pipeline(KFP)은 Docker 컨테이너 기반의 머신러닝 워크플로우를 정의하고 자동화할 수 있는 프레임워크입니다. Python SDK를 사용하여 파이프라인을 정의하고, KFP 백엔드를 통해 실행할 수 있습니다.
주요 개념 용어 | 설명 |
---|---|
파이프라인(Pipeline) | 머신러닝 전체 워크플로우를 정의하는 단위로, 여러 개의 컴포넌트로 구성 |
컴포넌트(Component) | 워크플로우의 한 단계를 수행하는 실행 단위로 데이터 전처리, 모델 학습 등 각 작업이 해당 |
실험(Experiment) | 작성한 파이프라인을 다양한 설정으로 반복 실행하는 논리적 워크스페이스 |
런(Run) | Experiment 내에서 수행되는 단일 파이프라인 실행 인스턴스 |
아티팩트(Artifact) | 각 컴포넌트가 실행된 후 생성되는 출력 결과 (예: 모델 파일, 평가 점수, 로그 등) |
자세한 개념 설명은 Kubeflow 공식 문서를 참고하세요.
시작하기
⚠️ 이 실습은 앞선 트래픽 예측 튜토리얼(1, 2, 3편) 을 완료한 상태를 기준으로 작성되었습니다.
Step 1. Workflow 설계
데이터 수집부터 모델 서빙까지 이어지는 머신러닝 전체 흐름을 Kubeflow Pipeline으로 구성합니다. 단, 이번 실습에서는 ML 모델의 연구개발이 이미 완료된 상태를 가정하므로 데이터 분석과 모델 개발 단계는 생략합니다.
튜토리얼에 포함된 워크플로우 단계
단계 | 작업 | 설명 |
---|---|---|
1 | 데이터 전처리(preprocessing) | 원본 데이터를 집계하고 CSV로 저장 |
2 | 피처 엔지니어링(featurization) | 시간대·요일 정보를 사이클릭 인코딩하여 입력 피처 생성 |
3 | 하이퍼파라미터 튜닝(tuning with Katib) | Katib을 통해 최적의 모델 파라미터 탐색 |
4 | 모델 평가(evaluate-model) | 테스트셋으로 예측 정확도(MAE) 평가 및 결과 저장 |
5 | 모델 등록(push to registry) | 모델 파일을 Model Registry에 등록 |
6 | 모델 배포(update KServe) | InferenceService를 통해 모델 서빙 |
실행 주기 설정
현재 사용하는 모델은 시간 및 요일 단위의 트래픽을 예측하도록 설계되어 있습니다. 워크플로우의 실행 주기는 1주일로 설정하며, 파이프라인 실행 시점 기준 최근 5주간 데이터 중 4주는 학습, 1주는 평가용으로 사용됩니다.
- 파이프라인 실행일: 2025년 5월 6일 00시
- 학습 + 검증 데이터셋: 2024년 4월 1일~28일 (4주)
- 평가 데이터셋: 2024년 4월 29일~5월 5일 (1주)
Step 2. 파이프라인 구성
Kubeflow Pipeline(KFP)의 Python SDK를 활용해 JupyterLab 환경에서 파이프라인을 구성하고 실행합니다.
1. 사전 설정
- Jupyter Notebook에서 다음 명령어를 실행하여 KFP SDK를 설치합니다.
!pip install kfp==2.9.0 --quiet
- 사용할 실습 데이터셋을 다운로드 및 업로드합니다.
- nlb-raw.txt
- 파일을
dataset-pvc
로 마운트된 디렉터리(/home/jovyan/dataset/
)에 업로드
2. 컴포넌트 작성
KFP의 컴포넌트는 dsl
패키지의 데코레이터를 사용하여 작성할 수 있습니다. 각 컴포넌트는 하나의 함수로 구성되며, 해당 함수 내에서 필요한 라이브러리는 반드시 함수 내부에서 import해야 합니다.
이전 설계에서 작성한 단계에 맞춰 JupyterLab 환경에서 컴포넌트를 작성합니다. 작성에 앞서 모듈을 import 하고 컴포넌트의 기본 이미지를 다음과 같이 설정합니다.
import kfp
from kfp import kubernetes
from kfp import dsl
from kfp.dsl import Output, Input, Dataset
LB_BASE_IMAGE="bigdata-150.kr-central-2.kcr.dev/kc-kubeflow/jupyter-scipy:v1.8.0.py311.1a"
-
데이터 전처리 (preprocessing): 원본 로그 데이터를 30분 단위로 집계하여 정형화된 데이터셋 생성
@dsl.component(base_image=LB_BASE_IMAGE)
def preprocessing(
source_data: str,
data_mnt_path: str,
start_date: str,
eval_date: str,
raw_data: Output[Dataset]
):
import json
import pandas as pd
import os
# file path
file_path = os.path.join(data_mnt_path, source_data)
json_data = []
# load nlb dataset
with open(file_path) as f:
for line in f: # 각 line 에 있는 JSON형식 텍스트를 읽음
# load json data
data = json.loads(line) # JSON 형식 문자열을 python dict 타입으로 변환
# append to raw_data
json_data.append(data)
raw_df = pd.json_normalize(json_data) # key:value 형식의 tabular 데이터로 변환, pandas의 Dataframe 객체 형식
# 시간을 30분 간격으로 변경
log_time_sr = pd.to_datetime(raw_df['time'],format='%Y/%m/%d %H:%M:%S:%f').dt.floor('30min')
# 30분 간격으로 로그의 수를 세고, 각 시간 별 로그 수를 dictionary 타입으로 저장
log_count_dict = log_time_sr.dt.floor('30min').value_counts(dropna=False).to_dict()
# 데이터셋 시간 범위를 생성 (30분 간격)
# collect_date = pd.to_datetime(collect_date)
time_range = pd.date_range(start=start_date, end=eval_date , freq='30min')
# 시간과 로그 수를 칼럼으로 갖는 데이터프레임 생성
df = pd.DataFrame({'datetime': time_range})
df['count'] = df['datetime'].apply(lambda x: log_count_dict.get(x, 0))
df.to_csv(raw_data.path, index=False)- 원본 데이터는 하나의 파일로 구성되었다고 가정합니다.
- 전처리된 데이터의 저장 경로를 전달하기 위해서, KFP의 Output 객체를 사용합니다.
- Output 객체인 raw_data의 경로에 저장하여, 이를 입력으로 받는 컴포넌트에서 해당 파일을 사용할 수 있도록 작성합니다.
-
피처 엔지니어링 (featurization): 시간/요일 특성을 주기 함수로 인코딩해 피처셋 구성
@dsl.component(base_image=LB_BASE_IMAGE)
def featurization(
data_version: str,
eval_date: str,
data_mnt_path: str,
raw_data: Input[Dataset]
):
import pandas as pd
import numpy as np
import os
df = pd.read_csv(raw_data.path,parse_dates=['datetime'])
time_sr = df['datetime'].apply(lambda x: x.hour * 2 + x.minute // 30)
dow_sr = df['datetime'].dt.dayofweek
dataset = pd.DataFrame()
dataset['datetime'] = df['datetime'] # 이후 작업 편의를 위해 설정
# 시간 관련 feature
dataset['x1'] = np.sin(2*np.pi*time_sr/48)
dataset['x2'] = np.cos(2*np.pi*time_sr/48)
# 요일 관련 feature
dataset['x3'] = np.sin(2*np.pi*dow_sr/7)
dataset['x4'] = np.cos(2*np.pi*dow_sr/7)
# 예측하고자 하는 대상, label
dataset['y'] = df['count']
train_df = dataset[dataset['datetime'] < eval_date]
test_df = dataset[dataset['datetime'] >= eval_date]
train_df.to_csv(os.path.join(data_mnt_path,f'nlb-{data_version}.csv'), index=False)
test_df.to_csv(os.path.join(data_mnt_path,f'nlb-{data_version}-test.csv'), index=False)- 변환된 데이터셋은 날짜 기준으로 학습+검증셋(train_df)과 평가셋(test_df)으로 나눕니다.
- 데이터셋은 별도의 볼륨을 사용해 저장하며(볼륨 마운트 부분은 pipeline 코드에서 설정), 파라미터로 주어진 데이터 버전 이름의 파일로 저장합니다.
-
하이퍼파라미터 튜닝 (tuning_with_katib): Katib을 활용한 최적 하이퍼파라미터 탐색 및 모델 학습
@dsl.component(base_image=LB_BASE_IMAGE, packages_to_install=['kubeflow-katib', 'scikit-learn==1.3.0'])
def tunning_with_katib(
data_mnt_path: str,
model_mnt_path: str,
artifact_mnt_path: str,
katib_exp_name: str,
model_version: str,
data_version: str
):
import os
import ast
import joblib,json
import pandas as pd
from sklearn.ensemble import GradientBoostingRegressor
import kubeflow.katib as katib
katib_client = katib.KatibClient()
katib_exp = katib_client.get_experiment(katib_exp_name)
katib_exp.metadata.name += f'-d{data_version}'
exp_name = katib_exp.metadata.name
katib_exp.metadata.resource_version = None
container_spec = katib_exp.spec.trial_template.trial_spec['spec']['template']['spec']['containers'][0]
container_spec['args'].append('--data_version')
container_spec['args'].append(data_version)
katib_client.create_experiment(katib_exp)
katib_client.wait_for_experiment_condition(name=exp_name)
optim_params = katib_client.get_optimal_hyperparameters(exp_name)
print(optim_params)
params = { param.name: ast.literal_eval(param.value) for param in optim_params.parameter_assignments}
df = pd.read_csv(os.path.join(data_mnt_path,f'nlb-{data_version}.csv'))
print("read df")
X = df[['x1', 'x2', 'x3', 'x4']]
y = df['y']
model = GradientBoostingRegressor(**params)
model.fit(X, y)
print("Model train")
model_dir = os.path.join(model_mnt_path, model_version)
os.makedirs(model_dir, exist_ok=True)
joblib.dump(model, os.path.join(model_dir,'model.joblib'))
print("Dump to model")
artifact_dir = os.path.join(artifact_mnt_path, model_version)
os.makedirs(artifact_dir, exist_ok=True)
with open(os.path.join(artifact_mnt_path, model_version,'param.json'),'w') as f:
json.dump(params,f)- 하이퍼파라미터 튜닝은 컴포넌트 내부에서 진행하지 않고, Katib 클라이언트를 통해 사전에 생성했던 Experiment(Katib 의 AutoML)을 실행합니다.
- 데이터셋 버전 정보를 추가하여 Experiment를 설정하여 실행합니다. (container_spec args 부분)
- 학습+검증 셋을 사용하여 최적 하이퍼파라미터를 찾고, 해당 기준으로 모델을 학습합니다.
- 학습한 모델은 모델 볼륨에, 파라미터 정보는 아티펙트 볼륨에 저장합니다.
-
모델 평가 (evaluate-model): 테스트 데이터셋을 사용한 MAE 기반 평가
@dsl.component(base_image=LB_BASE_IMAGE, packages_to_install=['scikit-learn==1.3.0'])
def evaluate_model(
data_mnt_path: str,
model_mnt_path: str,
artifact_mnt_path: str,
model_version: str,
data_version: str
):
import os
import joblib
import json
import pandas as pd
from sklearn.metrics import mean_absolute_error
df = pd.read_csv(os.path.join(data_mnt_path,f'nlb-{data_version}-test.csv'))
X = df[['x1', 'x2', 'x3', 'x4']]
real = df['y']
model_dir = os.path.join(model_mnt_path, model_version)
model = joblib.load(f'/{model_dir}/model.joblib')
pred = model.predict(X)
mae = mean_absolute_error(real,pred)
# you can add more metrics (e.g. mse, r2, etc.)
metrics = {'mae': mae}
print(f"MAE: {mae}")
artifacts_path = os.path.join(artifact_mnt_path, model_version)
with open(os.path.join(artifacts_path,'score.json'),'w') as f:
json.dump(metrics,f)- 모델 평가 점수를 아티펙트 볼륨에 저장합니다.
-
모델 등록 (push_to_registry): 모델 파일을 Model Registry에 등록
@dsl.component(base_image=LB_BASE_IMAGE, packages_to_install=['model-registry==0.2.7a1'])
def push_to_registry(
model_registry_host: str,
model_registry_port: int,
mr_author: str,
model_name: str,
model_version: str,
model_pvc: str
):
from model_registry import ModelRegistry
if not model_registry_host.startswith("http"):
model_registry_host = f"http://{model_registry_host}"
registry = ModelRegistry(
server_address=model_registry_host, # default: http://model-registry-service.kubeflow.svc.cluster.local
port=model_registry_port,
author=mr_author,
is_secure=False
)
registry.register_model(
model_name,
f"pvc://{model_pvc}/{model_version}/model.joblib",
model_format_name="joblib",
model_format_version="1",
version=model_version
)- 이전 단계에서 모델을 저장한 PVC 경로를 파라미터로 사용하여 등록합니다.
-
모델 배포 (update_kserve): KServe를 통한 InferenceService 생성 또는 업데이트
@dsl.component(base_image=LB_BASE_IMAGE, packages_to_install=['model-registry==0.2.7a1', 'kserve==0.13.1'])
def update_kserve(
model_registry_host: str,
model_registry_port: int,
mr_author: str,
model_name: str,
model_version: str
):
from model_registry import ModelRegistry
from kubernetes import client
import kserve
if not model_registry_host.startswith("http"):
model_registry_host = f"http://{model_registry_host}"
registry = ModelRegistry(
server_address=model_registry_host, # default: http://model-registry-service.kubeflow.svc.cluster.local
port=model_registry_port,
author=mr_author,
is_secure=False
)
model = registry.get_registered_model(model_name)
print("Registered Model:", model, "with ID", model.id)
version = registry.get_model_version(model_name, model_version)
print("Model Version:", version, "with ID", version.id)
art = registry.get_model_artifact(model_name, model_version)
print("Model Artifact:", art, "with ID", art.id)
isvc = kserve.V1beta1InferenceService(
api_version=kserve.constants.KSERVE_GROUP + "/v1beta1",
kind=kserve.constants.KSERVE_KIND,
metadata=client.V1ObjectMeta(
name=model_name,
labels={
"modelregistry/registered-model-id": model.id,
"modelregistry/model-version-id": version.id,
},
),
spec=kserve.V1beta1InferenceServiceSpec(
predictor=kserve.V1beta1PredictorSpec(
sklearn=kserve.V1beta1SKLearnSpec(
args=[f'--model_name={model_name}'],
storage_uri=art.uri
)
)
),
)
ks_client = kserve.KServeClient()
try:
ks_client.get(model_name)
ks_client.replace(model_name,isvc)
except: # assumption: not found err
ks_client.create(isvc)- 모델 버전 정보를 입력 받아 등록된 레지스트리에서 관련 정보를 가져옵니다.
- KServe SDK를 사용해 InferenceService를 생성합니다.
- 이미 생성된 경우, 새로 학습된 모델로 변경하여 생성합니다.(replace 부분)
3. 파이프라인 정의
모든 컴포넌트를 조합하여 단일 파이프라인으로 정의합니다. 컴포넌트 간 실행 순서를 after()
메서드를 이용해 지정하고, 필요한 PVC는 각 컴포넌트에 마운트합니다.
@dsl.pipeline(name="Load Prediction Model Pipeline")
def lb_model_pipeline(
source_data: str,
start_date: str,
eval_date: str,
dataset_pvc: str='dataset-pvc',
# 아래처럼 mount path는 변수로 받지 않고 상수로 사용
model_pvc: str='model-pvc',
artifact_pvc: str='artifact-pvc',
katib_exp_name: str='lb-gbr-tune',
mr_host: str='model-registry-service.kubeflow.svc.cluster.local',
mr_port: int=8080,
mr_author: str='user',
model_name: str='lb-predictor'
):
# step1: Load dataset
load_dataset_task = preprocessing(
source_data=source_data,
data_mnt_path='/dataset',
start_date=start_date,
eval_date=eval_date
)
kubernetes.mount_pvc(
load_dataset_task,
pvc_name=dataset_pvc,
mount_path='/dataset'
)
# step2: Featurization
featurization_task = featurization(
data_version=start_date,
eval_date=eval_date,
data_mnt_path='/dataset',
raw_data=load_dataset_task.outputs['raw_data']
)
featurization_task.after(load_dataset_task)
kubernetes.mount_pvc(
featurization_task,
pvc_name=dataset_pvc,
mount_path='/dataset'
)
# step3: Tunning
tunning_model_task = tunning_with_katib(
katib_exp_name=katib_exp_name,
model_version=start_date,
data_version=start_date,
data_mnt_path='/dataset',
model_mnt_path='/model',
artifact_mnt_path='/artifact'
)
tunning_model_task.after(featurization_task)
kubernetes.mount_pvc(
tunning_model_task,
pvc_name=dataset_pvc,
mount_path='/dataset'
)
kubernetes.mount_pvc(
tunning_model_task,
pvc_name=model_pvc,
mount_path='/model'
)
kubernetes.mount_pvc(
tunning_model_task,
pvc_name=artifact_pvc,
mount_path='/artifact'
)
# step4: Evaluation
evaluate_model_task = evaluate_model(
model_version=start_date,
data_version=start_date,
data_mnt_path='/dataset',
model_mnt_path='/model',
artifact_mnt_path='/artifact'
)
evaluate_model_task.after(tunning_model_task)
kubernetes.mount_pvc(
evaluate_model_task,
pvc_name=dataset_pvc,
mount_path='/dataset'
)
kubernetes.mount_pvc(
evaluate_model_task,
pvc_name=model_pvc,
mount_path='/model'
)
kubernetes.mount_pvc(
evaluate_model_task,
pvc_name=artifact_pvc,
mount_path='/artifact'
)
# step5: Regist model
regist_model_task = push_to_registry(
model_name=model_name,
model_version=start_date,
model_pvc=model_pvc,
model_registry_host=mr_host,
model_registry_port=8080,
mr_author=mr_author
)
regist_model_task.after(evaluate_model_task)
kubernetes.mount_pvc(
regist_model_task,
pvc_name=artifact_pvc,
mount_path='/artifact'
)
# step6: Update svc
update_task = update_kserve(
model_name=model_name,
model_version=start_date,
model_registry_host=mr_host,
model_registry_port=mr_port,
mr_author=mr_author,
)
update_task.after(regist_model_task)
- Pipeline은 dsl 의 pipeline 데코레이터를 사용해 정의합니다.
- 함수 내부에 컴포넌트를 생성하고 종속성을 설정(after 메소드 사용)하여 파이프라인을 정의할 수 있습니다.
- PVC를 사용하는 일부 컴포넌트들은 실습 개요에서 생성한 PVC들을 마운트합니다.
4. 파이프라인 컴파일
이제 사전에 정의한 파이프라인을 실행합니다. 아래 코드를 실행하여 파이프라인 코드를 Yaml 파일로 컴파일합니다. 컴파일된 YAML 파일은 KFP UI를 통해 업로드하여 Experiment와 Run을 생성하는 데 사용됩니다.
import kfp
kfp.compiler.Compiler().compile(lb_model_pipeline, "lb-pipeline.yaml")
Step 3. 파이프라인 실행
Kubeflow 대시보드에서 컴파일된 YAML 파일을 업로드하고 Experiment 및 Run을 생성하여 파이프라인을 실행합니다.
-
Kubeflow 대시보드에 접속한 뒤, 왼쪽 Pipelines 메뉴를 선택합니다.
-
[+ Upload pipeline] 버튼을 클릭하고, [Upload a file] 버튼을 눌러 생성한 파이프라인 yaml 파일을 업로드 합니다.
-
왼쪽 Experiments(KFP) 메뉴를 선택합니다.
-
[+Create experiment] 버튼을 클릭하고, experiment 이름을 입력하여 새로 생성합니다.
-
왼쪽 Runs 메뉴를 선택합니다.
-
[+Create run] 버튼을 클릭하고, 이전 단계에서 설정한 파이프라인과 experiment 이름을 넣어서 생성합니다.
항목 설정값 source_data nlb-raw.txt start_date 2024-04-01 eval_date 2024-04-29 -
[Start] 버튼을 눌러 파이프라인을 실행합니다.
-
Run 상세 페이지에서 각 컴포넌트의 실행 상태, 로그, 입력 및 출력 결과를 확인할 수 있습니다.