canal

​ canal是阿里开源出的mysql binlog监听框架,说实话,坑挺多,而且github上维护的很不频繁,感觉又是无人维护的状态了,所以使用要慎重


canal搭建

前期准备

1.首先要开启mysql的binlog日志(阿里云上默认开启),可以使用以下命令查看binlog开启状态

1
2
show variables like 'log_bin';
show variables like 'binlog_format'

如果为mac环境,则需要在/etc新建my.conf(mac安装就自带了mysql),并在配置文件中写入如下

1
2
3
4
5
6
7
[mysqld]
# log_bin
# 开启binlog
log-bin = mysql-bin
# 选择row模式
binlog-format = ROW
server_id = 1

非mac环境,在mysql对应安装目录下conf文件增加上述配置即可。
要注意选择row模式,否者会拿不到变更sql的很多信息

2.获取canal部署包,可以在git上直接下载对应版本的canal.deployer,地址如下https://github.com/alibaba/canal/releases。

3.使用canal前,需要先创建一个mysql的对应账号,并赋予一定权限,让canal可以通过该账号拉取的binlog日志,canal配置文件中默认的账号名称为canal,即可以在mysql中运行一下命令,创建用户。

1
2
3
4
5
6
-- 创建用户
CREATE USER canal IDENTIFIED BY 'canal'
-- 赋权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'
-- 查看对应用户的所有权限
show grants for canal

如已存在对应权限的账号,不想新创建账号,也可以更改canal的配置文件,

在canal.deployer的conf/example/instance.properties文件中,找到以下语句,修改即可

1
2
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

单机模式部署

单机模式启动的话(本地调试或者测试用),只需要修改canal.deployer的conf/example/instance.properties文件中,链接数据库的地址即可

1
canal.instance.master.address=127.0.0.1:3306

其他的配置都无需更改,然后运行bin目录下的startup.sh命令即可启动canal(每次启动都要先运行一下stop.sh命令),随后在logs目录下,就能够看到canal的运行日志和canal实例的运行日志。

生产环境多节点部署

如果为多节点部署,最简单的一种,可以使用zk来帮canal实现选举和灾备切换,就需要在canal的配置文件中指定zk地址启动,更改conf/canal.properties文件,指定zk地址

1
canal.zkServers = zk1,zk2,zk3

再然后,修改canal实例的配置文件conf/example/instance.properties,让不同节点的slaveId保持不同

1
canal.instance.mysql.slaveId=10

我本地在运行单机伪集群时,发现同一台机器没有办法启动多个canal,启动命令会提示已有canal再运行,所以要测试集群模式的话,只能找不同的机器部署了。

canal客户端消费binlog

但canal部署完成后,同样的,在canal的Git上有客户端的example。在该工程下,包括了直连、集群模式连接的示例,建议直接使用该项目来本地调试用。

canal部署结构图

在canal中有以下几个概念,canal-service、canal-instance

canal-struct

其中,一个canal-instance对应了一个数据库的,一个canal-service下可以有多个canal-instance,即一个canal-service可以监听多个数据库的binlog(在canal内部,使用destination字段标识一个canal-instance)。

canal-service可以使用zk来保证高可用,但是同一时刻内,只有一个canal-service会工作,相当于主备结构

也就是说,哪怕你canal-ser-A下有i1、i2、i3三个实例,canal-ser-B下有i4、i5、i6实例,canal-ser-A和canal-ser-B配置zk高可用,最终,只会同时运行三个实例,而不是六个实例。

canal默认会从当前最新的binlog位点进行数据同步,而不是最老的binlog

在mysql中,可以使用以下命令,查看目前的binlog位点

1
show master status

线上问题总结

1.虽然官方提供了一套canal.adapter用于异构数据库的同步,但是有严重限制,同步的sql不支持增加where条件过滤。我现在业务库中的单张表是多个对象公用的,必须要增加where条件才可以,所以只能够自己开发同步代码了。canal.adapter慎用。

2.在canal的配置文件中,canal.instance.parser.parallel = true 指定并行解析是被注释掉的,如果这时候发生了解析错误,会导致整个binlog消费的阻塞,产生严重后果,建议并行解析打开。

在并行解析下,同样可以指定并行解析binlog的线程数,我碰到了一个问题就跟线程数有关,在测试环境指定了canal.instance.parser.parallelThreadSize = 16,即16个线程去并行解析,没有任何问题,但是一上线,会发现CPU暴涨。最后排查发现,线上环境每个解析线程都要用个7%、8%的cpu,16个线程加起来那就上百了,直接占掉了一个核。最后没有办法,只能调低线程数,cpu才降低了下来。

3.当客户端第一次连接canal时,会报错,重启一下即可(我也不知道这是什么骚操作。。)

4.当canal监听的相关表结构发生变更时,有可能在canal的日志中,会存在一下错误

1
com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:xxx.xxx,36 vs 35

因为canal会对每条log解析的时候,会增加表结构的相关信息,如果表结构发生变化了,就有可能导致解析异常,canal官方是说了采用了tsdb–时序表功能解决了这个问题,但是和其他使用canal的人讨论后发现,在运行过程中还是有可能会发生这样的错误。反正就是超级坑,一但出现了,就只能重启canal,并且清空conf/example(对应canal实例名)/h2.mv.db文件,因为canal默认采用的h2数据库存放过往表结构。

5.canal消费binlog的位点信息存放,如果没有采用zk的话,canal消费的位点信息是存放于磁盘上的,也就是conf/example(对应canal实例名)/meta.dat文件。如果采用了zk,那么消费的位点信息就是放在zk里面的。这个也比较坑,如果不知道,你会发现位点信息为什么一直清不掉。。

6.当canal连接的数据库binlog发生了问题时,会发现canal的日志中,会疯狂打印以下错误

1
2
3
4
5
6
7
8
9
10
11
12
13
2020-12-15 11:12:55.553 [destination = qingka , address = /192.168.20.152:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000032,position=226723569,serverId=1,gtid=,timestamp=1606748470000] cost : 3ms , the next step is binlog dump
2020-12-15 11:12:55.601 [destination = qingka , address = /192.168.20.152:3306 , EventParser] ERROR c.a.o.canal.parse.inbound.mysql.dbsync.DirectLogFetcher - I/O error while reading from client socket
java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not open log file
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) ~[canal.parse-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235) [canal.parse-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265) [canal.parse-1.1.4.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
2020-12-15 11:12:55.601 [destination = qingka , address = /192.168.20.152:3306 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address /192.168.20.152:3306 has an error, retrying. caused by
java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not open log file
at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) ~[canal.parse-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235) ~[canal.parse-1.1.4.jar:na]
at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:265) ~[canal.parse-1.1.4.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]

总的来讲,就是根据上一次的消费去dump mysql的binlog发生问题了,解决方法为清空zk中保存的位点信息。

如果想要找回故障阶段所丢失的数据,方案有以下两种

  1. 通过数据同步方式(logstash或者代码)把故障时间内的数据同步以下
  2. 更改zk中的消费位点到发生故障前,这样会把故障期间的binlog再重复消费一遍