使用Canal同步MySQL数据
使用Canal同步MySQL的数据可以直接使用Canal客户端API方式消费Canal同步的数据,详细api参照:
ClientAPI · alibaba/canal Wiki · GitHub
也可以直接通过Canal将数据写入Kafka。下面我们使用Canal同步MySQL数据到Kafka为例,学习下Canal如何同步MySQL数据。
(资料图片)
一、Canal架构原理
1、Canal Server结构
server 代表一个 canal 运行实例,对应于一个 jvm。instance 对应于一个数据队列 (1个 canal server 对应 1..n 个 instance )instance 下的子模块eventParser: 数据源接入,模拟 slave 协议和 master 进行交互,协议解析eventSink: Parser 和 Store 链接器,进行数据过滤,加工,分发的工作eventStore: 数据存储metaManager: 增量订阅 & 消费信息管理器2、Canal同步MySQL数据原理
EventParser在向mysql发送dump命令之前会先从Log Position中获取上次解析成功的位置(如果是第一次启动,则获取初始指定位置或者当前数据段binlog位点)。mysql接受到dump命令后,由EventParser从mysql上pull binlog数据进行解析并传递给EventSink(传递给EventSink模块进行数据存储,是一个阻塞操作,直到存储成功),传送成功之后更新Log Position。流程图如下:
EventSink起到一个类似channel的功能,可以对数据进行过滤、分发/路由(1:n)、归并(n:1)和加工。EventSink是连接EventParser和EventStore的桥梁。
EventStore实现模式是内存模式,内存结构为环形队列,由三个指针(Put、Get和Ack)标识数据存储和读取的位置。
MetaManager是增量订阅&消费信息管理器,增量订阅和消费之间的协议包括get/ack/rollback,分别为:
Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:batch id[唯一标识]和entries[具体的数据对象]。void rollback(long batchId),顾名思义,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作。void ack(long batchId),顾名思义,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作。3、关于同步MySQL数据配置信息
首先Canal可以是一个集群,这里以Canal单机为例解释Canal同步MySQL数据配置文件配置原理。
首先需要在Canal中配置CanalServer 对应的canal.properties,这个文件中主要配置Canal对应的同步数据实例(Canal Instance)位置信息及数据导出的模式,例如:我们需要将某个mysql中的数据同步到Kafka中,那么就可以创建一个“数据同步实例”,导出到Kafka就是一种模式。
其次,需要配置Canal Instance 实例中的instance.properties文件,指定同步到MySQL数据源及管道信息。
二、配置步骤
1、配置“canal.properties”
进入“/software/canal/conf”目录下,编辑“canal.properties”文件:
#canal将数据写入Kafka,可配:tcp, kafka, RocketMQ,tcp就是使用canal代码接收canal.serverMode = kafka#配置canal写入Kafka地址canal.mq.servers = node1:9092,node2:9092,node3:9092
关于canal.properties更多参数参照:
AdminGuide · alibaba/canal Wiki · GitHub
2、配置mysql slave的权限
Canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限 ,授权Canal连接MySQL具有作为MySQL slave的权限:
mysql> CREATE USER canal IDENTIFIED BY "canal"; mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "canal"@"%"; mysql> FLUSH PRIVILEGES;mysql> show grants for "canal" ;
3、配置“instance.properties”
进入“/software/canal/conf/example/”下,编辑“instance.properties”文件:
#canal伪装为一个mysql的salve,配置其id,不要和真正mysql server-id冲突,这里也可以不配置,会自动生成canal.instance.mysql.slaveId=123456#配置mysql master 节点及端口canal.instance.master.address=node3:3306#配置连接mysql的用户名和密码,就是前面复制权限的用户名和密码canal.instance.dbUsername=canalcanal.instance.dbPassword=canal#配置Canal将数据导入到Kafka topiccanal.mq.topic=canal_topic
配置参照:
Canal Kafka RocketMQ QuickStart · alibaba/canal Wiki · GitHub
关于“instance.properties”更多参数介绍如下:AdminGuide · alibaba/canal Wiki · GitHub
4、启动Canal
进入“/software/canal/bin”,执行“startup.sh”脚本启动Canal。
#启动Canal[root@node3 ~]# cd /software/canal/bin/[root@node3 bin]# ./startup.sh [root@node3 bin]# jps68675 CanalLauncher #启动成功
5、启动zookeeper和Kafka,并监控Kafka中“canal_topic”的数据
注意:“canal_topic”不需要提前创建,默认创建就是1个分区。
[root@node2 bin]# ./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic canal_topic
6、在MySQL中建表,插入语句
mysql> create database testdb;mysql> use testdb;mysql> create table person(id int ,name varchar(255),age int);mysql> insert into person values (1,"zs",18),(2,"ls",19),(3,"ww",20);
对应的在Kafka中有对应的数据日志写入
以上写入Kafka中json格式如下:
关于以上json字段解析如下:
data:最新的数据,为JSON数组,如果是插入则表示最新插入的数据,如果是更新,则表示更新后的最新数据,如果是删除,则表示被删除的数据。database:数据库名称。es:事件时间,13位的时间戳。id:事件操作的序列号,1,2,3...isDdl:是否是DDL操作。mysqlType:字段类型。old:旧数据。pkNames:主键名称。sql:SQL语句。sqlType:是经过canal转换处理的,比如unsigned int会被转化为Long,unsigned long会被转换为BigDecimal。table:表名。ts:日志时间。type:操作类型,比如DELETE,UPDATE,INSERT。关键词:
相关内容
- 大数据Canal(三):使用Canal同步MySQL数据 a>
- 消息!大众的新型小型SUV将被称为“陶斯” a>
- 湖北在北交所上市公司扩容 武汉蓝电IPO成功过会 a>
- ChatGPT激发AI热潮,制造业加速探索AI应用 a>
- 2023年2月汽车工业经济运行情况 a>
- 急速下滑之后,工程机械与商用车行业有望复苏 a>
- 环球观点:姜力洗发水怎么卖(姜力洗发水怎么样) a>
- 满圆_关于满圆介绍 a>
- 世联行:公司在横琴已有相关旅游项目 目前在管旅游项目有横琴星乐度·露营小镇 a>
- 环球今亮点!小鹏中型SUV来了 小鹏G6曝光 a>
- 多彩艺体课助力“双减” a>
- 天天消息!g100878hb14g海尔滚筒洗衣机_g100878hb14g a>
- 世界头条:警惕信用卡提额诈骗套路 a>
- 全球热推荐:IBM CEO:2022年斥资20多亿美元收购8家公司 派发60亿美元股息 a>
- 快讯2023-03-13 17:01:21 a>
- 当前观点:坚守武汉最北端山村学校40年 他想让更多山里孩子看看山外的世界 a>
- 焦点日报:贵州轮胎海外经销商会议取得圆满成功 a>
- 农业农村部南京农业机械化研究所参加首届中国(永康)国际农林装备博览会 a>
- 天天热议:鑫源动力获得行业肯定! a>
- 环球实时:鑫源农机参加重庆市春耕生产现场会 a>