diff --git a/listener-service/event/consumer.go b/listener-service/event/consumer.go new file mode 100644 index 0000000..951562a --- /dev/null +++ b/listener-service/event/consumer.go @@ -0,0 +1,99 @@ +package event + +import ( + "encoding/json" + "fmt" + "log" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type Consumer struct { + conn *amqp.Connection + queueName string +} + +func NewConsumer(conn *amqp.Connection) (Consumer, error) { + consumer := Consumer{ + conn: conn, + } + + err := consumer.setup() + if err != nil { + return Consumer{}, err + } + return consumer, err +} + +func (consumer *Consumer) setup() error { + channel, err := consumer.conn.Channel() + if err != nil { + return err + } + + return declaireExchange(channel) +} + +type Payload struct { + Name string `json:"name"` + Data string `json:"data"` +} + +func (consumer *Consumer) Listen(topics []string) error { + ch, err := consumer.conn.Channel() + if err != nil { + return err + } + defer ch.Close() + + q, err := declaireRandomQueue(ch) + if err != nil { + return err + } + + for _, s := range topics { + ch.QueueBind(q.Name, s, "logs_topic", false, nil) + } + + messages, err := ch.Consume(q.Name, "", true, false, false, false, nil) + if err != nil { + return err + } + + forever := make(chan bool) + go func() { + for d := range messages { + var payload Payload + json.Unmarshal(d.Body, &payload) + + go handlePayload(payload) + } + }() + + fmt.Printf("Waiting for message [Exchange, Queue] [logs_topic, %s]\n", q.Name) + <-forever + return nil +} + +func handlePayload(payload Payload) { + switch payload.Name { + case "log", "event": + // log whatever we get + err := logEvent(payload) + if err != nil { + log.Println(err) + } + case "auth": + // authenticate + // You can have as many cases as you want, as long as you write the logic. + default: + err := logEvent(payload) + if err != nil { + log.Println(err) + } + } +} + +func logEvent(entry Payload) error { + return nil +} diff --git a/listener-service/event/event.go b/listener-service/event/event.go new file mode 100644 index 0000000..56b67fd --- /dev/null +++ b/listener-service/event/event.go @@ -0,0 +1,28 @@ +package event + +import ( + amqp "github.com/rabbitmq/amqp091-go" +) + +func declaireExchange(ch *amqp.Channel) error { + return ch.ExchangeDeclare( + "logs_topic", // name + "topic", // type + true, // durable? + false, // auto-deleted? + false, // internal? + false, // no-wait? + nil, // arguments? + ) +} + +func declaireRandomQueue(ch *amqp.Channel) (amqp.Queue, error) { + return ch.QueueDeclare( + "", // name + false, // durable + false, // delete when unused + true, // exclusive + false, // nowait + nil, // table + ) +} diff --git a/listener-service/go.mod b/listener-service/go.mod new file mode 100644 index 0000000..a78052f --- /dev/null +++ b/listener-service/go.mod @@ -0,0 +1,5 @@ +module listener + +go 1.22.5 + +require github.com/rabbitmq/amqp091-go v1.10.0 diff --git a/listener-service/go.sum b/listener-service/go.sum new file mode 100644 index 0000000..024eebe --- /dev/null +++ b/listener-service/go.sum @@ -0,0 +1,4 @@ +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/listener-service/main.go b/listener-service/main.go new file mode 100644 index 0000000..c71b8dd --- /dev/null +++ b/listener-service/main.go @@ -0,0 +1,56 @@ +package main + +import ( + "fmt" + "log" + "math" + "os" + "time" + + amqp "github.com/rabbitmq/amqp091-go" +) + +func main() { + // try to connect to rabbitmq + rabbitConn, err := connect() + if err != nil { + log.Println(err) + os.Exit(1) + } + defer rabbitConn.Close() + log.Println("Connected to RabbitMQ") + + // start listening for messages + + // create consumer + + // watch the queue and consume events +} + +func connect() (*amqp.Connection, error) { + var counts int64 + var connection *amqp.Connection + + // don't continue until rabbit is ready + for { + c, err := amqp.Dial("amqp://guest:guest@localhost") // XXX: credentials + if err != nil { + fmt.Println("RabbitMQ not yet ready...") + counts++ + } else { + connection = c + break + } + + if counts > 5 { + fmt.Println(err) + return nil, err + } + + backOff := time.Duration(math.Pow(float64(counts), 2) * float64(time.Second)) + log.Println("backing off...") + time.Sleep(backOff) + } + + return connection, nil +}