producer.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package rabbitmq
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "os"
  7. "time"
  8. "github.com/streadway/amqp"
  9. )
  10. var p *Producer
  11. type Producer struct {
  12. ctx context.Context
  13. msgIDPrefix string
  14. exchange *Exchange
  15. ch *amqp.Channel
  16. conn *Conn
  17. logger Logger
  18. queue *Queue
  19. }
  20. func Publish(data string) error {
  21. conn, err := New(context.Background(), Broker{
  22. Ssl: false, // bool
  23. Username: os.Getenv("RABBITMQ_USER"), // string
  24. Password: os.Getenv("RABBITMQ_PASSWORD"), // string
  25. Server: os.Getenv("RABBITMQ_HOST"), // string
  26. Port: os.Getenv("RABBITMQ_PORT"), // string
  27. Vhost: os.Getenv("RABBITMQ_VHOST"), // string
  28. TSL: nil, // *tls.Config
  29. ProxyAddr: "", // string
  30. Beatime: 15 * time.Second, // time.Duration
  31. })
  32. if err != nil {
  33. panic(err)
  34. }
  35. var exchange *Exchange
  36. var queue *Queue
  37. if os.Getenv("RABBITMQ_PDF_BUILDER_QUEUE") != "" {
  38. queue = &Queue{
  39. Name: os.Getenv("RABBITMQ_PDF_BUILDER_QUEUE"),
  40. Durable: true,
  41. AutoDelete: true,
  42. }
  43. }
  44. if exchange != nil || queue != nil {
  45. _, err = conn.Producer(exchange, queue)
  46. if err != nil {
  47. panic(err)
  48. }
  49. }
  50. if p == nil {
  51. return errors.New("nil producer")
  52. }
  53. // 关闭连接
  54. defer conn.conn.Close()
  55. return p.Publish(amqp.Publishing{
  56. ContentType: "text/plain",
  57. DeliveryMode: amqp.Persistent,
  58. Body: []byte(data),
  59. })
  60. }
  61. func PublishExchange(data string) error {
  62. if p == nil {
  63. return errors.New("nil producer")
  64. }
  65. return p.PublishExchange(amqp.Publishing{
  66. ContentType: "text/plain",
  67. DeliveryMode: amqp.Persistent,
  68. Body: []byte(data),
  69. })
  70. }
  71. func (p *Producer) Conn() *Conn {
  72. return p.conn
  73. }
  74. func (p *Producer) Publish(msg amqp.Publishing) error {
  75. msg.MessageId = fmt.Sprintf("%s:%d", p.msgIDPrefix, p.conn.MsgId())
  76. return p.ch.Publish(
  77. "",
  78. p.queue.Name, // routing key
  79. false, // mandatory
  80. false, // immediate
  81. msg,
  82. )
  83. }
  84. func (p *Producer) PublishExchange(msg amqp.Publishing) error {
  85. msg.MessageId = fmt.Sprintf("%s:%d", p.msgIDPrefix, p.conn.MsgId())
  86. return p.ch.Publish(
  87. p.exchange.Name, // exchange
  88. p.exchange.Key, // routing key
  89. false, // mandatory
  90. false, // immediate
  91. msg,
  92. )
  93. }