From 41dab037ce43dc811c1d188914629ac514ad6f5a Mon Sep 17 00:00:00 2001 From: vinchent Date: Fri, 6 Sep 2024 20:16:04 +0200 Subject: [PATCH] Add a publisher in broker service and make everything work --- broker-service/cmd/api/event/emitter.go | 53 +++++++++++++++++++++++++ broker-service/cmd/api/event/event.go | 28 +++++++++++++ broker-service/cmd/api/handlers.go | 38 +++++++++++++++++- broker-service/cmd/api/main.go | 51 ++++++++++++++++++++++-- broker-service/go.mod | 2 + broker-service/go.sum | 4 ++ listener-service/event/consumer.go | 1 + 7 files changed, 173 insertions(+), 4 deletions(-) create mode 100644 broker-service/cmd/api/event/emitter.go create mode 100644 broker-service/cmd/api/event/event.go diff --git a/broker-service/cmd/api/event/emitter.go b/broker-service/cmd/api/event/emitter.go new file mode 100644 index 0000000..3329ec8 --- /dev/null +++ b/broker-service/cmd/api/event/emitter.go @@ -0,0 +1,53 @@ +package event + +import ( + "log" + + amqp "github.com/rabbitmq/amqp091-go" +) + +type Emitter struct { + conn *amqp.Connection +} + +func NewEmitter(conn *amqp.Connection) (Emitter, error) { + emitter := Emitter{ + conn: conn, + } + + err := emitter.setup() + if err != nil { + return Emitter{}, err + } + return emitter, err +} + +func (emitter *Emitter) setup() error { + channel, err := emitter.conn.Channel() + if err != nil { + return err + } + defer channel.Close() + + return declaireExchange(channel) +} + +func (emitter *Emitter) Push(event string, severity string) error { + ch, err := emitter.conn.Channel() + if err != nil { + return err + } + defer ch.Close() + + log.Println("Pushing to channel") + + err = ch.Publish("logs_topic", severity, false, false, amqp.Publishing{ + ContentType: "text/plain", + Body: []byte(event), + }) + if err != nil { + return err + } + + return nil +} diff --git a/broker-service/cmd/api/event/event.go b/broker-service/cmd/api/event/event.go new file mode 100644 index 0000000..56b67fd --- /dev/null +++ b/broker-service/cmd/api/event/event.go @@ -0,0 +1,28 @@ +package event + +import ( + amqp "github.com/rabbitmq/amqp091-go" +) + +func declaireExchange(ch *amqp.Channel) error { + return ch.ExchangeDeclare( + "logs_topic", // name + "topic", // type + true, // durable? + false, // auto-deleted? + false, // internal? + false, // no-wait? + nil, // arguments? + ) +} + +func declaireRandomQueue(ch *amqp.Channel) (amqp.Queue, error) { + return ch.QueueDeclare( + "", // name + false, // durable + false, // delete when unused + true, // exclusive + false, // nowait + nil, // table + ) +} diff --git a/broker-service/cmd/api/handlers.go b/broker-service/cmd/api/handlers.go index e012054..95eeda8 100644 --- a/broker-service/cmd/api/handlers.go +++ b/broker-service/cmd/api/handlers.go @@ -1,6 +1,7 @@ package main import ( + "broker/cmd/api/event" "bytes" "encoding/json" "errors" @@ -54,7 +55,8 @@ func (app *Config) HandleSubmission(w http.ResponseWriter, r *http.Request) { case "auth": app.authenticate(w, requestPayload.Auth) case "log": - app.LogItem(w, requestPayload.Log) + // app.LogItem(w, requestPayload.Log) + app.logEventViaRabbit(w, requestPayload.Log) case "mail": app.SendMail(w, requestPayload.Mail) default: @@ -176,3 +178,37 @@ func (app *Config) callService(w http.ResponseWriter, ms Microservice) { app.writeJSON(w, http.StatusOK, payload) } + +func (app *Config) logEventViaRabbit(w http.ResponseWriter, l LogPayload) { + err := app.pushToQueue(l.Name, l.Data) + if err != nil { + app.errorJSON(w, err) + return + } + + var payload jsonResponse + payload.Error = false + payload.Message = "logged via RabbitMQ" + + app.writeJSON(w, http.StatusOK, payload) +} + +func (app *Config) pushToQueue(name, msg string) error { + emitter, err := event.NewEmitter(app.Rabbit) + if err != nil { + return err + } + + payload := LogPayload{ + Name: name, + Data: msg, + } + + j, _ := json.MarshalIndent(&payload, "", "\t") + err = emitter.Push(string(j), "log.INFO") + if err != nil { + return err + } + + return nil +} diff --git a/broker-service/cmd/api/main.go b/broker-service/cmd/api/main.go index 9a8c3fe..116554a 100644 --- a/broker-service/cmd/api/main.go +++ b/broker-service/cmd/api/main.go @@ -3,15 +3,32 @@ package main import ( "fmt" "log" + "math" "net/http" + "os" + "time" + + amqp "github.com/rabbitmq/amqp091-go" ) const webPort = "80" -type Config struct{} +type Config struct { + Rabbit *amqp.Connection +} func main() { - app := Config{} + // try to connect to rabbitmq + rabbitConn, err := connect() + if err != nil { + log.Println(err) + os.Exit(1) + } + defer rabbitConn.Close() + log.Println("Connected to RabbitMQ") + app := Config{ + Rabbit: rabbitConn, + } log.Printf("Starting broker service on port %s\n", webPort) @@ -21,8 +38,36 @@ func main() { Handler: app.routes(), } - err := srv.ListenAndServe() + err = srv.ListenAndServe() if err != nil { log.Panic(err) } } + +func connect() (*amqp.Connection, error) { + var counts int64 + var connection *amqp.Connection + + // don't continue until rabbit is ready + for { + c, err := amqp.Dial("amqp://guest:guest@rabbitmq") // XXX: credentials + if err != nil { + fmt.Println("RabbitMQ not yet ready...") + counts++ + } else { + connection = c + break + } + + if counts > 5 { + fmt.Println(err) + return nil, err + } + + backOff := time.Duration(math.Pow(float64(counts), 2) * float64(time.Second)) + log.Println("backing off...") + time.Sleep(backOff) + } + + return connection, nil +} diff --git a/broker-service/go.mod b/broker-service/go.mod index 6af2f9b..ee5feca 100644 --- a/broker-service/go.mod +++ b/broker-service/go.mod @@ -6,3 +6,5 @@ require ( github.com/go-chi/chi/v5 v5.1.0 github.com/go-chi/cors v1.2.1 ) + +require github.com/rabbitmq/amqp091-go v1.10.0 diff --git a/broker-service/go.sum b/broker-service/go.sum index 13b5d44..bc4a9a5 100644 --- a/broker-service/go.sum +++ b/broker-service/go.sum @@ -2,3 +2,7 @@ github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4= github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58= +github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= +github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= diff --git a/listener-service/event/consumer.go b/listener-service/event/consumer.go index 99660b9..88abbab 100644 --- a/listener-service/event/consumer.go +++ b/listener-service/event/consumer.go @@ -33,6 +33,7 @@ func (consumer *Consumer) setup() error { if err != nil { return err } + defer channel.Close() return declaireExchange(channel) }