CDC-Canal 的搭建与测试

Canal 单机版测试

下载好canal解压放到指定目录:

1
2
3
4
5
6
(base) [dw@hadoop116 module]$ pwd
/opt/module
(base) [dw@hadoop116 module]$ ls
canal.deployer-1.1.4.tar.gz
(base) [dw@hadoop116 module]$ mkdir canal-1.1.4
(base) [dw@hadoop116 module]$ tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/module/canal-1.1.4/

MySql创建canal账户并赋权

1
2
3
4
5
6
7
8
9
10
(base) [dw@hadoop116 example]$ mysql -uroot -p888888
mysql: [Warning] Using a password on the command line interface can be insecure.
mysql> set global validate_password_length=4;
Query OK, 0 rows affected (0.00 sec)

mysql> set global validate_password_policy=0;
Query OK, 0 rows affected (0.00 sec)

mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY '888888';
Query OK, 0 rows affected, 1 warning (0.01 sec)

修改配置文件

1
2
3
4
5
6
(base) [dw@hadoop116 canal-1.1.4]$ pwd
/opt/module/canal-1.1.4
(base) [dw@hadoop116 canal-1.1.4]$ ls
bin conf lib logs
# 这个文件是 canal 的基本通用配置,canal 端口号默认就是 11111
(base) [dw@hadoop116 canal-1.1.4]$ vim conf/canal.properties

修改配置文件canal.properties内容:

1
2
3
4
5
6
7
8
# tcp, kafka, RocketMQ
# 修改 canal 的输出 model,默认 tcp,改为输出到 kafka
# tcp 就是输出到 canal 客户端,通过编写 Java 代码处理
canal.serverMode = kafka
# 修改 Kafka 集群的地址
canal.mq.servers = hadoop116:9092,hadoop117:9092,hadoop118:9092
# 如果创建多个实例,通过前面 canal 架构,我们可以知道,一个 canal 服务中可以有多个 instance,conf/下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改 canal.properties 中的canal.destinations=实例 1,实例 2,实例 3。
canal.destinations = example

修改配置文件instance.properties内容,我们这里只读取一个 MySQL 数据,所以只有一个实例,这个实例的配置文件在conf/example 目录下 :

1
2
3
4
5
(base) [dw@hadoop116 example]$ pwd
/opt/module/canal-1.1.4/conf/example
(base) [dw@hadoop116 example]$ ls
instance.properties
(base) [dw@hadoop116 example]$ vim instance.properties

修改以下内容:

1
2
3
4
5
6
7
8
# 配置 MySQL 服务器地址
canal.instance.master.address=hadoop116:3306
# 配置连接 MySQL 的用户名和密码,默认就是我们前面授权的 canal
canal.instance.dbUsername=canal
canal.instance.dbPassword=888888
# 修改输出到 Kafka 的主题以及分区数
canal.mq.topic=ods_base_db_c
canal.mq.partitionsNum=1

注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱binlog 的顺序。如果要提高并行度,首先设置 kafka 的分区数>1,然后设置 canal.mq.partitionHash 属性

单机Canal测试

canal也可以进行断点续传,断点续传的元数据信息存储在/opt/module/canal-1.1.4/conf/example下,一般命名为meta.dat,依赖于本地磁盘,里面存的是读取到的binlog位置信息。

Maxwell的断点续传功能存储的元数据信息放在MySql里;

Canal的断点续传功能存储的元数据信息放在根目录的/opt/module/canal-1.1.4/conf/example/meta.dat里;

FlinkCDC的断点续传功能存储的元数据信息放在Checkpoint;

开启Kafka消费者

1
(base) [dw@hadoop116 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic ods_base_db_c

启动canal

1
2
3
4
5
6
7
8
(base) [dw@hadoop116 canal-1.1.4]$ bin/startup.sh
# 检验
(base) [dw@hadoop116 canal-1.1.4]$ jps
122548 Kafka
117030 ConsoleConsumer
117750 CanalLauncher <-- 启动成功
116285 QuorumPeerMain
117900 Jps

增删改测试

c u d数据进行测试:

kafka consumer输出:

1
2
3
4
5
6
(base) [dw@hadoop116 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic ods_base_db_c


{"data":[{"id":"12","tm_name":"everweekup","logo_url":null}],"database":"gmall2021-flink","es":1645684967000,"id":2,"isDdl":false,"mysqlType":{"id":"bigint(20)","tm_name":"varchar(100)","logo_url":"varchar(200)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"tm_name":12,"logo_url":12},"table":"base_trademark","ts":1645684968109,"type":"INSERT"}
{"data":[{"id":"12","tm_name":"everweekup","logo_url":"/update"}],"database":"gmall2021-flink","es":1645684978000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint(20)","tm_name":"varchar(100)","logo_url":"varchar(200)"},"old":[{"logo_url":null}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"tm_name":12,"logo_url":12},"table":"base_trademark","ts":1645684978347,"type":"UPDATE"}
{"data":[{"id":"12","tm_name":"everweekup","logo_url":"/update"}],"database":"gmall2021-flink","es":1645684981000,"id":4,"isDdl":false,"mysqlType":{"id":"bigint(20)","tm_name":"varchar(100)","logo_url":"varchar(200)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"tm_name":12,"logo_url":12},"table":"base_trademark","ts":1645684981721,"type":"DELETE"}

可以看到,Canal的输出和Maxwell相似。