mirror of
https://github.com/foomo/contentserver.git
synced 2025-10-16 12:25:44 +00:00
feat: add update request queue test
This commit is contained in:
parent
0ab7935a05
commit
ddb4adf571
@ -3,7 +3,6 @@ package main
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"runtime/debug"
|
||||
@ -26,7 +25,7 @@ const (
|
||||
|
||||
ServiceName = "Content Server"
|
||||
DefaultHealthzHandlerAddress = ":8080"
|
||||
DefaultPrometheusListener = "127.0.0.1:9111"
|
||||
DefaultPrometheusListener = "127.0.0.1:9200"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -52,10 +51,6 @@ func main() {
|
||||
|
||||
SetupLogging(*flagDebug, "contentserver.log")
|
||||
|
||||
go func() {
|
||||
fmt.Println(http.ListenAndServe("localhost:6060", nil))
|
||||
}()
|
||||
|
||||
if *flagFreeOSMem > 0 {
|
||||
Log.Info("freeing OS memory every $interval minutes", zap.Int("interval", *flagFreeOSMem))
|
||||
go func() {
|
||||
|
||||
@ -8,7 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/foomo/contentserver/content"
|
||||
. "github.com/foomo/contentserver/logger"
|
||||
"github.com/foomo/contentserver/logger"
|
||||
"github.com/foomo/contentserver/status"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"go.uber.org/zap"
|
||||
@ -28,12 +28,15 @@ func (repo *Repo) updateRoutine() {
|
||||
for {
|
||||
select {
|
||||
case resChan := <-repo.updateInProgressChannel:
|
||||
Log.Info("waiting for update to complete", zap.String("chan", fmt.Sprintf("%p", resChan)))
|
||||
logger.Log.Info("waiting for update to complete", zap.String("chan", fmt.Sprintf("%p", resChan)))
|
||||
start := time.Now()
|
||||
|
||||
repoRuntime, errUpdate := repo.update()
|
||||
if errUpdate != nil {
|
||||
status.M.UpdatesFailedCounter.WithLabelValues(errUpdate.Error()).Inc()
|
||||
logger.Log.Error("Failed to update contentserver", zap.Error(errUpdate))
|
||||
status.M.UpdatesFailedCounter.WithLabelValues().Inc()
|
||||
} else {
|
||||
status.M.UpdatesCompletedCounter.WithLabelValues().Inc()
|
||||
}
|
||||
|
||||
resChan <- updateResponse{
|
||||
@ -42,8 +45,7 @@ func (repo *Repo) updateRoutine() {
|
||||
}
|
||||
|
||||
duration := time.Since(start)
|
||||
Log.Info("update completed", zap.Duration("duration", duration), zap.String("chan", fmt.Sprintf("%p", resChan)))
|
||||
status.M.UpdatesCompletedCounter.WithLabelValues().Inc()
|
||||
logger.Log.Info("update completed", zap.Duration("duration", duration), zap.String("chan", fmt.Sprintf("%p", resChan)))
|
||||
status.M.UpdateDuration.WithLabelValues().Observe(duration.Seconds())
|
||||
}
|
||||
}
|
||||
@ -51,24 +53,24 @@ func (repo *Repo) updateRoutine() {
|
||||
|
||||
func (repo *Repo) dimensionUpdateRoutine() {
|
||||
for newDimension := range repo.dimensionUpdateChannel {
|
||||
Log.Info("dimensionUpdateRoutine received a new dimension", zap.String("dimension", newDimension.Dimension))
|
||||
logger.Log.Info("dimensionUpdateRoutine received a new dimension", zap.String("dimension", newDimension.Dimension))
|
||||
|
||||
err := repo._updateDimension(newDimension.Dimension, newDimension.Node)
|
||||
Log.Info("dimensionUpdateRoutine received result")
|
||||
logger.Log.Info("dimensionUpdateRoutine received result")
|
||||
if err != nil {
|
||||
Log.Debug("update dimension failed", zap.Error(err))
|
||||
logger.Log.Debug("update dimension failed", zap.Error(err))
|
||||
}
|
||||
repo.dimensionUpdateDoneChannel <- err
|
||||
}
|
||||
}
|
||||
|
||||
func (repo *Repo) updateDimension(dimension string, node *content.RepoNode) error {
|
||||
Log.Debug("trying to push dimension into update channel", zap.String("dimension", dimension), zap.String("nodeName", node.Name))
|
||||
logger.Log.Debug("trying to push dimension into update channel", zap.String("dimension", dimension), zap.String("nodeName", node.Name))
|
||||
repo.dimensionUpdateChannel <- &repoDimension{
|
||||
Dimension: dimension,
|
||||
Node: node,
|
||||
}
|
||||
Log.Debug("waiting for done signal")
|
||||
logger.Log.Debug("waiting for done signal")
|
||||
return <-repo.dimensionUpdateDoneChannel
|
||||
}
|
||||
|
||||
@ -179,7 +181,7 @@ func (repo *Repo) get(URL string) (err error) {
|
||||
}
|
||||
defer response.Body.Close()
|
||||
if response.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("Bad HTTP Response: %q", response.Status)
|
||||
return fmt.Errorf("bad HTTP Response: %q", response.Status)
|
||||
}
|
||||
|
||||
// Log.Info(ansi.Red + "RESETTING BUFFER" + ansi.Reset)
|
||||
@ -196,10 +198,10 @@ func (repo *Repo) update() (repoRuntime int64, err error) {
|
||||
repoRuntime = time.Now().UnixNano() - startTimeRepo
|
||||
if err != nil {
|
||||
// we have no json to load - the repo server did not reply
|
||||
Log.Debug("failed to load json", zap.Error(err))
|
||||
logger.Log.Debug("Failed to load json", zap.Error(err))
|
||||
return repoRuntime, err
|
||||
}
|
||||
Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(repo.jsonBuf.Bytes())))
|
||||
logger.Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(repo.jsonBuf.Bytes())))
|
||||
nodes, err := repo.loadNodesFromJSON()
|
||||
if err != nil {
|
||||
// could not load nodes from json
|
||||
@ -218,11 +220,11 @@ func (repo *Repo) tryUpdate() (repoRuntime int64, err error) {
|
||||
c := make(chan updateResponse)
|
||||
select {
|
||||
case repo.updateInProgressChannel <- c:
|
||||
Log.Info("update request added to queue")
|
||||
logger.Log.Info("update request added to queue")
|
||||
ur := <-c
|
||||
return ur.repoRuntime, ur.err
|
||||
default:
|
||||
Log.Info("update request accepted, will be processed after the previous update")
|
||||
logger.Log.Info("update request accepted, will be processed after the previous update")
|
||||
return 0, errUpdateRejected
|
||||
}
|
||||
}
|
||||
@ -233,7 +235,7 @@ func (repo *Repo) loadJSONBytes() error {
|
||||
data := repo.jsonBuf.Bytes()
|
||||
|
||||
if len(data) > 10 {
|
||||
Log.Debug("could not parse json",
|
||||
logger.Log.Debug("could not parse json",
|
||||
zap.String("jsonStart", string(data[:10])),
|
||||
zap.String("jsonStart", string(data[len(data)-10:])),
|
||||
)
|
||||
@ -245,10 +247,10 @@ func (repo *Repo) loadJSONBytes() error {
|
||||
if err == nil {
|
||||
historyErr := repo.history.add(repo.jsonBuf.Bytes())
|
||||
if historyErr != nil {
|
||||
Log.Error("could not add valid json to history", zap.Error(historyErr))
|
||||
logger.Log.Error("could not add valid json to history", zap.Error(historyErr))
|
||||
status.M.HistoryPersistFailedCounter.WithLabelValues(historyErr.Error()).Inc()
|
||||
} else {
|
||||
Log.Info("added valid json to history")
|
||||
logger.Log.Info("added valid json to history")
|
||||
}
|
||||
}
|
||||
return err
|
||||
@ -258,10 +260,10 @@ func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error {
|
||||
newDimensions := []string{}
|
||||
for dimension, newNode := range newNodes {
|
||||
newDimensions = append(newDimensions, dimension)
|
||||
Log.Debug("loading nodes for dimension", zap.String("dimension", dimension))
|
||||
logger.Log.Debug("loading nodes for dimension", zap.String("dimension", dimension))
|
||||
loadErr := repo.updateDimension(dimension, newNode)
|
||||
if loadErr != nil {
|
||||
Log.Debug("failed to load", zap.String("dimension", dimension), zap.Error(loadErr))
|
||||
logger.Log.Debug("failed to load", zap.String("dimension", dimension), zap.Error(loadErr))
|
||||
return loadErr
|
||||
}
|
||||
}
|
||||
@ -276,7 +278,7 @@ func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error {
|
||||
// we need to throw away orphaned dimensions
|
||||
for dimension := range repo.Directory {
|
||||
if !dimensionIsValid(dimension) {
|
||||
Log.Info("removing orphaned dimension", zap.String("dimension", dimension))
|
||||
logger.Log.Info("removing orphaned dimension", zap.String("dimension", dimension))
|
||||
delete(repo.Directory, dimension)
|
||||
}
|
||||
}
|
||||
|
||||
71
repo/repo.go
71
repo/repo.go
@ -12,7 +12,7 @@ import (
|
||||
"github.com/foomo/contentserver/status"
|
||||
|
||||
"github.com/foomo/contentserver/content"
|
||||
. "github.com/foomo/contentserver/logger"
|
||||
"github.com/foomo/contentserver/logger"
|
||||
"github.com/foomo/contentserver/requests"
|
||||
"github.com/foomo/contentserver/responses"
|
||||
"go.uber.org/zap"
|
||||
@ -51,7 +51,7 @@ type repoDimension struct {
|
||||
// NewRepo constructor
|
||||
func NewRepo(server string, varDir string) *Repo {
|
||||
|
||||
Log.Info("creating new repo",
|
||||
logger.Log.Info("creating new repo",
|
||||
zap.String("server", server),
|
||||
zap.String("varDir", varDir),
|
||||
)
|
||||
@ -68,13 +68,13 @@ func NewRepo(server string, varDir string) *Repo {
|
||||
go repo.updateRoutine()
|
||||
go repo.dimensionUpdateRoutine()
|
||||
|
||||
Log.Info("trying to restore previous state")
|
||||
logger.Log.Info("trying to restore previous state")
|
||||
restoreErr := repo.tryToRestoreCurrent()
|
||||
if restoreErr != nil {
|
||||
Log.Error(" could not restore previous repo content", zap.Error(restoreErr))
|
||||
logger.Log.Error(" could not restore previous repo content", zap.Error(restoreErr))
|
||||
} else {
|
||||
repo.recovered = true
|
||||
Log.Info("restored previous repo content")
|
||||
logger.Log.Info("restored previous repo content")
|
||||
}
|
||||
return repo
|
||||
}
|
||||
@ -104,12 +104,11 @@ func (repo *Repo) getNodes(nodeRequests map[string]*requests.Node, env *requests
|
||||
path = []*content.Item{}
|
||||
)
|
||||
for nodeName, nodeRequest := range nodeRequests {
|
||||
|
||||
if nodeName == "" || nodeRequest.ID == "" {
|
||||
Log.Info("invalid node request", zap.Error(errors.New("nodeName or nodeRequest.ID empty")))
|
||||
logger.Log.Info("invalid node request", zap.Error(errors.New("nodeName or nodeRequest.ID empty")))
|
||||
continue
|
||||
}
|
||||
Log.Debug("adding node", zap.String("name", nodeName), zap.String("requestID", nodeRequest.ID))
|
||||
logger.Log.Debug("adding node", zap.String("name", nodeName), zap.String("requestID", nodeRequest.ID))
|
||||
|
||||
groups := env.Groups
|
||||
if len(nodeRequest.Groups) > 0 {
|
||||
@ -120,30 +119,28 @@ func (repo *Repo) getNodes(nodeRequests map[string]*requests.Node, env *requests
|
||||
nodes[nodeName] = nil
|
||||
|
||||
if !ok && nodeRequest.Dimension == "" {
|
||||
Log.Debug("could not get dimension root node", zap.String("dimension", nodeRequest.Dimension))
|
||||
logger.Log.Debug("Could not get dimension root node", zap.String("dimension", nodeRequest.Dimension))
|
||||
for _, dimension := range env.Dimensions {
|
||||
dimensionNode, ok = repo.Directory[dimension]
|
||||
if ok {
|
||||
Log.Debug("found root node in env.Dimensions", zap.String("dimension", dimension))
|
||||
logger.Log.Debug("Found root node in env.Dimensions", zap.String("dimension", dimension))
|
||||
break
|
||||
}
|
||||
Log.Debug("could NOT find root node in env.Dimensions", zap.String("dimension", dimension))
|
||||
logger.Log.Debug("Could NOT find root node in env.Dimensions", zap.String("dimension", dimension))
|
||||
}
|
||||
}
|
||||
|
||||
if !ok {
|
||||
Log.Error("could not get dimension root node", zap.String("nodeRequest.Dimension", nodeRequest.Dimension))
|
||||
logger.Log.Error("could not get dimension root node", zap.String("nodeRequest.Dimension", nodeRequest.Dimension))
|
||||
continue
|
||||
}
|
||||
|
||||
treeNode, ok := dimensionNode.Directory[nodeRequest.ID]
|
||||
if ok {
|
||||
nodes[nodeName] = repo.getNode(treeNode, nodeRequest.Expand, nodeRequest.MimeTypes, path, 0, groups, nodeRequest.DataFields, nodeRequest.ExposeHiddenNodes)
|
||||
} else {
|
||||
Log.Error("an invalid tree node was requested",
|
||||
zap.String("nodeName", nodeName),
|
||||
zap.String("ID", nodeRequest.ID),
|
||||
)
|
||||
if !ok {
|
||||
status.M.InvalidNodeTreeRequests.WithLabelValues(nodeRequest.ID).Inc()
|
||||
continue
|
||||
}
|
||||
nodes[nodeName] = repo.getNode(treeNode, nodeRequest.Expand, nodeRequest.MimeTypes, path, 0, groups, nodeRequest.DataFields, nodeRequest.ExposeHiddenNodes)
|
||||
}
|
||||
return nodes
|
||||
}
|
||||
@ -161,18 +158,18 @@ func (repo *Repo) GetContent(r *requests.Content) (c *content.SiteContent, err e
|
||||
// add more input validation
|
||||
err = repo.validateContentRequest(r)
|
||||
if err != nil {
|
||||
Log.Error("repo.GetContent invalid request", zap.Error(err))
|
||||
logger.Log.Error("repo.GetContent invalid request", zap.Error(err))
|
||||
return
|
||||
}
|
||||
Log.Debug("repo.GetContent", zap.String("URI", r.URI))
|
||||
logger.Log.Debug("repo.GetContent", zap.String("URI", r.URI))
|
||||
c = content.NewSiteContent()
|
||||
resolved, resolvedURI, resolvedDimension, node := repo.resolveContent(r.Env.Dimensions, r.URI)
|
||||
if resolved {
|
||||
if !node.CanBeAccessedByGroups(r.Env.Groups) {
|
||||
Log.Warn("resolvecontent got status 401", zap.String("URI", r.URI))
|
||||
logger.Log.Warn("Resolved content cannot be accessed by specified group", zap.String("URI", r.URI))
|
||||
c.Status = content.StatusForbidden
|
||||
} else {
|
||||
Log.Info("resolvecontent got status 200", zap.String("URI", r.URI))
|
||||
logger.Log.Info("Content resolved", zap.String("URI", r.URI))
|
||||
c.Status = content.StatusOk
|
||||
c.Data = node.Data
|
||||
}
|
||||
@ -188,11 +185,11 @@ func (repo *Repo) GetContent(r *requests.Content) (c *content.SiteContent, err e
|
||||
}
|
||||
c.URIs = uris
|
||||
} else {
|
||||
Log.Info("resolvecontent got status 404", zap.String("URI", r.URI))
|
||||
logger.Log.Info("Content not found", zap.String("URI", r.URI))
|
||||
c.Status = content.StatusNotFound
|
||||
c.Dimension = r.Env.Dimensions[0]
|
||||
|
||||
Log.Debug("failed to resolve, falling back to default dimension",
|
||||
logger.Log.Debug("Failed to resolve, falling back to default dimension",
|
||||
zap.String("URI", r.URI),
|
||||
zap.String("defaultDimension", r.Env.Dimensions[0]),
|
||||
)
|
||||
@ -226,13 +223,13 @@ func (repo *Repo) WriteRepoBytes(w io.Writer) {
|
||||
|
||||
f, err := os.Open(repo.history.getCurrentFilename())
|
||||
if err != nil {
|
||||
Log.Error("failed to serve Repo JSON", zap.Error(err))
|
||||
logger.Log.Error("Failed to serve Repo JSON", zap.Error(err))
|
||||
}
|
||||
|
||||
w.Write([]byte("{\"reply\":"))
|
||||
_, err = io.Copy(w, f)
|
||||
if err != nil {
|
||||
Log.Error("failed to serve Repo JSON", zap.Error(err))
|
||||
logger.Log.Error("Failed to serve Repo JSON", zap.Error(err))
|
||||
}
|
||||
w.Write([]byte("}"))
|
||||
}
|
||||
@ -243,7 +240,7 @@ func (repo *Repo) Update() (updateResponse *responses.Update) {
|
||||
return float64(float64(nanoSeconds) / float64(1000000000.0))
|
||||
}
|
||||
|
||||
Log.Info("Update triggered")
|
||||
logger.Log.Info("Update triggered")
|
||||
// Log.Info(ansi.Yellow + "BUFFER LENGTH BEFORE tryUpdate(): " + strconv.Itoa(len(repo.jsonBuf.Bytes())) + ansi.Reset)
|
||||
|
||||
startTime := time.Now().UnixNano()
|
||||
@ -262,13 +259,13 @@ func (repo *Repo) Update() (updateResponse *responses.Update) {
|
||||
|
||||
if updateErr != errUpdateRejected {
|
||||
updateResponse.ErrorMessage = updateErr.Error()
|
||||
Log.Error("could not update repository:", zap.Error(updateErr))
|
||||
logger.Log.Error("Failed to update repository", zap.Error(updateErr))
|
||||
|
||||
restoreErr := repo.tryToRestoreCurrent()
|
||||
if restoreErr != nil {
|
||||
Log.Error("failed to restore preceding repo version", zap.Error(restoreErr))
|
||||
logger.Log.Error("Failed to restore preceding repository version", zap.Error(restoreErr))
|
||||
} else {
|
||||
Log.Info("restored current repo from local history")
|
||||
logger.Log.Info("Successfully restored current repository from local history")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
@ -276,7 +273,7 @@ func (repo *Repo) Update() (updateResponse *responses.Update) {
|
||||
// persist the currently loaded one
|
||||
historyErr := repo.history.add(repo.jsonBuf.Bytes())
|
||||
if historyErr != nil {
|
||||
Log.Error("could not persist current repo in history", zap.Error(historyErr))
|
||||
logger.Log.Error("Could not persist current repo in history", zap.Error(historyErr))
|
||||
status.M.HistoryPersistFailedCounter.WithLabelValues(historyErr.Error()).Inc()
|
||||
}
|
||||
// add some stats
|
||||
@ -292,7 +289,7 @@ func (repo *Repo) Update() (updateResponse *responses.Update) {
|
||||
// resolveContent find content in a repository
|
||||
func (repo *Repo) resolveContent(dimensions []string, URI string) (resolved bool, resolvedURI string, resolvedDimension string, repoNode *content.RepoNode) {
|
||||
parts := strings.Split(URI, content.PathSeparator)
|
||||
Log.Debug("repo.ResolveContent", zap.String("URI", URI))
|
||||
logger.Log.Debug("repo.ResolveContent", zap.String("URI", URI))
|
||||
for i := len(parts); i > 0; i-- {
|
||||
testURI := strings.Join(parts[0:i], content.PathSeparator)
|
||||
if testURI == "" {
|
||||
@ -300,13 +297,13 @@ func (repo *Repo) resolveContent(dimensions []string, URI string) (resolved bool
|
||||
}
|
||||
for _, dimension := range dimensions {
|
||||
if d, ok := repo.Directory[dimension]; ok {
|
||||
Log.Debug("checking",
|
||||
logger.Log.Debug("Checking node",
|
||||
zap.String("dimension", dimension),
|
||||
zap.String("URI", testURI),
|
||||
)
|
||||
if repoNode, ok := d.URIDirectory[testURI]; ok {
|
||||
resolved = true
|
||||
Log.Debug("found node", zap.String("URI", testURI), zap.String("destination", repoNode.DestinationID))
|
||||
logger.Log.Debug("Node found", zap.String("URI", testURI), zap.String("destination", repoNode.DestinationID))
|
||||
if len(repoNode.DestinationID) > 0 {
|
||||
if destionationNode, destinationNodeOk := d.Directory[repoNode.DestinationID]; destinationNodeOk {
|
||||
repoNode = destionationNode
|
||||
@ -328,7 +325,7 @@ func (repo *Repo) getURIForNode(dimension string, repoNode *content.RepoNode, re
|
||||
linkedNode, ok := repo.Directory[dimension].Directory[repoNode.LinkID]
|
||||
if ok {
|
||||
if recursionLevel > maxGetURIForNodeRecursionLevel {
|
||||
Log.Error("maxGetURIForNodeRecursionLevel reached", zap.String("repoNode.ID", repoNode.ID), zap.String("linkID", repoNode.LinkID), zap.String("dimension", dimension))
|
||||
logger.Log.Error("maxGetURIForNodeRecursionLevel reached", zap.String("repoNode.ID", repoNode.ID), zap.String("linkID", repoNode.LinkID), zap.String("dimension", dimension))
|
||||
return ""
|
||||
}
|
||||
return repo.getURIForNode(dimension, linkedNode, recursionLevel+1)
|
||||
@ -356,7 +353,7 @@ func (repo *Repo) getNode(
|
||||
) *content.Node {
|
||||
node := content.NewNode()
|
||||
node.Item = repoNode.ToItem(dataFields)
|
||||
Log.Debug("getNode", zap.String("ID", repoNode.ID))
|
||||
logger.Log.Debug("getNode", zap.String("ID", repoNode.ID))
|
||||
for _, childID := range repoNode.Index {
|
||||
childNode := repoNode.Nodes[childID]
|
||||
if (level == 0 || expanded || !expanded && childNode.InPath(path)) && (!childNode.Hidden || exposeHiddenNodes) && childNode.CanBeAccessedByGroups(groups) && childNode.IsOneOfTheseMimeTypes(mimeTypes) {
|
||||
|
||||
@ -12,13 +12,27 @@ import (
|
||||
"github.com/foomo/contentserver/status"
|
||||
)
|
||||
|
||||
func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source string) (replyBytes []byte, err error) {
|
||||
func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source string) ([]byte, error) {
|
||||
start := time.Now()
|
||||
|
||||
reply, err := executeRequest(r, handler, jsonBytes, source)
|
||||
result := "success"
|
||||
if err != nil {
|
||||
result = "error"
|
||||
}
|
||||
|
||||
status.M.ServiceRequestCounter.WithLabelValues(string(handler), result, source).Inc()
|
||||
status.M.ServiceRequestDuration.WithLabelValues(string(handler), result, source).Observe(time.Since(start).Seconds())
|
||||
|
||||
return reply, err
|
||||
}
|
||||
|
||||
func executeRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source string) (replyBytes []byte, err error) {
|
||||
|
||||
var (
|
||||
reply interface{}
|
||||
apiErr error
|
||||
jsonErr error
|
||||
start = time.Now()
|
||||
processIfJSONIsOk = func(err error, processingFunc func()) {
|
||||
if err != nil {
|
||||
jsonErr = err
|
||||
@ -57,7 +71,6 @@ func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source strin
|
||||
default:
|
||||
reply = responses.NewError(1, "unknown handler: "+string(handler))
|
||||
}
|
||||
addMetrics(handler, start, jsonErr, apiErr, source)
|
||||
|
||||
// error handling
|
||||
if jsonErr != nil {
|
||||
@ -71,20 +84,6 @@ func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source strin
|
||||
return encodeReply(reply)
|
||||
}
|
||||
|
||||
func addMetrics(handlerName Handler, start time.Time, errJSON error, errAPI error, source string) {
|
||||
|
||||
var (
|
||||
duration = time.Since(start)
|
||||
s = "succeeded"
|
||||
)
|
||||
if errJSON != nil || errAPI != nil {
|
||||
s = "failed"
|
||||
}
|
||||
|
||||
status.M.ServiceRequestCounter.WithLabelValues(string(handlerName), s, source).Inc()
|
||||
status.M.ServiceRequestDuration.WithLabelValues(string(handlerName), s, source).Observe(float64(duration.Seconds()))
|
||||
}
|
||||
|
||||
// encodeReply takes an interface and encodes it as JSON
|
||||
// it returns the resulting JSON and a marshalling error
|
||||
func encodeReply(reply interface{}) (replyBytes []byte, err error) {
|
||||
|
||||
@ -7,7 +7,6 @@ import (
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -47,13 +46,10 @@ func (s *socketServer) execute(handler Handler, jsonBytes []byte) (reply []byte)
|
||||
Log.Debug("incoming json buffer", zap.Int("length", len(jsonBytes)))
|
||||
|
||||
if handler == HandlerGetRepo {
|
||||
|
||||
var (
|
||||
b bytes.Buffer
|
||||
start = time.Now()
|
||||
b bytes.Buffer
|
||||
)
|
||||
s.repo.WriteRepoBytes(&b)
|
||||
addMetrics(handler, start, nil, nil, sourceSocketServer)
|
||||
return b.Bytes()
|
||||
}
|
||||
|
||||
|
||||
@ -5,7 +5,6 @@ import (
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
||||
@ -31,7 +30,7 @@ func NewWebServer(path string, r *repo.Repo) http.Handler {
|
||||
func (s *webServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
Log.Error("panic in handle connection", zap.String("error", fmt.Sprint(r)))
|
||||
Log.Error("Panic in handle connection", zap.String("error", fmt.Sprint(r)))
|
||||
}
|
||||
}()
|
||||
|
||||
@ -40,17 +39,14 @@ func (s *webServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
jsonBytes, readErr := ioutil.ReadAll(r.Body)
|
||||
r.Body.Close()
|
||||
if readErr != nil {
|
||||
http.Error(w, "failed to read incoming request", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
h := Handler(strings.TrimPrefix(r.URL.Path, s.path+"/"))
|
||||
if h == HandlerGetRepo {
|
||||
start := time.Now()
|
||||
s.r.WriteRepoBytes(w)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
addMetrics(h, start, nil, nil, sourceWebserver)
|
||||
return
|
||||
}
|
||||
reply, errReply := handleRequest(s.r, h, jsonBytes, "webserver")
|
||||
|
||||
@ -28,6 +28,7 @@ type Metrics struct {
|
||||
ContentRequestCounter *prometheus.CounterVec // count the total number of content requests
|
||||
NumSocketsGauge *prometheus.GaugeVec // keep track of the total number of open sockets
|
||||
HistoryPersistFailedCounter *prometheus.CounterVec // count the number of failed attempts to persist the content history
|
||||
InvalidNodeTreeRequests *prometheus.CounterVec // counts the number of invalid tree node requests
|
||||
}
|
||||
|
||||
// newMetrics can be used to instantiate a metrics instance
|
||||
@ -36,6 +37,9 @@ type Metrics struct {
|
||||
// the package exposes the initialized Metrics instance as the variable M.
|
||||
func newMetrics() *Metrics {
|
||||
return &Metrics{
|
||||
InvalidNodeTreeRequests: newCounterVec("invalid_node_tree_request_count",
|
||||
"Counts the number of invalid tree nodes for a specific node ID",
|
||||
"nodeID"),
|
||||
ServiceRequestCounter: newCounterVec(
|
||||
"service_request_count",
|
||||
"Count of requests for each handler",
|
||||
@ -57,7 +61,6 @@ func newMetrics() *Metrics {
|
||||
UpdatesFailedCounter: newCounterVec(
|
||||
"updates_failed_count",
|
||||
"Number of updates that failed due to an error",
|
||||
metricLabelError,
|
||||
),
|
||||
UpdateDuration: newSummaryVec(
|
||||
"update_duration_seconds",
|
||||
|
||||
Loading…
Reference in New Issue
Block a user