背景
先前开源了一个开源项目: 【阿里巴巴开源项目: 基于mysql数据库binlog的增量订阅&消费】
本文主要是介绍一下如何给canal贡献代码,介绍其设计思路和扩展方式
设计
说明:
- server代表一个canal运行实例,对应于一个jvm
- instance对应于一个数据队列 (1个server对应1..n个instance)
instance下的子模块:
- eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- eventStore (数据存储)
- metaManager (增量订阅&消费信息管理器)
整体类图设计
说明:
- CanalLifeCycle为所有canal模块的生命周期接口
- CanalInstance组合parser,sink,store三个子模块,三个子模块的生命周期统一受CanalInstance管理
- CanalServer聚合了多个CanalInstance
EventParser类图设计和扩展
每个EventParser都会关联两个内部组件: CanalLogPositionManager , CanalHAController
- CanalLogPositionManager : 记录binlog最后一次解析成功位置信息,主要是描述下一次canal启动的位点
- CanalHAController:控制EventParser的链接主机管理,判断当前该链接哪个mysql数据库.
说明:
1. 目前开源版本只有支持mysql的协议(LocalBinlog就是类似于relay log的那种模式,直接根据relay log进行数据消费)
2. 内部版本会有OracleEventParser,获取oracle增量变更信息,因为涉及一些政治,商业和产品关系,没有随canal开源。(oracle增量解析目前为c语言开发,提供socket方式供canal接入)
CanalLogPositionManager类图设计
说明:
1. 如果CanalEventStore选择的是内存模式,可不保留解析位置,下一次canal启动时直接依赖CanalMetaManager记录的最后一次消费成功的位点即可. (最后一次ack提交的数据位点)
2. 如果CanalEventStore选择的是持久化模式,可通过zookeeper记录位点信息,canal instance发生failover切换到另一台机器,可通过读取zookeeper获取位点信息.
可公通过实现自己的CanalLogPositionManager,比如记录位点信息到本地文件/nas文件,简单可用的无HA的模式.
CanalHAController类图设计
说明:
1. 常见的就是基于心跳语句,定时请求当前链接的数据库,超过一定次数检测失败时,尝试切换到备机.
2. 比如阿里内部会有一套数据库主备信息管理系统,DBA做了数据库主备切换或者机器下线,推送配置到各个应用节点,HAController收到后,控制EventParser进行链接切换.
EventSink类图设计和扩展
说明:
1. 常见的sink业务有分1:n和n:1的业务,目前GroupEventSink主要是解决n:1的归并业务
关于1:n/n:1的介绍,可参见我的canal介绍的文章。
EventStore类图设计和扩展
说明:
1. 抽象了CanalStoreScavenge , 解决数据的清理,比如定时清理,满了之后清理,每次ack清理等
2. CanalEventStore接口,主要包含put/get/ack/rollback的相关接口. put/get操作会组成一个生产者/消费者模式,每个store都会有存储大小设计,存储满了,put操作会阻塞等待get获取数据,所以不会无线占用存储,比如内存大小
a. 目前EventStore主要实现了memory模式,支持按照内存大小和内存记录数进行存储大小限制.
b. 后续可开发基于本地文件的存储模式
c. 基于文件存储和内存存储,开发mixed模式,做成两级队列,内存buffer有空位时,将文件的数据读入到内存buffer中。
重要:实现基于mixed模式后,canal才可以说是完成真正的消费/订阅的模型 (取1份binlog数据,提供多个客户端消费,消费有快有慢,各自保留消费位点)
MetaManager类图设计和扩展
说明:
1. metaManager目前同样支持了多种模式,最顶层的就是memory和zookeeper的模式,还有就是mixed模式,先写内存,再写zookeeper.
可公通过实现自己的CanalMetaManager,比如记录位点信息到本地文件/nas文件,简单可用的无HA的模式.
应用扩展
上面介绍了相关模块的设计,这里介绍下如何将自己的扩展代码应用到canal中. 介绍之前首先需要了解instance的配置方式,可参见: AdminGuide 的spring配置这一章节
canal instance基于spring的管理方式,主要由两部分组成:
- xxx.properties
- xxx-instance.xml
xxx-instance.xml就是描述对应instance所使用的模块组件定义,比如默认的instance模块组件定义有:
- memory-instance.xml (选择了memory模式的组件,速度优先,简单)
- default-instance.xml (选择了mixed/preiodmixed模式的组件,可以提供HA的功能)
- group-instance.xml (提供了n:1的sink模式)
所以,如果要应用自己的组件,就只需要定义一份自己的instance.xml,比如custom-intance.xml
<!-- properties --> <bean class="com.alibaba.otter.canal.instance.spring.support.PropertyPlaceholderConfigurer" lazy-init="false"> <property name="ignoreResourceNotFound" value="true" /> <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/><!-- 允许system覆盖 --> <property name="locationNames"> <list> <value>classpath:canal.properties</value> <value>classpath:${canal.instance.destination:}/instance.properties</value> </list> </property> </bean> <bean id="instance" class="com.alibaba.otter.canal.instance.spring.CanalInstanceWithSpring"> <property name="destination" value="${canal.instance.destination}" /> <property name="eventParser"> <ref local="eventParser" /> </property> <property name="eventSink"> <ref local="eventSink" /> </property> <property name="eventStore"> <ref local="eventStore" /> </property> <property name="metaManager"> <ref local="metaManager" /> </property> <property name="alarmHandler"> <ref local="alarmHandler" /> </property> </bean> ......
instance.xml要满足一个基本元素:
1. 一份instance.xml中有一份或者多份instance定义,优先以destination名字查找对应的instance bean定义,如果没有,则按默认的名字“instance”查找instance对象
2. 一份instance bean定义,需要包含eventParser , evnetSink , evnetStore , metaManager,alarmHandler的5个模块定义,(alarmHandler主要是一些报警机制处理,因为简单没展开,可扩展)
完成custom-instance.xml定义后,可通过canal.properties配置中进行引入:
canal.instance.{通道名字}.spring.xml = classpath:spring/custom-instance.xml
到这里,就完成了扩展组件的应用,启动canal instance后,就会使用自定义的的组件 , just have fun .
相关推荐
canal-canal-1.0.22_源码 canal-canal-1.0.22_源码 canal-canal-1.0.22_源码
最新版阿里开源中间件canal实现mysql数据库同步,零侵入不写代码实现,也可以通过整合到项目程序实现更加灵活的控制。详细使用方法:https://blog.csdn.net/u014374009/category_9409106.html
canal.adapter-1.1.4.tar.gz canal.admin-1.1.4.tar.gz canal.deployer-1.1.4.tar.gz 文件比较大,分两部分分上传
canal.adapter-1.1.4.tar.gz canal.admin-1.1.4.tar.gz canal.deployer-1.1.4.tar.gz 文件比较大,分两部分分上传
mysql需开启binlog 查看是否开启binlog ... 新增队列:test.queue, 绑定canal.queue, RoutingKey:canal.routing.key canal下载及配置 https://github.com/alibaba/canal/releases/tag/canal-1.1.5 配置文件见附件
canal客户端-canal.deployer-1.1.7-SNAPSHOT.tar.gz
canal.deployer-1.1.4.tar ; canal.admin-1.1.4.tar.gz ; canal.adapter-1.1.4.tar.gz ; canal.example-1.1.4.tar.gz 官网下载非常不易。
包含canal全套资源包含以下 canal.adapter-1.1.5.tar.gz canal.admin-1.1.5.tar.gz canal.deployer-1.1.5.tar.gz canal.example-1.1.5.tar.gz
Canal 是mysql数据库binlog的增量订阅&消费组件。 基于日志增量订阅&消费支持的业务: 数据库镜像 数据库实时备份 多级索引 (卖家和买家各自分库索引) search build 业务cache刷新 价格变化等重要业务消息 关于 ...
canal.admin-1.1.7-SNAPSHOT.tar.gz canal.adapter-1.1.7-SNAPSHOT.tar.gz canal.deployer-1.1.7-SNAPSHOT.tar.gz
Canal是阿里巴巴的实时数据同步工具,基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了mysql 详情查看 https://github.com/alibaba/canal/wiki/
github下载太慢,放一份到这方便大家 官方下载地址 https://github.com/alibaba/canal/releases 资源包括 canal.adapter-1.1.6.tar canal.admin-1.1.6.tar canal.deployer-1.1.6.tar
canal.deployer-1.1.6
深入浅出Otter与Canal.pdf深入浅出Otter与Canal.pdf深入浅出Otter与Canal.pdf深入浅出Otter与Canal.pdf
Canal-Admin-Guide Canal-Admin-Docker canal-server新增基于账号密码的ACL支持能力 canal-server新增admin动态运维指令,配合canal-admin工程动态管理订阅关系 多语言新增【Python客户端】 instance订阅表过滤,...
canal-1.1.6
Canal-Admin-Guide Canal-Admin-Docker canal-server新增基于账号密码的ACL支持能力 canal-server新增admin动态运维指令,配合canal-admin工程动态管理订阅关系 多语言新增【Python客户端】 instance订阅表过滤...
canal-server(canal-deploy):可以直接监听MySQL的binlog,把自己伪装成MySQL的从库,只负责接收数据,并不做处理。 canal-adapter:相当于canal的客户端,会从canal-server中获取数据,然后对数据进行同步,可以...
在springboot中整合与使用canal
canal.adapter-1.1.6.zip