123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- package rabbitmq
- import (
- "context"
- "errors"
- "fmt"
- "os"
- "time"
- "github.com/streadway/amqp"
- )
- var p *Producer
- type Producer struct {
- ctx context.Context
- msgIDPrefix string
- exchange *Exchange
- ch *amqp.Channel
- conn *Conn
- logger Logger
- queue *Queue
- }
- func Publish(data string) error {
- conn, err := New(context.Background(), Broker{
- Ssl: false, // bool
- Username: os.Getenv("RABBITMQ_USER"), // string
- Password: os.Getenv("RABBITMQ_PASSWORD"), // string
- Server: os.Getenv("RABBITMQ_HOST"), // string
- Port: os.Getenv("RABBITMQ_PORT"), // string
- Vhost: os.Getenv("RABBITMQ_VHOST"), // string
- TSL: nil, // *tls.Config
- ProxyAddr: "", // string
- Beatime: 15 * time.Second, // time.Duration
- })
- if err != nil {
- panic(err)
- }
- var exchange *Exchange
- var queue *Queue
- if os.Getenv("RABBITMQ_PDF_BUILDER_QUEUE") != "" {
- queue = &Queue{
- Name: os.Getenv("RABBITMQ_PDF_BUILDER_QUEUE"),
- Durable: true,
- AutoDelete: true,
- }
- }
- if exchange != nil || queue != nil {
- _, err = conn.Producer(exchange, queue)
- if err != nil {
- panic(err)
- }
- }
- if p == nil {
- return errors.New("nil producer")
- }
- // 关闭连接
- defer conn.conn.Close()
- return p.Publish(amqp.Publishing{
- ContentType: "text/plain",
- DeliveryMode: amqp.Persistent,
- Body: []byte(data),
- })
- }
- func PublishExchange(data string) error {
- if p == nil {
- return errors.New("nil producer")
- }
- return p.PublishExchange(amqp.Publishing{
- ContentType: "text/plain",
- DeliveryMode: amqp.Persistent,
- Body: []byte(data),
- })
- }
- func (p *Producer) Conn() *Conn {
- return p.conn
- }
- func (p *Producer) Publish(msg amqp.Publishing) error {
- msg.MessageId = fmt.Sprintf("%s:%d", p.msgIDPrefix, p.conn.MsgId())
- return p.ch.Publish(
- "",
- p.queue.Name, // routing key
- false, // mandatory
- false, // immediate
- msg,
- )
- }
- func (p *Producer) PublishExchange(msg amqp.Publishing) error {
- msg.MessageId = fmt.Sprintf("%s:%d", p.msgIDPrefix, p.conn.MsgId())
- return p.ch.Publish(
- p.exchange.Name, // exchange
- p.exchange.Key, // routing key
- false, // mandatory
- false, // immediate
- msg,
- )
- }
|