본문으로 건너뛰기
튜토리얼 시리즈 | Kubeflow 기반 LLM 워크플로우

3. LLM 모델 활용 Rag 구현

📘 업데이트된 LLM 모델 Endpoint를 통해 문서 기반 질의응답 시스템(RAG)을 구현합니다.

기본 정보
  • 예상 소요 시간: 60분
  • 권장 운영 체제: Ubuntu

시나리오 소개

Kubeflow 환경의 Jupyter Notebook에서 최신 LLM 모델인 카카오의 카나나(Kanana)Meta Llama 3.2을 활용해, 문서를 분할하고 벡터화한 뒤, RAG(Retrieval-Augmented Generation) 체계를 구성하는 전체 흐름을 안내합니다. 실습을 통해 로컬 문서 기반의 질문 응답 시스템을 구성하고, LangChain 기반 RAG 파이프라인을 완성할 수 있습니다.

주요 내용은 다음과 같습니다.

  • 문서 로드, 분할, 임베딩 및 벡터 저장
  • LangChain 기반 Retriever & Generator 구성
  • LLM 호출 및 RAG 질의 테스트

RAG 구현을 위한 프로세스

RAG의 전체 구성 흐름은 다음과 같이 전처리(1-4단계)와 질의 처리(5-8단계)로 나뉩니다.

단계작업 이름설명
1문서 로드(Load)PDF, 텍스트 파일, 웹페이지 등에서 문서 데이터를 불러옵니다.
2문서 분할(Split)LLM 입력 길이에 맞춰 chunk_size, overlap 기준으로 문서를 분할합니다.
3임베딩(Embedding)분할된 텍스트를 의미 기반 벡터로 변환합니다.
4벡터 저장소(VectorStore)임베딩된 벡터를 벡터 DB(예: FAISS)에 저장하여 검색 가능하게 구성합니다.
5검색(Retrieval)입력 질의와 유사한 문서를 검색하는 retriever 객체를 사용합니다.
6프롬프트 구성(Prompting)검색된 문서와 질의를 프롬프트로 구성하여 모델에 입력합니다.
7모델 호출(LLM)LLM을 통해 응답을 생성합니다.
8결과 반환(Output)최종 프롬프트 → 모델 호출 → 응답 결과를 하나의 체인으로 반환합니다.

지원 도구

도구버전설명
Jupyter Notebook4.2.1다양한 머신러닝 프레임워크와 Kubeflow SDK 연동을 지원하는 웹 기반 개발 환경
KServe0.15.0- 모델 서빙 도구로 빠른 모델 배포 및 업데이트 지원, 높은 가용성과 확장성을 제공
- 머신러닝 모델 서빙을 위한 일반적인 문제(로드 밸런싱, 모델 버전 관리, 실패 복구 등) 자동 처리

시작하기 전에

1. Kubeflow 환경 준비

Kubeflow에서 RAG 파이프라인을 안정적으로 구성하기 위해 아래와 같은 사양의 노드 풀 환경이 필요합니다. 사전 준비 사항을 참고하여 CPU 또는 GPU 노드 풀이 설정된 환경을 먼저 준비하세요.

2. 학습 데이터세트 준비

카카오클라우드 기술문서의 Kubeflow 서비스 가이드와 튜토리얼 텍스트를 학습 데이터로 활용하여 실습합니다. 아래에서 샘플 데이터를 다운로드해 주세요.

시작하기

이 실습에서는 최신 LLM 모델인 카카오의 Kanana-Nano-2.1B와 Meta의 Llama 3.2를 활용하여, 문서 기반 질의응답 시스템(RAG: Retrieval-Augmented Generation)을 구현합니다.

Step 1. Jupyter Notebook 인스턴스 생성

Kubeflow 대시보드에서 실습을 위한 GPU 기반 노트북 인스턴스를 생성합니다.

  1. Kubeflow 대시보드에서 Notebooks 탭을 선택합니다.

  2. 우측 상단의 [New Notebook] 버튼을 클릭하여 새 인스턴스를 생성합니다.

  3. New notebook 설정 화면에서 다음 정보를 입력합니다.

    • Notebook Imagekc-kubeflow/jupyter-pytorch-cuda-full:v1.8.0.py311.1a 선택
    • Notebook 최소 사양: vCPU 3개 이상, Memory 6GB 이상 입력
    • GPU 연결: Number of GPUs 1개, GPU Vendor NVIDIA MIG - 1g.10gb 선택
  4. 설정을 입력한 후, [LAUNCH] 버튼을 클릭하여 인스턴스를 생성합니다.

Step 2. 패키지 설치

RAG 구현에 필요한 Python 패키지를 설치합니다.

패키지 설치
! pip install langchain langchain-community langchain-openai transformers sentence-transformers faiss-cpu langgraph datasets accelerate langchain_huggingface

Step 3. 문서 로드 및 변환

CSV 파일로 제공된 문서를 로드하고, LangChain의 Document 객체로 변환합니다.

CSV 파일 로드
from datasets import Dataset

dataset = Dataset.from_csv('sample_rag_docs_dataset.csv')
dataset
출력 예시
Generating train split: 
 17/0 [00:00<00:00, 1236.38 examples/s]
Dataset({
features: ['source', 'page_content'],
num_rows: 17
})
LangChain Document 객체 변환
from langchain_core.documents import Document

docs = []

for _ea_doc_data in dataset:
docs.append(Document(
metadata={'source': _ea_doc_data['source']},
page_content=_ea_doc_data['page_content']
))

len(docs)
출력 예시
17

Step 4. 문서 분할

변환한 문서를 LLM 입력에 적합한 단위로 분할합니다. 이 분할 결과는 다음 단계인 임베딩 및 벡터 저장소 구성에 사용됩니다.

문서 분할
from langchain_text_splitters import RecursiveCharacterTextSplitter

text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=200
)
split_documents = text_splitter.split_documents(docs)
len(split_documents)
출력 예시
108

Step 5. 임베딩(Embedding) 및 벡터 스토어 생성

문서들을 고차원 벡터 형태로 변환하고, 이를 검색 가능한 구조로 저장합니다.

  1. 임베딩(Embedding)을 생성합니다. 문서 임베딩은 각 문서 조각(텍스트)을 벡터로 표현하는 과정입니다. HuggingFace에서 제공하는 BAAI/bge-m3 모델을 이용하며, HuggingFaceEmbeddings 클래스를 사용할 수 있습니다.

    Kanana-Nano-2.1B 모델은 LangChain의 내장 임베딩 인터페이스에서 직접 지원되지 않으므로, 아래와 같이 커스텀 래퍼 클래스를 구현해 사용합니다.

    Kanana 임베딩 모델 설정
        embeddings = AutoModel.from_pretrained(
    "kakaocorp/kanana-nano-2.1b-embedding",
    trust_remote_code=True,
    )
    디바이스 이동 및 임베딩 래퍼 정의
    def _move_to_device(maybe_tensor, device: torch.device):
    if torch.is_tensor(maybe_tensor):
    return maybe_tensor.to(device, non_blocking=device.type == "cuda")
    elif isinstance(maybe_tensor, dict):
    return {key: _move_to_device(value, device) for key, value in maybe_tensor.items()}
    elif isinstance(maybe_tensor, list):
    return [_move_to_device(x, device) for x in maybe_tensor]
    elif isinstance(maybe_tensor, tuple):
    return tuple([_move_to_device(x, device) for x in maybe_tensor])
    elif isinstance(maybe_tensor, Mapping):
    return type(maybe_tensor)({k: _move_to_device(v, device) for k, v in maybe_tensor.items()})
    else:
    return maybe_tensor

    def move_to_device(sample, device: torch.device):
    if device.type == "cpu":
    return sample

    if len(sample) == 0:
    return {}
    return _move_to_device(sample, device)

    class LangChainEmbeddingWrapper:
    def __init__(self, model):
    self.model = model


    def embed_documents(self, texts: List[str]) -> List[np.ndarray]:
    # 입력 텍스트를 모델의 입력 형식으로 변환
    batch_dict = self.model.tokenizer(
    texts,
    max_length=512,
    padding=True,
    return_tensors="pt",
    truncation=True
    )
    # pool_mask 생성 (예: attention_mask와 동일하게 설정)
    pool_mask = batch_dict['attention_mask'].clone()
    # 모델의 장치로 이동
    batch_dict = move_to_device(batch_dict, self.model.device)
    pool_mask = pool_mask.to(self.model.device)
    # 모델 실행
    with torch.no_grad():
    embeddings = self.model(
    input_ids=batch_dict['input_ids'],
    attention_mask=batch_dict['attention_mask'],
    pool_mask=pool_mask
    ).embedding
    # 결과를 numpy 배열로 변환하여 반환
    return embeddings.cpu().numpy().tolist()

    def embed_query(self, text: str) -> np.ndarray:
    return self.embed_documents([text])[0]

    # Wrapper 설정
    embeddings = LangChainEmbeddingWrapper(embeddings)
  2. 임베딩된 벡터들을 벡터 데이터베이스에 저장합니다. 이 데이터베이스는 이후 질의에 대한 유사 문서 검색을 수행하는 기반이 됩니다.

    벡터 저장
    vectorstore = FAISS.from_documents(documents=split_documents, embedding=embeddings)
    vectorstore.save_local('./db/kcdocs_faiss')

Step 6. LLM 모델 정의

KServe로 서빙 중인 LLM Endpoint를 LangChain에서 사용할 수 있도록 DNS 기반 URL로 접근합니다.

LLM Endpoint 설정
isvc_name="<YOUR_KSERVE_INFERENCE_SERVICE_NAME>" # inference service 이름
namespace="<YOUR_NAMESPACE>" # 사용자의 네임스페이스 입력
model_name= "<YOUR_MODEL_NAME>" # 모델 이름

llm_svc_url = f"http://{isvc_name}.{namespace}.svc.cluster.local/"

# OpenAI 기반 Chat 모델 설정
llm = ChatOpenAI(
model_name=model_name,
base_url=os.path.join(llm_svc_url,'openai','v1'),
openai_api_key="empty",
)

Step 7. RAG Graph 구성

문서 검색(Retrieve) → 응답 생성(Generate) 흐름으로 구성된 하나의 체인(Graph)을 구성합니다.

  1. Graph에서 각 노드(retrieve, generate)가 주고받는 데이터 형식을 정의합니다.

    상태 정의
    class State(TypedDict):
    question: str
    context: List[Document]
    answer: str
  2. 문서를 검색하는 Retriever와 답변을 생성하는 Generate 함수를 생성합니다.

    Retriever 및 Generator 함수
    # rag 에서 사용하는 prompt 를 가져옴, generate 함수에서 사용
    prompt = hub.pull("rlm/rag-prompt")

    # retrieve 함수: Vector Store에서 질문과 가장 유사한 문서를 검색하는 역할
    def retrieve(state: State):
    retrieved_docs = vectorstore.similarity_search(state["question"])
    return {"context": retrieved_docs}

    # generate 함수: 검색된 문서를 바탕으로 답변을 생성하는 역할
    def generate(state: State):
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    messages = prompt.invoke({"question": state["question"], "context": docs_content})
    response = llm.invoke(messages)
    return {"answer": response.content}
  3. 정의된 상태(State)와 함수들을 기반으로 Graph를 시각적으로 연결합니다.

    Graph 빌드
    graph_builder = StateGraph(State).add_sequence([retrieve, generate])
    graph_builder.add_edge(START, "retrieve")
    graph = graph_builder.compile()

Step 8. RAG 테스트

앞에서 구성한 Graph(retrieve → generate)를 통해 사용자의 질문에 대해 실제로 문서를 검색하고 LLM으로부터 응답을 생성합니다.

RAG 테스트
response = graph.invoke({"question": "kubeflow 1.6 버전과 1.8 버전에서 지원하는 프레임워크 버전들에 대해서 알려줘."})
print(response)
출력 예시
Generating train split: 
 17/0 [00:00<00:00, 1236.38 examples/s]
Dataset({
features: ['source', 'page_content'],
num_rows: 17
})