package rabbitmq import ( "context" "crypto/tls" "net" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/streadway/amqp" ) type Broker struct { Ssl bool Username string Password string Server string Port string Vhost string TSL *tls.Config ProxyAddr string Beatime time.Duration } type Conn struct { ctx context.Context msgID uint64 broker Broker logger Logger l sync.Mutex conn *amqp.Connection } type Exchange struct { Key string // 生产者routingKey, 消费者bindingKey Name string // 交换机名称 Kind string // fanout(广播) direct(直接交换)比fanout多加了一层密码限制(routingKey) topic(主题) headers(首部) Durable bool // 是否持久化 建议true, RabbitMQ关闭后,没有持久化的Exchange将被清除 AutoDelete bool // 是否自动删除 建议false, 如果没有与之绑定的Queue,直接删除 Internal bool // 是否内置的 建议false,如果为true,只能通过Exchange到Exchange Delay bool // 是否是延迟队列 NoWait bool // 是否非阻塞 建议false, true表示阻塞,创建交换器的请求发送后,阻塞等待RMQ Server返回信息。false非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用) Args amqp.Table // 其他参数, 死信就是通过该参数设置 } type Queue struct { Name string Durable bool // 是否持久化 建议true, RabbitMQ关闭后,没有持久化的Exchange将被清除 AutoDelete bool // 是否自动删除 建议false, 如果没有与之绑定的Queue,直接删除 Exclusive bool // 是否排外的 建议false, 当连接关闭时connection.close()该队列是否会自动删除。 该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)。 应用于一个队列只能有一个消费者来消费的场景。 NoWait bool // 是否非阻塞 建议false, true表示阻塞,创建交换器的请求发送后,阻塞等待RMQ Server返回信息。false非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用) Args amqp.Table // 其他参数, 延迟就是通过该参数设置 PrefetchCount int // 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该Consumer将block掉,直到有消息ack PrefetchSize int // prefetchSize:最多传输的内容的大小的限制,0 为不限制,但据说prefetchSize参数,rabbitmq没有实现 Global bool // 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是Consumer级别) AutoAck bool // 自动确认 } func (broker Broker) DSN() string { protocol := "amqp" if broker.Ssl { protocol = "amqps" } builder := strings.Builder{} builder.WriteString(protocol) builder.WriteString("://") if broker.Username != `` { builder.WriteString(broker.Username) builder.WriteString(":") builder.WriteString(broker.Password) builder.WriteString("@") } builder.WriteString(broker.Server) if broker.Port != `` { builder.WriteString(":") builder.WriteString(broker.Port) } builder.WriteString("/") if broker.Vhost != `` { builder.WriteString(broker.Vhost) } return builder.String() } func New(ctx context.Context, broker Broker) (*Conn, error) { c := &Conn{ ctx: ctx, broker: broker, } //尝试连接 if err := c.connect(); err != nil { return nil, err } return c, nil } // Producer 生成一个生产者 func (c *Conn) Producer(exchange *Exchange, queue *Queue) (*Producer, error) { p = &Producer{ ctx: c.ctx, msgIDPrefix: strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10), exchange: exchange, conn: c, logger: c.logger, queue: queue, } ch, err := c.channel(exchange, queue) if err != nil { return nil, err } p.ch = ch return p, nil } func (c *Conn) MsgId() uint64 { return atomic.AddUint64(&c.msgID, 1) } // 初始化channel func (c *Conn) channel(exchange *Exchange, queue *Queue) (*amqp.Channel, error) { ch, err := c.conn.Channel() if err != nil { return nil, err } if exchange != nil { err := ch.ExchangeDeclarePassive(exchange.Name, exchange.Kind, exchange.Durable, exchange.AutoDelete, exchange.Internal, exchange.NoWait, nil) if err != nil { return nil, err } } if queue != nil { _, err = ch.QueueDeclarePassive( queue.Name, queue.Durable, // durable queue.AutoDelete, // auto_delete queue.Exclusive, // exclusive queue.NoWait, // no_wait nil, ) } if err != nil { return nil, err } return ch, nil } func (c *Conn) connect() error { c.l.Lock() if c.conn != nil && !c.conn.IsClosed() { c.l.Unlock() return nil } var err error if c.broker.TSL != nil { c.conn, err = amqp.DialTLS(c.broker.DSN(), c.broker.TSL) } else if c.broker.ProxyAddr != `` { c.conn, err = amqp.DialConfig(c.broker.DSN(), amqp.Config{ Dial: func(network, addr string) (net.Conn, error) { return net.Dial("tcp", c.broker.ProxyAddr) }, }) } else { c.conn, err = amqp.Dial(c.broker.DSN()) } if err == nil { c.conn.Config.Heartbeat = c.broker.Beatime c.conn.Config.Locale = "en_US" } c.l.Unlock() return err }