Build CDC pipeline with Kafka
Describes how to set up a CDC (Change Data Capture) environment to track data changes from KakaoCloud MySQL service in real-time.
- Estimated time: 60 minutes
- User environment
- Recommended OS: MacOS, Ubuntu
- Region: kr-central-2
- Prerequisites
About this scenario
This tutorial guides you through building a CDC (Change Data Capture) environment that tracks data changes from KakaoCloud MySQL service in real-time and streams them through Advanced Managed Kafka. This setup allows you to deliver database changes to various analysis and data processing systems in real-time across fields such as e-commerce, financial services, and data warehousing.
Key highlights include:
- Detect data changes (insert, update, delete) from MySQL using Debezium
- Stream detected changes to Kafka for real-time delivery
- Store and analyze received data in Druid
- Visualize change history in real-time using Superset
Scenario architecture
- CDC (Change Data Capture): A technique to detect and track database changes in real-time
- Debezium: An open-source CDC platform that detects changes in databases like MySQL and sends them to Kafka
- Kafka Connect: A data integration framework within Kafka supporting real-time data streaming from various sources
Before you start
1. Set up VPC and subnets
Configure an isolated network environment to ensure secure communication between CDC pipeline components. Create VPC and subnets to protect against external access while enabling seamless internal communication.
-
Navigate to KakaoCloud Console > Beyond Networking Service > VPC.
-
Click the [Create VPC] button to create a new VPC.
VPC: tutorial-cdc-vpc
Category Item Value VPC info VPC name tutorial-cdc-vpc VPC IP CIDR block 10.0.0.0/16 Availability zone Number of availability zones 2 First AZ kr-central-2-a Second AZ kr-central-2-b Subnet setup Public subnets per AZ 1 Private subnets per AZ 0 Public subnet IPv4 CIDR block (kr-central-2-a) 10.0.0.0/20 Public subnet IPv4 CIDR block (kr-central-2-b) 10.0.16.0/20 -
Click the [Create] button at the bottom.
2. Configure security groups
Configure network access policies to ensure secure communication within the CDC pipeline. Allow internal communication between MySQL, Advanced Managed Kafka, and Hadoop Eco components, while restricting external access to specific IPs for management purposes.
-
Navigate to KakaoCloud Console > Beyond Networking Service > VPC > Security Groups.
-
Click [Create security group] to create a new security group.
-
Configure the security group as follows:
Item Value Name tutorial-cdc-sg Description CDC Pipeline security policy -
Configure inbound rules to allow access from your Public IP.
Check my public IPClick the button below to check your current public IP.
Item Value Note Protocol ALL Allow all protocols Source {Your Public IP}/32
Port ALL Policy description (optional) Allow Access from User Public IP -
After creating the security group, click Manage inbound rules and configure the following rule:
Item Value Note Protocol ALL Allow all protocols Source @tutorial-cdc-sg Allow internal communication within the same security group Port ALL Policy description (optional) Internal SG Access
3. Create MySQL instance group
Set up the source database for the CDC pipeline. Create tables to manage user information and configure MySQL to allow Debezium to detect changes (INSERT, UPDATE, DELETE).
-
Navigate to KakaoCloud Console > Data Store > MySQL > Instance Group.
-
Click [Create instance group] and configure as follows:
Item Value Instance group name tutorial-cdc-mysql Instance availability Single MySQL username admin MySQL password admin123 VPC tutorial-cdc-vpc Subnet main Auto backup option Enabled
4. Set up VM for Debezium Connector
Create a VM instance to run the Debezium Connector, which detects MySQL data changes and sends them to Kafka.
Key pair setup
-
Navigate to KakaoCloud Console > Beyond Compute Service > Virtual Machine > 키 페어.
-
Click Create key pair:
- Name: tutorial-cdc-keypair
The generated key pair (.pem file) can only be downloaded once. Store it securely. Lost keys cannot be recovered and require re-issuance.
Create VM instance
-
Navigate to KakaoCloud Console > Beyond Compute Service > Virtual Machine > Instance.
-
Click [Create instance] and configure as follows:
Category Item Value Note Basic info Name tutorial-cdc-vm Image OS Ubuntu 22.04 Instance type t1i.small Volume Root volume 30 GB SSD Key pair tutorial-cdc-keypair Network VPC tutorial-cdc-vpc Security group tutorial-cdc-sg Subnet main
5. Create Advanced Managed Kafka cluster
Configure an Advanced Managed Kafka cluster to stream and deliver data changes captured by CDC to other systems in real-time.
-
Navigate to KakaoCloud Console > Analytics > Advanced Managed Kafka > Cluster.
-
Click the [Create cluster] button and configure it as follows. Keep default settings for unspecified options.
Category Item Value Note Basic settings Cluster name tutorial-cdc-kafka Instance type r2a.2xlarge Network VPC tutorial-cdc-vpc Subnet main, {VPC_ID}
_sn_1 (10.0.16.0/20)Security group tutorial-cdc-sg Broker configuration Number of brokers 2 One per availability zone
6. Create Hadoop Eco cluster
Set up a Hadoop Eco Dataflow-type cluster to store and analyze data changes streamed from Kafka. This environment includes Druid for storage and analysis and Superset for visualization.
-
Navigate to KakaoCloud Console > Analytics > Hadoop Eco > Cluster.
-
Click the [Create cluster] button and configure as follows:
Step 1: Cluster settings
Category | Item | Value |
---|---|---|
Basic info | Cluster name | tutorial-cdc-dataflow |
Cluster configuration | Cluster type | Dataflow |
Admin settings | Admin ID | admin |
Admin password | admin123! | |
VPC settings | VPC | tutorial-cdc-vpc |
Subnet | main | |
Security group | Select an existing security group | |
Security group name | tutorial-cdc-sg |
Step 2: Instance settings
Category | Item | Value |
---|---|---|
Master node settings | Master node instance type | m2a.xlarge |
Worker node settings | Worker node instance type | m2a.xlarge |
Key pair | tutorial-cdc-keypair |
Step 3: Detailed settings
No changes are needed in detailed settings. Click [Create] to launch the Hadoop Eco Dataflow cluster.
Getting started
Step 1. Configure public IP
For CDC pipeline setup, VM instances and Hadoop Eco clusters need to be externally accessible. Assign a public IP to each instance for SSH access and web interface connections.
Assign public IP to VM instance
-
Go to KakaoCloud Console > Beyond Compute Service > Virtual Machine.
-
In the Instance tab, click the
tutorial-cdc-vm
instance name. -
Click [Instance actions] > [Associate public IP].
-
In the public IP association window, click [OK] without making any changes.
Assign public IP to Hadoop Eco Master Node
-
Go to KakaoCloud Console > Beyond Compute Service > Virtual Machine.
-
In the Instance tab, click the
HadoopMST-tutorial-cdc-dataflow-1
instance name. -
Click [Instance actions] > [Associate public IP].
-
In the public IP association window, click [OK] without making any changes.
The assigned public IP can be viewed in the Instance List or on the Network tab of the instance details page.
Step 2. Configure Debezium server environment
Set up the essential software and tools required for the CDC pipeline, including Java, MySQL Client, and Kafka.
-
Move to the directory where the key pair file is stored in your local terminal.
cd ~/Downloads # Or the directory where the key pair file is saved
-
Set permissions for the key pair file.
sudo chmod 400 tutorial-cdc-keypair.pem
-
Connect to the VM via SSH.
ssh -i tutorial-cdc-keypair.pem ubuntu@${VM_PUBLIC_IP}
환경변수 설명 VM_PUBLIC_IP🖌︎ Check Public IP in VM Instance > Network tab -
Install the Java runtime environment and configure environment variables.
sudo apt update
sudo apt install -y openjdk-21-jdk
# Configure Java environment variables
cat << EOF | sudo tee -a /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64
export PATH=\$JAVA_HOME/bin:\$PATH
export CLASSPATH=\$CLASSPATH:\$JAVA_HOME/lib/ext:\$JAVA_HOME/lib/tools.jar
EOF
source /etc/profile -
Install MySQL client tools for database management.
sudo apt install -y mysql-client
-
Install tools to manage the Kafka cluster. Ensure the Kafka tool version matches your cluster version. This tutorial uses version 3.7.1.
infoKafka version downloads can be found at https://archive.apache.org/dist/kafka/.
cd ~
curl https://archive.apache.org/dist/kafka/3.7.1/kafka_2.13-3.7.1.tgz -o kafka_2.13-3.7.1.tgz
tar -xzf kafka_2.13-3.7.1.tgz
rm kafka_2.13-3.7.1.tgz
mv kafka_2.13-3.7.1 kafka
Step 3. Configure Debezium
Debezium is a CDC tool that reads MySQL binary logs to detect data changes. In this step, you will install Debezium and configure it to send MySQL changes to Kafka, enabling real-time capture of all database changes.
-
Download and install the Debezium MySQL connector plugin.
mkdir -p ~/kafka/plugins
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/3.0.2.Final/debezium-connector-mysql-3.0.2.Final-plugin.tar.gz
tar -xzf debezium-connector-mysql-3.0.2.Final-plugin.tar.gz -C ~/kafka/plugins/
rm debezium-connector-mysql-3.0.2.Final-plugin.tar.gz -
Configure Kafka Connect in distributed mode.
cat << EOF > /home/ubuntu/kafka/config/connect-distributed.properties
bootstrap.servers=${KAFKA_BOOTSTRAP_SERVERS}
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
plugin.path=/home/ubuntu/kafka/plugins
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-statuses
status.storage.replication.factor=1
auto.create.topics.enable=true
EOF환경변수 설명 KAFKA_BOOTSTRAP_SERVERS🖌︎ Advanced Managed Kafka > Cluster bootstrap server address -
Register Kafka Connect as a system service to ensure it runs continuously.
sudo sh -c 'cat << EOF > /etc/systemd/system/kafka-connect.service
[Unit]
Description=Kafka Connect Distributed
Documentation=http://kafka.apache.org/
After=network.target kafka.service
[Service]
Type=simple
User=ubuntu
Environment="KAFKA_HEAP_OPTS=-Xms128M -Xmx512M"
ExecStart=/home/ubuntu/kafka/bin/connect-distributed.sh /home/ubuntu/kafka/config/connect-distributed.properties
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
EOF'
sudo systemctl daemon-reload
sudo systemctl start kafka-connect
sudo systemctl enable kafka-connect -
Verify that the
kafka-connect
service is running. After checking the status, press Ctrl + C to exit.sudo systemctl status kafka-connect
Step 4. Configure MySQL database
Create the MySQL database and table for CDC as the data source, and set up initial data. This step establishes the foundation for tracking data changes.
-
Connect to the MySQL server remotely.
mysql -h ${MYSQL_ENDPOINT} -u admin -padmin123
Environment Variable Description MYSQL_ENDPOINT🖌︎ MySQL instance endpoint -
Create a database and table for CDC testing.
CREATE DATABASE `cdc-database`;
USE `cdc-database`;
CREATE TABLE `cdc-table` (
id BIGINT NOT NULL AUTO_INCREMENT,
name VARCHAR(100),
email VARCHAR(200),
status ENUM('active', 'inactive') DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id)
); -
Exit the MySQL session.
exit
Step 5. Configure Debezium connector and generate MySQL test data
Configure the Debezium connector to capture changes from MySQL and send them to Kafka. This setup enables real-time capture of data changes.
-
Create a connector configuration directory.
sudo mkdir -p /home/ubuntu/kafka/config/connectors
-
Create the connector configuration file and set the required parameters.
sudo tee /home/ubuntu/kafka/config/connectors/mysql-connector.json << 'EOF'
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "${MYSQL_ENDPOINT}",
"database.port": "3306",
"database.user": "admin",
"database.password": "admin123",
"database.server.id": "1",
"topic.prefix": "mysql-server",
"database.include.list": "cdc-database",
"table.include.list": "cdc-database.cdc-table",
"schema.history.internal.kafka.bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
"schema.history.internal.kafka.topic": "schema-changes.mysql",
"topic.creation.enable": "true",
"topic.creation.default.replication.factor": "1",
"topic.creation.default.partitions": "1",
"tombstones.on.delete": "true"
}
}
EOFEnvironment Variable Description MYSQL_ENDPOINT🖌︎ MySQL instance endpoint KAFKA_BOOTSTRAP_SERVERS🖌︎ Kafka cluster bootstrap server address -
Create the connector via REST API.
curl -X POST -H "Content-Type: application/json" \
--data @/home/ubuntu/kafka/config/connectors/mysql-connector.json \
http://localhost:8083/connectors -
Verify the status of the
mysql-connector
.curl -X GET http://localhost:8083/connectors/mysql-connector/status
If the connector is operating correctly, it will display
"state": "RUNNING"
. -
If the connector is running, reconnect to the MySQL server remotely.
mysql -h ${MYSQL_ENDPOINT} -u admin -padmin123
Environment Variable Description MYSQL_ENDPOINT🖌︎ MySQL instance endpoint -
Generate test data to verify the Kafka topic and Debezium integration.
USE `cdc-database`;
INSERT INTO `cdc-table` (name, email) VALUES
('John Doe', 'john.doe@example.com'),
('Jane Smith', 'jane.smith@example.com'),
('Bob Johnson', 'bob.johnson@example.com'),
('Alice Brown', 'alice.brown@example.com'),
('Charlie Wilson', 'charlie.wilson@example.com'); -
Exit the MySQL session.
exit
Step 6. Integrate Druid and execute MySQL queries
Configure Druid to store and analyze real-time data changes sent through Kafka. This setup enables real-time processing and analysis of the data.
-
Access Druid via Hadoop Eco > Cluster > tutorial-cdc-dataflow > Druid URL.
http://${MASTER_NODE_PUBLIC_IP}:3008
Environment Variable Description MASTER_NODE_PUBLIC_IP🖌︎ Public IP of Hadoop Eco master node -
On the main screen, click Load Data > Streaming. Then click Edit Spec in the top-right corner.
-
In the
JSON
configuration, paste the Kafka cluster's bootstrap server address intobootstrap.servers
, and then click Submit.{
"type": "kafka",
"dataSchema": {
"dataSource": "user_changes",
"timestampSpec": {
"column": "created_at",
"format": "iso"
},
"dimensionsSpec": {
"dimensions": [
"id",
"name",
"email",
"status",
"operation_type",
"updated_at",
{
"name": "__deleted",
"type": "boolean"
}
]
},
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "MINUTE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000,
"maxBytesInMemory": 25000000
},
"ioConfig": {
"topic": "mysql-server.cdc-database.cdc-table",
"consumerProperties": {
"bootstrap.servers": "${KAFKA_BOOTSTRAP_SERVERS}",
"group.id": "druid-user-changes"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H",
"completionTimeout": "PT20M",
"inputFormat": {
"type": "json",
"flattenSpec": {
"useFieldDiscovery": false,
"fields": [
{
"type": "jq",
"name": "id",
"expr": ".before.id // .after.id"
},
{
"type": "jq",
"name": "name",
"expr": ".before.name // .after.name"
},
{
"type": "jq",
"name": "email",
"expr": ".before.email // .after.email"
},
{
"type": "jq",
"name": "status",
"expr": ".before.status // .after.status"
},
{
"type": "jq",
"name": "created_at",
"expr": ".before.created_at // .after.created_at"
},
{
"type": "jq",
"name": "updated_at",
"expr": ".before.updated_at // .after.updated_at"
},
{
"type": "jq",
"name": "operation_type",
"expr": ".op"
},
{
"type": "jq",
"name": "__deleted",
"expr": ".op == \"d\""
}
]
}
}
}
}Environment Variable Description KAFKA_BOOTSTRAP_SERVERS🖌︎ Bootstrap server address of the Kafka cluster. -
In the Ingestion tab of the Druid console, verify that Supervisors > Datasource > user_changes > Status shows RUNNING.
-
Connect to the MySQL server and trigger data changes to ensure Druid can capture them.
mysql -h ${MYSQL_ENDPOINT} -u admin -padmin123
Environment Variable Description MYSQL_ENDPOINT🖌︎ MySQL instance endpoint -
Execute the following queries to test if Druid is correctly capturing data changes:
USE `cdc-database`;
UPDATE `cdc-table` SET status = 'inactive' WHERE id IN (2, 4);
INSERT INTO `cdc-table` (name, email) VALUES ('David Park', 'david.park@example.com');
DELETE FROM `cdc-table` WHERE id = 3; -
Exit the MySQL session.
exit
-
In the Datasources tab of the Druid console, verify the newly created
user_changes
datasource.
Step 7. Integrate Superset
Configure a Superset dashboard to visualize and monitor the collected data. This enables intuitive analysis and tracking of data changes.
-
Access Superset via Hadoop Eco Cluster > Cluster information > Superset. Log in using the admin ID and password specified during cluster creation.
http://${MASTER_NODE_PUBLIC_IP}:4000
Environment Variable Description MASTER_NODE_PUBLIC_IP🖌︎ Public IP of Hadoop Eco master node -
Click the [Datasets] button in the top menu. To import datasets from Druid, click the [+ DATASET] button in the top-right corner.
-
Configure the database and schema as follows, then click the [CREATE DATASET AND CREATE CHART] button:
Item Value DATABASE druid SCHEMA druid TABLE user_changes -
Select the desired chart type and click the [CREATE NEW CHART] button.
-
Enter the data and settings you want to visualize, then click the [CREATE CHART] button. Save the chart by clicking the [SAVE] button in the top-right corner.