2. Kafka 데이터의 Object Storage 적재
🗒️ Kafka 메시지를 카카오클라우드 Object Storage에 적재하는 실습입니다. 실시간 데이터 파이프라인의 저장 단계를 구성합니다.
- 예상 소요 시간: 30분
- 권장 운영 체제: MacOS, Ubuntu
- IAM 권한: Object Storage 관리자 또는 프로젝트 관리자 보유
- 사전 준비 사항
시나리오 소개
이번 튜토리얼에서는 1편에서 수집한 Kafka 메시지를 Object Storage에 저장하는 파이프라인을 구성합니다. 이 단계는 전체 아키텍처에서 데이터 저장(Storage) 에 해당하며, 적재된 데이터는 이후 분석 처리나 장기 보관의 기반이 됩니다.
주요 내용은 다음과 같습니다.
- Kafka Connector를 활용한 Object Storage Sink 설정
- 메시지 수신 및 적재 확인
- Object Storage에서 파일로 저장된 결과 확인
파이프라인 구성 아키텍처
시작하기 전에
이 실습은 Kafka 클러스터와 토픽이 사전에 구성되어 있어야 합니다. 구성이 완료되지 않았다면 먼저 Kafka를 통한 메시지 처리를 진행하세요.
Step 1. Object Storage 버킷 생성
Kafka 메시지를 저장할 Object Storage 버킷을 생성합니다.
-
카카오클라우드 콘솔 > Object Storage > 버킷 메뉴로 이동합니다.
-
[버킷 생성] 클릭 후 다음 값을 입력합니다.
항목 설정값 유형 Standard 이름 tutorial-kafka-bucket
암호화 미사용 -
[생성] 클릭 후 버킷이 목록에 나타나는지 확인합니다.
Step 2. Sink Connector 설정
Kafka에서 Object Storage로 데이터를 전송하는 Sink Connector를 설정합니다.
-
Kafka 클러스터 접근용 VM(
tutorial-amk-vm
)에 접속합니다. -
Sink Connector 설치에 필요한 환경 변수를 세팅합니다.
cat <<'EOF' > /tmp/env_vars.sh
export KAFKA_BOOTSTRAP_SERVER="${HOST:PORT}"
export TOPIC="${TOPIC_NAME}"
export AWS_ACCESS_KEY_ID_VALUE="${S3_ACCESS_KEY_ID}"
export AWS_SECRET_ACCESS_KEY_VALUE="${S3_ACCESS_SECRET_KEY}"
export BUCKET_NAME="${BUCKET_NAME}"
export AWS_DEFAULT_REGION_VALUE="kr-central-2"
export AWS_DEFAULT_OUTPUT_VALUE="json"
export LOGFILE="/home/ubuntu/setup.log"
EOF
source /tmp/env_vars.sh
echo "source /tmp/env_vars.sh" >> /home/ubuntu/.bashrc환경변수 설명 HOST:PORT🖌︎ Advanced Managed Kafka > 클러스터 메뉴에서 생성해 둔 `tutorial-amk-cluster` 클릭 후 부트스트랩 서버 정보 복사 가능 TOPIC_NAME🖌︎ 컨슈머가 데이터를 수신할 토픽 이름 / 예시: tutorial-topic S3_ACCESS_KEY_ID🖌︎ S3 액세스 키 ID 값 S3_ACCESS_SECRET_KEY🖌︎ S3 액세스 시크릿 키 값 BUCKET_NAME🖌︎ 버킷 이름 / 예시: tutorial-kafka-bucket -
Sink Connector 세팅을 위한 스크립트를 작성합니다.
sudo vi ./setting-script.sh
setting-script.sh 스크립트 예시
아래의 스크립트를
setting-script.sh
파일에 복사/붙여넣기 합니다.setting-script.sh#!/bin/bash
#------------------------------------------
# 설정 변수
#------------------------------------------
# Confluent Hub Client 설정
CONFLUENT_HUB_DIR="/confluent-hub"
CONFLUENT_HUB_URL="https://client.hub.confluent.io/confluent-hub-client-latest.tar.gz"
CONFLUENT_HUB_FILE="confluent-hub-client-latest.tar.gz"
# AWS CLI 설정
AWS_CLI_VERSION="2.22.0"
AWS_CLI_ZIP="awscliv2.zip"
AWS_CLI_DOWNLOAD_URL="https://awscli.amazonaws.com/awscli-exe-linux-x86_64-${AWS_CLI_VERSION}.zip"
# Kafka 설정
KAFKA_INSTALL_DIR="/home/ubuntu/kafka"
# 필수 환경변수 검증
required_variables=(
KAFKA_BOOTSTRAP_SERVER BUCKET_NAME
AWS_ACCESS_KEY_ID_VALUE AWS_SECRET_ACCESS_KEY_VALUE
AWS_DEFAULT_REGION_VALUE AWS_DEFAULT_OUTPUT_VALUE
)
echo "kakaocloud: 필수 환경변수 검증 시작"
for var in "${required_variables[@]}"; do
if [ -z "${!var}" ]; then
echo "kakaocloud: 필수 환경변수 $var 가 설정되지 않았습니다. 스크립트를 종료합니다."
exit 1
fi
done
################################################################################
# 0. apt 업데이트 및 필수 패키지 설치
############################################################f####################
echo "kakaocloud: 4. 시스템 업데이트 및 필수 패키지 설치 시작"
sudo apt-get update -y || { echo "kakaocloud: apt-get update 실패"; exit 1; }
sudo apt-get install -y python3 python3-pip openjdk-21-jdk unzip jq aria2 curl || { echo "kakaocloud: 필수 패키지 설치 실패"; exit 1; }
################################################################################
# 1. Confluent Hub Client 설치
################################################################################
echo "kakaocloud: Confluent Hub Client 설치 시작"
sudo mkdir -p "$CONFLUENT_HUB_DIR/plugins" || { echo "디렉토리 생성 실패"; exit 1; }
cd "$CONFLUENT_HUB_DIR" || { echo "디렉토리 이동 실패"; exit 1; }
aria2c -x 16 -s 16 -o "$CONFLUENT_HUB_FILE" "$CONFLUENT_HUB_URL" || { echo "다운로드 실패"; exit 1; }
sudo tar -xzf "$CONFLUENT_HUB_FILE" || { echo "압축 해제 실패"; exit 1; }
sudo rm "$CONFLUENT_HUB_FILE"
sudo chown -R ubuntu:ubuntu "$CONFLUENT_HUB_DIR"
# .bashrc PATH 설정
sed -i '/CONFLUENT_HOME=/d' /home/ubuntu/.bashrc
sed -i '/PATH=.*\$CONFLUENT_HOME\/bin/d' /home/ubuntu/.bashrc
cat <<EOF >> /home/ubuntu/.bashrc
# Confluent 설정
export CONFLUENT_HOME="$CONFLUENT_HUB_DIR"
export PATH="\$PATH:\$CONFLUENT_HOME/bin"
EOF
export CONFLUENT_HOME="$CONFLUENT_HUB_DIR"
export PATH="$PATH:$CONFLUENT_HUB_DIR/bin"
echo "kakaocloud: Confluent Hub Client 설치 완료"
################################################################################
# 2. .bashrc에 환경 변수 등록
################################################################################
echo "kakaocloud: 7. 환경 변수 등록 시작"
# 기존 관련 라인 제거
sed -i '/S3_ACCESS_KEY=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
sed -i '/S3_SECRET_ACCESS_KEY=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
sed -i '/AWS_DEFAULT_REGION=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
sed -i '/AWS_DEFAULT_OUTPUT=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
sed -i '/CONFLUENT_HOME=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
sed -i '/JAVA_HOME=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
cat <<EOF >> /home/ubuntu/.bashrc
# Kakao i Cloud S3 Credentials
export AWS_ACCESS_KEY_ID="$AWS_ACCESS_KEY_ID_VALUE"
export AWS_SECRET_ACCESS_KEY="$AWS_SECRET_ACCESS_KEY_VALUE"
export AWS_DEFAULT_REGION="$AWS_DEFAULT_REGION_VALUE"
export AWS_DEFAULT_OUTPUT="$AWS_DEFAULT_OUTPUT_VALUE"
# Confluent 설정
export CONFLUENT_HOME="/confluent-hub"
export PATH="\$PATH:\$CONFLUENT_HOME/bin"
# Java 설정
export JAVA_HOME="/usr/lib/jvm/java-21-openjdk-amd64"
export PATH="\$JAVA_HOME/bin:\$PATH"
EOF
if [ $? -ne 0 ]; then echo "kakaocloud: 환경 변수 등록 실패"; exit 1; fi
################################################################################
# 3. .bashrc 적용 (비인터랙티브 환경 대비)
################################################################################
source /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 적용 실패"; exit 1; }
################################################################################
# 4. S3 Sink Connector 설치
################################################################################
echo "kakaocloud: 8. S3 Sink Connector 설치 시작"
sudo chown ubuntu:ubuntu /home/ubuntu/kafka/config/connect-standalone.properties 2>/dev/null
/confluent-hub/bin/confluent-hub install confluentinc/kafka-connect-s3:latest \
--component-dir /confluent-hub/plugins \
--worker-configs /home/ubuntu/kafka/config/connect-standalone.properties \
--no-prompt || { echo "kakaocloud: S3 Sink Connector 설치 실패"; exit 1; }
################################################################################
# 5. AWS CLI 설치
################################################################################
echo "kakaocloud: 9. AWS CLI 설치 시작"
cd /home/ubuntu || { echo "kakaocloud: 홈 디렉토리 이동 실패"; exit 1; }
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64-${AWS_CLI_VERSION}.zip" -o "${AWS_CLI_ZIP}" || { echo "kakaocloud: AWS CLI 다운로드 실패"; exit 1; }
unzip "${AWS_CLI_ZIP}" || { echo "kakaocloud: AWS CLI 압축 해제 실패"; exit 1; }
sudo ./aws/install || { echo "kakaocloud: AWS CLI 설치 실패"; exit 1; }
rm -rf aws "${AWS_CLI_ZIP}" || { echo "kakaocloud: AWS CLI 설치 후 정리 실패"; exit 1; }
AWS_VERSION=$(aws --version 2>&1 || true)
################################################################################
# 6. AWS CLI configure 파일 설정
################################################################################
echo "kakaocloud: 10. AWS CLI 설정 시작"
sudo -u ubuntu -i aws configure set aws_access_key_id "$AWS_ACCESS_KEY_ID_VALUE" || { echo "kakaocloud: AWS CLI aws_access_key_id 설정 실패"; exit 1; }
sudo -u ubuntu -i aws configure set aws_secret_access_key "$AWS_SECRET_ACCESS_KEY_VALUE" || { echo "kakaocloud: AWS CLI aws_secret_access_key 설정 실패"; exit 1; }
sudo -u ubuntu -i aws configure set default.region "$AWS_DEFAULT_REGION_VALUE" || { echo "kakaocloud: AWS CLI default.region 설정 실패"; exit 1; }
sudo -u ubuntu -i aws configure set default.output "$AWS_DEFAULT_OUTPUT_VALUE" || { echo "kakaocloud: AWS CLI default.output 설정 실패"; exit 1; }
AWS_VERSION=$(aws --version 2>&1)
source /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 재적용 실패"; exit 1; }
################################################################################
# 7. Kafka 설정 폴더 생성 및 권한 부여
################################################################################
echo "kakaocloud: 11. Kafka 설정 폴더 생성 및 권한 부여 시작"
sudo mkdir -p /opt/kafka/config || { echo "kakaocloud: Kafka 설정 폴더 생성 실패"; exit 1; }
sudo chown -R ubuntu:ubuntu /opt/kafka || { echo "kakaocloud: Kafka 설정 폴더 권한 변경 실패"; exit 1; }
################################################################################
# 8. 커스텀 파티셔너, 파일네임 플러그인 다운로드
################################################################################
echo "kakaocloud: 12. 커스텀 플러그인 다운로드 시작"
sudo wget -O /confluent-hub/plugins/confluentinc-kafka-connect-s3/lib/custom-partitioner-1.0-SNAPSHOT.jar \
"https://raw.githubusercontent.com/kakaocloud-edu/tutorial/main/DataAnalyzeCourse/src/day1/Lab03/kafka_connector/custom-partitioner-1.0-SNAPSHOT.jar" || { echo "kakaocloud: custom-partitioner 다운로드 실패"; exit 1; }
sudo wget -O /confluent-hub/plugins/confluentinc-kafka-connect-s3/lib/custom-filename-1.0-SNAPSHOT.jar \
"https://raw.githubusercontent.com/kakaocloud-edu/tutorial/main/DataAnalyzeCourse/src/day1/Lab03/kafka_connector/custom-filename-1.0-SNAPSHOT.jar" || { echo "kakaocloud: custom-filename 다운로드 실패"; exit 1; }
################################################################################
# 9. s3-sink-connector.properties 생성
################################################################################
echo "kakaocloud: 13. s3-sink-connector.properties 생성 시작"
cat <<EOF > /opt/kafka/config/s3-sink-connector.properties
name=s3-sink-connector
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=${TOPIC}
s3.region=kr-central-2
s3.bucket.name=${BUCKET_NAME}
s3.part.size=5242880
aws.access.key.id=${AWS_ACCESS_KEY_ID_VALUE}
aws.secret.access.key=${AWS_SECRET_ACCESS_KEY_VALUE}
store.url=https://objectstorage.kr-central-2.kakaocloud.com
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
parquet.codec=snappy
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
flush.size=1
partitioner.class=com.mycompany.connect.FlexibleTimeBasedPartitioner
topics.dir=${TOPIC}
custom.topic.dir=topic
custom.partition.prefix=partition_
partition.duration.ms=3600000
path.format='year_'yyyy/'month_'MM/'day_'dd/'hour_'HH
locale=en-US
timezone=Asia/Seoul
timestamp.extractor=Wallclock
custom.replacements==:_
s3.bucket.create=false
bucket.exists.checking.enabled=false
EOF
if [ $? -ne 0 ]; then echo "kakaocloud: s3-sink-connector.properties 생성 실패"; exit 1; fi
################################################################################
# 10. worker.properties 생성
################################################################################
echo "kakaocloud: 14. worker.properties 생성 시작"
cat <<EOF > /opt/kafka/config/worker.properties
bootstrap.servers=${KAFKA_BOOTSTRAP_SERVER}
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/confluent-hub/plugins/confluentinc-kafka-connect-s3
listeners=http://0.0.0.0:8084
EOF
if [ $? -ne 0 ]; then echo "kakaocloud: worker.properties 생성 실패"; exit 1; fi
################################################################################
# 11. kafka-connect systemd 서비스 등록
################################################################################
echo "kakaocloud: 15. Kafka Connect 서비스 등록 시작"
cat <<EOF | sudo tee /etc/systemd/system/kafka-connect.service
[Unit]
Description=Kafka Connect Standalone Service
After=network.target
[Service]
User=ubuntu
ExecStart=/home/ubuntu/kafka/bin/connect-standalone.sh \
/opt/kafka/config/worker.properties \
/opt/kafka/config/s3-sink-connector.properties
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
EOF
if [ $? -ne 0 ]; then echo "kakaocloud: Kafka Connect 서비스 등록 실패"; exit 1; fi
################################################################################
# 12. Schema Registry 다운로드 및 설치
################################################################################
echo "kakaocloud: 16. Schema Registry 다운로드 및 설치 시작"
sudo wget https://packages.confluent.io/archive/7.5/confluent-7.5.3.tar.gz || { echo "kakaocloud: Schema Registry 다운로드 실패"; exit 1; }
sudo tar -xzvf confluent-7.5.3.tar.gz -C /confluent-hub/plugins || { echo "kakaocloud: Schema Registry 압축 해제 실패"; exit 1; }
sudo rm confluent-7.5.3.tar.gz || { echo "kakaocloud: Schema Registry 압축파일 삭제 실패"; exit 1; }
################################################################################
# 13. systemd 유닛 파일 생성 및 Schema Registry 서비스 등록
################################################################################
echo "kakaocloud: 17. systemd 유닛 파일 생성 및 Schema Registry 서비스 등록 시작"
cat <<EOF > /etc/systemd/system/schema-registry.service
[Unit]
Description=Confluent Schema Registry
After=network.target
[Service]
Type=simple
User=ubuntu
ExecStart=/confluent-hub/plugins/confluent-7.5.3/bin/schema-registry-start /confluent-hub/plugins/confluent-7.5.3/etc/schema-registry/schema-registry.properties
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
EOF
if [ $? -ne 0 ]; then echo "kakaocloud: Schema Registry Service 파일 작성 실패"; exit 1; fi
sudo systemctl daemon-reload || { echo "kakaocloud: daemon-reload 실패"; exit 1; }
sudo systemctl enable schema-registry.service || { echo "kakaocloud: schema-registry 서비스 생성 실패"; exit 1; }
sudo systemctl start schema-registry.service || { echo "kakaocloud: schema-registry 서비스 시작 실패"; exit 1; }
################################################################################
# 14. S3 커넥터 플러그인 경로에 Avro 컨버터 설치 및 설정
################################################################################
echo "kakaocloud: 18. Avro 컨버터 설치 및 설정 시작"
sudo wget https://github.com/kakaocloud-edu/tutorial/raw/refs/heads/main/DataAnalyzeCourse/src/day2/Lab01/confluentinc-kafka-connect-avro-converter-7.5.3.zip || { echo "kakaocloud: confluentinc-kafka-connect-avro-converter 다운로드 실패"; exit 1; }
unzip confluentinc-kafka-connect-avro-converter-7.5.3.zip || { echo "kakaocloud: confluentinc-kafka-connect-avro-converter 압축 해제 실패"; exit 1; }
sudo rm confluentinc-kafka-connect-avro-converter-7.5.3.zip || { echo "kakaocloud: confluentinc-kafka-connect-avro-converter 압축파일 삭제 실패"; exit 1; }
sudo mv confluentinc-kafka-connect-avro-converter-7.5.3/lib/*.jar /confluent-hub/plugins/confluentinc-kafka-connect-s3/lib || { echo "kakaocloud: confluentinc-kafka-connect-avro-converter 파일 이동 실패"; exit 1; }
sudo wget -P /confluent-hub/plugins/confluentinc-kafka-connect-s3/lib \
https://repo1.maven.org/maven2/com/google/guava/guava/30.1.1-jre/guava-30.1.1-jre.jar \
https://packages.confluent.io/maven/io/confluent/kafka-connect-protobuf-converter/7.5.3/kafka-connect-protobuf-converter-7.5.3.jar \
https://packages.confluent.io/maven/io/confluent/kafka-protobuf-serializer/7.5.3/kafka-protobuf-serializer-7.5.3.jar \
https://packages.confluent.io/maven/io/confluent/common-config/7.5.3/common-config-7.5.3.jar \
https://repo1.maven.org/maven2/com/google/protobuf/protobuf-java/3.25.1/protobuf-java-3.25.1.jar \
https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.2/failureaccess-1.0.2.jar || { echo -e "\nERROR: S3 커넥터 추가 의존성 다운로드 실패"; exit 1; }
################################################################################
# 완료
################################################################################
echo "kakaocloud: Setup 완료" -
스크립트를 실행합니다.
스크립트 실행하기SCRIPT_FILE="./setting-script.sh"
sudo chmod +x "$SCRIPT_FILE"
sudo -E "$SCRIPT_FILE"
# 스크립트 실행에 시간이 다소 소요될 수 있습니다. -
Object Storage 버킷에 메시지 파일을 작성하고, 접근 권한을 설정합니다.
aws s3api put-bucket-acl \
--bucket "${BUCKET_NAME}" \
--grant-read 'uri="http://acs.amazonaws.com/groups/global/AllUsers"' \
--grant-write 'uri="http://acs.amazonaws.com/groups/global/AllUsers"' \
--endpoint-url https://objectstorage.kr-central-2.kakaocloud.com환경변수 설명 BUCKET_NAME🖌︎ 버킷 이름 - 예시: tutorial-kafka-bucket -
S3 Connector, Standard Worker 설정 파일이 정상 생성되었는지 확인합니다.
ls /opt/kafka/config
# 출력: s3-sink-connector.properties worker.properties -
kafka-connect 서비스 파일이 생성된 것을 확인합니다.
ls /etc/systemd/system | grep kafka-connect.service
# 출력: kafka-connect.service -
Schema Registry 설정 파일에서 Kafka 브로커 주소를 수정합니다.
sudo sed -i 's|PLAINTEXT://localhost:9092|${HOST:PORT}|' /confluent-hub/plugins/confluent-7.5.3/etc/schema-registry/schema-registry.properties
환경변수 설명 HOST:PORT🖌︎ Advanced Managed Kafka > `tutorial-amk-cluster` 상세 화면에서 부트스트랩 서버 정보 복사 가능
Step 3. kafka-connect 서비스 시작 및 Sink Connector 상태 확인
-
데몬 리로드 및 kafka-connect 서비스를 시작합니다.
데몬 Reloadsudo systemctl daemon-reload
schema-registry 재시작sudo systemctl restart schema-registry.service
Kafka Connect 부팅 시 자동 시작 등록sudo systemctl enable kafka-connect
Kafka Connect 서비스 시작sudo systemctl start kafka-connect
-
s3-sink-connector 상태 정보를 확인합니다.
watch -n 1 "curl -s http://localhost:8084/connectors/s3-sink-connector/status | jq"
# 출력 예시
{
"name": "s3-sink-connector",
"connector": {
"state": "RUNNING",
"worker_id": "0.0.0.0:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "0.0.0.0:8083"
}
],
"type": "sink"
}
Step 4. 메시지 발행 및 적재 확인
Kafka로 메시지를 발행하고, Object Storage 버킷에 정상적으로 적재되는지 확인합니다.
-
프로듀서를 클라이언트를 실행합니다.
cd kafka
bin/kafka-console-producer.sh --topic ${TOPIC_NAME} --bootstrap-server ${HOST:PORT}
환경변수 설명 TOPIC_NAME🖌︎ 프로듀서가 데이터를 송신할 토픽 이름 / 예시: tutorial-topic HOST:PORT🖌︎ Advanced Managed Kafka > `tutorial-amk-cluster` 상세 화면에서 부트스트랩 서버 정보 복사 가능 -
실행 결과로
>
가 출력되었다면 아래의 테스트 메시지를 입력하고 실행합니다.hello tester!
-
콘솔에서 Object Storage >
tutorial-kafka-bucket
으로 이동해 파일이 생성되었는지 확인합니다.- 저장 경로는 Kafka Connector 설정에 따라 달라지며, 기본적으로 날짜 또는 파티션 단위로 구분됩니다.
마무리 및 다음 단계
Kafka 메시지를 Object Storage에 저장하는 구조를 구성했습니다. 이제 저장된 데이터를 기반으로 Data Catalog에 메타데이터를 등록하고, Data Query를 활용해 분석 작업을 수행할 수 있습니다.
👉 다음 단계는 Data Catalog와 Data Query를 이용한 Kafka 메시지 분석 튜토리얼을 참고하세요.