FlinkCDC、CanalCDC、MaxwellCDC三者的对比
FlinkCDC VS CanalCDC VS MaxwellCDC
现在,我们对FlinkCDC、CanalCDC和MaxwellCDC三者进行比较。
分别开启Maxwell、Canal
启动Maxwell、Canal:
1 | # 启动canal |
分别启动Kafka Consumer消费对应主题:
1 | # 启动canal的consumer |
新建测试表
首先,我们在库表下新建一张表用于测试,建表语句:
1 | CREATE TABLE cdc_test_user_info ( |
此时,可以看到canal的consumer输出了建表语句,说明canal会对建表语句做监控。
1 | (base) [dw@hadoop116 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic ods_base_db_c |
修改FlinkCDC代码
主要是修改监控的表为先前创建的测试表cdc_test_user_info
即可。
1 | package com.everweekup; |
运行代码输出:
1 | 二月 24, 2022 3:19:21 下午 com.github.shyiko.mysql.binlog.BinaryLogClient connect |
由于是新建的表,所以没有数据。
三者对比测试
执行insert语句对比
1 | INSERT INTO cdc_test_user_info VALUES(30, '小红', '15511302837'), (31, '小明', '13711153734'); |
FlinkCDC监控输出结果:
1 | {"database":"gmall2021-flink","before":{},"opt_type":"insert","after":{"user_name":"小红","tel":"15511302837","id":"30"},"tableName":"cdc_test_user_info"} |
MaxwellCDC监控输出结果:
1 | {"database":"gmall2021-flink","table":"cdc_test_user_info","type":"insert","ts":1645687439,"xid":1871100,"xoffset":0,"data":{"id":"30","user_name":"小红","tel":"15511302837"}} |
CanalCDC监控输出结果:
1 | {"data":[{"id":"30","user_name":"小红","tel":"15511302837"},{"id":"31","user_name":"小明","tel":"13711153734"}],"database":"gmall2021-flink","es":1645687439000,"id":6,"isDdl":false,"mysqlType":{"id":"varchar(255)","user_name":"varchar(255)","tel":"varchar(255)"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":12,"user_name":12,"tel":12},"table":"cdc_test_user_info","ts":1645687439203,"type":"INSERT"} |
综上,在MaxwellCDC和FlinkCDC都分别输出了两条json记录,而Canal将两条insert记录放在一个json里输出。可以看到Canal不太方便,后续要一条条处理数据需要将Canal的输出结果进行explode成单条进行处理。
执行update语句对比
1 | UPDATE cdc_test_user_info SET `user_name` = '老王' WHERE id IN (30, 31); |
FlinkCDC监控输出结果:
1 | {"database":"gmall2021-flink","before":{"user_name":"小红","tel":"15511302837","id":"30"},"opt_type":"update","after":{"user_name":"老王","tel":"15511302837","id":"30"},"tableName":"cdc_test_user_info"} |
MaxwellCDC监控输出结果:
1 | {"database":"gmall2021-flink","table":"cdc_test_user_info","type":"update","ts":1645687907,"xid":1877403,"xoffset":0,"data":{"id":"30","user_name":"老王","tel":"15511302837"},"old":{"user_name":"小红"}} |
CanalCDC监控输出结果:
1 | {"data":[{"id":"30","user_name":"老王","tel":"15511302837"},{"id":"31","user_name":"老王","tel":"13711153734"}],"database":"gmall2021-flink","es":1645687907000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(255)","user_name":"varchar(255)","tel":"varchar(255)"},"old":[{"user_name":"小红"},{"user_name":"小明"}],"pkNames":null,"sql":"","sqlType":{"id":12,"user_name":12,"tel":12},"table":"cdc_test_user_info","ts":1645687907461,"type":"UPDATE"} |
综上,对比三者,Canal依旧是将两条update记录放在一个json输出。此外,对于update操作,FlinkCDC将update前的字段所在行的记录整行输出,而Canal和Maxwell只输出了update字段在更新前的原始值记录。
执行delete语句对比
1 | DELETE FROM cdc_test_user_info WHERE id IN (30, 31); |
FlinkCDC监控输出结果:
1 | {"database":"gmall2021-flink","before":{"user_name":"老王","tel":"15511302837","id":"30"},"opt_type":"delete","after":{},"tableName":"cdc_test_user_info"} |
MaxwellCDC监控输出结果:
1 | {"database":"gmall2021-flink","table":"cdc_test_user_info","type":"delete","ts":1645688192,"xid":1881173,"xoffset":0,"data":{"id":"30","user_name":"老王","tel":"15511302837"}} |
CanalCDC监控输出结果:
1 | {"data":[{"id":"30","user_name":"老王","tel":"15511302837"},{"id":"31","user_name":"老王","tel":"13711153734"}],"database":"gmall2021-flink","es":1645688192000,"id":8,"isDdl":false,"mysqlType":{"id":"varchar(255)","user_name":"varchar(255)","tel":"varchar(255)"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":12,"user_name":12,"tel":12},"table":"cdc_test_user_info","ts":1645688192785,"type":"DELETE"} |
总结
综上,对比三者在C U D操作中,FlinkCDC表现最好,使用起来更灵活和方便。
补充: Maxwell有初始化的功能,可以读取mysql中的历史数据。(但是只能单表做初始化)
初始化脚本在Maxwell根目录bin目录下的maxwell-bootstrap中运行。
FlinkCDC | Maxwell | Canal | |
---|---|---|---|
SQl->数据(即执行一条SQL对监控数据的影响) | 无 | 无 | 一对一(一条SQL对应只产生一条记录数据,如果一条SQL里影响多条数据记录,则canal也只会将多条影响的记录放在一条json数据里,这样需要后续对数据进行explode处理) |
初始化功能(访问历史数据) | 有(多库多表同时做初始化) | 有(单表) | 无(单独查询) |
断点续传 | CheckPoint | MySQL | 本地磁盘 |
封装格式 | 自定义反序列化器 | JSON | JSON(c/s自定义:需要自己写代码定义) |
高可用 | 运行集群高可用即可 | 无 | 配置Canal高可用集群(ZK) |
对于canal,要实现初始化功能,其实也可以自己写一个单独的查询任务,canal直接select该表的历史数据,再接着监控即可。