Add a publisher in broker service and make everything work

This commit is contained in:
vinchent 2024-09-06 20:16:04 +02:00
parent e05aa83ff5
commit 41dab037ce
7 changed files with 173 additions and 4 deletions

View File

@ -0,0 +1,53 @@
package event
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
type Emitter struct {
conn *amqp.Connection
}
func NewEmitter(conn *amqp.Connection) (Emitter, error) {
emitter := Emitter{
conn: conn,
}
err := emitter.setup()
if err != nil {
return Emitter{}, err
}
return emitter, err
}
func (emitter *Emitter) setup() error {
channel, err := emitter.conn.Channel()
if err != nil {
return err
}
defer channel.Close()
return declaireExchange(channel)
}
func (emitter *Emitter) Push(event string, severity string) error {
ch, err := emitter.conn.Channel()
if err != nil {
return err
}
defer ch.Close()
log.Println("Pushing to channel")
err = ch.Publish("logs_topic", severity, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(event),
})
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,28 @@
package event
import (
amqp "github.com/rabbitmq/amqp091-go"
)
func declaireExchange(ch *amqp.Channel) error {
return ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable?
false, // auto-deleted?
false, // internal?
false, // no-wait?
nil, // arguments?
)
}
func declaireRandomQueue(ch *amqp.Channel) (amqp.Queue, error) {
return ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // nowait
nil, // table
)
}

View File

@ -1,6 +1,7 @@
package main
import (
"broker/cmd/api/event"
"bytes"
"encoding/json"
"errors"
@ -54,7 +55,8 @@ func (app *Config) HandleSubmission(w http.ResponseWriter, r *http.Request) {
case "auth":
app.authenticate(w, requestPayload.Auth)
case "log":
app.LogItem(w, requestPayload.Log)
// app.LogItem(w, requestPayload.Log)
app.logEventViaRabbit(w, requestPayload.Log)
case "mail":
app.SendMail(w, requestPayload.Mail)
default:
@ -176,3 +178,37 @@ func (app *Config) callService(w http.ResponseWriter, ms Microservice) {
app.writeJSON(w, http.StatusOK, payload)
}
func (app *Config) logEventViaRabbit(w http.ResponseWriter, l LogPayload) {
err := app.pushToQueue(l.Name, l.Data)
if err != nil {
app.errorJSON(w, err)
return
}
var payload jsonResponse
payload.Error = false
payload.Message = "logged via RabbitMQ"
app.writeJSON(w, http.StatusOK, payload)
}
func (app *Config) pushToQueue(name, msg string) error {
emitter, err := event.NewEmitter(app.Rabbit)
if err != nil {
return err
}
payload := LogPayload{
Name: name,
Data: msg,
}
j, _ := json.MarshalIndent(&payload, "", "\t")
err = emitter.Push(string(j), "log.INFO")
if err != nil {
return err
}
return nil
}

View File

@ -3,15 +3,32 @@ package main
import (
"fmt"
"log"
"math"
"net/http"
"os"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
const webPort = "80"
type Config struct{}
type Config struct {
Rabbit *amqp.Connection
}
func main() {
app := Config{}
// try to connect to rabbitmq
rabbitConn, err := connect()
if err != nil {
log.Println(err)
os.Exit(1)
}
defer rabbitConn.Close()
log.Println("Connected to RabbitMQ")
app := Config{
Rabbit: rabbitConn,
}
log.Printf("Starting broker service on port %s\n", webPort)
@ -21,8 +38,36 @@ func main() {
Handler: app.routes(),
}
err := srv.ListenAndServe()
err = srv.ListenAndServe()
if err != nil {
log.Panic(err)
}
}
func connect() (*amqp.Connection, error) {
var counts int64
var connection *amqp.Connection
// don't continue until rabbit is ready
for {
c, err := amqp.Dial("amqp://guest:guest@rabbitmq") // XXX: credentials
if err != nil {
fmt.Println("RabbitMQ not yet ready...")
counts++
} else {
connection = c
break
}
if counts > 5 {
fmt.Println(err)
return nil, err
}
backOff := time.Duration(math.Pow(float64(counts), 2) * float64(time.Second))
log.Println("backing off...")
time.Sleep(backOff)
}
return connection, nil
}

View File

@ -6,3 +6,5 @@ require (
github.com/go-chi/chi/v5 v5.1.0
github.com/go-chi/cors v1.2.1
)
require github.com/rabbitmq/amqp091-go v1.10.0

View File

@ -2,3 +2,7 @@ github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4=
github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

View File

@ -33,6 +33,7 @@ func (consumer *Consumer) setup() error {
if err != nil {
return err
}
defer channel.Close()
return declaireExchange(channel)
}