生产环境遇到的问题
- 数据库更新数据后,缓存也要相应的更新
- 数据库更新后, elasticsearch , hbase 中的数据也要及时更新
- 数据库更新后, kafaka 消息队列中也要及时更新
也就是说,在数据库更新后,依赖这些数据的服务都需要做相应的变化,需要在相应的服务中写相应的逻辑,对原有代码侵入量比较大,也不利于后期的维护
什么canal
canal是用 java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件,canal通过binlog同步拿到变更数据,再发送到存储目的地,比如 mysql , Kafka ,Elastic Search等多源同步
测试环境
系统 : ubuntu:21
以下服务都在 docker 中运行
mysql:8.0
canal-server:latest
安装mysql步骤省略
1. 一定要开启binlog 日志
2. 设置binglog 行模式为row
3.设置一个全局唯一的 server _id
创建canal用户,并授予权限
创建用户
create user canal identified by ‘canal’;
授予权限
grant select ,replication slave ,replication client on *.* to ‘canal’@’%’;
刷新权限
flush privileges;
//如果是mysql8.0以上的版本,需要修改一下加密方式,否则同步时会有异常,我这里以前设置过,所以省略了
安装canal-server服务端
下载 canal-sever 镜像
docker pull canal/canal-server:latest
启动方式一,直接使用 docker run 在命令行运行
说明 :
mysql 和canal-server必须可以通信,由于我的mysql和canal-server在同一台机器上,但是mysql并没有默认的网络,这里使用的是自定义网络wwwdata_frontend,所以也需要把canal-server加入到这个网络中
docker run -d –name canal-server –network wwwdata_frontend -p 11111:11111 canal/canal-server:latest
启动方式二,使用 docker-compose 启动,方便多个服务管理
docker-compose.yml 部分配置如下
canal-server:
image: canal/canal-server:latest
container_name: canal-server
restart: always
ports:
– “9100:9100”
– “11111:11111”
– “11110:11110”
– “11112:11112”
networks:
– frontend
volumes:
– /wwwdata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties
– /wwwdata/canal/conf/canal.properties:/home/admin/canal-server/conf/example/canal.properties
需要完整的 docker- compose.yml (测试服务器上的服务只包含 nginx , redis , php , mysql , canal )可以私信我哈,有需要帮忙制作 dockerfile 或 docker-compose.yml 文件的,都可以私信我哈
修改配置文件instance.properties和canal.properties
我这里由于测试,大部分使用了默认的参数,有以下几个参数需要注意
instance.properties 文件中
canal.instance.master.address=mysql80:3306 我这里的 mysql80 是我的 docker 中运行的 mysql 服务名,当然也可以 mysql 对应的 ip ,( 两个服务必须可以 ping 通 )
这里是在 mysql 中创建的用户名
canal.instance.dbUsername=canal canal.instance.dbPassword=canal
canal.properties 文件中
canal端口 canal.port = 11111 canal.metrics.pull.port = 11112
canal.destinations = example //我这里用的是默认的
再重新启动canal-server,查看日志,如下则说明执行成功
编写客户端来测试一下
我这里使用 go 客户端来实现
package main
import (
"fmt"
"github.com/golang/protobuf/proto"
"github.com/withlin/canal-go/client"
pbe "github.com/withlin/canal-go/protocol/entry"
"log"
"os"
"time"
)
func main() {
//idletimeout 设置为0 表示不限制
connector := client.NewSimpleCanalConnector("canal-serve地址", 端口号默认11111, "用户名", "密码", "destination默认是example", sotimeOut, idletimeout设置为0表示不退出)
err := connector.Connect()
if err != nil {
log.Println(err)
os.Exit(1)
}
//订阅表,所有表
err = connector.Subscribe(".*\\..*")
if err != nil {
fmt.Println(err)
return
}
for {
message, err := connector.Get(100, nil , nil)
if err != nil {
log.Println(err)
os.Exit(1)
}
batch Id := message.Id
if batchId == -1 || len(message.Entries) <= 0 {
time.Sleep(3000 * time.Millisecond)
fmt.Println("===没有数据了===")
continue
}
printEntry(message.Entries)
}
}
func printEntry(entrys []pbe.Entry) {
for _, entry := range entrys {
if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND {
continue
}
rowChange := new(pbe.RowChange)
err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
checkError(err)
if rowChange != nil {
eventType := rowChange.GetEventType()
Header := entry.GetHeader()
fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.Get Schema Name(), header.GetTableName(), header.GetEventType()))
for _, rowData := range rowChange.GetRowDatas() {
if eventType == pbe.EventType_DELETE {
printColumn(rowData.GetBeforeColumns())
} else if eventType == pbe.EventType_INSERT {
printColumn(rowData.GetAfterColumns())
} else {
fmt.Println("-------> before")
printColumn(rowData.GetBeforeColumns())
fmt.Println("-------> after")
printColumn(rowData.GetAfterColumns())
}
}
}
}
}
func printColumn(columns []*pbe.Column) {
for _, col := range columns {
fmt.Println(fmt.Sprintf("%s : %s update= %t", col.GetName(), col.GetValue(), col.GetUpdated()))
}
}
func checkError(err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
os.Exit(1)
}
}
测试
mysql 添加数据
//创建表
CREATE TABLE `canal` (
`id` int NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
//添加数据
insert INTO canal (name) VALUE(“abcccc”)
在客户端可以看到,实时更新了
本次测试并没有实现 HA ,只有单机实例做的测试,生产中遇到啥坑,后面再填吧