udemy-go-microservices/listener-service/event/consumer.go

177 lines
3.3 KiB
Go

package event
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
amqp "github.com/rabbitmq/amqp091-go"
)
type Consumer struct {
conn *amqp.Connection
queueName string
}
func NewConsumer(conn *amqp.Connection) (Consumer, error) {
consumer := Consumer{
conn: conn,
}
err := consumer.setup()
if err != nil {
return Consumer{}, err
}
return consumer, err
}
func (consumer *Consumer) setup() error {
channel, err := consumer.conn.Channel()
if err != nil {
return err
}
return declaireExchange(channel)
}
type Payload struct {
Name string `json:"name"`
Data string `json:"data"`
}
func (consumer *Consumer) Listen(topics []string) error {
ch, err := consumer.conn.Channel()
if err != nil {
return err
}
defer ch.Close()
q, err := declaireRandomQueue(ch)
if err != nil {
return err
}
for _, s := range topics {
ch.QueueBind(q.Name, s, "logs_topic", false, nil)
}
messages, err := ch.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
return err
}
forever := make(chan bool)
go func() {
for d := range messages {
var payload Payload
json.Unmarshal(d.Body, &payload)
go handlePayload(payload)
}
}()
fmt.Printf("Waiting for message [Exchange, Queue] [logs_topic, %s]\n", q.Name)
<-forever
return nil
}
func handlePayload(payload Payload) {
switch payload.Name {
case "log", "event":
// log whatever we get
err := logEvent(payload)
if err != nil {
log.Println(err)
}
case "auth":
// authenticate
// You can have as many cases as you want, as long as you write the logic.
default:
err := logEvent(payload)
if err != nil {
log.Println(err)
}
}
}
func logEvent(entry Payload) error {
log.Println(entry)
loggerService := Microservice{
Input: entry,
Addr: "http://logger-service/log",
ErrCode: statusError{
ExpectedCode: http.StatusAccepted,
ErrCode: http.StatusInternalServerError,
Err: errors.New("internal error"),
},
SuccessMsg: "Logged!",
}
return callService(loggerService)
}
// func (app *Config) SendMail(w http.ResponseWriter, entry MailPayload) {
// log.Println(entry)
// mailService := Microservice{
// Input: entry,
// Addr: "http://mail-service/send-mail",
// ErrCode: statusError{
// ExpectedCode: http.StatusAccepted,
// ErrCode: http.StatusInternalServerError,
// Err: errors.New("internal error"),
// },
// SuccessMsg: "Mail sent!",
// }
// app.callService(w, mailService)
// }
type statusError struct {
ExpectedCode int
ErrCode int
Err error
}
type Microservice struct {
Input any
Addr string
ErrCode statusError
SuccessMsg string
}
func callService(ms Microservice) error {
// create some json we'll send to the microservice
inputPayload, err := json.MarshalIndent(ms.Input, "", "\t")
if err != nil {
return err
}
log.Println(ms.Input)
// call the service
req, err := http.NewRequest(
"POST",
ms.Addr,
bytes.NewBuffer(inputPayload),
)
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
log.Println(resp.Body)
// make sure we get back the correct status code
if resp.StatusCode != ms.ErrCode.ExpectedCode {
return err
}
return nil
}