Add grpc client in broker service

This commit is contained in:
2024-09-06 23:11:49 +02:00
parent 29135938c2
commit 5825d0018d
9 changed files with 137 additions and 9 deletions

View File

@ -1,53 +0,0 @@
package event
import (
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
type Emitter struct {
conn *amqp.Connection
}
func NewEmitter(conn *amqp.Connection) (Emitter, error) {
emitter := Emitter{
conn: conn,
}
err := emitter.setup()
if err != nil {
return Emitter{}, err
}
return emitter, err
}
func (emitter *Emitter) setup() error {
channel, err := emitter.conn.Channel()
if err != nil {
return err
}
defer channel.Close()
return declaireExchange(channel)
}
func (emitter *Emitter) Push(event string, severity string) error {
ch, err := emitter.conn.Channel()
if err != nil {
return err
}
defer ch.Close()
log.Println("Pushing to channel")
err = ch.Publish("logs_topic", severity, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(event),
})
if err != nil {
return err
}
return nil
}

View File

@ -1,28 +0,0 @@
package event
import (
amqp "github.com/rabbitmq/amqp091-go"
)
func declaireExchange(ch *amqp.Channel) error {
return ch.ExchangeDeclare(
"logs_topic", // name
"topic", // type
true, // durable?
false, // auto-deleted?
false, // internal?
false, // no-wait?
nil, // arguments?
)
}
func declaireRandomQueue(ch *amqp.Channel) (amqp.Queue, error) {
return ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // nowait
nil, // table
)
}

View File

@ -1,8 +1,10 @@
package main
import (
"broker/cmd/api/event"
"broker/event"
"broker/logs"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@ -10,13 +12,16 @@ import (
"net/http"
"net/rpc"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type RequestPayload struct {
Action string `string:"action"`
Auth AuthPayload `json:"auth,omitempty"`
Log LogPayload `json:"log,omitempty"`
Mail MailPayload `json:"mail,omitempty"`
Auth AuthPayload ` json:"auth,omitempty"`
Log LogPayload ` json:"log,omitempty"`
Mail MailPayload ` json:"mail,omitempty"`
}
type AuthPayload struct {
@ -57,12 +62,14 @@ func (app *Config) HandleSubmission(w http.ResponseWriter, r *http.Request) {
switch requestPayload.Action {
case "auth":
app.authenticate(w, requestPayload.Auth)
case "logHttp": // 2729 us
case "logHttp": // 2336 us
app.LogItem(w, requestPayload.Log)
case "logRabbit": // 10007 us
case "logRabbit": // 7825 us
app.logEventViaRabbit(w, requestPayload.Log)
case "logRpc": // 555 us
case "logRpc": // 2097 us
app.logItemViaRPC(w, requestPayload.Log)
case "logGrpc": // 236882 us
app.LogViaGRPC(w, requestPayload.Log)
case "mail":
app.SendMail(w, requestPayload.Mail)
default:
@ -253,3 +260,39 @@ func (app *Config) logItemViaRPC(w http.ResponseWriter, l LogPayload) {
app.writeJSON(w, http.StatusOK, payload)
}
func (app *Config) LogViaGRPC(w http.ResponseWriter, l LogPayload) {
conn, err := grpc.NewClient(
"logger-service:50001",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
app.errorJSON(w, err)
return
}
defer conn.Close()
now := time.Now()
l.Data += fmt.Sprintf("gRPC sent date %d", now.UnixMicro())
c := logs.NewLogServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_, err = c.WriteLog(ctx, &logs.LogRequest{
LogEntry: &logs.Log{
Name: l.Name,
Data: l.Data,
},
})
if err != nil {
app.errorJSON(w, err)
return
}
var payload jsonResponse
payload.Error = false
payload.Message = "logged via gRPC"
app.writeJSON(w, http.StatusOK, payload)
}