Listening to rabbitmq events
This commit is contained in:
		
							
								
								
									
										99
									
								
								listener-service/event/consumer.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										99
									
								
								listener-service/event/consumer.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,99 @@
 | 
				
			|||||||
 | 
					package event
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"encoding/json"
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"log"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						amqp "github.com/rabbitmq/amqp091-go"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type Consumer struct {
 | 
				
			||||||
 | 
						conn      *amqp.Connection
 | 
				
			||||||
 | 
						queueName string
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func NewConsumer(conn *amqp.Connection) (Consumer, error) {
 | 
				
			||||||
 | 
						consumer := Consumer{
 | 
				
			||||||
 | 
							conn: conn,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						err := consumer.setup()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return Consumer{}, err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return consumer, err
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (consumer *Consumer) setup() error {
 | 
				
			||||||
 | 
						channel, err := consumer.conn.Channel()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return declaireExchange(channel)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type Payload struct {
 | 
				
			||||||
 | 
						Name string `json:"name"`
 | 
				
			||||||
 | 
						Data string `json:"data"`
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (consumer *Consumer) Listen(topics []string) error {
 | 
				
			||||||
 | 
						ch, err := consumer.conn.Channel()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer ch.Close()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						q, err := declaireRandomQueue(ch)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for _, s := range topics {
 | 
				
			||||||
 | 
							ch.QueueBind(q.Name, s, "logs_topic", false, nil)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						messages, err := ch.Consume(q.Name, "", true, false, false, false, nil)
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							return err
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						forever := make(chan bool)
 | 
				
			||||||
 | 
						go func() {
 | 
				
			||||||
 | 
							for d := range messages {
 | 
				
			||||||
 | 
								var payload Payload
 | 
				
			||||||
 | 
								json.Unmarshal(d.Body, &payload)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
								go handlePayload(payload)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						fmt.Printf("Waiting for message [Exchange, Queue] [logs_topic, %s]\n", q.Name)
 | 
				
			||||||
 | 
						<-forever
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func handlePayload(payload Payload) {
 | 
				
			||||||
 | 
						switch payload.Name {
 | 
				
			||||||
 | 
						case "log", "event":
 | 
				
			||||||
 | 
							// log whatever we get
 | 
				
			||||||
 | 
							err := logEvent(payload)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								log.Println(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						case "auth":
 | 
				
			||||||
 | 
							// authenticate
 | 
				
			||||||
 | 
							// You can have as many cases as you want, as long as you write the logic.
 | 
				
			||||||
 | 
						default:
 | 
				
			||||||
 | 
							err := logEvent(payload)
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								log.Println(err)
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func logEvent(entry Payload) error {
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										28
									
								
								listener-service/event/event.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								listener-service/event/event.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,28 @@
 | 
				
			|||||||
 | 
					package event
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						amqp "github.com/rabbitmq/amqp091-go"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func declaireExchange(ch *amqp.Channel) error {
 | 
				
			||||||
 | 
						return ch.ExchangeDeclare(
 | 
				
			||||||
 | 
							"logs_topic", // name
 | 
				
			||||||
 | 
							"topic",      // type
 | 
				
			||||||
 | 
							true,         // durable?
 | 
				
			||||||
 | 
							false,        // auto-deleted?
 | 
				
			||||||
 | 
							false,        // internal?
 | 
				
			||||||
 | 
							false,        // no-wait?
 | 
				
			||||||
 | 
							nil,          // arguments?
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func declaireRandomQueue(ch *amqp.Channel) (amqp.Queue, error) {
 | 
				
			||||||
 | 
						return ch.QueueDeclare(
 | 
				
			||||||
 | 
							"",    // name
 | 
				
			||||||
 | 
							false, // durable
 | 
				
			||||||
 | 
							false, // delete when unused
 | 
				
			||||||
 | 
							true,  // exclusive
 | 
				
			||||||
 | 
							false, // nowait
 | 
				
			||||||
 | 
							nil,   // table
 | 
				
			||||||
 | 
						)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
							
								
								
									
										5
									
								
								listener-service/go.mod
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										5
									
								
								listener-service/go.mod
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,5 @@
 | 
				
			|||||||
 | 
					module listener
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					go 1.22.5
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					require github.com/rabbitmq/amqp091-go v1.10.0
 | 
				
			||||||
							
								
								
									
										4
									
								
								listener-service/go.sum
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										4
									
								
								listener-service/go.sum
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,4 @@
 | 
				
			|||||||
 | 
					github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
 | 
				
			||||||
 | 
					github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
 | 
				
			||||||
 | 
					go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
 | 
				
			||||||
 | 
					go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
 | 
				
			||||||
							
								
								
									
										56
									
								
								listener-service/main.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										56
									
								
								listener-service/main.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,56 @@
 | 
				
			|||||||
 | 
					package main
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import (
 | 
				
			||||||
 | 
						"fmt"
 | 
				
			||||||
 | 
						"log"
 | 
				
			||||||
 | 
						"math"
 | 
				
			||||||
 | 
						"os"
 | 
				
			||||||
 | 
						"time"
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						amqp "github.com/rabbitmq/amqp091-go"
 | 
				
			||||||
 | 
					)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func main() {
 | 
				
			||||||
 | 
						// try to connect to rabbitmq
 | 
				
			||||||
 | 
						rabbitConn, err := connect()
 | 
				
			||||||
 | 
						if err != nil {
 | 
				
			||||||
 | 
							log.Println(err)
 | 
				
			||||||
 | 
							os.Exit(1)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						defer rabbitConn.Close()
 | 
				
			||||||
 | 
						log.Println("Connected to RabbitMQ")
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// start listening for messages
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// create consumer
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// watch the queue and consume events
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func connect() (*amqp.Connection, error) {
 | 
				
			||||||
 | 
						var counts int64
 | 
				
			||||||
 | 
						var connection *amqp.Connection
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// don't continue until rabbit is ready
 | 
				
			||||||
 | 
						for {
 | 
				
			||||||
 | 
							c, err := amqp.Dial("amqp://guest:guest@localhost") // XXX: credentials
 | 
				
			||||||
 | 
							if err != nil {
 | 
				
			||||||
 | 
								fmt.Println("RabbitMQ not yet ready...")
 | 
				
			||||||
 | 
								counts++
 | 
				
			||||||
 | 
							} else {
 | 
				
			||||||
 | 
								connection = c
 | 
				
			||||||
 | 
								break
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							if counts > 5 {
 | 
				
			||||||
 | 
								fmt.Println(err)
 | 
				
			||||||
 | 
								return nil, err
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							backOff := time.Duration(math.Pow(float64(counts), 2) * float64(time.Second))
 | 
				
			||||||
 | 
							log.Println("backing off...")
 | 
				
			||||||
 | 
							time.Sleep(backOff)
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						return connection, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
		Reference in New Issue
	
	Block a user