Skip to main content

Build CDC pipeline with Kafka

Describes how to set up a CDC (Change Data Capture) environment to track data changes from KakaoCloud MySQL service in real-time.

info
  • Estimated time: 60 minutes
  • User environment
    • Recommended OS: MacOS, Ubuntu
    • Region: kr-central-2
  • Prerequisites

About this scenario

This tutorial guides you through building a CDC (Change Data Capture) environment that tracks data changes from KakaoCloud MySQL service in real-time and streams them through Advanced Managed Kafka. This setup allows you to deliver database changes to various analysis and data processing systems in real-time across fields such as e-commerce, financial services, and data warehousing.

Key highlights include:

  • Detect data changes (insert, update, delete) from MySQL using Debezium
  • Stream detected changes to Kafka for real-time delivery
  • Store and analyze received data in Druid
  • Visualize change history in real-time using Superset

Image
Scenario architecture

Terminology
  • CDC (Change Data Capture): A technique to detect and track database changes in real-time
  • Debezium: An open-source CDC platform that detects changes in databases like MySQL and sends them to Kafka
  • Kafka Connect: A data integration framework within Kafka supporting real-time data streaming from various sources

Before you start

1. Set up VPC and subnets

Configure an isolated network environment to ensure secure communication between CDC pipeline components. Create VPC and subnets to protect against external access while enabling seamless internal communication.

  1. Navigate to KakaoCloud Console > Beyond Networking Service > VPC.

  2. Click the [Create VPC] button to create a new VPC.

    VPC: tutorial-cdc-vpc
    CategoryItemValue
    VPC infoVPC nametutorial-cdc-vpc
    VPC IP CIDR block10.0.0.0/16
    Availability zoneNumber of availability zones2
    First AZkr-central-2-a
    Second AZkr-central-2-b
    Subnet setupPublic subnets per AZ1
    Private subnets per AZ0
    Public subnet IPv4 CIDR block (kr-central-2-a)10.0.0.0/20
    Public subnet IPv4 CIDR block (kr-central-2-b)10.0.16.0/20
  3. Click the [Create] button at the bottom.

2. Configure security groups

Configure network access policies to ensure secure communication within the CDC pipeline. Allow internal communication between MySQL, Advanced Managed Kafka, and Hadoop Eco components, while restricting external access to specific IPs for management purposes.

  1. Navigate to KakaoCloud Console > Beyond Networking Service > VPC > Security Groups.

  2. Click [Create security group] to create a new security group.

  3. Configure the security group as follows:

    ItemValue
    Nametutorial-cdc-sg
    DescriptionCDC Pipeline security policy
  4. Configure inbound rules to allow access from your Public IP.

    Check my public IP

    Click the button below to check your current public IP.

    ItemValueNote
    ProtocolALLAllow all protocols
    Source{Your Public IP}/32
    PortALL
    Policy description (optional)Allow Access from User Public IP
  5. After creating the security group, click Manage inbound rules and configure the following rule:

    ItemValueNote
    ProtocolALLAllow all protocols
    Source@tutorial-cdc-sgAllow internal communication within the same security group
    PortALL
    Policy description (optional)Internal SG Access

3. Create MySQL instance group

Set up the source database for the CDC pipeline. Create tables to manage user information and configure MySQL to allow Debezium to detect changes (INSERT, UPDATE, DELETE).

  1. Navigate to KakaoCloud Console > Data Store > MySQL > Instance Group.

  2. Click [Create instance group] and configure as follows:

    ItemValue
    Instance group nametutorial-cdc-mysql
    Instance availabilitySingle
    MySQL usernameadmin
    MySQL passwordadmin123
    VPCtutorial-cdc-vpc
    Subnetmain
    Auto backup optionEnabled

4. Set up VM for Debezium Connector

Create a VM instance to run the Debezium Connector, which detects MySQL data changes and sends them to Kafka.

Key pair setup

  1. Navigate to KakaoCloud Console > Beyond Compute Service > Virtual Machine > 키 페어.

  2. Click Create key pair:

    • Name: tutorial-cdc-keypair
warning

The generated key pair (.pem file) can only be downloaded once. Store it securely. Lost keys cannot be recovered and require re-issuance.

Create VM instance

  1. Navigate to KakaoCloud Console > Beyond Compute Service > Virtual Machine > Instance.

  2. Click [Create instance] and configure as follows:

    CategoryItemValueNote
    Basic infoNametutorial-cdc-vm
    ImageOSUbuntu 22.04
    Instance typet1i.small
    VolumeRoot volume30 GBSSD
    Key pairtutorial-cdc-keypair
    NetworkVPCtutorial-cdc-vpc
    Security grouptutorial-cdc-sg
    Subnetmain

5. Create Advanced Managed Kafka cluster

Configure an Advanced Managed Kafka cluster to stream and deliver data changes captured by CDC to other systems in real-time.

  1. Navigate to KakaoCloud Console > Analytics > Advanced Managed Kafka > Cluster.

  2. Click the [Create cluster] button and configure it as follows. Keep default settings for unspecified options.

    CategoryItemValueNote
    Basic settingsCluster nametutorial-cdc-kafka
    Instance typer2a.2xlarge
    NetworkVPCtutorial-cdc-vpc
    Subnetmain, {VPC_ID}_sn_1 (10.0.16.0/20)
    Security grouptutorial-cdc-sg
    Broker configurationNumber of brokers2One per availability zone

6. Create Hadoop Eco cluster

Set up a Hadoop Eco Dataflow-type cluster to store and analyze data changes streamed from Kafka. This environment includes Druid for storage and analysis and Superset for visualization.

  1. Navigate to KakaoCloud Console > Analytics > Hadoop Eco > Cluster.

  2. Click the [Create cluster] button and configure as follows:

Step 1: Cluster settings

CategoryItemValue
Basic infoCluster nametutorial-cdc-dataflow
Cluster configurationCluster typeDataflow
Admin settingsAdmin IDadmin
Admin passwordadmin123!
VPC settingsVPCtutorial-cdc-vpc
Subnetmain
Security groupSelect an existing security group
Security group nametutorial-cdc-sg

Step 2: Instance settings

CategoryItemValue
Master node settingsMaster node instance typem2a.xlarge
Worker node settingsWorker node instance typem2a.xlarge
Key pairtutorial-cdc-keypair

Step 3: Detailed settings

No changes are needed in detailed settings. Click [Create] to launch the Hadoop Eco Dataflow cluster.

Getting started

Step 1. Configure public IP

For CDC pipeline setup, VM instances and Hadoop Eco clusters need to be externally accessible. Assign a public IP to each instance for SSH access and web interface connections.

Assign public IP to VM instance

  1. Go to KakaoCloud Console > Beyond Compute Service > Virtual Machine.

  2. In the Instance tab, click the tutorial-cdc-vm instance name.

  3. Click [Instance actions] > [Associate public IP].

  4. In the public IP association window, click [OK] without making any changes.

Assign public IP to Hadoop Eco Master Node

  1. Go to KakaoCloud Console > Beyond Compute Service > Virtual Machine.

  2. In the Instance tab, click the HadoopMST-tutorial-cdc-dataflow-1 instance name.

  3. Click [Instance actions] > [Associate public IP].

  4. In the public IP association window, click [OK] without making any changes.

info

The assigned public IP can be viewed in the Instance List or on the Network tab of the instance details page.

Step 2. Configure Debezium server environment

Set up the essential software and tools required for the CDC pipeline, including Java, MySQL Client, and Kafka.

  1. Move to the directory where the key pair file is stored in your local terminal.

    cd ~/Downloads  # Or the directory where the key pair file is saved
  2. Set permissions for the key pair file.

    sudo chmod 400 tutorial-cdc-keypair.pem
  3. Connect to the VM via SSH.

    ssh -i tutorial-cdc-keypair.pem ubuntu@${VM_PUBLIC_IP}
    환경변수설명
    VM_PUBLIC_IP🖌Check Public IP in VM Instance > Network tab
  4. Install the Java runtime environment and configure environment variables.

    sudo apt update
    sudo apt install -y openjdk-21-jdk

    # Configure Java environment variables
    cat << EOF | sudo tee -a /etc/profile
    export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
    export PATH=\$JAVA_HOME/bin:\$PATH
    export CLASSPATH=\$CLASSPATH:\$JAVA_HOME/lib/ext:\$JAVA_HOME/lib/tools.jar
    EOF

    source /etc/profile
  5. Install MySQL client tools for database management.

    sudo apt install -y mysql-client
  6. Install tools to manage the Kafka cluster. Ensure the Kafka tool version matches your cluster version. This tutorial uses version 3.7.1.

    info

    Kafka version downloads can be found at https://archive.apache.org/dist/kafka/.

    cd ~
    curl https://archive.apache.org/dist/kafka/3.7.1/kafka_2.13-3.7.1.tgz -o kafka_2.13-3.7.1.tgz
    tar -xzf kafka_2.13-3.7.1.tgz
    rm kafka_2.13-3.7.1.tgz
    mv kafka_2.13-3.7.1 kafka

Step 3. Configure Debezium

Debezium is a CDC tool that reads MySQL binary logs to detect data changes. In this step, you will install Debezium and configure it to send MySQL changes to Kafka, enabling real-time capture of all database changes.

  1. Download and install the Debezium MySQL connector plugin.

    mkdir -p ~/kafka/plugins
    wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/3.0.2.Final/debezium-connector-mysql-3.0.2.Final-plugin.tar.gz
    tar -xzf debezium-connector-mysql-3.0.2.Final-plugin.tar.gz -C ~/kafka/plugins/
    rm debezium-connector-mysql-3.0.2.Final-plugin.tar.gz
  2. Configure Kafka Connect in distributed mode.

    cat << EOF > /home/ubuntu/kafka/config/connect-distributed.properties
    bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS}
    group.id=connect-cluster
    key.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable=false
    value.converter.schemas.enable=false
    plugin.path=/home/ubuntu/kafka/plugins
    offset.storage.topic=connect-offsets
    offset.storage.replication.factor=1
    config.storage.topic=connect-configs
    config.storage.replication.factor=1
    status.storage.topic=connect-statuses
    status.storage.replication.factor=1
    auto.create.topics.enable=true
    EOF
    환경변수설명
    KAFKA_BOOTSTRAP_SERVERS🖌Advanced Managed Kafka > Cluster bootstrap server address
  3. Register Kafka Connect as a system service to ensure it runs continuously.

    sudo sh -c 'cat << EOF > /etc/systemd/system/kafka-connect.service
    [Unit]
    Description=Kafka Connect Distributed
    Documentation=http://kafka.apache.org/
    After=network.target kafka.service

    [Service]
    Type=simple
    User=ubuntu
    Environment="KAFKA_HEAP_OPTS=-Xms128M -Xmx512M"
    ExecStart=/home/ubuntu/kafka/bin/connect-distributed.sh /home/ubuntu/kafka/config/connect-distributed.properties
    Restart=always
    RestartSec=10

    [Install]
    WantedBy=multi-user.target
    EOF'

    sudo systemctl daemon-reload
    sudo systemctl start kafka-connect
    sudo systemctl enable kafka-connect
  4. Verify that the kafka-connect service is running. After checking the status, press Ctrl + C to exit.

    sudo systemctl status kafka-connect

Step 4. Configure MySQL database

Create the MySQL database and table for CDC as the data source, and set up initial data. This step establishes the foundation for tracking data changes.

  1. Connect to the MySQL server remotely.

    mysql -h ${MYSQL_ENDPOINT} -u admin -padmin123
    Environment VariableDescription
    MYSQL_ENDPOINT🖌MySQL instance endpoint
  2. Create a database and table for CDC testing.

    CREATE DATABASE `cdc-database`;
    USE `cdc-database`;

    CREATE TABLE `cdc-table` (
    id BIGINT NOT NULL AUTO_INCREMENT,
    name VARCHAR(100),
    email VARCHAR(200),
    status ENUM('active', 'inactive') DEFAULT 'active',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (id)
    );
  3. Exit the MySQL session.

    exit

Step 5. Configure Debezium connector and generate MySQL test data

Configure the Debezium connector to capture changes from MySQL and send them to Kafka. This setup enables real-time capture of data changes.

  1. Create a connector configuration directory.

    sudo mkdir -p /home/ubuntu/kafka/config/connectors
  2. Create the connector configuration file and set the required parameters.

    sudo tee /home/ubuntu/kafka/config/connectors/mysql-connector.json << 'EOF'
    {
    "name": "mysql-connector",
    "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "${MYSQL_ENDPOINT}",
    "database.port": "3306",
    "database.user": "admin",
    "database.password": "admin123",
    "database.server.id": "1",
    "topic.prefix": "mysql-server",
    "database.include.list": "cdc-database",
    "table.include.list": "cdc-database.cdc-table",
    "schema.history.internal.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
    "schema.history.internal.kafka.topic": "schema-changes.mysql",
    "topic.creation.enable": "true",
    "topic.creation.default.replication.factor": "1",
    "topic.creation.default.partitions": "1",
    "tombstones.on.delete": "true"
    }
    }
    EOF
    Environment VariableDescription
    MYSQL_ENDPOINT🖌MySQL instance endpoint
    KAFKA_BOOTSTRAP_SERVERS🖌Kafka cluster bootstrap server address
  3. Create the connector via REST API.

    curl -X POST -H "Content-Type: application/json" \
    --data @/home/ubuntu/kafka/config/connectors/mysql-connector.json \
    http://localhost:8083/connectors
  4. Verify the status of the mysql-connector.

    curl -X GET http://localhost:8083/connectors/mysql-connector/status

    If the connector is operating correctly, it will display "state": "RUNNING".

  5. If the connector is running, reconnect to the MySQL server remotely.

    mysql -h ${MYSQL_ENDPOINT} -u admin -padmin123
    Environment VariableDescription
    MYSQL_ENDPOINT🖌MySQL instance endpoint
  6. Generate test data to verify the Kafka topic and Debezium integration.

    USE `cdc-database`;

    INSERT INTO `cdc-table` (name, email) VALUES
    ('John Doe', 'john.doe@example.com'),
    ('Jane Smith', 'jane.smith@example.com'),
    ('Bob Johnson', 'bob.johnson@example.com'),
    ('Alice Brown', 'alice.brown@example.com'),
    ('Charlie Wilson', 'charlie.wilson@example.com');
  7. Exit the MySQL session.

    exit

Step 6. Integrate Druid and execute MySQL queries

Configure Druid to store and analyze real-time data changes sent through Kafka. This setup enables real-time processing and analysis of the data.

  1. Access Druid via Hadoop Eco > Cluster > tutorial-cdc-dataflow > Druid URL.

    http://${MASTER_NODE_PUBLIC_IP}:3008
    Environment VariableDescription
    MASTER_NODE_PUBLIC_IP🖌Public IP of Hadoop Eco master node
  2. On the main screen, click Load Data > Streaming. Then click Edit Spec in the top-right corner.

  3. In the JSON configuration, paste the Kafka cluster's bootstrap server address into bootstrap.servers, and then click Submit.

    {
    "type": "kafka",
    "dataSchema": {
    "dataSource": "user_changes",
    "timestampSpec": {
    "column": "created_at",
    "format": "iso"
    },
    "dimensionsSpec": {
    "dimensions": [
    "id",
    "name",
    "email",
    "status",
    "operation_type",
    "updated_at",
    {
    "name": "__deleted",
    "type": "boolean"
    }
    ]
    },
    "granularitySpec": {
    "type": "uniform",
    "segmentGranularity": "DAY",
    "queryGranularity": "MINUTE",
    "rollup": false
    }
    },
    "tuningConfig": {
    "type": "kafka",
    "maxRowsPerSegment": 5000000,
    "maxBytesInMemory": 25000000
    },
    "ioConfig": {
    "topic": "mysql-server.cdc-database.cdc-table",
    "consumerProperties": {
    "bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
    "group.id": "druid-user-changes"
    },
    "taskCount": 1,
    "replicas": 1,
    "taskDuration": "PT1H",
    "completionTimeout": "PT20M",
    "inputFormat": {
    "type": "json",
    "flattenSpec": {
    "useFieldDiscovery": false,
    "fields": [
    {
    "type": "jq",
    "name": "id",
    "expr": ".before.id // .after.id"
    },
    {
    "type": "jq",
    "name": "name",
    "expr": ".before.name // .after.name"
    },
    {
    "type": "jq",
    "name": "email",
    "expr": ".before.email // .after.email"
    },
    {
    "type": "jq",
    "name": "status",
    "expr": ".before.status // .after.status"
    },
    {
    "type": "jq",
    "name": "created_at",
    "expr": ".before.created_at // .after.created_at"
    },
    {
    "type": "jq",
    "name": "updated_at",
    "expr": ".before.updated_at // .after.updated_at"
    },
    {
    "type": "jq",
    "name": "operation_type",
    "expr": ".op"
    },
    {
    "type": "jq",
    "name": "__deleted",
    "expr": ".op == \"d\""
    }
    ]
    }
    }
    }
    }
    Environment VariableDescription
    KAFKA_BOOTSTRAP_SERVERS🖌Bootstrap server address of the Kafka cluster.
  4. In the Ingestion tab of the Druid console, verify that Supervisors > Datasource > user_changes > Status shows RUNNING.

  5. Connect to the MySQL server and trigger data changes to ensure Druid can capture them.

    mysql -h ${MYSQL_ENDPOINT} -u admin -padmin123
    Environment VariableDescription
    MYSQL_ENDPOINT🖌MySQL instance endpoint
  6. Execute the following queries to test if Druid is correctly capturing data changes:

    USE `cdc-database`;

    UPDATE `cdc-table` SET status = 'inactive' WHERE id IN (2, 4);
    INSERT INTO `cdc-table` (name, email) VALUES ('David Park', 'david.park@example.com');
    DELETE FROM `cdc-table` WHERE id = 3;
  7. Exit the MySQL session.

    exit
  8. In the Datasources tab of the Druid console, verify the newly created user_changes datasource.

Step 7. Integrate Superset

Configure a Superset dashboard to visualize and monitor the collected data. This enables intuitive analysis and tracking of data changes.

  1. Access Superset via Hadoop Eco Cluster > Cluster information > Superset. Log in using the admin ID and password specified during cluster creation.

    http://${MASTER_NODE_PUBLIC_IP}:4000
    Environment VariableDescription
    MASTER_NODE_PUBLIC_IP🖌Public IP of Hadoop Eco master node
  2. Click the [Datasets] button in the top menu. To import datasets from Druid, click the [+ DATASET] button in the top-right corner.

  3. Configure the database and schema as follows, then click the [CREATE DATASET AND CREATE CHART] button:

    ItemValue
    DATABASEdruid
    SCHEMAdruid
    TABLEuser_changes
  4. Select the desired chart type and click the [CREATE NEW CHART] button.

  5. Enter the data and settings you want to visualize, then click the [CREATE CHART] button. Save the chart by clicking the [SAVE] button in the top-right corner.