七叶笔记 » golang编程 » golang实现rabbitmq消息队列消费失败尝试重试

golang实现rabbitmq消息队列消费失败尝试重试

一:消息重试机制

如是有了如下思路:

消息进入队列前, header 默认有参数 retry_num=0 表示尝试次数;

消费者在消费时候的, 如果消息失败,就把消息插入另外一个队列(队列abc);该队列abc 绑定一个死信队列(原始消费的队列),这样形成一个回路

当消息失败后,消息就进入队列abc,队列abc拥有ttl过期时间,ttl过期时间到了后,该消息进入死信队列(死信队列刚好是刚开始我们消费的队列);

这样消息就又回到原始消费队列尾部了;

最后可以通过队列消息头部的header参数retry_num 可以控制消息消费多少次后,直接插入db日志;

db日志可以记录 交换机 路由,queuename,这样,可以做一个后台管理,可以手动一次把消息重新放入队列,进行消息(因为有时间消费队列里面可能在请求其它服务,其它服务也可能会挂掉)

这时候消息无论你消费多少次都没有用,但是入库db后,可以一键重回队列消息(当我们知道服务已经正常后)

图解:

附上代码

git clone rabbitMQ

send.go 消费者

 package main

 import  (
    "fmt"
    _ "fmt"
    "#34;
)

func main() {


    for i := 0;i<20;i++{
        body := fmt.Sprintf("{\"order_id\":%d}",i)
        fmt.Println(body)

        /**
            使用默认的交换机
            如果是默认交换机
            type QueueExchange  struct  {
            QuName  string           // 队列名称
            RtKey   string           // key值
            ExName  string           // 交换机名称
            ExType  string           // 交换机类型
             dns      string              //链接地址
            }
            如果你喜欢使用默认交换机
             Rt Key  此处建议填写成 RtKey 和 QuName 一样的值
         */
        queueExchange := rabbitmq.QueueExchange{
            "a_test_0001",
            "a_test_0001",
            "hello_go",
            " direct ",
            "amqp://guest:guest@192.168.1.169:5672/",
        }

        _ =  rabbitmq .Send(queueExchange,body)

    }
}  

recv .go消费者

 package main

import (
    "fmt"
    "#34;
    "time"
)

type RecvPro struct {

}

//// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db
/*
返回值 error 为nil  则表示该消息消费成功
否则消息会进入ttl延时队列  重复尝试消费3次
3次后消息如果还是失败 消息就执行失败  进入告警 FailAction
 */func (t *RecvPro) Consumer(data byte  []byte) error {
    time.Sleep(time.Second*1)
    //return errors.New("顶顶顶顶")
    fmt.Println(string(dataByte))
    //time.Sleep(1*time.Second)
    //return errors.New("顶顶顶顶")
    return nil
}

//消息已经消费3次 失败了 请进行处理
/*
如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等
 */func (t *RecvPro) FailAction(err error,dataByte []byte) error {
    fmt.Println(string(dataByte))
    fmt.Println(err)
    fmt.Println("任务处理失败了,我要进入db日志库了")
    fmt.Println("任务处理失败了,发送 钉钉 消息通知主人")
    return nil
}



func main() {
    processTask := &RecvPro{}

    /*
        runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了
        maxTryConnTimeFromMinute:表示最大尝试时间  分钟
     */    err := rabbitmq.Recv(rabbitmq.QueueExchange{
        "a_test_0001",
        "a_test_0001",
        "hello_go",
        "direct",
        "amqp://guest:guest@192.168.1.169:5672/",
    },
    processTask,4,2)
    if(err != nil){
        fmt.Println(err)
    }

}  

recv.go消费者

 package main

import (
    "fmt"
    "#34;
    "time"
)

type RecvPro struct {

}

//// 实现消费者 消费消息失败 自动进入延时尝试  尝试3次之后入库db
/*
返回值 error 为nil  则表示该消息消费成功
否则消息会进入ttl延时队列  重复尝试消费3次
3次后消息如果还是失败 消息就执行失败  进入告警 FailAction
 */func (t *RecvPro) Consumer(dataByte []byte) error {
    time.Sleep(time.Second*1)
    //return errors.New("顶顶顶顶")
    fmt.Println(string(dataByte))
    //time.Sleep(1*time.Second)
    //return errors.New("顶顶顶顶")
    return nil
}

//消息已经消费3次 失败了 请进行处理
/*
如果消息 消费3次后 仍然失败  此处可以根据情况 对消息进行告警提醒 或者 补偿  入库db  钉钉告警等等
 */func (t *RecvPro) FailAction(err error,dataByte []byte) error {
    fmt.Println(string(dataByte))
    fmt.Println(err)
    fmt.Println("任务处理失败了,我要进入db日志库了")
    fmt.Println("任务处理失败了,发送钉钉消息通知主人")
    return nil
}



func main() {
    processTask := &RecvPro{}

    /*
        runNums: 表示任务并发处理数量  一般建议 普通任务1-3    就可以了
        maxTryConnTimeFromMinute:表示最大尝试时间  分钟
     */    err := rabbitmq.Recv(rabbitmq.QueueExchange{
        "a_test_0001",
        "a_test_0001",
        "hello_go",
        "direct",
        "amqp://guest:guest@192.168.1.169:5672/",
    },
    processTask,4,2)
    if(err != nil){
        fmt.Println(err)
    }

}  

utils/rabbitmq包

 package rabbitmq

import (
    "errors"
    " strconv "
    "time"

    //"errors"
    "fmt"
    "github.com/streadway/amqp"
    "log"
)


// 定义 全局变量 ,指针类型
var  mq Conn *amqp.Connection
var mqChan *amqp.Channel

// 定义生产者接口
type Producer interface {
    MsgContent() string
}

// 定义生产者接口
type RetryProducer interface {
    MsgContent() string
}

// 定义接收者接口
type Receiver interface {
    Consumer([]byte)    error
    FailAction(error , []byte)  error
}

// 定义RabbitMQ对象
type RabbitMQ struct {
    connection *amqp.Connection
    Channel *amqp.Channel
    dns string
    QueueName   string            // 队列名称
    RoutingKey  string            // key名称
    ExchangeName string           // 交换机名称
    ExchangeType string           // 交换机类型
    producerList []Producer
    retryProducerList []RetryProducer
    receiverList []Receiver
}

// 定义队列交换机对象
type QueueExchange struct {
    QuName  string           // 队列名称
    RtKey   string           // key值
    ExName  string           // 交换机名称
    ExType  string           // 交换机类型
    Dns     string              //链接地址
}



// 链接rabbitMQ
func (r *RabbitMQ)MqConnect() (err error){

    mqConn, err = amqp.Dial(r.dns)
    r.connection = mqConn   // 赋值给RabbitMQ对象

    if err != nil {
        fmt.Printf("rbmq链接失败  :%s \n",  err )
    }

    return
}

// 关闭mq链接
func (r *RabbitMQ)CloseMqConnect() (err error){

    err = r.connection.Close()
    if err != nil{
        fmt.Printf("关闭mq链接失败  :%s \n", err)
    }
    return
}

// 链接rabbitMQ
func (r *RabbitMQ)MqOpenChannel() (err error){
    mqConn := r.connection
    r.Channel, err = mqConn.Channel()
    //defer mqChan.Close()
    if err != nil {
        fmt.Printf("MQ打开管道失败:%s \n", err)
    }
    return err
}

// 链接rabbitMQ
func (r *RabbitMQ)CloseMqChannel() (err error){
    r.Channel.Close()
    if err != nil {
        fmt.Printf("关闭mq链接失败  :%s \n", err)
    }
    return err
}




// 创建一个新的操作对象
func NewMq(q QueueExchange) RabbitMQ {
    return RabbitMQ{
        QueueName:q.QuName,
        RoutingKey:q.RtKey,
        ExchangeName: q.ExName,
        ExchangeType: q.ExType,
        dns:q.Dns,
    }
}

func (mq *RabbitMQ) sendMsg (body string) (err error)  {
    err = mq.MqOpenChannel()
    ch := mq.Channel
    if err != nil{
        log.Printf("Channel err  :%s \n", err)
    }

    defer func() {
        _ = mq.Channel.Close()
    }()
    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false,  nil )
        if err != nil {
            log.Printf("ExchangeDeclare err  :%s \n", err)
        }
    }


    // 用于检查队列是否存在,已经存在不需要重复声明
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
    if err != nil {
        log.Printf("QueueDeclare err :%s \n", err)
    }
    // 绑定任务
    if mq.RoutingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
        if err != nil {
            log.Printf("QueueBind err :%s \n", err)
        }
    }

    if mq.ExchangeName != "" && mq.RoutingKey != ""{
        err = mq.Channel.Publish(
            mq.ExchangeName,     // exchange
            mq.RoutingKey, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(body),
                DeliveryMode: 2,
            })
    }else{
        err = mq.Channel.Publish(
            "",     // exchange
            mq.QueueName, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(body),
                DeliveryMode: 2,
            })
    }
    return

}


/*
发送延时消息
 */func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){
    err =mq.MqOpenChannel()
    ch := mq.Channel
    if err != nil{
        log.Printf("Channel err  :%s \n", err)
    }
    defer mq.Channel.Close()

    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            return
        }
    }


    if ttl <= 0{
        return errors.New("发送延时消息,ttl参数是必须的")
    }

    table := make(map[string]interface{},3)
    table["x-dead-letter-routing-key"] = mq.RoutingKey
    table["x-dead-letter-exchange"] = mq.ExchangeName
    table["x-message-ttl"] = ttl*1000

    //fmt.Printf("%+v",table)
    //fmt.Printf("%+v",mq)
    // 用于检查队列是否存在,已经存在不需要重复声明
    ttlstring := strconv.FormatInt(ttl,10)
    queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
    routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring)
    _, err = ch.QueueDeclare(queueName, true, false, false, false, table)
    if err != nil {
        return
    }
    // 绑定任务
    if routingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil)
        if err != nil {
            return
        }
    }

    header := make(map[string]interface{},1)

    header["retry_nums"] = 0

    var ttl_exchange string
    var ttl_routkey string

    if(mq.ExchangeName != "" ){
        ttl_exchange = mq.ExchangeName
    }else{
        ttl_exchange = ""
    }


    if mq.RoutingKey != "" && mq.ExchangeName != ""{
        ttl_routkey = routingKey
    }else{
        ttl_routkey = queueName
    }

    err = mq.Channel.Publish(
        ttl_exchange,     // exchange
        ttl_routkey, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            ContentType: "text/plain",
            Body:        []byte(body),
            Headers:header,
        })
    if err != nil {
        return

    }
    return
}


func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string)  {
    err :=mq.MqOpenChannel()
    ch := mq.Channel
    if err != nil{
        log.Printf("Channel err  :%s \n", err)
    }
    defer mq.Channel.Close()

    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            log.Printf("ExchangeDeclare err  :%s \n", err)
        }
    }

    //原始路由key
    oldRoutingKey := args[0]
    //原始交换机名
    oldExchangeName := args[1]

    table := make(map[string]interface{},3)
    table["x-dead-letter-routing-key"] = oldRoutingKey
    if oldExchangeName != "" {
        table["x-dead-letter-exchange"] = oldExchangeName
    }else{
        mq.ExchangeName = ""
        table["x-dead-letter-exchange"] = ""
    }

    table["x-message-ttl"] = int64(20000)

    //fmt.Printf("%+v",table)
    //fmt.Printf("%+v",mq)
    // 用于检查队列是否存在,已经存在不需要重复声明
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table)
    if err != nil {
        log.Printf("QueueDeclare err :%s \n", err)
    }
    // 绑定任务
    if mq.RoutingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
        if err != nil {
            log.Printf("QueueBind err :%s \n", err)
        }
    }

    header := make(map[string]interface{},1)

    header["retry_nums"] = retry_nums + int32(1)

    var ttl_exchange string
    var ttl_routkey string

    if(mq.ExchangeName != "" ){
        ttl_exchange = mq.ExchangeName
    }else{
        ttl_exchange = ""
    }


    if mq.RoutingKey != "" && mq.ExchangeName != ""{
        ttl_routkey = mq.RoutingKey
    }else{
        ttl_routkey = mq.QueueName
    }

    //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey)
    err = mq.Channel.Publish(
        ttl_exchange,     // exchange
        ttl_routkey, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing {
            ContentType: "text/plain",
            Body:        []byte(body),
            Headers:header,
        })
    if err != nil {
        fmt.Printf("MQ任务发送失败:%s \n", err)

    }

}


// 监听接收者接收任务 消费者
func (mq *RabbitMQ) ListenReceiver(receiver Receiver) {
    err :=mq.MqOpenChannel()
    ch := mq.Channel
    if err != nil{
        log.Printf("Channel err  :%s \n", err)
    }
    defer mq.Channel.Close()
    if mq.ExchangeName != "" {
        if mq.ExchangeType == ""{
            mq.ExchangeType = "direct"
        }
        err =  ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil)
        if err != nil {
            log.Printf("ExchangeDeclare err  :%s \n", err)
        }
    }


    // 用于检查队列是否存在,已经存在不需要重复声明
    _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil)
    if err != nil {
        log.Printf("QueueDeclare err :%s \n", err)
    }
    // 绑定任务
    if mq.RoutingKey != "" && mq.ExchangeName != "" {
        err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil)
        if err != nil {
            log.Printf("QueueBind err :%s \n", err)
        }
    }
    // 获取消费通道,确保rabbitMQ一个一个发送消息
    err =  ch.Qos(1, 0, false)
    msgList, err :=  ch.Consume(mq.QueueName, "", false, false, false, false, nil)
    if err != nil {
        log.Printf("Consume err :%s \n", err)
    }
    for msg := range msgList {
        retry_nums,ok := msg.Headers["retry_nums"].(int32)
        if(!ok){
            retry_nums = int32(0)
        }
        // 处理数据
        err := receiver.Consumer(msg.Body)
        if err!=nil {
            //消息处理失败 进入延时尝试机制
            if retry_nums < 3{
                fmt.Println(string(msg.Body))
                fmt.Printf("消息处理失败 消息开始进入尝试  ttl延时队列 \n")
                retry_msg(msg.Body,retry_nums,QueueExchange{
                        mq.QueueName,
                        mq.RoutingKey,
                        mq.ExchangeName,
                        mq.ExchangeType,
                        mq.dns,
                    })
            }else{
                //消息失败 入库db
                fmt.Printf("消息处理3次后还是失败了 入库db 钉钉告警 \n")
                receiver.FailAction(err,msg.Body)
            }
            err = msg.Ack(true)
            if err != nil {
                fmt.Printf("确认消息未完成异常:%s \n", err)
            }
        }else {
            // 确认消息,必须为false
            err = msg.Ack(true)

            if err != nil {
                fmt.Printf("消息消费ack失败 err :%s \n", err)
            }
        }

    }
}

//消息处理失败之后 延时尝试
func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){
    //原始队列名称 交换机名称
    oldQName := queueExchange.QuName
    oldExchangeName := queueExchange.ExName
    oldRoutingKey := queueExchange.RtKey
    if oldRoutingKey == "" || oldExchangeName == ""{
        oldRoutingKey = oldQName
    }

    if queueExchange.QuName != "" {
        queueExchange.QuName = queueExchange.QuName + "_retry_3";
    }

    if queueExchange.RtKey != "" {
        queueExchange.RtKey = queueExchange.RtKey + "_retry_3";
    }else{
        queueExchange.RtKey = queueExchange.QuName + "_retry_3";
    }

//fmt.Printf("%+v",queueExchange)

    mq := NewMq(queueExchange)
    _ = mq.MqConnect()

    defer func(){
        _ = mq.CloseMqConnect()
    }()
    //fmt.Printf("%+v",queueExchange)
    mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName)


}


func Send(queueExchange QueueExchange,msg string) (err error){
    mq := NewMq(queueExchange)
    err = mq.MqConnect()
    if err != nil{
        return
    }

    defer func(){
        _ = mq.CloseMqConnect()
    }()

    err = mq.sendMsg(msg)

    return
}

//发送延时消息
func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){
    mq := NewMq(queueExchange)
    err = mq.MqConnect()
    if err != nil{
        return
    }
    defer func(){
        _ = mq.CloseMqConnect()
    }()
    err = mq.sendDelayMsg(msg,ttl)
    return
}


/*
runNums  开启并发执行任务数量
 */func Recv(queueExchange QueueExchange,receiver Receiver,otherParams ...int) (err error){
    var (
        exitTask bool
        maxTryConnNums int  //rbmq链接失败后多久尝试一次
        runNums int
        maxTryConnTimeFromMinute int
    )

    if(len(otherParams) <= 0){
        runNums = 1
        maxTryConnTimeFromMinute = 0
    }else if(len(otherParams) == 1){
        runNums = otherParams[0]
        maxTryConnTimeFromMinute = 0
    }else if(len(otherParams) == 2){
        runNums = otherParams[0]
        maxTryConnTimeFromMinute = otherParams[1]
    }


    //maxTryConnNums := 360 //rbmq链接失败后最大尝试次数
    //maxTryConnTime := time.Duration(10) //rbmq链接失败后多久尝试一次
    maxTryConnNums = maxTryConnTimeFromMinute * 10 * maxTryConnTimeFromMinute//rbmq链接失败后最大尝试次数
    maxTryConnTime := time.Duration(6) //rbmq链接失败后多久尝试一次
    mq := NewMq(queueExchange)
    //链接rabbitMQ
    err = mq.MqConnect()
    if(err != nil){
        return
    }

    defer func() {
        if panicErr := recover(); panicErr != nil{
            fmt.Println(recover())
            err = errors.New(fmt.Sprintf("%s",panicErr))
        }
    }()

    //rbmq断开链接后 协程退出释放信号
    taskQuit:= make(chan struct{}, 1)
    //尝试链接rbmq
    tryToLinkC := make(chan struct{}, 1)

    //最大尝试次数
    tryToLinkMaxNums := make(chan struct{}, 1)

    maxTryNums := 0 //尝试重启次数

    //开始执行任务
    for i:=1;i<=runNums;i++{
        go Recv2(mq,receiver,taskQuit);
    }

    //如果rbmq断开连接后 尝试重新建立链接
    var tryToLink = func() {
        for {
            maxTryNums += 1
            err = mq.MqConnect()
            if(err == nil){
                tryToLinkC <- struct{}{}
                break
            }
            if(maxTryNums > maxTryConnNums){
                tryToLinkMaxNums <- struct{}{}
                break
            }
            //如果链接断开了 10秒重新尝试链接一次
            time.Sleep(time.Second * maxTryConnTime)
        }
        return
    }
    scheduleTimer := time.NewTimer(time.Millisecond*300)
    exitTask = true
    for{
        select {
        case <-tryToLinkC: //建立链接成功后 重新开启协程执行任务
            fmt.Println("重新开启新的协程执行任务")
            go Recv2(mq,receiver,taskQuit);
        case <-tryToLinkMaxNums://rbmq超出最大链接次数 退出任务
            fmt.Println("rbmq链接超过最大尝试次数!")
            exitTask = false
            err = errors.New("rbmq链接超过最大尝试次数!")
        case <- taskQuit ://rbmq断开连接后 开始尝试重新建立链接
            fmt.Println("rbmq断开连接后 开始尝试重新建立链接")
             go tryToLink()
        case <- scheduleTimer.C:
            //fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~")
        }
        // 重置调度间隔
        scheduleTimer.Reset(time.Millisecond*300)
        if !exitTask{
            break
        }
    }
    fmt.Println("exit")
    return
}


func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){
        defer func() {
            fmt.Println("rbmq链接失败,协程任务退出~~~~~~~~~~~~~~~~~~~~")
            taskQuit <- struct{}{}
            return
        }()
        // 验证链接是否正常
        err := mq.MqOpenChannel()
        if(err != nil){
            return
        }
        mq.ListenReceiver(receiver)
}


type retryPro struct {
    msgContent   string
}  

二,延时队列

场景一:物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时。

场景二:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单。

最近的一个项目遇到了这种情况,如果运单30分钟还没有被接单,则状态自动变为已取消。实现延迟消息原理如下,借用一张图:

实现方案

  1. 定时任务轮询数据库,看是否有产生新任务,如果产生则消费任务
  2. pcntl_alarm为进程设置一个闹钟信号
  3. swoole的异步高精度定时器:swoole_time_tick(类似javascript的setInterval)和swoole_time_after(相当于javascript的setTimeout)
  4. rabbitmq延迟任务

以上四种方案,如果生产环境有使用到swoole建议使用第三种方案。此篇文章重点讲述第四种方案实现

生产者:

 1 <?php
 2 require_once __DIR__ . '/../vendor/autoload.php';
 3 use PhpAmqpLib\Connection\AMQPStreamConnection;
 4 use PhpAmqpLib\Message\AMQPMessage;
 5 
 6 
 7 $queue = "test_ack_queue";
 8 $exchange = "test_ack_queue";
 9 //获取连接
10 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
11 //从连接中创建通道
12 $channel = $connection->channel();
13 
14 $channel->exchange_declare('delay_exchange', 'direct',false,true,false);
15 $channel->exchange_declare('cache_exchange', 'direct',false,true,false);
16 
17 $tale = new \PhpAmqpLib\Wire\AMQPTable();
18 $tale->set('x-dead-letter-exchange', 'delay_exchange');
19 $tale->set('x-dead-letter-routing-key','delay_exchange');
20 //$tale->set('x-message-ttl',10000);
21 
22 $channel->queue_declare('cache_queue',false,true,false,false,false,$tale);
23 $channel->queue_bind('cache_queue', 'cache_exchange','cache_exchange');
24 
25 $channel->queue_declare('delay_queue',false,true,false,false,false);
26 $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
27 
28 
29 $msg = new AMQPMessage('Hello World',array(
30     'expiration' => 10000,
31     'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
32 
33 ));
34 
35 $channel->basic_publish($msg,'cache_exchange','cache_exchange');
36 echo date('Y-m-d H:i:s')." [x] Sent 'Hello World!' ".PHP_EOL;
37 
38 
39 
40 
41 //while ($wait) {
42 //    $channel->wait();
43 //}
44 
45 $channel->close();
46 $connection->close();

task  

消费者:

 1 <?php
 2 require_once __DIR__ . '/../vendor/autoload.php';
 3 use PhpAmqpLib\Connection\AMQPStreamConnection;
 4 use PhpAmqpLib\Message\AMQPMessage;
 5 
 6 
 7 //获取连接
 8 $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
 9 //从连接中创建通道
10 $channel = $connection->channel();
11 
12 
13 //$channel->queue_declare($queue, false, true, false, false);
14 //$channel->exchange_declare($exchange, 'topic', false, true, false);
15 //$channel->queue_bind($queue, $exchange);
16 
17 
18 
19 $channel->exchange_declare('delay_exchange', 'direct',false,false,false);
20 $channel->queue_declare('delay_queue',false,true,false,false,false);
21 $channel->queue_bind('delay_queue', 'delay_exchange','delay_exchange');
22 
23 
24 
25 function process_message(AMQPMessage $message)
26 {
27     $headers = $message->get('application_headers');
28     $nativeData = $headers->getNativeData();
29 //    var_dump($nativeData['x-delay']);
30     echo date('Y-m-d H:i:s')." [x] Received",$message->body,PHP_EOL;
31     $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
32 
33 }
34 
35 
36 $channel->basic_qos(null, 1, null);
37 $channel->basic_consume('delay_queue', '', false, false, false, false, 'process_message');
38 
39 function shutdown($channel, $connection)
40 {
41     $channel->close();
42     $connection->close();
43 }
44 register_shutdown_function('shutdown', $channel, $connection);
45 
46 while (count($channel->callbacks)) {
47     $channel->wait();
48 }
work  

延时队列实现和上面所讲的消息重试有异曲同工之处,都是利用了延时时间和死信队列这一特性实现

最新源码仓库地址:

其它:该rabbitmq包实现中包含了,rabbitmq断线重连,有兴趣的同学可以看看

  (重试和重连接是两个概念)

  重连接 :rabbitmq链接失败导致任务失败,此时要等待rabbitmq服务器恢复正常后才能再次启动协程处理任务

  重试:rabbitmq服务正常,消息消费进程也正常,但是消息处理失败。尝试多次消费消息后还是失败就ack消息,在整个重试过程中不会阻塞消费

golang监听rabbitmq消息队列任务断线自动重连接:

相关文章