mirror of
https://github.com/foomo/contentserver.git
synced 2025-10-16 12:25:44 +00:00
add prometheus request metrics
This commit is contained in:
parent
ff28d6670f
commit
b7f10ed673
@ -4,19 +4,26 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/foomo/contentserver/log"
|
"github.com/foomo/contentserver/log"
|
||||||
"github.com/foomo/contentserver/repo"
|
"github.com/foomo/contentserver/repo"
|
||||||
"github.com/foomo/contentserver/requests"
|
"github.com/foomo/contentserver/requests"
|
||||||
"github.com/foomo/contentserver/responses"
|
"github.com/foomo/contentserver/responses"
|
||||||
|
"github.com/foomo/contentserver/status"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte) (replyBytes []byte, err error) {
|
func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, metrics *status.Metrics) (replyBytes []byte, err error) {
|
||||||
|
|
||||||
|
// variables
|
||||||
var reply interface{}
|
var reply interface{}
|
||||||
var apiErr error
|
var apiErr error
|
||||||
var jsonErr error
|
var jsonErr error
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
// helper processor
|
||||||
processIfJSONIsOk := func(err error, processingFunc func()) {
|
processIfJSONIsOk := func(err error, processingFunc func()) {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
jsonErr = err
|
jsonErr = err
|
||||||
@ -25,37 +32,46 @@ func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte) (replyBytes
|
|||||||
processingFunc()
|
processingFunc()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handle and process
|
||||||
switch handler {
|
switch handler {
|
||||||
case HandlerGetURIs:
|
case HandlerGetURIs:
|
||||||
getURIRequest := &requests.URIs{}
|
getURIRequest := &requests.URIs{}
|
||||||
processIfJSONIsOk(json.Unmarshal(jsonBytes, &getURIRequest), func() {
|
processIfJSONIsOk(json.Unmarshal(jsonBytes, &getURIRequest), func() {
|
||||||
reply = r.GetURIs(getURIRequest.Dimension, getURIRequest.IDs)
|
reply = r.GetURIs(getURIRequest.Dimension, getURIRequest.IDs)
|
||||||
})
|
})
|
||||||
|
addMetrics(metrics, HandlerGetURIs, start, jsonErr, apiErr)
|
||||||
case HandlerGetContent:
|
case HandlerGetContent:
|
||||||
contentRequest := &requests.Content{}
|
contentRequest := &requests.Content{}
|
||||||
processIfJSONIsOk(json.Unmarshal(jsonBytes, &contentRequest), func() {
|
processIfJSONIsOk(json.Unmarshal(jsonBytes, &contentRequest), func() {
|
||||||
reply, apiErr = r.GetContent(contentRequest)
|
reply, apiErr = r.GetContent(contentRequest)
|
||||||
})
|
})
|
||||||
|
addMetrics(metrics, HandlerGetContent, start, jsonErr, apiErr)
|
||||||
case HandlerGetNodes:
|
case HandlerGetNodes:
|
||||||
nodesRequest := &requests.Nodes{}
|
nodesRequest := &requests.Nodes{}
|
||||||
processIfJSONIsOk(json.Unmarshal(jsonBytes, &nodesRequest), func() {
|
processIfJSONIsOk(json.Unmarshal(jsonBytes, &nodesRequest), func() {
|
||||||
reply = r.GetNodes(nodesRequest)
|
reply = r.GetNodes(nodesRequest)
|
||||||
})
|
})
|
||||||
|
addMetrics(metrics, HandlerGetNodes, start, jsonErr, apiErr)
|
||||||
case HandlerUpdate:
|
case HandlerUpdate:
|
||||||
updateRequest := &requests.Update{}
|
updateRequest := &requests.Update{}
|
||||||
processIfJSONIsOk(json.Unmarshal(jsonBytes, &updateRequest), func() {
|
processIfJSONIsOk(json.Unmarshal(jsonBytes, &updateRequest), func() {
|
||||||
reply = r.Update()
|
reply = r.Update()
|
||||||
})
|
})
|
||||||
|
addMetrics(metrics, HandlerUpdate, start, jsonErr, apiErr)
|
||||||
case HandlerGetRepo:
|
case HandlerGetRepo:
|
||||||
repoRequest := &requests.Repo{}
|
repoRequest := &requests.Repo{}
|
||||||
processIfJSONIsOk(json.Unmarshal(jsonBytes, &repoRequest), func() {
|
processIfJSONIsOk(json.Unmarshal(jsonBytes, &repoRequest), func() {
|
||||||
reply = r.GetRepo()
|
reply = r.GetRepo()
|
||||||
})
|
})
|
||||||
|
addMetrics(metrics, HandlerGetRepo, start, jsonErr, apiErr)
|
||||||
default:
|
default:
|
||||||
err = errors.New(log.Error(" can not handle this one " + handler))
|
err = errors.New(log.Error(" can not handle this one " + handler))
|
||||||
errorResponse := responses.NewError(1, "unknown handler")
|
errorResponse := responses.NewError(1, "unknown handler")
|
||||||
reply = errorResponse
|
reply = errorResponse
|
||||||
|
addMetrics(metrics, "default", start, jsonErr, apiErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// error handling
|
||||||
if jsonErr != nil {
|
if jsonErr != nil {
|
||||||
err = jsonErr
|
err = jsonErr
|
||||||
log.Error(" could not read incoming json:", jsonErr)
|
log.Error(" could not read incoming json:", jsonErr)
|
||||||
@ -66,9 +82,30 @@ func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte) (replyBytes
|
|||||||
err = apiErr
|
err = apiErr
|
||||||
reply = responses.NewError(3, "internal error "+apiErr.Error())
|
reply = responses.NewError(3, "internal error "+apiErr.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
return encodeReply(reply)
|
return encodeReply(reply)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func addMetrics(metrics *status.Metrics, handlerName Handler, start time.Time, errJSON error, errAPI error) {
|
||||||
|
|
||||||
|
duration := time.Since(start)
|
||||||
|
|
||||||
|
s := "succeeded"
|
||||||
|
if errJSON != nil || errAPI != nil {
|
||||||
|
s = "failed"
|
||||||
|
}
|
||||||
|
|
||||||
|
metrics.ServiceRequestCounter.With(prometheus.Labels{
|
||||||
|
status.MetricLabelHandler: string(handlerName),
|
||||||
|
status.MetricLabelStatus: s,
|
||||||
|
}).Inc()
|
||||||
|
|
||||||
|
metrics.ServiceRequestDuration.With(prometheus.Labels{
|
||||||
|
status.MetricLabelHandler: string(handlerName),
|
||||||
|
status.MetricLabelStatus: s,
|
||||||
|
}).Observe(float64(duration.Nanoseconds()))
|
||||||
|
}
|
||||||
|
|
||||||
func encodeReply(reply interface{}) (replyBytes []byte, err error) {
|
func encodeReply(reply interface{}) (replyBytes []byte, err error) {
|
||||||
encodedBytes, jsonReplyErr := json.MarshalIndent(map[string]interface{}{
|
encodedBytes, jsonReplyErr := json.MarshalIndent(map[string]interface{}{
|
||||||
"reply": reply,
|
"reply": reply,
|
||||||
|
|||||||
@ -74,19 +74,23 @@ func runSocketServer(
|
|||||||
address string,
|
address string,
|
||||||
chanErr chan error,
|
chanErr chan error,
|
||||||
) {
|
) {
|
||||||
s := &socketServer{
|
// create socket server
|
||||||
stats: newStats(),
|
s, errSocketServer := newSocketServer(repo)
|
||||||
repo: repo,
|
if errSocketServer != nil {
|
||||||
}
|
log.Error(errSocketServer)
|
||||||
ln, err := net.Listen("tcp", address)
|
chanErr <- errSocketServer
|
||||||
if err != nil {
|
|
||||||
err = errors.New("RunSocketServer: could not start the on \"" + address + "\" - error: " + fmt.Sprint(err))
|
|
||||||
// failed to create socket
|
|
||||||
log.Error(err)
|
|
||||||
chanErr <- err
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// there we go
|
|
||||||
|
// listen on socket
|
||||||
|
ln, errListen := net.Listen("tcp", address)
|
||||||
|
if errListen != nil {
|
||||||
|
errListenSocket := errors.New("RunSocketServer: could not start the on \"" + address + "\" - error: " + fmt.Sprint(errListen))
|
||||||
|
log.Error(errListenSocket)
|
||||||
|
chanErr <- errListenSocket
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
log.Record("RunSocketServer: started to listen on " + address)
|
log.Record("RunSocketServer: started to listen on " + address)
|
||||||
for {
|
for {
|
||||||
// this blocks until connection or error
|
// this blocks until connection or error
|
||||||
|
|||||||
@ -10,39 +10,21 @@ import (
|
|||||||
"github.com/foomo/contentserver/log"
|
"github.com/foomo/contentserver/log"
|
||||||
"github.com/foomo/contentserver/repo"
|
"github.com/foomo/contentserver/repo"
|
||||||
"github.com/foomo/contentserver/responses"
|
"github.com/foomo/contentserver/responses"
|
||||||
|
"github.com/foomo/contentserver/status"
|
||||||
)
|
)
|
||||||
|
|
||||||
// simple internal request counter
|
|
||||||
type stats struct {
|
|
||||||
requests int64
|
|
||||||
chanCount chan int
|
|
||||||
}
|
|
||||||
|
|
||||||
func newStats() *stats {
|
|
||||||
s := &stats{
|
|
||||||
requests: 0,
|
|
||||||
chanCount: make(chan int),
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-s.chanCount:
|
|
||||||
s.requests++
|
|
||||||
s.chanCount <- 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return s
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *stats) countRequest() {
|
|
||||||
s.chanCount <- 1
|
|
||||||
<-s.chanCount
|
|
||||||
}
|
|
||||||
|
|
||||||
type socketServer struct {
|
type socketServer struct {
|
||||||
stats *stats
|
repo *repo.Repo
|
||||||
repo *repo.Repo
|
metrics *status.Metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
// newSocketServer returns a shiny new socket server
|
||||||
|
func newSocketServer(repo *repo.Repo) (s *socketServer, err error) {
|
||||||
|
s = &socketServer{
|
||||||
|
repo: repo,
|
||||||
|
metrics: status.NewMetrics("socketserver"),
|
||||||
|
}
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func extractHandlerAndJSONLentgh(header string) (handler Handler, jsonLength int, err error) {
|
func extractHandlerAndJSONLentgh(header string) (handler Handler, jsonLength int, err error) {
|
||||||
@ -58,12 +40,10 @@ func extractHandlerAndJSONLentgh(header string) (handler Handler, jsonLength int
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *socketServer) execute(handler Handler, jsonBytes []byte) (reply []byte) {
|
func (s *socketServer) execute(handler Handler, jsonBytes []byte) (reply []byte) {
|
||||||
s.stats.countRequest()
|
|
||||||
log.Notice("socketServer.execute: ", s.stats.requests, ", ", handler)
|
|
||||||
if log.SelectedLevel == log.LevelDebug {
|
if log.SelectedLevel == log.LevelDebug {
|
||||||
log.Debug(" incoming json buffer:", string(jsonBytes))
|
log.Debug(" incoming json buffer:", string(jsonBytes))
|
||||||
}
|
}
|
||||||
reply, handlingError := handleRequest(s.repo, handler, jsonBytes)
|
reply, handlingError := handleRequest(s.repo, handler, jsonBytes, s.metrics)
|
||||||
if handlingError != nil {
|
if handlingError != nil {
|
||||||
log.Error("socketServer.execute handlingError :", handlingError)
|
log.Error("socketServer.execute handlingError :", handlingError)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -5,19 +5,23 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
"github.com/foomo/contentserver/status"
|
||||||
|
|
||||||
"github.com/foomo/contentserver/repo"
|
"github.com/foomo/contentserver/repo"
|
||||||
)
|
)
|
||||||
|
|
||||||
type webServer struct {
|
type webServer struct {
|
||||||
r *repo.Repo
|
r *repo.Repo
|
||||||
path string
|
path string
|
||||||
|
metrics *status.Metrics
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWebServer returns a shiny new web server
|
// NewWebServer returns a shiny new web server
|
||||||
func NewWebServer(path string, r *repo.Repo) (s http.Handler, err error) {
|
func NewWebServer(path string, r *repo.Repo) (s http.Handler, err error) {
|
||||||
s = &webServer{
|
s = &webServer{
|
||||||
r: r,
|
r: r,
|
||||||
path: path,
|
path: path,
|
||||||
|
metrics: status.NewMetrics("webserver"),
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -33,7 +37,7 @@ func (s *webServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||||||
http.Error(w, "failed to read incoming request", http.StatusBadRequest)
|
http.Error(w, "failed to read incoming request", http.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
reply, errReply := handleRequest(s.r, Handler(strings.TrimPrefix(r.URL.Path, s.path+"/")), jsonBytes)
|
reply, errReply := handleRequest(s.r, Handler(strings.TrimPrefix(r.URL.Path, s.path+"/")), jsonBytes, s.metrics)
|
||||||
if errReply != nil {
|
if errReply != nil {
|
||||||
http.Error(w, errReply.Error(), http.StatusInternalServerError)
|
http.Error(w, errReply.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
|||||||
43
status/metrics.go
Normal file
43
status/metrics.go
Normal file
@ -0,0 +1,43 @@
|
|||||||
|
package status
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
)
|
||||||
|
|
||||||
|
const MetricLabelHandler = "handler"
|
||||||
|
const MetricLabelStatus = "status"
|
||||||
|
|
||||||
|
type Metrics struct {
|
||||||
|
ServiceRequestCounter *prometheus.CounterVec // count the number of requests for each service function
|
||||||
|
ServiceRequestDuration *prometheus.SummaryVec // count the duration of requests for each service function
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMetrics(namespace string) *Metrics {
|
||||||
|
return &Metrics{
|
||||||
|
ServiceRequestCounter: serviceRequestCounter("api", namespace),
|
||||||
|
ServiceRequestDuration: serviceRequestDuration("api", namespace),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func serviceRequestCounter(subsystem, namespace string) *prometheus.CounterVec {
|
||||||
|
vec := prometheus.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "count_service_requests",
|
||||||
|
Help: "count of requests per func",
|
||||||
|
}, []string{MetricLabelHandler, MetricLabelStatus})
|
||||||
|
prometheus.MustRegister(vec)
|
||||||
|
return vec
|
||||||
|
}
|
||||||
|
|
||||||
|
func serviceRequestDuration(subsystem, namespace string) *prometheus.SummaryVec {
|
||||||
|
vec := prometheus.NewSummaryVec(prometheus.SummaryOpts{
|
||||||
|
Namespace: namespace,
|
||||||
|
Subsystem: subsystem,
|
||||||
|
Name: "time_nanoseconds",
|
||||||
|
Help: "nanoseconds to unmarshal requests, execute a service function and marshal its reponses",
|
||||||
|
}, []string{MetricLabelHandler, MetricLabelStatus})
|
||||||
|
prometheus.MustRegister(vec)
|
||||||
|
return vec
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user