rabbitmq.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. package rabbitmq
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "net"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. "github.com/streadway/amqp"
  12. )
  13. type Broker struct {
  14. Ssl bool
  15. Username string
  16. Password string
  17. Server string
  18. Port string
  19. Vhost string
  20. TSL *tls.Config
  21. ProxyAddr string
  22. Beatime time.Duration
  23. }
  24. type Conn struct {
  25. ctx context.Context
  26. msgID uint64
  27. broker Broker
  28. logger Logger
  29. l sync.Mutex
  30. conn *amqp.Connection
  31. }
  32. type Exchange struct {
  33. Key string // 生产者routingKey, 消费者bindingKey
  34. Name string // 交换机名称
  35. Kind string // fanout(广播) direct(直接交换)比fanout多加了一层密码限制(routingKey) topic(主题) headers(首部)
  36. Durable bool // 是否持久化 建议true, RabbitMQ关闭后,没有持久化的Exchange将被清除
  37. AutoDelete bool // 是否自动删除 建议false, 如果没有与之绑定的Queue,直接删除
  38. Internal bool // 是否内置的 建议false,如果为true,只能通过Exchange到Exchange
  39. Delay bool // 是否是延迟队列
  40. NoWait bool // 是否非阻塞 建议false, true表示阻塞,创建交换器的请求发送后,阻塞等待RMQ Server返回信息。false非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
  41. Args amqp.Table // 其他参数, 死信就是通过该参数设置
  42. }
  43. type Queue struct {
  44. Name string
  45. Durable bool // 是否持久化 建议true, RabbitMQ关闭后,没有持久化的Exchange将被清除
  46. AutoDelete bool // 是否自动删除 建议false, 如果没有与之绑定的Queue,直接删除
  47. Exclusive bool // 是否排外的 建议false, 当连接关闭时connection.close()该队列是否会自动删除。 该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)。 应用于一个队列只能有一个消费者来消费的场景。
  48. NoWait bool // 是否非阻塞 建议false, true表示阻塞,创建交换器的请求发送后,阻塞等待RMQ Server返回信息。false非阻塞:不会阻塞等待RMQ Server的返回信息,而RMQ Server也不会返回信息。(不推荐使用)
  49. Args amqp.Table // 其他参数, 延迟就是通过该参数设置
  50. PrefetchCount int // 会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一旦有N个消息还没有ack,则该Consumer将block掉,直到有消息ack
  51. PrefetchSize int // prefetchSize:最多传输的内容的大小的限制,0 为不限制,但据说prefetchSize参数,rabbitmq没有实现
  52. Global bool // 是否将上面设置应用于channel,简单点说,就是上面限制是channel级别的还是Consumer级别)
  53. AutoAck bool // 自动确认
  54. }
  55. func (broker Broker) DSN() string {
  56. protocol := "amqp"
  57. if broker.Ssl {
  58. protocol = "amqps"
  59. }
  60. builder := strings.Builder{}
  61. builder.WriteString(protocol)
  62. builder.WriteString("://")
  63. if broker.Username != `` {
  64. builder.WriteString(broker.Username)
  65. builder.WriteString(":")
  66. builder.WriteString(broker.Password)
  67. builder.WriteString("@")
  68. }
  69. builder.WriteString(broker.Server)
  70. if broker.Port != `` {
  71. builder.WriteString(":")
  72. builder.WriteString(broker.Port)
  73. }
  74. builder.WriteString("/")
  75. if broker.Vhost != `` {
  76. builder.WriteString(broker.Vhost)
  77. }
  78. return builder.String()
  79. }
  80. func New(ctx context.Context, broker Broker) (*Conn, error) {
  81. c := &Conn{
  82. ctx: ctx,
  83. broker: broker,
  84. }
  85. //尝试连接
  86. if err := c.connect(); err != nil {
  87. return nil, err
  88. }
  89. return c, nil
  90. }
  91. // Producer 生成一个生产者
  92. func (c *Conn) Producer(exchange *Exchange, queue *Queue) (*Producer, error) {
  93. p = &Producer{
  94. ctx: c.ctx,
  95. msgIDPrefix: strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10),
  96. exchange: exchange,
  97. conn: c,
  98. logger: c.logger,
  99. queue: queue,
  100. }
  101. ch, err := c.channel(exchange, queue)
  102. if err != nil {
  103. return nil, err
  104. }
  105. p.ch = ch
  106. return p, nil
  107. }
  108. func (c *Conn) MsgId() uint64 {
  109. return atomic.AddUint64(&c.msgID, 1)
  110. }
  111. // 初始化channel
  112. func (c *Conn) channel(exchange *Exchange, queue *Queue) (*amqp.Channel, error) {
  113. ch, err := c.conn.Channel()
  114. if err != nil {
  115. return nil, err
  116. }
  117. if exchange != nil {
  118. err := ch.ExchangeDeclarePassive(exchange.Name, exchange.Kind, exchange.Durable, exchange.AutoDelete, exchange.Internal, exchange.NoWait, nil)
  119. if err != nil {
  120. return nil, err
  121. }
  122. }
  123. if queue != nil {
  124. _, err = ch.QueueDeclarePassive(
  125. queue.Name,
  126. queue.Durable, // durable
  127. queue.AutoDelete, // auto_delete
  128. queue.Exclusive, // exclusive
  129. queue.NoWait, // no_wait
  130. nil,
  131. )
  132. }
  133. if err != nil {
  134. return nil, err
  135. }
  136. return ch, nil
  137. }
  138. func (c *Conn) connect() error {
  139. c.l.Lock()
  140. if c.conn != nil && !c.conn.IsClosed() {
  141. c.l.Unlock()
  142. return nil
  143. }
  144. var err error
  145. if c.broker.TSL != nil {
  146. c.conn, err = amqp.DialTLS(c.broker.DSN(), c.broker.TSL)
  147. } else if c.broker.ProxyAddr != `` {
  148. c.conn, err = amqp.DialConfig(c.broker.DSN(), amqp.Config{
  149. Dial: func(network, addr string) (net.Conn, error) {
  150. return net.Dial("tcp", c.broker.ProxyAddr)
  151. },
  152. })
  153. } else {
  154. c.conn, err = amqp.Dial(c.broker.DSN())
  155. }
  156. if err == nil {
  157. c.conn.Config.Heartbeat = c.broker.Beatime
  158. c.conn.Config.Locale = "en_US"
  159. }
  160. c.l.Unlock()
  161. return err
  162. }