Compare commits
No commits in common. "a0ce8098699fe362dcf822c89484459e6ee55879" and "fc8e500c5b9a261ef3d93bf200523a995d238254" have entirely different histories.
a0ce809869
...
fc8e500c5b
@ -1,53 +0,0 @@
|
|||||||
package event
|
|
||||||
|
|
||||||
import (
|
|
||||||
"log"
|
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Emitter struct {
|
|
||||||
conn *amqp.Connection
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewEmitter(conn *amqp.Connection) (Emitter, error) {
|
|
||||||
emitter := Emitter{
|
|
||||||
conn: conn,
|
|
||||||
}
|
|
||||||
|
|
||||||
err := emitter.setup()
|
|
||||||
if err != nil {
|
|
||||||
return Emitter{}, err
|
|
||||||
}
|
|
||||||
return emitter, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (emitter *Emitter) setup() error {
|
|
||||||
channel, err := emitter.conn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer channel.Close()
|
|
||||||
|
|
||||||
return declaireExchange(channel)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (emitter *Emitter) Push(event string, severity string) error {
|
|
||||||
ch, err := emitter.conn.Channel()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer ch.Close()
|
|
||||||
|
|
||||||
log.Println("Pushing to channel")
|
|
||||||
|
|
||||||
err = ch.Publish("logs_topic", severity, false, false, amqp.Publishing{
|
|
||||||
ContentType: "text/plain",
|
|
||||||
Body: []byte(event),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -1,28 +0,0 @@
|
|||||||
package event
|
|
||||||
|
|
||||||
import (
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
|
||||||
|
|
||||||
func declaireExchange(ch *amqp.Channel) error {
|
|
||||||
return ch.ExchangeDeclare(
|
|
||||||
"logs_topic", // name
|
|
||||||
"topic", // type
|
|
||||||
true, // durable?
|
|
||||||
false, // auto-deleted?
|
|
||||||
false, // internal?
|
|
||||||
false, // no-wait?
|
|
||||||
nil, // arguments?
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func declaireRandomQueue(ch *amqp.Channel) (amqp.Queue, error) {
|
|
||||||
return ch.QueueDeclare(
|
|
||||||
"", // name
|
|
||||||
false, // durable
|
|
||||||
false, // delete when unused
|
|
||||||
true, // exclusive
|
|
||||||
false, // nowait
|
|
||||||
nil, // table
|
|
||||||
)
|
|
||||||
}
|
|
@ -1,15 +1,11 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"broker/cmd/api/event"
|
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/rpc"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type RequestPayload struct {
|
type RequestPayload struct {
|
||||||
@ -57,12 +53,8 @@ func (app *Config) HandleSubmission(w http.ResponseWriter, r *http.Request) {
|
|||||||
switch requestPayload.Action {
|
switch requestPayload.Action {
|
||||||
case "auth":
|
case "auth":
|
||||||
app.authenticate(w, requestPayload.Auth)
|
app.authenticate(w, requestPayload.Auth)
|
||||||
case "logHttp": // 2729 us
|
case "log":
|
||||||
app.LogItem(w, requestPayload.Log)
|
app.LogItem(w, requestPayload.Log)
|
||||||
case "logRabbit": // 10007 us
|
|
||||||
app.logEventViaRabbit(w, requestPayload.Log)
|
|
||||||
case "logRpc": // 555 us
|
|
||||||
app.logItemViaRPC(w, requestPayload.Log)
|
|
||||||
case "mail":
|
case "mail":
|
||||||
app.SendMail(w, requestPayload.Mail)
|
app.SendMail(w, requestPayload.Mail)
|
||||||
default:
|
default:
|
||||||
@ -86,8 +78,6 @@ func (app *Config) authenticate(w http.ResponseWriter, a AuthPayload) {
|
|||||||
|
|
||||||
func (app *Config) LogItem(w http.ResponseWriter, entry LogPayload) {
|
func (app *Config) LogItem(w http.ResponseWriter, entry LogPayload) {
|
||||||
log.Println(entry)
|
log.Println(entry)
|
||||||
now := time.Now()
|
|
||||||
entry.Data += fmt.Sprintf("HTTP sent date %d", now.UnixMicro())
|
|
||||||
loggerService := Microservice{
|
loggerService := Microservice{
|
||||||
Input: entry,
|
Input: entry,
|
||||||
Addr: "http://logger-service/log",
|
Addr: "http://logger-service/log",
|
||||||
@ -186,70 +176,3 @@ func (app *Config) callService(w http.ResponseWriter, ms Microservice) {
|
|||||||
|
|
||||||
app.writeJSON(w, http.StatusOK, payload)
|
app.writeJSON(w, http.StatusOK, payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *Config) logEventViaRabbit(w http.ResponseWriter, l LogPayload) {
|
|
||||||
now := time.Now()
|
|
||||||
l.Data += fmt.Sprintf("Rabbit sent date %d", now.UnixMicro())
|
|
||||||
err := app.pushToQueue(l.Name, l.Data)
|
|
||||||
if err != nil {
|
|
||||||
app.errorJSON(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var payload jsonResponse
|
|
||||||
payload.Error = false
|
|
||||||
payload.Message = "logged via RabbitMQ"
|
|
||||||
|
|
||||||
app.writeJSON(w, http.StatusOK, payload)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *Config) pushToQueue(name, msg string) error {
|
|
||||||
emitter, err := event.NewEmitter(app.Rabbit)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := LogPayload{
|
|
||||||
Name: name,
|
|
||||||
Data: msg,
|
|
||||||
}
|
|
||||||
|
|
||||||
j, _ := json.MarshalIndent(&payload, "", "\t")
|
|
||||||
err = emitter.Push(string(j), "log.INFO")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type RPCPayload struct {
|
|
||||||
Name string
|
|
||||||
Data string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (app *Config) logItemViaRPC(w http.ResponseWriter, l LogPayload) {
|
|
||||||
client, err := rpc.Dial("tcp", "logger-service:5001")
|
|
||||||
if err != nil {
|
|
||||||
app.errorJSON(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
l.Data += fmt.Sprintf("RPC sent date %d", now.UnixMicro())
|
|
||||||
rpcPayload := RPCPayload(l)
|
|
||||||
|
|
||||||
var result string
|
|
||||||
err = client.Call("RPCServer.LogInfo", rpcPayload, &result)
|
|
||||||
if err != nil {
|
|
||||||
app.errorJSON(w, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var payload jsonResponse
|
|
||||||
payload.Error = false
|
|
||||||
payload.Message = "logged via RPC"
|
|
||||||
|
|
||||||
app.writeJSON(w, http.StatusOK, payload)
|
|
||||||
}
|
|
||||||
|
@ -3,32 +3,15 @@ package main
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"math"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
amqp "github.com/rabbitmq/amqp091-go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const webPort = "80"
|
const webPort = "80"
|
||||||
|
|
||||||
type Config struct {
|
type Config struct{}
|
||||||
Rabbit *amqp.Connection
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
// try to connect to rabbitmq
|
app := Config{}
|
||||||
rabbitConn, err := connect()
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
defer rabbitConn.Close()
|
|
||||||
log.Println("Connected to RabbitMQ")
|
|
||||||
app := Config{
|
|
||||||
Rabbit: rabbitConn,
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("Starting broker service on port %s\n", webPort)
|
log.Printf("Starting broker service on port %s\n", webPort)
|
||||||
|
|
||||||
@ -38,36 +21,8 @@ func main() {
|
|||||||
Handler: app.routes(),
|
Handler: app.routes(),
|
||||||
}
|
}
|
||||||
|
|
||||||
err = srv.ListenAndServe()
|
err := srv.ListenAndServe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Panic(err)
|
log.Panic(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func connect() (*amqp.Connection, error) {
|
|
||||||
var counts int64
|
|
||||||
var connection *amqp.Connection
|
|
||||||
|
|
||||||
// don't continue until rabbit is ready
|
|
||||||
for {
|
|
||||||
c, err := amqp.Dial("amqp://guest:guest@rabbitmq") // XXX: credentials
|
|
||||||
if err != nil {
|
|
||||||
fmt.Println("RabbitMQ not yet ready...")
|
|
||||||
counts++
|
|
||||||
} else {
|
|
||||||
connection = c
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if counts > 5 {
|
|
||||||
fmt.Println(err)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
backOff := time.Duration(math.Pow(float64(counts), 2) * float64(time.Second))
|
|
||||||
log.Println("backing off...")
|
|
||||||
time.Sleep(backOff)
|
|
||||||
}
|
|
||||||
|
|
||||||
return connection, nil
|
|
||||||
}
|
|
||||||
|
@ -6,5 +6,3 @@ require (
|
|||||||
github.com/go-chi/chi/v5 v5.1.0
|
github.com/go-chi/chi/v5 v5.1.0
|
||||||
github.com/go-chi/cors v1.2.1
|
github.com/go-chi/cors v1.2.1
|
||||||
)
|
)
|
||||||
|
|
||||||
require github.com/rabbitmq/amqp091-go v1.10.0
|
|
||||||
|
@ -2,7 +2,3 @@ github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
|
|||||||
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
|
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
|
||||||
github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4=
|
github.com/go-chi/cors v1.2.1 h1:xEC8UT3Rlp2QuWNEr4Fs/c2EAGVKBwy/1vHx3bppil4=
|
||||||
github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
|
github.com/go-chi/cors v1.2.1/go.mod h1:sSbTewc+6wYHBBCW7ytsFSn836hqM7JxpglAy2Vzc58=
|
||||||
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
|
|
||||||
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
|
|
||||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
|
||||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
|
||||||
|
@ -9,9 +9,7 @@
|
|||||||
|
|
||||||
<a id="brokerBtn" class="btn btn-outline-secondary" href="javascript:void(0);">Test Broker</a>
|
<a id="brokerBtn" class="btn btn-outline-secondary" href="javascript:void(0);">Test Broker</a>
|
||||||
<a id="authBrokerBtn" class="btn btn-outline-secondary" href="javascript:void(0);">Test Auth</a>
|
<a id="authBrokerBtn" class="btn btn-outline-secondary" href="javascript:void(0);">Test Auth</a>
|
||||||
<a id="logHttpBtn" class="btn btn-outline-secondary" href="javascript:void(0);">Test HTTP Log</a>
|
<a id="logBtn" class="btn btn-outline-secondary" href="javascript:void(0);">Test Log</a>
|
||||||
<a id="logRabbitBtn" class="btn btn-outline-secondary" href="javascript:void(0);">Test Rabbit Log</a>
|
|
||||||
<a id="logRpcBtn" class="btn btn-outline-secondary" href="javascript:void(0);">Test RPC Log</a>
|
|
||||||
<a id="mailBtn" class="btn btn-outline-secondary" href="javascript:void(0);">Test Mail</a>
|
<a id="mailBtn" class="btn btn-outline-secondary" href="javascript:void(0);">Test Mail</a>
|
||||||
|
|
||||||
<div id="output" class="mt-5" style="outline: 1px solid silver; padding: 2em;">
|
<div id="output" class="mt-5" style="outline: 1px solid silver; padding: 2em;">
|
||||||
@ -41,9 +39,6 @@
|
|||||||
let brokerBtn = document.getElementById("brokerBtn");
|
let brokerBtn = document.getElementById("brokerBtn");
|
||||||
let authBrokerBtn = document.getElementById("authBrokerBtn");
|
let authBrokerBtn = document.getElementById("authBrokerBtn");
|
||||||
let mailBtn = document.getElementById("mailBtn");
|
let mailBtn = document.getElementById("mailBtn");
|
||||||
let logHttpBtn = document.getElementById("logHttpBtn");
|
|
||||||
let logRabbitBtn = document.getElementById("logRabbitBtn");
|
|
||||||
let logRpcBtn = document.getElementById("logRpcBtn");
|
|
||||||
let output = document.getElementById("output");
|
let output = document.getElementById("output");
|
||||||
let sent = document.getElementById("payload");
|
let sent = document.getElementById("payload");
|
||||||
let received = document.getElementById("received");
|
let received = document.getElementById("received");
|
||||||
@ -100,73 +95,9 @@
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
logHttpBtn.addEventListener("click", () => {
|
logBtn.addEventListener("click", () => {
|
||||||
const payload = {
|
const payload = {
|
||||||
action: "logHttp",
|
action: "log",
|
||||||
log: {
|
|
||||||
name: "event",
|
|
||||||
data: "some kind of data",
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const headers = new Headers();
|
|
||||||
headers.append("Content-Type", "application/json");
|
|
||||||
const body = {
|
|
||||||
method: 'POST',
|
|
||||||
body: JSON.stringify(payload),
|
|
||||||
headers: headers,
|
|
||||||
}
|
|
||||||
fetch("http:\/\/localhost:8080/handle", body)
|
|
||||||
.then((response) => response.json())
|
|
||||||
.then((data) => {
|
|
||||||
sent.innerHTML = JSON.stringify(payload, undefined, 4);
|
|
||||||
received.innerHTML = JSON.stringify(data, undefined, 4);
|
|
||||||
if (data.error) {
|
|
||||||
console.log(data.message);
|
|
||||||
output.innerHTML += `<br><strong>Error:</strong>: ${data.message}`;
|
|
||||||
} else {
|
|
||||||
output.innerHTML += `<br><strong>Response from broker service</strong>: ${data.message}`;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.catch((error) => {
|
|
||||||
output.innerHTML += "<br><br>Error: " + error;
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
logRabbitBtn.addEventListener("click", () => {
|
|
||||||
const payload = {
|
|
||||||
action: "logRabbit",
|
|
||||||
log: {
|
|
||||||
name: "event",
|
|
||||||
data: "some kind of data",
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const headers = new Headers();
|
|
||||||
headers.append("Content-Type", "application/json");
|
|
||||||
const body = {
|
|
||||||
method: 'POST',
|
|
||||||
body: JSON.stringify(payload),
|
|
||||||
headers: headers,
|
|
||||||
}
|
|
||||||
fetch("http:\/\/localhost:8080/handle", body)
|
|
||||||
.then((response) => response.json())
|
|
||||||
.then((data) => {
|
|
||||||
sent.innerHTML = JSON.stringify(payload, undefined, 4);
|
|
||||||
received.innerHTML = JSON.stringify(data, undefined, 4);
|
|
||||||
if (data.error) {
|
|
||||||
console.log(data.message);
|
|
||||||
output.innerHTML += `<br><strong>Error:</strong>: ${data.message}`;
|
|
||||||
} else {
|
|
||||||
output.innerHTML += `<br><strong>Response from broker service</strong>: ${data.message}`;
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.catch((error) => {
|
|
||||||
output.innerHTML += "<br><br>Error: " + error;
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
logRpcBtn.addEventListener("click", () => {
|
|
||||||
const payload = {
|
|
||||||
action: "logRpc",
|
|
||||||
log: {
|
log: {
|
||||||
name: "event",
|
name: "event",
|
||||||
data: "some kind of data",
|
data: "some kind of data",
|
||||||
|
@ -33,7 +33,6 @@ func (consumer *Consumer) setup() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer channel.Close()
|
|
||||||
|
|
||||||
return declaireExchange(channel)
|
return declaireExchange(channel)
|
||||||
}
|
}
|
||||||
|
@ -1,8 +0,0 @@
|
|||||||
FROM alpine:latest
|
|
||||||
|
|
||||||
RUN mkdir /app
|
|
||||||
|
|
||||||
COPY listenerApp /app
|
|
||||||
|
|
||||||
CMD ["/app/listenerApp"]
|
|
||||||
|
|
@ -2,7 +2,6 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"listener/event"
|
|
||||||
"log"
|
"log"
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
@ -22,19 +21,10 @@ func main() {
|
|||||||
log.Println("Connected to RabbitMQ")
|
log.Println("Connected to RabbitMQ")
|
||||||
|
|
||||||
// start listening for messages
|
// start listening for messages
|
||||||
log.Println("Listening for and consuming RabbitMQ messages...")
|
|
||||||
// create consumer
|
// create consumer
|
||||||
|
|
||||||
consumer, err := event.NewConsumer(rabbitConn)
|
|
||||||
if err != nil {
|
|
||||||
log.Panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// watch the queue and consume events
|
// watch the queue and consume events
|
||||||
err = consumer.Listen([]string{"log.INFO", "log.WARNING", "log.ERROR"})
|
|
||||||
if err != nil {
|
|
||||||
log.Println(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func connect() (*amqp.Connection, error) {
|
func connect() (*amqp.Connection, error) {
|
||||||
@ -43,7 +33,7 @@ func connect() (*amqp.Connection, error) {
|
|||||||
|
|
||||||
// don't continue until rabbit is ready
|
// don't continue until rabbit is ready
|
||||||
for {
|
for {
|
||||||
c, err := amqp.Dial("amqp://guest:guest@rabbitmq") // XXX: credentials
|
c, err := amqp.Dial("amqp://guest:guest@localhost") // XXX: credentials
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Println("RabbitMQ not yet ready...")
|
fmt.Println("RabbitMQ not yet ready...")
|
||||||
counts++
|
counts++
|
||||||
|
@ -1,11 +1,9 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"log"
|
"log"
|
||||||
"logger/data"
|
"logger/data"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type JSONPayload struct {
|
type JSONPayload struct {
|
||||||
@ -29,9 +27,6 @@ func (app *Config) WriteLog(w http.ResponseWriter, r *http.Request) {
|
|||||||
Data: requestPayload.Data,
|
Data: requestPayload.Data,
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
event.Data += fmt.Sprintf(" received date %d", now.UnixMicro())
|
|
||||||
|
|
||||||
log.Println("event", event)
|
log.Println("event", event)
|
||||||
|
|
||||||
err = app.Models.LogEntry.Insert(event)
|
err = app.Models.LogEntry.Insert(event)
|
||||||
|
@ -5,9 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"logger/data"
|
"logger/data"
|
||||||
"net"
|
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/rpc"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.mongodb.org/mongo-driver/bson"
|
"go.mongodb.org/mongo-driver/bson"
|
||||||
@ -51,14 +49,6 @@ func main() {
|
|||||||
Models: data.New(client),
|
Models: data.New(client),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Register the RPC server
|
|
||||||
err = rpc.Register(new(RPCServer))
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
go app.rpcListen()
|
|
||||||
|
|
||||||
// start web server
|
// start web server
|
||||||
log.Println("Starting service on port", webPort)
|
log.Println("Starting service on port", webPort)
|
||||||
app.serve()
|
app.serve()
|
||||||
@ -98,20 +88,3 @@ func connectToMongo() (*mongo.Client, error) {
|
|||||||
|
|
||||||
return client, nil
|
return client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (app *Config) rpcListen() error {
|
|
||||||
log.Println("Starting RPC server on port ", rpcPort)
|
|
||||||
listen, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%s", rpcPort))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
defer listen.Close()
|
|
||||||
for {
|
|
||||||
rpcConn, err := listen.Accept()
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
go rpc.ServeConn(rpcConn)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -1,37 +0,0 @@
|
|||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"logger/data"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type RPCServer struct{}
|
|
||||||
|
|
||||||
type RPCPayload struct {
|
|
||||||
Name string
|
|
||||||
Data string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RPCServer) LogInfo(payload RPCPayload, resp *string) error {
|
|
||||||
collection := client.Database("logs").Collection("logs")
|
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
payload.Data += fmt.Sprintf(" received date %d", now.UnixMicro())
|
|
||||||
|
|
||||||
_, err := collection.InsertOne(context.TODO(), data.LogEntry{
|
|
||||||
Name: payload.Name,
|
|
||||||
Data: payload.Data,
|
|
||||||
CreatedAt: time.Now(),
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Println("error writing to mongo", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
*resp = "Processed payload via RPC:" + payload.Name
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
@ -3,7 +3,6 @@ BROKER_BINARY=brokerApp
|
|||||||
AUTH_BINARY=authApp
|
AUTH_BINARY=authApp
|
||||||
LOGGER_BINARY=loggerApp
|
LOGGER_BINARY=loggerApp
|
||||||
MAIL_BINARY=mailApp
|
MAIL_BINARY=mailApp
|
||||||
LISTENER_BINARY=listenerApp
|
|
||||||
|
|
||||||
## up: starts all containers in the background without forcing build
|
## up: starts all containers in the background without forcing build
|
||||||
up:
|
up:
|
||||||
@ -12,7 +11,7 @@ up:
|
|||||||
@echo "Docker images started!"
|
@echo "Docker images started!"
|
||||||
|
|
||||||
## up_build: stops docker-compose (if running), builds all projects and starts docker compose
|
## up_build: stops docker-compose (if running), builds all projects and starts docker compose
|
||||||
up_build: build_broker build_auth build_logger build_mail build_listener
|
up_build: build_broker build_auth build_logger build_mail
|
||||||
@echo "Stopping docker images (if running...)"
|
@echo "Stopping docker images (if running...)"
|
||||||
docker compose down
|
docker compose down
|
||||||
@echo "Building (when required) and starting docker images..."
|
@echo "Building (when required) and starting docker images..."
|
||||||
@ -49,12 +48,6 @@ build_mail:
|
|||||||
cd ../mail-service && env GOOS=linux CGO_ENABLED=0 go build -o ${MAIL_BINARY} ./cmd/api
|
cd ../mail-service && env GOOS=linux CGO_ENABLED=0 go build -o ${MAIL_BINARY} ./cmd/api
|
||||||
@echo "Done!"
|
@echo "Done!"
|
||||||
|
|
||||||
## build_listener: builds the listener binary as a linux executable
|
|
||||||
build_listener:
|
|
||||||
@echo "Building listener binary..."
|
|
||||||
cd ../listener-service && env GOOS=linux CGO_ENABLED=0 go build -o ${LISTENER_BINARY} .
|
|
||||||
@echo "Done!"
|
|
||||||
|
|
||||||
## build_front: builds the frone end binary
|
## build_front: builds the frone end binary
|
||||||
build_front:
|
build_front:
|
||||||
@echo "Building front end binary..."
|
@echo "Building front end binary..."
|
||||||
|
@ -50,15 +50,6 @@ services:
|
|||||||
FROM_ADDR: me@here.com
|
FROM_ADDR: me@here.com
|
||||||
|
|
||||||
|
|
||||||
listener-service:
|
|
||||||
build:
|
|
||||||
context: ./../listener-service/
|
|
||||||
dockerfile: ./../listener-service/listener-service.dockerfile
|
|
||||||
restart: always
|
|
||||||
deploy:
|
|
||||||
mode: replicated
|
|
||||||
replicas: 1
|
|
||||||
|
|
||||||
postgres:
|
postgres:
|
||||||
image: postgres:16.4-alpine
|
image: postgres:16.4-alpine
|
||||||
ports:
|
ports:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user