CDC-Maxwell搭建与测试

先下载Maxwell安装包:

链接:https://pan.baidu.com/s/1DzUsi7kCdVn7DqQgPRyjPA
提取码:w1tl

解压 maxwell-1.25.0.tar.gz 到/opt/module 目录:

1
(base) [dw@hadoop116 module]$ tar -zxvf maxwell-1.25.0.tar.gz -C /opt/module/

初始化Maxwell 元数据库

在 MySQL 中建立一个 maxwell 库用于存储 Maxwell 的元数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
(base) [dw@hadoop116 module]$ mysql -uroot -p888888
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

# 在 MySQL 中建立一个 maxwell 库用于存储 Maxwell 的元数据
mysql> CREATE DATABASE maxwell;
Query OK, 1 row affected (0.04 sec)

# 设置安全级别
mysql> set global validate_password_length=4;
Query OK, 0 rows affected (0.00 sec)

mysql> set global validate_password_policy=0;
Query OK, 0 rows affected (0.00 sec)

# 分配一个账号可以操作该数据库
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '888888';
Query OK, 0 rows affected, 1 warning (0.14 sec)

# 分配这个账号可以监控其他数据库的权限
mysql> GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
Query OK, 0 rows affected (0.00 sec)

Maxwell 监控抓取MySQL 数据

拷贝配置文件

1
2
3
4
5
(base) [dw@hadoop116 maxwell-1.25.0]$ cp config.properties.example config.properties
(base) [dw@hadoop116 maxwell-1.25.0]$ ls
bin config.properties kinesis-producer-library.properties.example LICENSE quickstart.md
config.md config.properties.example lib log4j2.xml README.md
(base) [dw@hadoop116 maxwell-1.25.0]$ vim config.properties

修改配置文件

找到配置文件对应内容进行修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 直接发送到kafka
producer=kafka
# 对应的监听端口
kafka.bootstrap.servers=hadoop116:9092,hadoop117:9092,hadoop118:9092
# 对应主题
kafka_topic=ods_base_db_m

# mysql login info
host=hadoop116
user=maxwell
password=888888

# 设置节点id,只要跟mysql master的id不一样就可以了
client_id=maxwell_1

注意:默认还是输出到指定 Kafka 主题的一个 kafka 分区,因为多个分区并行可能会打乱binlog 的顺序
如果要提高并行度,首先设置 kafka 的分区数>1,然后设置 producer_partition_by 属性 可选值 producer_partition_by=database|table|primary_key|random| column (表示按照什么进行分区)

如果我们选的是column,则还需加上一条配置:

producer_partition_columns=id,foo,bar(即在column分区的前提上告知按照哪个字段进行分区)

我们这里按照主键(primary_key)进行分区,因为主键自增能够轮询,避免出现热点数据,负债均衡。

查看mysql主节点id:

启动kafka消费客户端

1
2
3
(base) [dw@hadoop116 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic ods_base_db_m
# 由于没创建主题,第一次启动消费者会报错
[2022-02-23 20:28:51,623] WARN [Consumer clientId=consumer-console-consumer-97946-1, groupId=console-consumer-97946] Error while fetching metadata with correlation id 2 : {ods_base_db_m=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

启动maxwell

1.直接命令行启动

1
2
(base) [dw@hadoop116 maxwell-1.25.0]$ bin/maxwell --config ./config.properties
Using kafka version: 1.0.0

补充: Maxwell有初始化的功能,可以读取mysql中的历史数据。
初始化脚本在Maxwell根目录bin目录下的maxwell-bootstrap中运行。

2.写脚本,使用脚本启动

1
2
3
4
5
6
(base) [dw@hadoop116 maxwell-1.25.0]$ vim /home/dw/bin/maxwell.sh
/opt/module/maxwell-1.25.0/bin/maxwell --config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &
# 授予执行权限
(base) [dw@hadoop116 maxwell-1.25.0]$ sudo chmod +x /home/atguigu/bin/maxwell.sh
# 运行启动程序
(base) [dw@hadoop116 maxwell-1.25.0]$ maxwell.sh

启动后默认从最新的binlog开始消费。

测试

由于maxwell配置的是监控所有表,所以我们随便在mysql中选中一张表修改数据:

此时查看kafka consumer输出:

1
2
3
4
5
6
7
(base) [dw@hadoop116 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop116:9092 --topic ods_base_db_m
[2022-02-23 20:28:51,623] WARN [Consumer clientId=consumer-console-consumer-97946-1, groupId=console-consumer-97946] Error while fetching metadata with correlation id 2 : {ods_base_db_m=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
{"database":"gmall2021-flink","table":"base_trademark","type":"insert","ts":1645619681,"xid":1782753,"commit":true,"data":{"id":12,"tm_name":"everweekup","logo_url":null}}

{"database":"gmall2021-flink","table":"base_trademark","type":"update","ts":1645619835,"xid":1784777,"commit":true,"data":{"id":12,"tm_name":"everweekup","logo_url":"/update"},"old":{"logo_url":null}}

{"database":"gmall2021-flink","table":"base_trademark","type":"delete","ts":1645619860,"xid":1785168,"commit":true,"data":{"id":12,"tm_name":"everweekup","logo_url":"/update"}}

查看日志可以发现,对比FlinkCDC,Maxwell对于更新的数据,只保存修改的那个字段先前的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"database":"gmall2021-flink",
"table":"base_trademark",
"type":"update",
"ts":1645619835,
"xid":1784777,
"commit":true,
"data":{
"id":12,
"tm_name":"everweekup",
"logo_url":"/update"},
"old":{
"logo_url":null
}
}

而FlinkCDC会保存修改字段原先所在行的所有字段数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
{
"database":"gmall2021-flink",
"before":{
"tm_name":"everweekup",
"id":12
},
"opt_type":"update",
"after":{
"tm_name":"everweekup",
"logo_url":"aaa",
"id":12
},
"tableName":"base_trademark"
}

综上,Maxwell对于更新的数据表现没有FlinkCDC那么好。