
因为自己项目中需要用到mysql数据同步到es中,查找了相关资料最后决定用canal来做,所以便有了本文,下面一起来看如何使用canal吧
根据 https://github.com/alibaba/canal 上的原理解释,我们知道 canal 会模拟 mysql slave 的交互协议,伪装自己为 mysql slave,然后向 mysql master 发送 dump 协议。
mysql master 收到 dump 请求,开始推送 binary log 给 slave(也就是 canal),然后 canal 解析 binary log 对象(原始为 byte流)。
经 canal 解析过的对象,我们使用起来就非常的方便了。
再根据 https://github.com/alibaba/canal/releases 提供的版本信息,你会发现 canal 其实相当于一个中间件,专门用来解析 MySQL 的 binlog 日志。canal 解析好了之后,会封装成一个数据对象,通过 protobuf3.0 协议进行交互,让 canal 客户端进行消费。
根据上面的解释,以及 canal 提供的版本信息,我们在使用 canal 的时候,首选要安装一个 canal.deployer-1.1.4.tar.gz 进行解析 MySQL 的 binlog 日志。
下载后,复制 canal.deployer-1.1.4.tar.gz 到 MySQL 主机上,比如放在 /usr/local/soft/目录下。然后依次执行下面的命令:
然后修改 canal 的配置文件 vim conf/example/instance.properties
这三项改成你自己的,比如我的配置如下:
然后保存并退出。(VI 模式下,按 Esc 输入 :wq 回车退出。)
接着,我们检查一下 MySQL 的配置。确定版本和是否开启了 binlog 日志,以及日志格式。
canal 支持 binlog 格式为 ROW 的模式。如果你没开启 binlog,并且格式是非 row 的,建议修改一下 mysql 的配置文件。
执行 mysql –help | grep my.cnf 找到 mysql 的 my.cnf 文件。
执行 vim /etc/my.cnf 命令。添加下面 3 个配置。
然后保存并退出。
接着执行 sudo service mysqld restart 重启 MySQL。
需要注意的是你的 mysql 用户,必须要有 REPLICATION SLAVE 权限。该权限授予 slave 服务器以该账户连接 master 后可以执行 replicate *** 作的权利。
如果没有权限,则使用 root 账户登录进 MySQL,执行下面的语句,创建用户,分配权限。
MySQL 启动后,就可以开启 canal 服务了。
开启后,观察 canal 服务的日志,确保服务正常。
查看 canal 的日志
确定没有问题后,开始编写我们的测试程序。
pom.xml 中导入下面的依赖。
使用JAVA进行测试
然后执行 main 方法。你再修改修改 MySQL 中的数据,你会发现所有改变都同步过来了。上面是使用的Java代码进行运行,如果想用canal.adapter来进行运行可以下载
放入服务器中,依次执行下面命令
然后修改配置文件 :
然后将需要运行存储到es的的yml文件放入到
目录下。例如:
然后开启canal-adapter服务
/usr/local/soft/canal-adapter/bin/startup.sh
查看 canal-adapter 的日志,确定没有问题后修改数据 就可以同步到es了
注意:
1、canal-adapter自带mysql连接使用的5.x的,如果自己安装的是高版本的mysql需要自己去/usr/local/soft/canal-adapter/lib增加对应的jar包
2、因项目中同步es使用的sql中有数据库中没有的字段,导致原生程序一直报异常,后修改源码中
加了一个判断后才可以
3、es中使用的date字段类型和数据库中不一致,所以这里又修改了部分源码兼容我们项目中的类型
可以根据各自情况修改。
本文讲解如何通过一套开源日志存储和检索系统 ELK 构建 MySQL 慢日志收集及分析平台。
ELK、EFK 简介
想必你对 ELK、EFK 都不陌生,它们有一个共同的组件:Elasticsearch(简称ES),它是一个实时的全文搜索和分析引擎,可以提供日志数据的收集、分析、存储 3 大功能。另外一个组件 Kibana 是这套检索系统中的 Web 图形化界面系统,可视化展示在 Elasticsearch 的日志数据和结果。
ELF/EFK 工具集中还有 l 和 F 这两个名称的缩写,这两个缩写代表的工具根据不同的架构和使用方式而定。
L 通常是 Logstash 组件,它是一个用来搜集、分析、过滤日志的工具 。
F 代表 Beats 工具(它是一个轻量级的日志采集器),Beats 家族有 6 个成员,Filebeat 工具,它是一个用于在客户端收集日志的轻量级管理工具。
F 也可以代表工具 fluentd,它是这套架构里面常用的日志收集、处理转发的工具。
那么它们(Logstash VS Beats VS fluentd)有什么样的区别呢?Beats 里面是一个工具集,其中包含了 Filebeat 这样一个针对性的日志收集工具。Logstash 除了做日志的收集以外,还可以提供分析和过滤功能,所以它的功能会更加的强大。
Beats 和 fluentd 有一个共同的特点,就是轻量级,没有 Logstash 功能全面。但如果比较注重日志收集性能,Beats 里面的 Filebeat 和 fluentd 这两个工具会更有优势。
Kafka 是 ELK 和 EFK 里面一个附加的关键组件(缩写 K),它主要是在支持高并发的日志收集系统里面提供分布式的消息队列服务。
ELK 的优势
在此之前,先介绍 ELK 日志分析会有一些什么样的优势?主要有 3 点:
1、它是一套开源、完整的日志检索分析系统,包含收集、存储、分析、检索工具。我们不需要去开发一些额外的组件去完成这套功能,因为它默认的开源方式就提供了一整套组件,只要组合起来,就可以完成从日志收集、检索、存储、到整个展示的完整解决方案了。
2、支持可视化的数据浏览。运维人员只要在控制台里选择想关注的某一段时间内的数据,就可以查看相应的报表,非常快捷和方便。
3、它能广泛的支持一些架构平台,比如我们现在讲到的 K8s 或者是云原生的微服务架构。
Kafka 作为日志消息队列,客户端通过 Filebeat 收集数据(日志)后将其先存入 Kafka,然后由 Logstash 提取并消费,这套架构的好处是:当我们有海量日志同步情况下,直接存入服务端 ES 很难直接应承接海量流量,所以 Kafka 会进行临时性的存取和缓冲,再由 Logstash 进行提取、过滤,通过 Logstash 以后,再把满足条件的日志数据存入 ES。
ES 不再是以单实例的方部署,而是采用集群架构,考虑 Kafka 的集群模式, Logstash 也使用集群模式。
我们会看到这套架构稍微庞大,大中型的企业往往存储海量数据(上百 T 或 P 级)运维日志、或者是系统日志、业务日志。
完成ELK服务搭建后,首先我需要开启的是 MySQL 的慢查询配置,那么通过 set global slow_query_log=‘ON‘,这样就可以开启慢查询日志,还需要设置好慢查询日志标准是大于 1 秒的,那么同样是 set global long_query_time 大于或等于 1,它的意思是大于 1 秒的查询语句,才会认为是慢查询,并且做日志的记录。
那么另外还要设置慢查询日志的位置,通过 set global slow_query_log = 日志文件路径,这里设置到 filebeat 配置监听的路径下,就完成了慢查询日志的路径设置。
配置完成以后,需要在 MySQL 终端上,模拟执行一条执行时间较长的语句,比如执行 select sleep(5),这样就会模拟执行一条查询语句,并且会让它休眠 5 秒。接下来我们看到服务端窗口的 MySQL 这条 sleep 语句已经执行完毕了,同时我们可以再打开 filebeat 的推送窗口,发现这里产生了一条推送日志,表示成功地把这条日志推送给了 ES。
那么接下来我们就可以通过浏览器打开 Kibana 的管理后台,从界面里来看一看检索日志的记录和一些可视化展示的图表,我们可以点击界面上的 Discover 按钮,同时选择好对应的时间周期,然后可以增加一个 filter 过滤器,过滤器里面敲入对应的关键字来进行索引。
这里我敲入的是 slow.query 这个关键字,就会匹配出对应的可以检索的项目,点击想要查询的对应项目,展示出想检索的某一个时间周期内对应的一些日志记录,以及它的图表是什么样子的,同时在下方会有对应的 MySQL 的日志信息打印出来,通过 Kibana 这样的可视化界面就能够看到的相关信息了。
准备工作: mysql库的安装。
python中mysql库用的是mysql-connector,安装执行如下命令:
第一步: 连接mysql,读取数据。
通过执行sql语句,读取mysql数据。
至此,获得mysql的原始数据raw_data 。接下来对数据进行预处理,按日期进行分组聚合,然后重命名行和列名,得到dataFrame格式的数据。
第二步: 连接ES。
这步没有太多的可解释的地方,就是配置信息。
第三步: ES主键加密。
这步的目的是为了保持主键唯一性,防止重复写入。用的方法是md5加密。
第四步: 写入ES
至此,一切的准备工作都做好了,数据也有了,主键加密也做了,就开始写入了。
用main方法执行以上方法:
最后查看一下ES写的是否成功,用查询方法
如果返回以下信息,说明ES里成功插入了数据。
另外,ES删除索引的 *** 作:
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)