Skip to main content
Tutorial series | Build a real-time data pipeline

2. Load Kafka data into Object Storage

Practice loading messages received from Kafka into Object Storage.

Basic information

About this scenario

In this tutorial, you configure a pipeline that stores Kafka messages collected in Part 1 in Object Storage. This step corresponds to Storage in the overall architecture, and the loaded data becomes the foundation for later analytics processing or long-term retention.

You will cover the following:

  • Configure an Object Storage Sink using Kafka Connector
  • Receive messages and verify loading
  • Check results saved as files in Object Storage

architect Pipeline architecture

Before you start

This tutorial requires a Kafka cluster and topic to be configured in advance. If they are not configured yet, complete Message processing through Kafka first.

Step 1. Create an Object Storage bucket

Create an Object Storage bucket to store Kafka messages.

  1. Go to KakaoCloud console > Object Storage > Bucket.

  2. Click Create Bucket, then enter the following values.

    ItemValue
    TypeStandard
    Nametutorial-kafka-bucket
    EncryptionNot used
  3. Click Create and verify that the bucket appears in the list.

Step 2. Configure Sink Connector

Configure a Sink Connector that sends data from Kafka to Object Storage.

  1. Connect to the VM for Kafka cluster access (tutorial-amk-vm).

  2. Set environment variables required to install Sink Connector.

    cat <<'EOF' > /tmp/env_vars.sh
    export KAFKA_BOOTSTRAP_SERVER="${HOST:PORT}"
    export TOPIC="${TOPIC_NAME}"

    export AWS_ACCESS_KEY_ID_VALUE="${S3_ACCESS_KEY_ID}"
    export AWS_SECRET_ACCESS_KEY_VALUE="${S3_ACCESS_SECRET_KEY}"

    export BUCKET_NAME="${BUCKET_NAME}"
    export AWS_DEFAULT_REGION_VALUE="kr-central-2"
    export AWS_DEFAULT_OUTPUT_VALUE="json"

    export LOGFILE="/home/ubuntu/setup.log"
    EOF

    source /tmp/env_vars.sh
    echo "source /tmp/env_vars.sh" >> /home/ubuntu/.bashrc
    환경변수설명
    HOST:PORT🖌In Advanced Managed Kafka > Cluster, click the created `tutorial-amk-cluster` and copy the bootstrap server information
    TOPIC_NAME🖌Topic name where the consumer receives data / Example: tutorial-topic
    S3_ACCESS_KEY_ID🖌S3 access key ID value
    S3_ACCESS_SECRET_KEY🖌S3 secret access key value
    BUCKET_NAME🖌Bucket name / Example: tutorial-kafka-bucket
  3. Write a script for Sink Connector setup.

    sudo vi ./setting-script.sh
    Example setting-script.sh script

    Copy and paste the script below into the setting-script.sh file.

    setting-script.sh
    #!/bin/bash

    #------------------------------------------
    # 설정 변수
    #------------------------------------------
    # Confluent Hub Client 설정
    CONFLUENT_HUB_DIR="/confluent-hub"
    CONFLUENT_HUB_URL="https://client.hub.confluent.io/confluent-hub-client-latest.tar.gz"
    CONFLUENT_HUB_FILE="confluent-hub-client-latest.tar.gz"

    # AWS CLI 설정
    AWS_CLI_VERSION="2.22.0"
    AWS_CLI_ZIP="awscliv2.zip"
    AWS_CLI_DOWNLOAD_URL="https://awscli.amazonaws.com/awscli-exe-linux-x86_64-${AWS_CLI_VERSION}.zip"

    # Kafka 설정
    KAFKA_INSTALL_DIR="/home/ubuntu/kafka"

    # 필수 환경변수 검증
    required_variables=(
    KAFKA_BOOTSTRAP_SERVER BUCKET_NAME
    AWS_ACCESS_KEY_ID_VALUE AWS_SECRET_ACCESS_KEY_VALUE
    AWS_DEFAULT_REGION_VALUE AWS_DEFAULT_OUTPUT_VALUE
    )

    echo "kakaocloud: 필수 환경변수 검증 시작"
    for var in "${required_variables[@]}"; do
    if [ -z "${!var}" ]; then
    echo "kakaocloud: 필수 환경변수 $var 가 설정되지 않았습니다. 스크립트를 종료합니다."
    exit 1
    fi
    done
    ################################################################################
    # 0. apt 업데이트 및 필수 패키지 설치
    ############################################################f####################
    echo "kakaocloud: 4. 시스템 업데이트 및 필수 패키지 설치 시작"
    sudo apt-get update -y || { echo "kakaocloud: apt-get update 실패"; exit 1; }
    sudo apt-get install -y python3 python3-pip openjdk-21-jdk unzip jq aria2 curl || { echo "kakaocloud: 필수 패키지 설치 실패"; exit 1; }

    ################################################################################
    # 1. Confluent Hub Client 설치
    ################################################################################
    echo "kakaocloud: Confluent Hub Client 설치 시작"
    sudo mkdir -p "$CONFLUENT_HUB_DIR/plugins" || { echo "디렉토리 생성 실패"; exit 1; }
    cd "$CONFLUENT_HUB_DIR" || { echo "디렉토리 이동 실패"; exit 1; }
    aria2c -x 16 -s 16 -o "$CONFLUENT_HUB_FILE" "$CONFLUENT_HUB_URL" || { echo "다운로드 실패"; exit 1; }
    sudo tar -xzf "$CONFLUENT_HUB_FILE" || { echo "압축 해제 실패"; exit 1; }
    sudo rm "$CONFLUENT_HUB_FILE"
    sudo chown -R ubuntu:ubuntu "$CONFLUENT_HUB_DIR"

    # .bashrc PATH 설정
    sed -i '/CONFLUENT_HOME=/d' /home/ubuntu/.bashrc
    sed -i '/PATH=.*\$CONFLUENT_HOME\/bin/d' /home/ubuntu/.bashrc
    cat <<EOF >> /home/ubuntu/.bashrc
    # Confluent 설정
    export CONFLUENT_HOME="$CONFLUENT_HUB_DIR"
    export PATH="\$PATH:\$CONFLUENT_HOME/bin"
    EOF
    export CONFLUENT_HOME="$CONFLUENT_HUB_DIR"
    export PATH="$PATH:$CONFLUENT_HUB_DIR/bin"

    echo "kakaocloud: Confluent Hub Client 설치 완료"

    ################################################################################
    # 2. .bashrc에 환경 변수 등록
    ################################################################################
    echo "kakaocloud: 7. 환경 변수 등록 시작"
    # 기존 관련 라인 제거
    sed -i '/S3_ACCESS_KEY=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
    sed -i '/S3_SECRET_ACCESS_KEY=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
    sed -i '/AWS_DEFAULT_REGION=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
    sed -i '/AWS_DEFAULT_OUTPUT=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
    sed -i '/CONFLUENT_HOME=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }
    sed -i '/JAVA_HOME=/d' /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 수정 실패"; exit 1; }

    cat <<EOF >> /home/ubuntu/.bashrc
    # KakaoCloud S3 Credentials
    export AWS_ACCESS_KEY_ID="$AWS_ACCESS_KEY_ID_VALUE"
    export AWS_SECRET_ACCESS_KEY="$AWS_SECRET_ACCESS_KEY_VALUE"
    export AWS_DEFAULT_REGION="$AWS_DEFAULT_REGION_VALUE"
    export AWS_DEFAULT_OUTPUT="$AWS_DEFAULT_OUTPUT_VALUE"

    # Confluent 설정
    export CONFLUENT_HOME="/confluent-hub"
    export PATH="\$PATH:\$CONFLUENT_HOME/bin"

    # Java 설정
    export JAVA_HOME="/usr/lib/jvm/java-21-openjdk-amd64"
    export PATH="\$JAVA_HOME/bin:\$PATH"
    EOF
    if [ $? -ne 0 ]; then echo "kakaocloud: 환경 변수 등록 실패"; exit 1; fi

    ################################################################################
    # 3. .bashrc 적용 (비인터랙티브 환경 대비)
    ################################################################################
    source /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 적용 실패"; exit 1; }

    ################################################################################
    # 4. S3 Sink Connector 설치
    ################################################################################
    echo "kakaocloud: 8. S3 Sink Connector 설치 시작"
    sudo chown ubuntu:ubuntu /home/ubuntu/kafka/config/connect-standalone.properties 2>/dev/null
    /confluent-hub/bin/confluent-hub install confluentinc/kafka-connect-s3:latest \
    --component-dir /confluent-hub/plugins \
    --worker-configs /home/ubuntu/kafka/config/connect-standalone.properties \
    --no-prompt || { echo "kakaocloud: S3 Sink Connector 설치 실패"; exit 1; }

    ################################################################################
    # 5. AWS CLI 설치
    ################################################################################
    echo "kakaocloud: 9. AWS CLI 설치 시작"
    cd /home/ubuntu || { echo "kakaocloud: 홈 디렉토리 이동 실패"; exit 1; }
    curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64-${AWS_CLI_VERSION}.zip" -o "${AWS_CLI_ZIP}" || { echo "kakaocloud: AWS CLI 다운로드 실패"; exit 1; }
    unzip "${AWS_CLI_ZIP}" || { echo "kakaocloud: AWS CLI 압축 해제 실패"; exit 1; }
    sudo ./aws/install || { echo "kakaocloud: AWS CLI 설치 실패"; exit 1; }
    rm -rf aws "${AWS_CLI_ZIP}" || { echo "kakaocloud: AWS CLI 설치 후 정리 실패"; exit 1; }
    AWS_VERSION=$(aws --version 2>&1 || true)

    ################################################################################
    # 6. AWS CLI configure 파일 설정
    ################################################################################
    echo "kakaocloud: 10. AWS CLI 설정 시작"
    sudo -u ubuntu -i aws configure set aws_access_key_id "$AWS_ACCESS_KEY_ID_VALUE" || { echo "kakaocloud: AWS CLI aws_access_key_id 설정 실패"; exit 1; }
    sudo -u ubuntu -i aws configure set aws_secret_access_key "$AWS_SECRET_ACCESS_KEY_VALUE" || { echo "kakaocloud: AWS CLI aws_secret_access_key 설정 실패"; exit 1; }
    sudo -u ubuntu -i aws configure set default.region "$AWS_DEFAULT_REGION_VALUE" || { echo "kakaocloud: AWS CLI default.region 설정 실패"; exit 1; }
    sudo -u ubuntu -i aws configure set default.output "$AWS_DEFAULT_OUTPUT_VALUE" || { echo "kakaocloud: AWS CLI default.output 설정 실패"; exit 1; }
    AWS_VERSION=$(aws --version 2>&1)
    source /home/ubuntu/.bashrc || { echo "kakaocloud: .bashrc 재적용 실패"; exit 1; }

    ################################################################################
    # 7. Kafka 설정 폴더 생성 및 권한 부여
    ################################################################################
    echo "kakaocloud: 11. Kafka 설정 폴더 생성 및 권한 부여 시작"
    sudo mkdir -p /opt/kafka/config || { echo "kakaocloud: Kafka 설정 폴더 생성 실패"; exit 1; }
    sudo chown -R ubuntu:ubuntu /opt/kafka || { echo "kakaocloud: Kafka 설정 폴더 권한 변경 실패"; exit 1; }

    ################################################################################
    # 8. 커스텀 파티셔너, 파일네임 플러그인 다운로드
    ################################################################################
    echo "kakaocloud: 12. 커스텀 플러그인 다운로드 시작"
    sudo wget -O /confluent-hub/plugins/confluentinc-kafka-connect-s3/lib/custom-partitioner-1.0-SNAPSHOT.jar \
    "https://raw.githubusercontent.com/kakaocloud-edu/tutorial/main/DataAnalyzeCourse/src/day1/Lab03/kafka_connector/custom-partitioner-1.0-SNAPSHOT.jar" || { echo "kakaocloud: custom-partitioner 다운로드 실패"; exit 1; }
    sudo wget -O /confluent-hub/plugins/confluentinc-kafka-connect-s3/lib/custom-filename-1.0-SNAPSHOT.jar \
    "https://raw.githubusercontent.com/kakaocloud-edu/tutorial/main/DataAnalyzeCourse/src/day1/Lab03/kafka_connector/custom-filename-1.0-SNAPSHOT.jar" || { echo "kakaocloud: custom-filename 다운로드 실패"; exit 1; }

    ################################################################################
    # 9. s3-sink-connector.properties 생성
    ################################################################################
    echo "kakaocloud: 13. s3-sink-connector.properties 생성 시작"
    cat <<EOF > /opt/kafka/config/s3-sink-connector.properties
    name=s3-sink-connector
    connector.class=io.confluent.connect.s3.S3SinkConnector
    tasks.max=1
    topics=${TOPIC}
    s3.region=kr-central-2
    s3.bucket.name=${BUCKET_NAME}
    s3.part.size=5242880
    aws.access.key.id=${AWS_ACCESS_KEY_ID_VALUE}
    aws.secret.access.key=${AWS_SECRET_ACCESS_KEY_VALUE}
    store.url=https://objectstorage.kr-central-2.kakaocloud.com
    storage.class=io.confluent.connect.s3.storage.S3Storage
    format.class=io.confluent.connect.s3.format.json.JsonFormat
    parquet.codec=snappy
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    flush.size=1
    partitioner.class=com.mycompany.connect.FlexibleTimeBasedPartitioner
    topics.dir=${TOPIC}
    custom.topic.dir=topic
    custom.partition.prefix=partition_
    partition.duration.ms=3600000
    path.format='year_'yyyy/'month_'MM/'day_'dd/'hour_'HH
    locale=en-US
    timezone=Asia/Seoul
    timestamp.extractor=Wallclock
    custom.replacements==:_
    s3.bucket.create=false
    bucket.exists.checking.enabled=false
    EOF
    if [ $? -ne 0 ]; then echo "kakaocloud: s3-sink-connector.properties 생성 실패"; exit 1; fi

    ################################################################################
    # 10. worker.properties 생성
    ################################################################################
    echo "kakaocloud: 14. worker.properties 생성 시작"
    cat <<EOF > /opt/kafka/config/worker.properties
    bootstrap.servers=${KAFKA_BOOTSTRAP_SERVER}
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    offset.storage.file.filename=/tmp/connect.offsets
    offset.flush.interval.ms=10000
    plugin.path=/confluent-hub/plugins/confluentinc-kafka-connect-s3
    listeners=http://0.0.0.0:8084
    EOF
    if [ $? -ne 0 ]; then echo "kakaocloud: worker.properties 생성 실패"; exit 1; fi

    ################################################################################
    # 11. kafka-connect systemd 서비스 등록
    ################################################################################
    echo "kakaocloud: 15. Kafka Connect 서비스 등록 시작"
    cat <<EOF | sudo tee /etc/systemd/system/kafka-connect.service
    [Unit]
    Description=Kafka Connect Standalone Service
    After=network.target

    [Service]
    User=ubuntu
    ExecStart=/home/ubuntu/kafka/bin/connect-standalone.sh \
    /opt/kafka/config/worker.properties \
    /opt/kafka/config/s3-sink-connector.properties
    Restart=on-failure
    RestartSec=5

    [Install]
    WantedBy=multi-user.target
    EOF
    if [ $? -ne 0 ]; then echo "kakaocloud: Kafka Connect 서비스 등록 실패"; exit 1; fi

    ################################################################################
    # 12. Schema Registry 다운로드 및 설치
    ################################################################################
    echo "kakaocloud: 16. Schema Registry 다운로드 및 설치 시작"
    sudo wget https://packages.confluent.io/archive/7.5/confluent-7.5.3.tar.gz || { echo "kakaocloud: Schema Registry 다운로드 실패"; exit 1; }
    sudo tar -xzvf confluent-7.5.3.tar.gz -C /confluent-hub/plugins || { echo "kakaocloud: Schema Registry 압축 해제 실패"; exit 1; }
    sudo rm confluent-7.5.3.tar.gz || { echo "kakaocloud: Schema Registry 압축파일 삭제 실패"; exit 1; }

    ################################################################################
    # 13. systemd 유닛 파일 생성 및 Schema Registry 서비스 등록
    ################################################################################
    echo "kakaocloud: 17. systemd 유닛 파일 생성 및 Schema Registry 서비스 등록 시작"
    cat <<EOF > /etc/systemd/system/schema-registry.service
    [Unit]
    Description=Confluent Schema Registry
    After=network.target

    [Service]
    Type=simple
    User=ubuntu
    ExecStart=/confluent-hub/plugins/confluent-7.5.3/bin/schema-registry-start /confluent-hub/plugins/confluent-7.5.3/etc/schema-registry/schema-registry.properties
    Restart=on-failure
    RestartSec=5s

    [Install]
    WantedBy=multi-user.target
    EOF
    if [ $? -ne 0 ]; then echo "kakaocloud: Schema Registry Service 파일 작성 실패"; exit 1; fi

    sudo systemctl daemon-reload || { echo "kakaocloud: daemon-reload 실패"; exit 1; }
    sudo systemctl enable schema-registry.service || { echo "kakaocloud: schema-registry 서비스 생성 실패"; exit 1; }
    sudo systemctl start schema-registry.service || { echo "kakaocloud: schema-registry 서비스 시작 실패"; exit 1; }

    ################################################################################
    # 14. S3 커넥터 플러그인 경로에 Avro 컨버터 설치 및 설정
    ################################################################################
    echo "kakaocloud: 18. Avro 컨버터 설치 및 설정 시작"
    sudo wget https://github.com/kakaocloud-edu/tutorial/raw/refs/heads/main/DataAnalyzeCourse/src/day2/Lab01/confluentinc-kafka-connect-avro-converter-7.5.3.zip || { echo "kakaocloud: confluentinc-kafka-connect-avro-converter 다운로드 실패"; exit 1; }
    unzip confluentinc-kafka-connect-avro-converter-7.5.3.zip || { echo "kakaocloud: confluentinc-kafka-connect-avro-converter 압축 해제 실패"; exit 1; }
    sudo rm confluentinc-kafka-connect-avro-converter-7.5.3.zip || { echo "kakaocloud: confluentinc-kafka-connect-avro-converter 압축파일 삭제 실패"; exit 1; }
    sudo mv confluentinc-kafka-connect-avro-converter-7.5.3/lib/*.jar /confluent-hub/plugins/confluentinc-kafka-connect-s3/lib || { echo "kakaocloud: confluentinc-kafka-connect-avro-converter 파일 이동 실패"; exit 1; }
    sudo wget -P /confluent-hub/plugins/confluentinc-kafka-connect-s3/lib \
    https://repo1.maven.org/maven2/com/google/guava/guava/30.1.1-jre/guava-30.1.1-jre.jar \
    https://packages.confluent.io/maven/io/confluent/kafka-connect-protobuf-converter/7.5.3/kafka-connect-protobuf-converter-7.5.3.jar \
    https://packages.confluent.io/maven/io/confluent/kafka-protobuf-serializer/7.5.3/kafka-protobuf-serializer-7.5.3.jar \
    https://packages.confluent.io/maven/io/confluent/common-config/7.5.3/common-config-7.5.3.jar \
    https://repo1.maven.org/maven2/com/google/protobuf/protobuf-java/3.25.1/protobuf-java-3.25.1.jar \
    https://repo1.maven.org/maven2/com/google/guava/failureaccess/1.0.2/failureaccess-1.0.2.jar || { echo -e "\nERROR: S3 커넥터 추가 의존성 다운로드 실패"; exit 1; }

    ################################################################################
    # 완료
    ################################################################################
    echo "kakaocloud: Setup 완료"
  4. Run the script.

    Run script
    SCRIPT_FILE="./setting-script.sh"
    sudo chmod +x "$SCRIPT_FILE"
    sudo -E "$SCRIPT_FILE"
    # Running the script may take some time.
  5. Write message files to the Object Storage bucket and configure access permissions.

    aws s3api put-bucket-acl \
    --bucket "${BUCKET_NAME}" \
    --grant-read 'uri="http://acs.amazonaws.com/groups/global/AllUsers"' \
    --grant-write 'uri="http://acs.amazonaws.com/groups/global/AllUsers"' \
    --endpoint-url https://objectstorage.kr-central-2.kakaocloud.com
    환경변수설명
    BUCKET_NAME🖌Bucket name - Example: tutorial-kafka-bucket
  6. Verify that the S3 Connector and Standard Worker configuration files were created correctly.

    ls /opt/kafka/config
    # Output: s3-sink-connector.properties worker.properties
  7. Verify that the kafka-connect service file was created.

    ls /etc/systemd/system | grep kafka-connect.service
    # Output: kafka-connect.service
  8. Modify the Kafka broker address in the Schema Registry configuration file.

    sudo sed -i 's|PLAINTEXT://localhost:9092|${HOST:PORT}|' /confluent-hub/plugins/confluent-7.5.3/etc/schema-registry/schema-registry.properties
    환경변수설명
    HOST:PORT🖌In Advanced Managed Kafka > `tutorial-amk-cluster` details, copy the bootstrap server information

Step 3. Start the kafka-connect service and check Sink Connector status

  1. Reload the daemon and start the kafka-connect service.

    Daemon reload
    sudo systemctl daemon-reload
    Restart schema-registry
    sudo systemctl restart schema-registry.service
    Enable Kafka Connect on boot
    sudo systemctl enable kafka-connect
    Start Kafka Connect service
    sudo systemctl start kafka-connect
  2. Check s3-sink-connector status information.

    watch -n 1 "curl -s http://localhost:8084/connectors/s3-sink-connector/status | jq"

    # Example output
    {
    "name": "s3-sink-connector",
    "connector": {
    "state": "RUNNING",
    "worker_id": "0.0.0.0:8083"
    },
    "tasks": [
    {
    "id": 0,
    "state": "RUNNING",
    "worker_id": "0.0.0.0:8083"
    }
    ],
    "type": "sink"
    }

Step 4. Publish messages and verify loading

Publish messages to Kafka and verify that they are loaded into the Object Storage bucket correctly.

  1. Run the producer client.

    cd kafka
    bin/kafka-console-producer.sh --topic ${TOPIC_NAME} --bootstrap-server ${HOST:PORT}
    환경변수설명
    TOPIC_NAME🖌Topic name where the producer sends data / Example: tutorial-topic
    HOST:PORT🖌In Advanced Managed Kafka > `tutorial-amk-cluster` details, copy the bootstrap server information
  2. If > appears in the execution result, enter and run the test message below.

    hello tester!
  3. In the console, go to Object Storage > tutorial-kafka-bucket and verify that a file was created.

    • The storage path varies depending on the Kafka Connector settings and is usually divided by date or partition.

Wrap-up and next steps

You configured a structure that stores Kafka messages in Object Storage. You can now register metadata in Data Catalog based on the stored data and perform analysis using Data Query.

Next, see Analyze Kafka messages using Data Catalog and Data Query.