七叶笔记 » golang编程 » canal中间件的使用

canal中间件的使用

生产环境遇到的问题

  • 数据库更新数据后,缓存也要相应的更新
  • 数据库更新后, elasticsearch , hbase 中的数据也要及时更新
  • 数据库更新后, kafaka 消息队列中也要及时更新

也就是说,在数据库更新后,依赖这些数据的服务都需要做相应的变化,需要在相应的服务中写相应的逻辑,对原有代码侵入量比较大,也不利于后期的维护

什么canal

canal是用 java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件,canal通过binlog同步拿到变更数据,再发送到存储目的地,比如 mysql Kafka ,Elastic Search等多源同步

测试环境

系统 : ubuntu:21

以下服务都在 docker 中运行

mysql:8.0

canal-server:latest

安装mysql步骤省略

  • 注意点如下
  • 1. 一定要开启binlog 日志

    2. 设置binglog 行模式为row

    3.设置一个全局唯一的 server _id

    创建canal用户,并授予权限

    创建用户

    create user canal identified by ‘canal’;

    授予权限

    grant select ,replication slave ,replication client on *.* to ‘canal’@’%’;

    刷新权限

    flush privileges;

    //如果是mysql8.0以上的版本,需要修改一下加密方式,否则同步时会有异常,我这里以前设置过,所以省略了

    安装canal-server服务端

    下载 canal-sever 镜像

    docker pull canal/canal-server:latest

    启动方式一,直接使用 docker run 在命令行运行

    说明 :

    mysql 和canal-server必须可以通信,由于我的mysql和canal-server在同一台机器上,但是mysql并没有默认的网络,这里使用的是自定义网络wwwdata_frontend,所以也需要把canal-server加入到这个网络中

    docker run -d –name canal-server –network wwwdata_frontend -p 11111:11111 canal/canal-server:latest

    启动方式二,使用 docker-compose 启动,方便多个服务管理

    docker-compose.yml 部分配置如下

    canal-server:

    image: canal/canal-server:latest

    container_name: canal-server

    restart: always

    ports:

    – “9100:9100”

    – “11111:11111”

    – “11110:11110”

    – “11112:11112”

    networks:

    – frontend

    volumes:

    – /wwwdata/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties

    – /wwwdata/canal/conf/canal.properties:/home/admin/canal-server/conf/example/canal.properties

    需要完整的 docker- compose.yml (测试服务器上的服务只包含 nginx , redis , php , mysql , canal )可以私信我哈,有需要帮忙制作 dockerfile docker-compose.yml 文件的,都可以私信我哈

    修改配置文件instance.properties和canal.properties

    我这里由于测试,大部分使用了默认的参数,有以下几个参数需要注意

    instance.properties 文件中

    canal.instance.master.address=mysql80:3306 我这里的 mysql80 是我的 docker 中运行的 mysql 服务名,当然也可以 mysql 对应的 ip ,( 两个服务必须可以 ping )

    这里是在 mysql 中创建的用户名

    canal.instance.dbUsername=canal canal.instance.dbPassword=canal

    canal.properties 文件中

    canal端口 canal.port = 11111 canal.metrics.pull.port = 11112

    canal.destinations = example //我这里用的是默认的

    再重新启动canal-server,查看日志,如下则说明执行成功

    编写客户端来测试一下

    我这里使用 go 客户端来实现

     package main
    
     import  (
    "fmt"
    "github.com/golang/protobuf/proto"
    "github.com/withlin/canal-go/client"
    pbe "github.com/withlin/canal-go/protocol/entry"
    "log"
    "os"
    "time"
    )
    
    func main() {
    //idletimeout 设置为0 表示不限制
    connector := client.NewSimpleCanalConnector("canal-serve地址", 端口号默认11111, "用户名", "密码", "destination默认是example", sotimeOut, idletimeout设置为0表示不退出)
    err := connector.Connect()
    if err != nil {
    log.Println(err)
    os.Exit(1)
    }
    //订阅表,所有表
    err = connector.Subscribe(".*\\..*")
    if err != nil {
    fmt.Println(err)
    return
    }
    
    for {
    
    message, err := connector.Get(100,  nil , nil)
    if err != nil {
    log.Println(err)
    os.Exit(1)
    }
    
     batch Id := message.Id
    if batchId == -1 || len(message.Entries) <= 0 {
    time.Sleep(3000 * time.Millisecond)
    fmt.Println("===没有数据了===")
    continue
    }
    
    printEntry(message.Entries)
    
    }
    
    }
    func printEntry(entrys []pbe.Entry) {
    
    for _, entry := range entrys {
    if entry.GetEntryType() == pbe.EntryType_TRANSACTIONBEGIN || entry.GetEntryType() == pbe.EntryType_TRANSACTIONEND {
    continue
    }
    rowChange := new(pbe.RowChange)
    
    err := proto.Unmarshal(entry.GetStoreValue(), rowChange)
    checkError(err)
    if rowChange != nil {
    eventType := rowChange.GetEventType()
     Header  := entry.GetHeader()
    fmt.Println(fmt.Sprintf("================> binlog[%s : %d],name[%s,%s], eventType: %s", header.GetLogfileName(), header.GetLogfileOffset(), header.Get Schema Name(), header.GetTableName(), header.GetEventType()))
    
    for _, rowData := range rowChange.GetRowDatas() {
    if eventType == pbe.EventType_DELETE {
    printColumn(rowData.GetBeforeColumns())
    } else if eventType == pbe.EventType_INSERT {
    printColumn(rowData.GetAfterColumns())
    } else {
    fmt.Println("-------> before")
    printColumn(rowData.GetBeforeColumns())
    fmt.Println("-------> after")
    printColumn(rowData.GetAfterColumns())
    }
    }
    }
    }
    }
    
    func printColumn(columns []*pbe.Column) {
    for _, col := range columns {
    fmt.Println(fmt.Sprintf("%s : %s  update= %t", col.GetName(), col.GetValue(), col.GetUpdated()))
    }
    }
    
    func checkError(err error) {
    if err != nil {
    fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())
    os.Exit(1)
    }
    }  

    测试

    mysql 添加数据

    //创建表

    CREATE TABLE `canal` (

    `id` int NOT NULL AUTO_INCREMENT,

    `name` varchar(255) DEFAULT NULL,

    PRIMARY KEY (`id`)

    ) ENGINE=InnoDB AUTO_INCREMENT=12 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

    //添加数据

    insert INTO canal (name) VALUE(“abcccc”)

    在客户端可以看到,实时更新了

    本次测试并没有实现 HA ,只有单机实例做的测试,生产中遇到啥坑,后面再填吧

    相关文章