FlinkCDC VS CanalCDC VS MaxwellCDC

现在,我们对FlinkCDC、CanalCDC和MaxwellCDC三者进行比较。

分别开启Maxwell、Canal

启动Maxwell、Canal:

1
2
3
4
5
6
7
8
9
# 启动canal
(base) [dw@hadoop116 canal-1.1.4]$ pwd
/opt/module/canal-1.1.4
(base) [dw@hadoop116 canal-1.1.4]$ bin/startup.sh
# 启动maxwell
(base) [dw@hadoop116 maxwell-1.25.0]$ bin/maxwell --config ./config.properties
Using kafka version: 1.0.0
15:06:48,763 INFO BinaryLogClient - Connected to hadoop116:3306 at mysql-bin.000001/6279 (sid:6379, cid:2678)
15:06:48,763 INFO BinlogConnectorLifecycleListener - Binlog connected.

分别启动Kafka Consumer消费对应主题:

1
2
3
4
# 启动canal的consumer
(base) [dw@hadoop116 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic ods_base_db_c
# 启动maxwell的consumer
(base) [dw@hadoop116 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic ods_base_db_m

新建测试表

首先,我们在库表下新建一张表用于测试,建表语句:

1
2
3
4
5
CREATE TABLE cdc_test_user_info (
`id` varchar(255),
`user_name` varchar(255),
`tel` varchar(255)
);

此时,可以看到canal的consumer输出了建表语句,说明canal会对建表语句做监控。

1
2
(base) [dw@hadoop116 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic ods_base_db_c
{"data":null,"database":"gmall2021-flink","es":1645686690000,"id":5,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"/* ApplicationName=DBeaver 21.2.5 - SQLEditor <Script-3.sql> */ CREATE TABLE cdc_test_user_info (\r\n\t`id` varchar(255),\r\n\t`user_name` varchar(255),\r\n\t`tel` varchar(255)\r\n)","sqlType":null,"table":"cdc_test_user_info","ts":1645686690851,"type":"CREATE"}

修改FlinkCDC代码

主要是修改监控的表为先前创建的测试表cdc_test_user_info即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.everweekup;

import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* @author jabari
* @date 2022年02月22日 14:17
*/
public class FlinkCDCWithCustomerDserialization {
public static void main(String[] args) throws Exception {

// 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

// 2.通过FlinkCDC构建SourceFunction并读取数据
// builder是泛型类型
DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
.hostname("hadoop116")
.port(3306)
.username("root")
.password("888888")
// 监控这个数据库下的所有表
.databaseList("gmall2021-flink") // 可以同时读多个数据库
// 不想监控整库,这里可以加入监控表名
.tableList("gmall2021-flink.cdc_test_user_info") // 如果不添加该参数则消费库中指定数据库中所有表数据,如果指定,指定方式为 db.(避免出现监控多个库下出现相同表名))
.deserializer(new CustomerDserialization()) // 反序列化
.startupOptions(StartupOptions.initial()) // 枚举类型 此方式会首先同步读取全量数据
.build();

DataStreamSource<String> streamSource = env.addSource(sourceFunction);

// 3.打印数据
streamSource.print();

// 4.启动任务
env.execute("FlinkCDCWithCustomerDserialization");
}
}

运行代码输出:

1
2
二月 24, 2022 3:19:21 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect
信息: Connected to hadoop116:3306 at mysql-bin.000001/8819 (sid:5948, cid:2694)

由于是新建的表,所以没有数据。

三者对比测试

执行insert语句对比

1
INSERT INTO cdc_test_user_info VALUES(30, '小红', '15511302837'), (31, '小明', '13711153734');

FlinkCDC监控输出结果:

1
2
{"database":"gmall2021-flink","before":{},"opt_type":"insert","after":{"user_name":"小红","tel":"15511302837","id":"30"},"tableName":"cdc_test_user_info"}
{"database":"gmall2021-flink","before":{},"opt_type":"insert","after":{"user_name":"小明","tel":"13711153734","id":"31"},"tableName":"cdc_test_user_info"}

MaxwellCDC监控输出结果:

1
2
{"database":"gmall2021-flink","table":"cdc_test_user_info","type":"insert","ts":1645687439,"xid":1871100,"xoffset":0,"data":{"id":"30","user_name":"小红","tel":"15511302837"}}
{"database":"gmall2021-flink","table":"cdc_test_user_info","type":"insert","ts":1645687439,"xid":1871100,"commit":true,"data":{"id":"31","user_name":"小明","tel":"13711153734"}}

CanalCDC监控输出结果:

1
{"data":[{"id":"30","user_name":"小红","tel":"15511302837"},{"id":"31","user_name":"小明","tel":"13711153734"}],"database":"gmall2021-flink","es":1645687439000,"id":6,"isDdl":false,"mysqlType":{"id":"varchar(255)","user_name":"varchar(255)","tel":"varchar(255)"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":12,"user_name":12,"tel":12},"table":"cdc_test_user_info","ts":1645687439203,"type":"INSERT"}

综上,在MaxwellCDC和FlinkCDC都分别输出了两条json记录,而Canal将两条insert记录放在一个json里输出。可以看到Canal不太方便,后续要一条条处理数据需要将Canal的输出结果进行explode成单条进行处理。

执行update语句对比

1
UPDATE cdc_test_user_info SET `user_name` = '老王' WHERE id IN (30, 31);

FlinkCDC监控输出结果:

1
2
{"database":"gmall2021-flink","before":{"user_name":"小红","tel":"15511302837","id":"30"},"opt_type":"update","after":{"user_name":"老王","tel":"15511302837","id":"30"},"tableName":"cdc_test_user_info"}
{"database":"gmall2021-flink","before":{"user_name":"小明","tel":"13711153734","id":"31"},"opt_type":"update","after":{"user_name":"老王","tel":"13711153734","id":"31"},"tableName":"cdc_test_user_info"}

MaxwellCDC监控输出结果:

1
2
{"database":"gmall2021-flink","table":"cdc_test_user_info","type":"update","ts":1645687907,"xid":1877403,"xoffset":0,"data":{"id":"30","user_name":"老王","tel":"15511302837"},"old":{"user_name":"小红"}}
{"database":"gmall2021-flink","table":"cdc_test_user_info","type":"update","ts":1645687907,"xid":1877403,"commit":true,"data":{"id":"31","user_name":"老王","tel":"13711153734"},"old":{"user_name":"小明"}}

CanalCDC监控输出结果:

1
{"data":[{"id":"30","user_name":"老王","tel":"15511302837"},{"id":"31","user_name":"老王","tel":"13711153734"}],"database":"gmall2021-flink","es":1645687907000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(255)","user_name":"varchar(255)","tel":"varchar(255)"},"old":[{"user_name":"小红"},{"user_name":"小明"}],"pkNames":null,"sql":"","sqlType":{"id":12,"user_name":12,"tel":12},"table":"cdc_test_user_info","ts":1645687907461,"type":"UPDATE"}

综上,对比三者,Canal依旧是将两条update记录放在一个json输出。此外,对于update操作,FlinkCDC将update前的字段所在行的记录整行输出,而Canal和Maxwell只输出了update字段在更新前的原始值记录。

执行delete语句对比

1
DELETE FROM cdc_test_user_info WHERE id IN (30, 31);

FlinkCDC监控输出结果:

1
2
{"database":"gmall2021-flink","before":{"user_name":"老王","tel":"15511302837","id":"30"},"opt_type":"delete","after":{},"tableName":"cdc_test_user_info"}
{"database":"gmall2021-flink","before":{"user_name":"老王","tel":"13711153734","id":"31"},"opt_type":"delete","after":{},"tableName":"cdc_test_user_info"}

MaxwellCDC监控输出结果:

1
2
{"database":"gmall2021-flink","table":"cdc_test_user_info","type":"delete","ts":1645688192,"xid":1881173,"xoffset":0,"data":{"id":"30","user_name":"老王","tel":"15511302837"}}
{"database":"gmall2021-flink","table":"cdc_test_user_info","type":"delete","ts":1645688192,"xid":1881173,"commit":true,"data":{"id":"31","user_name":"老王","tel":"13711153734"}}

CanalCDC监控输出结果:

1
{"data":[{"id":"30","user_name":"老王","tel":"15511302837"},{"id":"31","user_name":"老王","tel":"13711153734"}],"database":"gmall2021-flink","es":1645688192000,"id":8,"isDdl":false,"mysqlType":{"id":"varchar(255)","user_name":"varchar(255)","tel":"varchar(255)"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":12,"user_name":12,"tel":12},"table":"cdc_test_user_info","ts":1645688192785,"type":"DELETE"}

总结

综上,对比三者在C U D操作中,FlinkCDC表现最好,使用起来更灵活和方便。

补充: Maxwell有初始化的功能,可以读取mysql中的历史数据。(但是只能单表做初始化)
初始化脚本在Maxwell根目录bin目录下的maxwell-bootstrap中运行。

FlinkCDC Maxwell Canal
SQl->数据(即执行一条SQL对监控数据的影响) 一对一(一条SQL对应只产生一条记录数据,如果一条SQL里影响多条数据记录,则canal也只会将多条影响的记录放在一条json数据里,这样需要后续对数据进行explode处理)
初始化功能(访问历史数据) 有(多库多表同时做初始化) 有(单表) 无(单独查询)
断点续传 CheckPoint MySQL 本地磁盘
封装格式 自定义反序列化器 JSON JSON(c/s自定义:需要自己写代码定义)
高可用 运行集群高可用即可 配置Canal高可用集群(ZK)

对于canal,要实现初始化功能,其实也可以自己写一个单独的查询任务,canal直接select该表的历史数据,再接着监控即可。