package rabbitmq import ( "context" "errors" "fmt" "os" "time" "github.com/streadway/amqp" ) var p *Producer type Producer struct { ctx context.Context msgIDPrefix string exchange *Exchange ch *amqp.Channel conn *Conn logger Logger queue *Queue } func Publish(data string) error { conn, err := New(context.Background(), Broker{ Ssl: false, // bool Username: os.Getenv("RABBITMQ_USER"), // string Password: os.Getenv("RABBITMQ_PASSWORD"), // string Server: os.Getenv("RABBITMQ_HOST"), // string Port: os.Getenv("RABBITMQ_PORT"), // string Vhost: os.Getenv("RABBITMQ_VHOST"), // string TSL: nil, // *tls.Config ProxyAddr: "", // string Beatime: 15 * time.Second, // time.Duration }) if err != nil { panic(err) } var exchange *Exchange var queue *Queue if os.Getenv("RABBITMQ_PDF_BUILDER_QUEUE") != "" { queue = &Queue{ Name: os.Getenv("RABBITMQ_PDF_BUILDER_QUEUE"), Durable: true, AutoDelete: true, } } if exchange != nil || queue != nil { _, err = conn.Producer(exchange, queue) if err != nil { panic(err) } } if p == nil { return errors.New("nil producer") } // 关闭连接 defer conn.conn.Close() return p.Publish(amqp.Publishing{ ContentType: "text/plain", DeliveryMode: amqp.Persistent, Body: []byte(data), }) } func PublishExchange(data string) error { if p == nil { return errors.New("nil producer") } return p.PublishExchange(amqp.Publishing{ ContentType: "text/plain", DeliveryMode: amqp.Persistent, Body: []byte(data), }) } func (p *Producer) Conn() *Conn { return p.conn } func (p *Producer) Publish(msg amqp.Publishing) error { msg.MessageId = fmt.Sprintf("%s:%d", p.msgIDPrefix, p.conn.MsgId()) return p.ch.Publish( "", p.queue.Name, // routing key false, // mandatory false, // immediate msg, ) } func (p *Producer) PublishExchange(msg amqp.Publishing) error { msg.MessageId = fmt.Sprintf("%s:%d", p.msgIDPrefix, p.conn.MsgId()) return p.ch.Publish( p.exchange.Name, // exchange p.exchange.Key, // routing key false, // mandatory false, // immediate msg, ) }