欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

实时同步sqlserver数据,写入kafka

程序员文章站 2024-03-11 17:32:43
...

序言

本文使用的是debezium连接器来实时同步SQL server数据,再写入到kafka消息队列中。

一、sqlserver开启CDC机制

1、开启 SQL Server Agent 服务,确保SQL server 代理 服务正常运行
2、开启cdc机制

-- 执行下面sql语句

-- 创建表
CREATE TABLE [dbo].[student] (
  [name] char(25) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [sex] char(10) COLLATE Chinese_PRC_CI_AS  NOT NULL,
  [id] int  NOT NULL,
  [age] int  NOT NULL
)
GO

-- 开启cdc
EXEC sys.sp_cdc_enable_db 
GO

EXEC sys.sp_cdc_enable_table
    @source_schema = N'dbo'
  , @source_name = N'student' -- 表名
  , @role_name = N'cdc_admin'--增加的角色
  , @capture_instance = N'student_instance'--实例名 
  , @supports_net_changes = 1
  , @filegroup_name = N'PRIMARY';
GO

-- 查看数据库是否启用cdc
SELECT name,is_cdc_enabled FROM sys.databases WHERE is_cdc_enabled = 1;

-- 查看表是否启用cdc
SELECT name,is_tracked_by_cdc FROM sys.tables WHERE is_tracked_by_cdc = 1;

-- 有结果则说明开启成功

二、配置connector

1、下载 debezium-connector-sqlserver 下载地址

2、配置 $KAFKA_HOME/config/connect-distributed.properties,详见实时同步Oracle数据,写入Kafka

3、解压下载的tar包到 connectors目录中
实时同步sqlserver数据,写入kafka

三、启动connector

cd /usr/software/kafka/kafka_2.12-2.4.0/config/

connect-distributed.sh ./connect-distributed.properties

# 确认是否正确配置connector
curl -s node1:18083/connector-plugins | jq

# 有下面结果说明成功
{
  "class": "io.debezium.connector.sqlserver.SqlServerConnector",
  "type": "source",
  "version": "1.4.1.Final"
},

# 启动connector的两种方法
# 方法1
curl -s -X POST -H "Content-Type: application/json" --data '
{
 "name": "sqlserver-kafka-connector",
 "config": {
     "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
     "tasks.max" : "1",
     "database.server.name" : "server1",
     "database.hostname" : "hostname",
     "database.port" : "1433",
     "database.user" : "username",
     "database.password" : "password",
     "database.dbname" : "CDCTest",
     "database.history.kafka.bootstrap.servers" : "node1:9092,node2:9092,node3:9092",
     "database.history.kafka.topic": "schema-changes.inventory"
   }
}' http://node1:18083/connectors | jq

# 方法2
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://node1:18083/connectors/ -d @register-sqlserver.json

# register-sqlserver.json 内容如下
{
 "name": "sqlserver-kafka-connector",
 "config": {
     "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
     "tasks.max" : "1",
     "database.server.name" : "server1",
     "database.hostname" : "hostname",
     "database.port" : "1433",
     "database.user" : "username",
     "database.password" : "password",
     "database.dbname" : "CDCTest",
     "database.history.kafka.bootstrap.servers" : "node1:9092,node2:9092,node3:9092",
     "database.history.kafka.topic": "schema-changes.inventory"
   }
}
注意:方法2 需要进入到存放 register-sqlserver.json 的目录中

# 启动成功后可以查看到一下内容,说明启动成功
[[email protected] kafka]# curl -s node1:18083/connectors | jq
[
  "sqlserver-kafka-connector",
  "oracle-kafka-connector"
]
[[email protected] kafka]# curl -s node1:18083/connectors/sqlserver-kafka-connector/status | jq
{
  "name": "sqlserver-kafka-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.2.170:18083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "192.168.2.170:18083"
    }
  ],
  "type": "source"
}

四、同步sqlserver数据到kafka

-- 往student表插入数据
insert into student values(1,'jack',22,'male')

-- server1.dbo.student 为我们需要消费的DML数据
[root@node2 ~]# kafka-topics.sh --list --bootstrap-server node1:9092,node2:9092,node3:9092
TEST.KAFKA.STUDENT
TEST.KAFKA._GENERIC_DDL
__consumer_offsets
connect-configs
connect-offsets
connect-status
schema-changes.inventory
server1
server1.dbo.student
server1.dbo.teacher
shcema-changes.inventory

-- 消费server1.dbo.student
kafka-console-consumer.sh --topic server1.dbo.student --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning | jq
{
  "before": null,
  "after": {
    "id": 1,
    "name": "jack                     ",
    "age": 22,
    "sex": "male      "
  },
  "source": {
    "version": "1.4.1.Final",
    "connector": "sqlserver",
    "name": "server1",
    "ts_ms": 1612779384700,
    "snapshot": "false",
    "db": "CDCTest",
    "schema": "dbo",
    "table": "student",
    "change_lsn": "00000030:000002b8:0002",
    "commit_lsn": "00000030:000002b8:0003",
    "event_serial_no": 1
  },
  "op": "c",
  "ts_ms": 1612779385938,
  "transaction": null
}

如有问题,欢迎一起交流讨论。

上一篇:实时同步Oracle数据,写入Kafka