SQL Server를 Kafka로 복제하는 방법 알아보기
SQL Server의 데이터를 Kafka Debezium Source Connector와 JDBC Sink Connector를 이용해 복제하는 방법에 대해 알아본 내용을 정리해본다. 아쉽게도 실제 고객 서비스에 적용해보면서 겪어보진 못 하였다..
다시 정리하는 과정에서 오타나 빠뜨린 부분이 있었을 수 있다.
Schema Registry의 적용 방법이나, 스키마 변경 시의 운영 방법, Connector/Transform의 커스터마이징 방법 등도 적어보고 싶은데, 이에 대해서는 추가적인 분석을 통해 별도로 자세히 정리하도록 한다.
Kafka Connect 설치
주키퍼 & 카프카는 설치되어 있다고 본다. 주석 표시한 부분들은 임의로 교체해 주도록 한다.
# prerequisite 설치
sudo apt update && sudo apt upgrade
sudo apt install openjdk-13-jre unzip
# kafka 다운로드
mkdir -p /var/opt/kafka
cd /var/opt/kafka
wget https://dlcdn.aapche.org/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar xvzf kafka_2.13-2.8.0.tgz --strip-components=1
# kafka connect 설정 편집
cat > config/connect.properties
bootstrap.servers=<kafka host>:9092 # 실제 카프카 주소
group.id=connect-cluster-id # 임의의 카프카 커넥트 클러스터 id 설정
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
config.storage.topic=connect-configs
config.storage.replication.factor=1 # 적절하게 조절
status.storage.topic=connect-status
status.storage.replication.factor=1 # 적절하게 조절
offset.storage.topic=connect-offset
offset.storage.replication.factor=1 # 적절하게 조절
offset.flush.interval.ms=10000
plugin.path=/var/opt/kafka/connect/plugins # 플러그인 주소를 적절하게 설정
rest.port=8083
# ^Z
# kafka connect 실행
bin/connect-distributed.sh config/connect.properties
# debezium, jdbc connector plugin 다운로드
curl https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/1.7.0.Final/debezium-conenctor-sqlserver-1.7.0.Final-plugin.tar.gz -o /tmp/debezium-connector.tgz
curl https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.2.4/confluentinc-kafka-connect-jdbc-10.2.4.zip -o /tmp/jdbc-connector.zip
mkdir -p /var/opt/kafka/connect/plugins
tar xvzf /tmp/debezium-connector.tgz -C /var/opt/kafka/connect/plugins
unzip /tmp/jdbc-connector.zip -d /var/opt/kafka/connect/plugins
설치 후, 로그는 /var/opt/kafka/logs
디렉토리에 남는다.
SQL Server에 CDC 준비
별로 그럴듯 하지 않은 예시이지만, 다음과 같이 게임 아이템 정보를 저장하고 있는 두 테이블이 있고, 이 중 아이템 소유자 이력과 관련된 컬럼들을 복제해서 아이템 소유자의 변경 이력을 따로 보려고 한다고 가정하자.
복제를 하기 위해, 원본 DB(혹은 이하 source db)와 대상 DB(혹은 이하 sink db)를 미리 셋팅해 두어야 한다. sink connector에 auto create 옵션이 있긴 하나, 데이터타입이 정확하게 동기화되는 것이 아니므로, 미리 sink db를 만들어 놓아 주는 것이 좋다.
-
ItemDb (source db)
- dbo.Items
- ItemId int PK
- ItemName nvarchar(64) not null
- ItemTypeCode int not null
- ItemEnchantLevel tinyint not null
- ItemBlessCode tinyint not null
- ItemOwnerCharacterId int null
- Modified datetimeoffset(3) null
- Registered datetimeoffset(3) not null
- dbo.ItemHistory
- ItemId int PK
- Registered datetimeoffset(3) not null PK
- ItemEnchantLevel tinyint null
- ItemBlessCode tinyint null
- ItemOwnerCharacterId int not null
- dbo.Items
-
ItemOwnerDb (sink db)
- dbo.Items
- ItemId int PK
- ItemName nvarchar(64) not null
- ItemTypeCode int not null
- ItemOwnerCharacterId int null
- Modified datetimeoffset(3) null
- Registered datetimeoffset(3) not null
- dbo.ItemHistory
- ItemId int PK
- Registered datetimeoffset(3) not null PK
- ItemOwnerCharacterId int not null
- dbo.Items
먼저 CDC를 구성한다.
use ItemDb
EXEC sys.sp_cdc_enable_db
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'Items',
@role_name = NULL,
@supports_net_changes = 1
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N'dbo',
@source_name = N'ItemHistory',
@role_name = NULL,
@supports_net_changes = 1
GO
만일 CDC를 제거하거나 초기화 하고 싶다면, 아래와 같이 할 수 있다.
use ItemDb
EXEC sys.sp_cdc_disable_table
@source_schema = N'dbo',
@source_name = N'Items',
@capture_instance = 'all' -- CDC 테이블 이름에서 _CT postfix를 제거하면 capture instance 이다. 스키마 업데이트 등으로 인해 CDC를 업데이트하는 것이 아니면, 그냥 all로 지정해주면 된다.
GO
EXEC sys.sp_cdc_diable_table
@source_schema = N'dbo',
@source_name = N'ItemHistory',
@capture_instance = 'all'
GO
EXEC sys.sp_cdc_disable_db
GO
Connector 등록
설정 준비
source db의 connector는 데이터베이스 당 한 개, sink db의 connector는 테이블 당 한 개 씩, 총 3개의 설정파일이 필요하다.
cat > itemdb.json
{
"name": "itemdb",
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max": 1, # 태스크는 1개 밖에 지정하지 못 한다..
"database.hostname": "<SQL Server IP>",
"database.port": "1433",
"database.user": "sa",
"database.password": "<password>",
"database.dbname": "ItemDb",
"table.include.list": "dbo.Items,dbo.ItemHistory",
"database.server.name": "itemdb",
"database.history.kafka.bootstrap.servers": "<kafka host>:9092",
"database.history.kafka.topic": "schema-changes.itemdb.history",
"transforms": "unwrap", # after 레코드들을 unwrap/flatten
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": false,
"transforms.unwrap.delete.handling.mode": "rewrite",
"transforms.unwrap.add.fields": "source.name",
"transforms.unwrap.add.fields.prefix": ""
}
}
cat > itemownerdb-items.json
{
"name": "itemownerdb-items",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"connection.url": "jdbc:sqlserver://<SQL Server IP>;Trusted_connection=yes;databaseName=ItemOnwerDb",
"connection.user": "sa",
"connection.password": "<password>",
"insert.mode": "upsert",
"db.timezone": "UTC",
"table.name.format": "Items",
"auto.create": false,
"auto.evolve": false,
"delete.enabled": true,
"pk.fields": "",
"pk.mode": "record_key",
"fields.whitelist": "ItemId,ItemName,ItemTypeCode,ItemOwnerCharacterId,Modified,Registered",
"topics": "itemdb.dbo.items", # 중간에 transform 이상으로 복잡한 데이터를 가공하는 처리를 끼워넣고 싶다면, topic 이름을 조정해주면 된다.
# 예를 들어, Kafka Streams 애플리케이션이 itemdb.dbo.items를 consume하여 가공한 뒤
# itemdb.dbo.items.sink를 produce하도록 작성하고,
# 이 부분에 itemdb.dbo.items 대신 itemdb.dbo.items.sink를 기재해 주면 된다.
"max.retries": 3
}
}
cat > itemownerdb-itemhistory.json
{
"name": "itemownerdb-items",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"connection.url": "jdbc:sqlserver://<SQL Server IP>;Trusted_connection=yes;databaseName=ItemOnwerDb",
"connection.user": "sa",
"connection.password": "<password>",
"insert.mode": "upsert",
"db.timezone": "UTC",
"table.name.format": "Items",
"auto.create": false,
"auto.evolve": false,
"delete.enabled": true,
"pk.fields": "",
"pk.mode": "record_key",
"fields.whitelist": "ItemId,Registered,ItemOwnerCharacterId",
"topics": "itemdb.dbo.itemhistory",
"max.retries": 3
}
}
Connector 등록
작성한 설정을 가지고, Connector를 등록해준다. 등록은, Kafka Connect Cluster에 REST API를 호출하여 등록할 수 있다.
curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" -d @itemdb.json http://localhost:8083/connectors
curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" -d @itemownerdb-items.json http://localhost:8083/connectors
curl -X POST -H "Accept: application/json" -H "Content-Type: application/json" -d @itemownerdb-itemhistory.json http://localhost:8083/connectors
잘 등록되었는지 다음과 같이 확인해 볼 수 있다.
curl -X GET http://localhost:8083/connectors
등록된 Connector를 삭제하려면 다음과 같이 호출해주면 된다.
curl -X DELETE http://localhost:8083/connectors/itemdb
curl -X DELETE http://localhost:8083/connectors/itemownerdb-items
curl -X DELETE http://localhost:8083/connectors/itemownerdb-itemhistory
이제 ItemDb에 데이터가 등록/변경/삭제되면, ItemOwnerDb에도 데이터가 복제되어 반영됨을 볼 수 있을 것이다.
참고 자료
작성하면서 찾아본 모든 글들 모음이다.
여기에는 적지 않았으나 원래 하고자 했던 목적이, SQL Server -> Kafka -> SQL Server로 거쳐가는 스트리밍 데이터를 중간에서 가공하는 처리가 필요했었기 때문에, Kafka Connector나 Transforms를 직접 만드는 방법에 대한 자료가 다수 포함되어 있다.
- SQL Server CDC
- Kafka
- Kafka Connect
- About Kafka Connect 이론
- Confluent - Kafka Connect Concepts
- Confluent - How to use Kafka Connect - Getting Started - Installing Connect Plugins
- Confluent - Using Kafka Connect with Schema Registry
- Confluent Blog - Kafka Connect Deep Dive - Converters and Serialization Explained
- Confluent Blog - Kafka Connect Deep Dive - Error Handling and Dead Letter Queues
- 꿩 이야기 - Kafka Connect 란?
- 꿩 이야기 - Kafka Connect 설정
- 토리맘의 한글라이즈 프로젝트 - Apache Kafka - Kafka Connect
- About Connectors 이론
- About Transforms & Predecates 이론
- Kafka Client & Connect API Library (java) - Connector 혹은 Transformation 작성 참고 샘플
- Github - confluent/kafka-connect-insert-uuid 예제 코드
- Github - lensesio/kafka-connect-kcql-smt 예제 코드(scala)
- Github - lensesio/kafka-connect-transformers 예제 코드(scala)
- Github - jcustenborder/kafka-connect-transform-common 예제 코드(java)
- Github - avien/transforms-for-apache-kafka-connect 예제 코드(java)
- kafka-clients 2.8.1 apidoc
- connect-api 2.8.1 apidoc
- Stackoverflow - Write a custom kafka connect single message transform
- About Kafka Connect 이론
- Debezium Connector