2. Load Kafka data into Object Storage
Practice loading messages received from Kafka into Object Storage.
- Estimated time: 30 minutes
- Recommended OS: macOS, Ubuntu
- IAM permission: Object Storage administrator or project administrator
- Prerequisites
About this scenario
In this tutorial, you configure a pipeline that stores Kafka messages collected in Part 1 in Object Storage. This step corresponds to Storage in the overall architecture, and the loaded data becomes the foundation for later analytics processing or long-term retention.
You will cover the following:
- Configure an Object Storage Sink using Kafka Connector
- Receive messages and verify loading
- Check results saved as files in Object Storage
Pipeline architecture
Before you start
This tutorial requires a Kafka cluster and topic to be configured in advance. If they are not configured yet, complete Message processing through Kafka first.
Step 1. Create an Object Storage bucket
Create an Object Storage bucket to store Kafka messages.
-
Go to KakaoCloud console > Object Storage > Bucket.
-
Click Create Bucket, then enter the following values.
Item Value Type Standard Name tutorial-kafka-bucketEncryption Not used -
Click Create and verify that the bucket appears in the list.
Step 2. Configure Sink Connector
Configure a Sink Connector that sends data from Kafka to Object Storage.
-
Connect to the VM for Kafka cluster access (
tutorial-amk-vm). -
Set environment variables required to install Sink Connector.
cat <<'EOF' > /tmp/env_vars.sh
export KAFKA_BOOTSTRAP_SERVER="${HOST:PORT}"
export TOPIC="${TOPIC_NAME}"
export AWS_ACCESS_KEY_ID_VALUE="${S3_ACCESS_KEY_ID}"
export AWS_SECRET_ACCESS_KEY_VALUE="${S3_ACCESS_SECRET_KEY}"
export BUCKET_NAME="${BUCKET_NAME}"
export AWS_DEFAULT_REGION_VALUE="kr-central-2"
export AWS_DEFAULT_OUTPUT_VALUE="json"
export LOGFILE="/home/ubuntu/setup.log"
EOF
source /tmp/env_vars.sh
echo "source /tmp/env_vars.sh" >> /home/ubuntu/.bashrc환경변수 설명 HOST:PORT🖌︎ In Advanced Managed Kafka > Cluster, click the created `tutorial-amk-cluster` and copy the bootstrap server information TOPIC_NAME🖌︎ Topic name where the consumer receives data / Example: tutorial-topic S3_ACCESS_KEY_ID🖌︎ S3 access key ID value S3_ACCESS_SECRET_KEY🖌︎ S3 secret access key value BUCKET_NAME🖌︎ Bucket name / Example: tutorial-kafka-bucket -
Write a script for Sink Connector setup.
sudo vi ./setting-script.shExample setting-script.sh script
Copy and paste the script below into the
setting-script.shfile.setting-script.sh#!/bin/bash
#------------------------------------------
# 설정 변수
#------------------------------------------
# Confluent Hub Client 설정
CONFLUENT_HUB_DIR="/confluent-hub"
CONFLUENT_HUB_URL="https://client.hub.confluent.io/confluent-hub-client-latest.tar.gz"
CONFLUENT_HUB_FILE="confluent-hub-client-latest.tar.gz"
# AWS CLI 설정
AWS_CLI_VERSION="2.22.0"
AWS_CLI_ZIP="awscliv2.zip"
AWS_CLI_DOWNLOAD_URL="https://awscli.amazonaws.com/awscli-exe-linux-x86_64-${AWS_CLI_VERSION}.zip"
# Kafka 설정
KAFKA_INSTALL_DIR="/home/ubuntu/kafka"
# 필수 환경변수 검증
required_variables=(
KAFKA_BOOTSTRAP_SERVER BUCKET_NAME
AWS_ACCESS_KEY_ID_VALUE AWS_SECRET_ACCESS_KEY_VALUE
AWS_DEFAULT_REGION_VALUE AWS_DEFAULT_OUTPUT_VALUE
)
echo "kakaocloud: 필수 환경변수 검증 시작"
for var in "${required_variables[@]}"; do
if [ -z "${!var}" ]; then
echo "kakaocloud: 필수 환경변수 $var 가 설정되지 않았습니다. 스크립트를 종료합니다."
exit 1
fi
done
################################################################################
# 0. apt 업데이트 및 필수 패키지 설치
############################################################f####################
echo "kakaocloud: 4. 시스템 업데이트 및 필수 패키지 설치 시작"
sudo apt-get update -y || { echo "kakaocloud: apt-get update 실패"; exit 1; }
sudo apt-get install -y python3 python3-pip openjdk-21-jdk unzip jq aria2 curl || { echo "kakaocloud: 필수 패키지 설치 실패"; exit 1; }
################################################################################
# 1. Confluent Hub Client 설치
################################################################################
echo "kakaocloud: Confluent Hub Client 설치 시작"
sudo mkdir -p "$CONFLUENT_HUB_DIR/plugins" || { echo "디렉토리 생성 실패"; exit 1; }
cd "$CONFLUENT_HUB_DIR" || { echo "디렉토리 이동 실패"; exit 1; }
aria2c -x 16 -s 16 -o "$CONFLUENT_HUB_FILE" "$CONFLUENT_HUB_URL" || { echo "다운로드 실패"; exit 1; }
sudo tar -xzf "$CONFLUENT_HUB_FILE" || { echo "압축 해제 실패"; exit 1; }
sudo rm "$CONFLUENT_HUB_FILE"
sudo chown -R ubuntu:ubuntu "$CONFLUENT_HUB_DIR"
# .bashrc PATH 설정
sed -i '/CONFLUENT_HOME=/d' /home/ubuntu/.bashrc
sed -i '/PATH=.*\$CONFLUENT_HOME\/bin/d' /home/ubuntu/.bashrc
cat <<EOF >> /home/ubuntu/.bashrc
# Confluent 설정
export CONFLUENT_HOME="$CONFLUENT_HUB_DIR"
export PATH="\$PATH:\$CONFLUENT_HOME/bin"
EOF
export CONFLUENT_HOME="$CONFLUENT_HUB_DIR"
export PATH="$PATH:$CONFLUENT_HUB_DIR/bin"
echo "kakaocloud: Confluent Hub Client 설치 완료"
################################################################################
# 2. .bashrc에 환경 변수 등록
################################################################################
echo "kakaocloud: 7. 환경 변수 등록 시작"
# 기존 관련 라인 제거
sed -i '/S3_ACCESS_KEY=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
sed -i '/S3_SECRET_ACCESS_KEY=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
sed -i '/AWS_DEFAULT_REGION=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
sed -i '/AWS_DEFAULT_OUTPUT=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
sed -i '/CONFLUENT_HOME=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
sed -i '/JAVA_HOME=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
cat <<EOF >> /home/ubuntu/.bashrc
# KakaoCloud S3 Credentials
export AWS_ACCESS_KEY_ID="$AWS_ACCESS_KEY_ID_VALUE"
export AWS_SECRET_ACCESS_KEY="$AWS_SECRET_ACCESS_KEY_VALUE"
export AWS_DEFAULT_REGION="$AWS_DEFAULT_REGION_VALUE"
export AWS_DEFAULT_OUTPUT="$AWS_DEFAULT_OUTPUT_VALUE"
# Confluent 설정
export CONFLUENT_HOME="/confluent-hub"
export PATH="\$PATH:\$CONFLUENT_HOME/bin"
# Java 설정
export JAVA_HOME="/usr/lib/jvm/java-21-openjdk-amd64"
export PATH="\$JAVA_HOME/bin:\$PATH"
EOF
if [ $? -ne 0 ]; then echo "kakaocloud: 환경 변수 등록 실패"; exit 1; fi
################################################################################
# 3. .bashrc 적용 (비인터랙티브 환경 대비)
################################################################################
source /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 적용 실패"; exit 1; }
################################################################################
# 4. S3 Sink Connector 설치
################################################################################
echo "kakaocloud: 8. S3 Sink Connector 설치 시작"
sudo chown ubuntu:ubuntu /home/ubuntu/kafka/config/connect-standalone.properties 2>/dev/null
/confluent-hub/bin/confluent-hub install confluentinc/kafka-connect-s3:latest \
--component-dir /confluent-hub/plugins \
--worker-configs /home/ubuntu/kafka/config/connect-standalone.properties \
--no-prompt || { echo "kakaocloud: S3 Sink Connector 설치 실패"; exit 1; }
################################################################################
# 5. AWS CLI 설치
################################################################################
echo "kakaocloud: 9. AWS CLI 설치 시작"
cd /home/ubuntu || { echo "kakaocloud: 홈 디렉토리 이동 실패"; exit 1; }
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64-${AWS_CLI_VERSION}.zip" -o "${AWS_CLI_ZIP}" || { echo "kakaocloud: AWS CLI 다운로드 실패"; exit 1; }
unzip "${AWS_CLI_ZIP}" || { echo "kakaocloud: AWS CLI 압축 해제 실패"; exit 1; }
sudo ./aws/install || { echo "kakaocloud: AWS CLI 설치 실패"; exit 1; }
rm -rf aws "${AWS_CLI_ZIP}" || { echo "kakaocloud: AWS CLI 설치 후 정리 실패"; exit 1; }
AWS_VERSION=$(aws --version 2>&1 || true)
################################################################################
# 6. AWS CLI configure 파일 설정
################################################################################
echo "kakaocloud: 10. AWS CLI 설정 시작"
sudo -u ubuntu -i aws configure set aws_access_key_id "$AWS_ACCESS_KEY_ID_VALUE" || { echo "kakaocloud: AWS CLI aws_access_key_id 설정 실패"; exit 1; }
sudo -u ubuntu -i aws configure set aws_secret_access_key "$AWS_SECRET_ACCESS_KEY_VALUE" || { echo "kakaocloud: AWS CLI aws_secret_access_key 설정 실패"; exit 1; }
sudo -u ubuntu -i aws configure set default.region "$AWS_DEFAULT_REGION_VALUE" || { echo "kakaocloud: AWS CLI default.region 설정 실패"; exit 1; }
sudo -u ubuntu -i aws configure set default.output "$AWS_DEFAULT_OUTPUT_VALUE" || { echo "kakaocloud: AWS CLI default.output 설정 실패"; exit 1; }
AWS_VERSION=$(aws --version 2>&1)
source /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 재적용 실패"; exit 1; }
################################################################################
# 7. Kafka 설정 폴더 생성 및 권한 부여
################################################################################
echo "kakaocloud: 11. Kafka 설정 폴더 생성 및 권한 부여 시작"
sudo mkdir -p /opt/kafka/config || { echo "kakaocloud: Kafka 설정 폴더 생성 실패"; exit 1; }
sudo chown -R ubuntu:ubuntu /opt/kafka || { echo "kakaocloud: Kafka 설정 폴더 권한 변경 실패"; exit 1; }
################################################################################
# 8. 커스텀 파티셔너, 파일네임 플러그인 다운로드
################################################################################
echo "kakaocloud: 12. 커스텀 플러그인 다운로드 시작"
sudo wget -O /confluent-hub/plugins/confluentinc-kafka-connect-s3/lib/custom-partitioner-1.0-SNAPSHOT.jar \
"https://raw.githubusercontent.com/kakaocloud-edu/tutorial/main/DataAnalyzeCourse/src/day1/Lab03/kafka_connector/custom-partitioner-1.0-SNAPSHOT.jar" || { echo "kakaocloud: custom-partitioner 다운로드 실패"; exit 1; }
sudo wget -O /confluent-hub/plugins/confluentinc-kafka-connect-s3/lib/custom-filename-1.0-SNAPSHOT.jar \
"https://raw.githubusercontent.com/kakaocloud-edu/tutorial/main/DataAnalyzeCourse/src/day1/Lab03/kafka_connector/custom-filename-1.0-SNAPSHOT.jar" || { echo "kakaocloud: custom-filename 다운로드 실패"; exit 1; }
################################################################################
# 9. s3-sink-connector.properties 생성
################################################################################
echo "kakaocloud: 13. s3-sink-connector.properties 생성 시작"
cat <<EOF > /opt/kafka/config/s3-sink-connector.properties
name=s3-sink-connector
connector.class=io.confluent.connect.s3.S3SinkConnector
tasks.max=1
topics=${TOPIC}
s3.region=kr-central-2
s3.bucket.name=${BUCKET_NAME}
s3.part.size=5242880
aws.access.key.id=${AWS_ACCESS_KEY_ID_VALUE}
aws.secret.access.key=${AWS_SECRET_ACCESS_KEY_VALUE}
store.url=https://objectstorage.kr-central-2.kakaocloud.com
storage.class=io.confluent.connect.s3.storage.S3Storage
format.class=io.confluent.connect.s3.format.json.JsonFormat
parquet.codec=snappy
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
flush.size=1
partitioner.class=com.mycompany.connect.FlexibleTimeBasedPartitioner
topics.dir=${TOPIC}
custom.topic.dir=topic
custom.partition.prefix=partition_
partition.duration.ms=3600000
path.format='year_'yyyy/'month_'MM/'day_'dd/'hour_'HH
locale=en-US
timezone=Asia/Seoul
timestamp.extractor=Wallclock
custom.replacements==:_
s3.bucket.create=false
bucket.exists.checking.enabled=false
EOF
if [ $? -ne 0 ]; then echo "kakaocloud: s3-sink-connector.properties 생성 실패"; exit 1; fi
################################################################################
# 10. worker.properties 생성
################################################################################
echo "kakaocloud: 14. worker.properties 생성 시작"
cat <<EOF > /opt/kafka/config/worker.properties
bootstrap.servers=${KAFKA_BOOTSTRAP_SERVER}
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/confluent-hub/plugins/confluentinc-kafka-connect-s3
listeners=http://0.0.0.0:8084
EOF
if [ $? -ne 0 ]; then echo "kakaocloud: worker.properties 생성 실패"; exit 1; fi
################################################################################
# 11. kafka-connect systemd 서비스 등록
################################################################################
echo "kakaocloud: 15. Kafka Connect 서비스 등록 시작"
cat <<EOF | sudo tee /etc/systemd/system/kafka-connect.service
[Unit]
Description=Kafka Connect Standalone Service
After=network.target
[Service]
User=ubuntu
ExecStart=/home/ubuntu/kafka/bin/connect-standalone.sh \
/opt/kafka/config/worker.properties \
/opt/kafka/config/s3-sink-connector.properties
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
EOF
if [ $? -ne 0 ]; then echo "kakaocloud: Kafka Connect 서비스 등록 실패"; exit 1; fi
################################################################################
# 12. Schema Registry 다운로드 및 설치
################################################################################
echo "kakaocloud: 16. Schema Registry 다운로드 및 설치 시작"
sudo wget https://packages.confluent.io/archive/7.5/confluent-7.5.3.tar.gz || { echo "kakaocloud: Schema Registry 다운로드 실패"; exit 1; }
sudo tar -xzvf confluent-7.5.3.tar.gz -C /confluent-hub/plugins || { echo "kakaocloud: Schema Registry 압축 해제 실패"; exit 1; }
sudo rm confluent-7.5.3.tar.gz || { echo "kakaocloud: Schema Registry 압축파일 삭제 실패"; exit 1; }
################################################################################
# 13. systemd 유닛 파일 생성 및 Schema Registry 서비스 등록
################################################################################
echo "kakaocloud: 17. systemd 유닛 파일 생성 및 Schema Registry 서비스 등록 시작"
cat <<EOF > /etc/systemd/system/schema-registry.service
[Unit]
Description=Confluent Schema Registry
After=network.target
[Service]
Type=simple
User=ubuntu
ExecStart=/confluent-hub/plugins/confluent-7.5.3/bin/schema-registry-start /confluent-hub/plugins/confluent-7.5.3/etc/schema-registry/schema-registry.properties
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target
EOF
if [ $? -ne 0 ]; then echo "kakaocloud: Schema Registry Service 파일 작성 실패"; exit 1; fi
sudo systemctl daemon-reload || { echo "kakaocloud: daemon-reload 실패"; exit 1; }
sudo systemctl enable schema-registry.service || { echo "kakaocloud: schema-registry 서비스 생성 실패"; exit 1; }
sudo systemctl start schema-registry.service || { echo "kakaocloud: schema-registry 서비스 시작 실패"; exit 1; }
################################################################################
# 14. S3 커넥터 플러그인 경로에 Avro 컨버터 설치 및 설정
################################################################################
echo "kakaocloud: 18. Avro 컨버터 설치 및 설정 시작"
sudo wget https://github.com/kakaocloud-edu/tutorial/raw/refs/heads/main/DataAnalyzeCourse/src/day2/Lab01/confluentinc-kafka-connect-avro-converter-7.5.3.zip || { echo "kakaocloud: confluentinc-kafka-connect-avro-converter 다운로드 실패"; exit 1; }
unzip confluentinc-kafka-connect-avro-converter-7.5.3.zip || { echo "kakaocloud: confluentinc-kafka-connect-avro-converter 압축 해제 실패"; exit 1; }
sudo rm confluentinc-kafka-connect-avro-converter-7.5.3.zip || { echo "kakaocloud: confluentinc-kafka-connect-avro-converter 압축파일 삭제 실패"; exit 1; }
sudo mv confluentinc-kafka-connect-avro-converter-7.5.3/lib/*.jar /confluent-hub/plugins/confluentinc-kafka-connect-s3/lib || { echo "kakaocloud: confluentinc-kafka-connect-avro-converter 파일 이동 실패"; exit 1; }
sudo wget -P /confluent-hub/plugins/confluentinc-kafka-connect-s3/lib \
https://repo1.maven.org/maven2/com/google/guava/guava/30.1.1-jre/guava-30.1.1-jre.jar \
https://packages.confluent.io/maven/io/confluent/kafka-connect-protobuf-converter/7.5.3/kafka-connect-protobuf-converter-7.5.3.jar \
https://packages.confluent.io/maven/io/confluent/kafka-protobuf-serializer/7.5.3/kafka-protobuf-serializer-7.5.3.jar \
https://packages.confluent.io/maven/io/confluent/common-config/7.5.3/common-config-7.5.3.jar \
https://repo1.maven.org/maven2/com/google/protobuf/protobuf-java/3.25.1/protobuf-java-3.25.1.jar \
https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.2/failureaccess-1.0.2.jar || { echo -e "\nERROR: S3 커넥터 추가 의존성 다운로드 실패"; exit 1; }
################################################################################
# 완료
################################################################################
echo "kakaocloud: Setup 완료" -
Run the script.
Run scriptSCRIPT_FILE="./setting-script.sh"
sudo chmod +x "$SCRIPT_FILE"
sudo -E "$SCRIPT_FILE"
# Running the script may take some time. -
Write message files to the Object Storage bucket and configure access permissions.
aws s3api put-bucket-acl \
--bucket "${BUCKET_NAME}" \
--grant-read 'uri="http://acs.amazonaws.com/groups/global/AllUsers"' \
--grant-write 'uri="http://acs.amazonaws.com/groups/global/AllUsers"' \
--endpoint-url https://objectstorage.kr-central-2.kakaocloud.com환경변수 설명 BUCKET_NAME🖌︎ Bucket name - Example: tutorial-kafka-bucket -
Verify that the S3 Connector and Standard Worker configuration files were created correctly.
ls /opt/kafka/config
# Output: s3-sink-connector.properties worker.properties -
Verify that the kafka-connect service file was created.
ls /etc/systemd/system | grep kafka-connect.service
# Output: kafka-connect.service -
Modify the Kafka broker address in the Schema Registry configuration file.
sudo sed -i 's|PLAINTEXT://localhost:9092|${HOST:PORT}|' /confluent-hub/plugins/confluent-7.5.3/etc/schema-registry/schema-registry.properties환경변수 설명 HOST:PORT🖌︎ In Advanced Managed Kafka > `tutorial-amk-cluster` details, copy the bootstrap server information
Step 3. Start the kafka-connect service and check Sink Connector status
-
Reload the daemon and start the kafka-connect service.
Daemon reloadsudo systemctl daemon-reloadRestart schema-registrysudo systemctl restart schema-registry.serviceEnable Kafka Connect on bootsudo systemctl enable kafka-connectStart Kafka Connect servicesudo systemctl start kafka-connect -
Check s3-sink-connector status information.
watch -n 1 "curl -s http://localhost:8084/connectors/s3-sink-connector/status | jq"
# Example output
{
"name": "s3-sink-connector",
"connector": {
"state": "RUNNING",
"worker_id": "0.0.0.0:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "0.0.0.0:8083"
}
],
"type": "sink"
}
Step 4. Publish messages and verify loading
Publish messages to Kafka and verify that they are loaded into the Object Storage bucket correctly.
-
Run the producer client.
cd kafkabin/kafka-console-producer.sh --topic ${TOPIC_NAME} --bootstrap-server ${HOST:PORT}환경변수 설명 TOPIC_NAME🖌︎ Topic name where the producer sends data / Example: tutorial-topic HOST:PORT🖌︎ In Advanced Managed Kafka > `tutorial-amk-cluster` details, copy the bootstrap server information -
If
>appears in the execution result, enter and run the test message below.hello tester! -
In the console, go to Object Storage >
tutorial-kafka-bucketand verify that a file was created.- The storage path varies depending on the Kafka Connector settings and is usually divided by date or partition.
Wrap-up and next steps
You configured a structure that stores Kafka messages in Object Storage. You can now register metadata in Data Catalog based on the stored data and perform analysis using Data Query.
Next, see Analyze Kafka messages using Data Catalog and Data Query.