
2.Canal工作原理 2.1 Mysql主从复制原理阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)
1)Master 主库将改变记录,写到二进制日志(Binary Log)中;
2)Slave 从库向 MySQL Master 发送 dump 协议,将 Master 主库的 binary log events 拷贝
到它的中继日志(relay log);
3)Slave 从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
略
详情见Maxwell简单使用2.2.1章节
3 Canal使用 3.1 Canal部署Canal需要部署到linux服务器上
- 上传canal.deployer-1.1.2.tar.gz包到服务器的/opt/software下
- 解压 tar -zxvf canal.deployer-1.1.2.tar.gz -C ./canal/ 目录下
- 配置canal.properties和instance.properties文件中内容
cancal.properties
instance.properties
4. 启动canal ./bin/startup.sh
- 创建一个java的maven工程
- pom.xml中添加如下配置
com.alibaba.otter canal.client1.1.2
- 创建一个CanalClient.java文件
- 内容如下
//连接服务端canal
CanalConnector example = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.130", 11111), "example", "", "");
//开始循环订阅数据
while (true) {
//开始连接
example.connect();
//订阅某个库或者表数据
example.subscribe("test-canal.*");
//获取变化数据
Message message = example.get(100);
//拿到数据
List entries = message.getEntries();
//判断是否存在数据
if (entries.size() <= 0) {
log.info("没有数据");
Thread.sleep(1000L);
} else {
//开始遍历数据
for (CanalEntry.Entry e : entries) {
String tableName = e.getHeader().getTableName();
CanalEntry.EntryType entryType = e.getEntryType();
if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
ByteString storevalue = e.getStorevalue();
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storevalue);
List rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData row : rowDatasList) {
Map beforeMap = new HashMap<>(1 << 4);
List beforeColumnsList = row.getBeforeColumnsList();
for (CanalEntry.Column c : beforeColumnsList) {
beforeMap.put(c.getName(), c.getValue());
}
Map afterMap = new HashMap<>(1 << 4);
List afterColumnsList = row.getAfterColumnsList();
for (CanalEntry.Column c : afterColumnsList) {
afterMap.put(c.getName(), c.getValue());
}
log.info("表名:{},之前数据:{},之后数据:{}", tableName, JSON.toJSON(beforeMap), JSON.toJSON(afterMap));
}
}
}
}
}
- 数据变化格式
上面为一个canal的简单使用。具体详细使用可以查看Canal的gitHub项目地址
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)