index.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. package rabbitmq
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "time"
  7. jsoniter "github.com/json-iterator/go"
  8. "github.com/streadway/amqp"
  9. )
  10. type WebhookMessage struct {
  11. Body any `json:"body"`
  12. Config struct {
  13. Url string `json:"url"`
  14. } `json:"config"`
  15. }
  16. var json = jsoniter.ConfigCompatibleWithStandardLibrary
  17. func Webhook(webhookMessage WebhookMessage) error {
  18. conn, err := Init()
  19. if err != nil {
  20. return err
  21. }
  22. p, err := conn.Producer(&Exchange{
  23. Name: os.Getenv("RABBITMQ_WEBHOOK_EXCHANGE"),
  24. Kind: amqp.ExchangeDirect,
  25. Key: os.Getenv("RABBITMQ_WEBHOOK_ROUTING"),
  26. }, nil)
  27. if err != nil {
  28. return err
  29. }
  30. message, err := json.MarshalToString(webhookMessage)
  31. if err != nil {
  32. return err
  33. }
  34. fmt.Println("message", message)
  35. err = p.PublishExchange(amqp.Publishing{
  36. ContentType: "text/plain",
  37. DeliveryMode: amqp.Persistent,
  38. Body: []byte(message),
  39. })
  40. if err != nil {
  41. return err
  42. }
  43. // 关闭连接
  44. defer conn.conn.Close()
  45. return nil
  46. }
  47. func Init() (*Conn, error) {
  48. var broker2 = Broker{
  49. Ssl: false, // bool
  50. Username: os.Getenv("RABBITMQ_USER"), // string
  51. Password: os.Getenv("RABBITMQ_PASSWORD"), // string
  52. Server: os.Getenv("RABBITMQ_HOST"), // string
  53. Port: os.Getenv("RABBITMQ_PORT"), // string
  54. Vhost: os.Getenv("RABBITMQ_VHOST"), // string
  55. TSL: nil, // *tls.Config
  56. ProxyAddr: "", // string
  57. Beatime: 15 * time.Second, // time.Duration
  58. }
  59. conn, err := New(context.Background(), broker2)
  60. if err != nil {
  61. return nil, err
  62. }
  63. return conn, nil
  64. }