CDC-Maxwell搭建与测试
先下载Maxwell安装包:
链接:https://pan.baidu.com/s/1DzUsi7kCdVn7DqQgPRyjPA
提取码:w1tl
解压 maxwell-1.25.0.tar.gz 到/opt/module 目录:
1
| (base) [dw@hadoop116 module]$ tar -zxvf maxwell-1.25.0.tar.gz -C /opt/module/
|
初始化Maxwell 元数据库
在 MySQL 中建立一个 maxwell 库用于存储 Maxwell 的元数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| (base) [dw@hadoop116 module]$ mysql -uroot -p888888 Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> CREATE DATABASE maxwell; Query OK, 1 row affected (0.04 sec)
mysql> set global validate_password_length=4; Query OK, 0 rows affected (0.00 sec)
mysql> set global validate_password_policy=0; Query OK, 0 rows affected (0.00 sec)
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '888888'; Query OK, 0 rows affected, 1 warning (0.14 sec)
mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%'; Query OK, 0 rows affected (0.00 sec)
|
Maxwell 监控抓取MySQL 数据
拷贝配置文件
1 2 3 4 5
| (base) [dw@hadoop116 maxwell-1.25.0]$ cp config.properties.example config.properties (base) [dw@hadoop116 maxwell-1.25.0]$ ls bin config.properties kinesis-producer-library.properties.example LICENSE quickstart.md config.md config.properties.example lib log4j2.xml README.md (base) [dw@hadoop116 maxwell-1.25.0]$ vim config.properties
|
修改配置文件
找到配置文件对应内容进行修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| # 直接发送到kafka producer=kafka # 对应的监听端口 kafka.bootstrap.servers=hadoop116:9092,hadoop117:9092,hadoop118:9092 # 对应主题 kafka_topic=ods_base_db_m
# mysql login info host=hadoop116 user=maxwell password=888888
# 设置节点id,只要跟mysql master的id不一样就可以了 client_id=maxwell_1
|
注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱binlog 的顺序
如果要提高并行度,首先设置 kafka 的分区数>1,然后设置 producer_partition_by 属性 可选值 producer_partition_by=database|table|primary_key|random| column (表示按照什么进行分区)
如果我们选的是column,则还需加上一条配置:
producer_partition_columns=id,foo,bar(即在column分区的前提上告知按照哪个字段进行分区)
我们这里按照主键(primary_key)进行分区,因为主键自增能够轮询,避免出现热点数据,负债均衡。
查看mysql主节点id:

启动kafka消费客户端
1 2 3
| (base) [dw@hadoop116 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic ods_base_db_m
[2022-02-23 20:28:51,623] WARN [Consumer clientId=consumer-console-consumer-97946-1, groupId=console-consumer-97946] Error while fetching metadata with correlation id 2 : {ods_base_db_m=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
|
启动maxwell
1.直接命令行启动
1 2
| (base) [dw@hadoop116 maxwell-1.25.0]$ bin/maxwell --config ./config.properties Using kafka version: 1.0.0
|
补充: Maxwell有初始化的功能,可以读取mysql中的历史数据。
初始化脚本在Maxwell根目录bin目录下的maxwell-bootstrap中运行。
2.写脚本,使用脚本启动
1 2 3 4 5 6
| (base) [dw@hadoop116 maxwell-1.25.0]$ vim /home/dw/bin/maxwell.sh /opt/module/maxwell-1.25.0/bin/maxwell --config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &
(base) [dw@hadoop116 maxwell-1.25.0]$ sudo chmod +x /home/atguigu/bin/maxwell.sh
(base) [dw@hadoop116 maxwell-1.25.0]$ maxwell.sh
|
启动后默认从最新的binlog开始消费。
测试
由于maxwell配置的是监控所有表,所以我们随便在mysql中选中一张表修改数据:

此时查看kafka consumer输出:
1 2 3 4 5 6 7
| (base) [dw@hadoop116 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic ods_base_db_m [2022-02-23 20:28:51,623] WARN [Consumer clientId=consumer-console-consumer-97946-1, groupId=console-consumer-97946] Error while fetching metadata with correlation id 2 : {ods_base_db_m=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) {"database":"gmall2021-flink","table":"base_trademark","type":"insert","ts":1645619681,"xid":1782753,"commit":true,"data":{"id":12,"tm_name":"everweekup","logo_url":null}}
{"database":"gmall2021-flink","table":"base_trademark","type":"update","ts":1645619835,"xid":1784777,"commit":true,"data":{"id":12,"tm_name":"everweekup","logo_url":"/update"},"old":{"logo_url":null}}
{"database":"gmall2021-flink","table":"base_trademark","type":"delete","ts":1645619860,"xid":1785168,"commit":true,"data":{"id":12,"tm_name":"everweekup","logo_url":"/update"}}
|
查看日志可以发现,对比FlinkCDC,Maxwell对于更新的数据,只保存修改的那个字段先前的数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| { "database":"gmall2021-flink", "table":"base_trademark", "type":"update", "ts":1645619835, "xid":1784777, "commit":true, "data":{ "id":12, "tm_name":"everweekup", "logo_url":"/update"}, "old":{ "logo_url":null } }
|
而FlinkCDC会保存修改字段原先所在行的所有字段数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| { "database":"gmall2021-flink", "before":{ "tm_name":"everweekup", "id":12 }, "opt_type":"update", "after":{ "tm_name":"everweekup", "logo_url":"aaa", "id":12 }, "tableName":"base_trademark" }
|
综上,Maxwell对于更新的数据表现没有FlinkCDC那么好。