From ddb4adf571858de44eb84f74f940df09c9b8e60f Mon Sep 17 00:00:00 2001 From: Stefan Martinov Date: Sun, 22 Nov 2020 18:53:16 +0100 Subject: [PATCH 1/3] feat: add update request queue test --- contentserver.go | 7 +--- repo/loader.go | 44 +++++++++++++------------ repo/repo.go | 71 ++++++++++++++++++++--------------------- server/handlerequest.go | 33 ++++++++++--------- server/socketserver.go | 6 +--- server/webserver.go | 6 +--- status/metrics.go | 5 ++- 7 files changed, 80 insertions(+), 92 deletions(-) diff --git a/contentserver.go b/contentserver.go index 3d45927..40110b7 100644 --- a/contentserver.go +++ b/contentserver.go @@ -3,7 +3,6 @@ package main import ( "flag" "fmt" - "net/http" _ "net/http/pprof" "os" "runtime/debug" @@ -26,7 +25,7 @@ const ( ServiceName = "Content Server" DefaultHealthzHandlerAddress = ":8080" - DefaultPrometheusListener = "127.0.0.1:9111" + DefaultPrometheusListener = "127.0.0.1:9200" ) var ( @@ -52,10 +51,6 @@ func main() { SetupLogging(*flagDebug, "contentserver.log") - go func() { - fmt.Println(http.ListenAndServe("localhost:6060", nil)) - }() - if *flagFreeOSMem > 0 { Log.Info("freeing OS memory every $interval minutes", zap.Int("interval", *flagFreeOSMem)) go func() { diff --git a/repo/loader.go b/repo/loader.go index e483cd7..6e0c8f9 100644 --- a/repo/loader.go +++ b/repo/loader.go @@ -8,7 +8,7 @@ import ( "time" "github.com/foomo/contentserver/content" - . "github.com/foomo/contentserver/logger" + "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/status" jsoniter "github.com/json-iterator/go" "go.uber.org/zap" @@ -28,12 +28,15 @@ func (repo *Repo) updateRoutine() { for { select { case resChan := <-repo.updateInProgressChannel: - Log.Info("waiting for update to complete", zap.String("chan", fmt.Sprintf("%p", resChan))) + logger.Log.Info("waiting for update to complete", zap.String("chan", fmt.Sprintf("%p", resChan))) start := time.Now() repoRuntime, errUpdate := repo.update() if errUpdate != nil { - status.M.UpdatesFailedCounter.WithLabelValues(errUpdate.Error()).Inc() + logger.Log.Error("Failed to update contentserver", zap.Error(errUpdate)) + status.M.UpdatesFailedCounter.WithLabelValues().Inc() + } else { + status.M.UpdatesCompletedCounter.WithLabelValues().Inc() } resChan <- updateResponse{ @@ -42,8 +45,7 @@ func (repo *Repo) updateRoutine() { } duration := time.Since(start) - Log.Info("update completed", zap.Duration("duration", duration), zap.String("chan", fmt.Sprintf("%p", resChan))) - status.M.UpdatesCompletedCounter.WithLabelValues().Inc() + logger.Log.Info("update completed", zap.Duration("duration", duration), zap.String("chan", fmt.Sprintf("%p", resChan))) status.M.UpdateDuration.WithLabelValues().Observe(duration.Seconds()) } } @@ -51,24 +53,24 @@ func (repo *Repo) updateRoutine() { func (repo *Repo) dimensionUpdateRoutine() { for newDimension := range repo.dimensionUpdateChannel { - Log.Info("dimensionUpdateRoutine received a new dimension", zap.String("dimension", newDimension.Dimension)) + logger.Log.Info("dimensionUpdateRoutine received a new dimension", zap.String("dimension", newDimension.Dimension)) err := repo._updateDimension(newDimension.Dimension, newDimension.Node) - Log.Info("dimensionUpdateRoutine received result") + logger.Log.Info("dimensionUpdateRoutine received result") if err != nil { - Log.Debug("update dimension failed", zap.Error(err)) + logger.Log.Debug("update dimension failed", zap.Error(err)) } repo.dimensionUpdateDoneChannel <- err } } func (repo *Repo) updateDimension(dimension string, node *content.RepoNode) error { - Log.Debug("trying to push dimension into update channel", zap.String("dimension", dimension), zap.String("nodeName", node.Name)) + logger.Log.Debug("trying to push dimension into update channel", zap.String("dimension", dimension), zap.String("nodeName", node.Name)) repo.dimensionUpdateChannel <- &repoDimension{ Dimension: dimension, Node: node, } - Log.Debug("waiting for done signal") + logger.Log.Debug("waiting for done signal") return <-repo.dimensionUpdateDoneChannel } @@ -179,7 +181,7 @@ func (repo *Repo) get(URL string) (err error) { } defer response.Body.Close() if response.StatusCode != http.StatusOK { - return fmt.Errorf("Bad HTTP Response: %q", response.Status) + return fmt.Errorf("bad HTTP Response: %q", response.Status) } // Log.Info(ansi.Red + "RESETTING BUFFER" + ansi.Reset) @@ -196,10 +198,10 @@ func (repo *Repo) update() (repoRuntime int64, err error) { repoRuntime = time.Now().UnixNano() - startTimeRepo if err != nil { // we have no json to load - the repo server did not reply - Log.Debug("failed to load json", zap.Error(err)) + logger.Log.Debug("Failed to load json", zap.Error(err)) return repoRuntime, err } - Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(repo.jsonBuf.Bytes()))) + logger.Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(repo.jsonBuf.Bytes()))) nodes, err := repo.loadNodesFromJSON() if err != nil { // could not load nodes from json @@ -218,11 +220,11 @@ func (repo *Repo) tryUpdate() (repoRuntime int64, err error) { c := make(chan updateResponse) select { case repo.updateInProgressChannel <- c: - Log.Info("update request added to queue") + logger.Log.Info("update request added to queue") ur := <-c return ur.repoRuntime, ur.err default: - Log.Info("update request accepted, will be processed after the previous update") + logger.Log.Info("update request accepted, will be processed after the previous update") return 0, errUpdateRejected } } @@ -233,7 +235,7 @@ func (repo *Repo) loadJSONBytes() error { data := repo.jsonBuf.Bytes() if len(data) > 10 { - Log.Debug("could not parse json", + logger.Log.Debug("could not parse json", zap.String("jsonStart", string(data[:10])), zap.String("jsonStart", string(data[len(data)-10:])), ) @@ -245,10 +247,10 @@ func (repo *Repo) loadJSONBytes() error { if err == nil { historyErr := repo.history.add(repo.jsonBuf.Bytes()) if historyErr != nil { - Log.Error("could not add valid json to history", zap.Error(historyErr)) + logger.Log.Error("could not add valid json to history", zap.Error(historyErr)) status.M.HistoryPersistFailedCounter.WithLabelValues(historyErr.Error()).Inc() } else { - Log.Info("added valid json to history") + logger.Log.Info("added valid json to history") } } return err @@ -258,10 +260,10 @@ func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error { newDimensions := []string{} for dimension, newNode := range newNodes { newDimensions = append(newDimensions, dimension) - Log.Debug("loading nodes for dimension", zap.String("dimension", dimension)) + logger.Log.Debug("loading nodes for dimension", zap.String("dimension", dimension)) loadErr := repo.updateDimension(dimension, newNode) if loadErr != nil { - Log.Debug("failed to load", zap.String("dimension", dimension), zap.Error(loadErr)) + logger.Log.Debug("failed to load", zap.String("dimension", dimension), zap.Error(loadErr)) return loadErr } } @@ -276,7 +278,7 @@ func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error { // we need to throw away orphaned dimensions for dimension := range repo.Directory { if !dimensionIsValid(dimension) { - Log.Info("removing orphaned dimension", zap.String("dimension", dimension)) + logger.Log.Info("removing orphaned dimension", zap.String("dimension", dimension)) delete(repo.Directory, dimension) } } diff --git a/repo/repo.go b/repo/repo.go index 7bf302b..2b89dfd 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -12,7 +12,7 @@ import ( "github.com/foomo/contentserver/status" "github.com/foomo/contentserver/content" - . "github.com/foomo/contentserver/logger" + "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/requests" "github.com/foomo/contentserver/responses" "go.uber.org/zap" @@ -51,7 +51,7 @@ type repoDimension struct { // NewRepo constructor func NewRepo(server string, varDir string) *Repo { - Log.Info("creating new repo", + logger.Log.Info("creating new repo", zap.String("server", server), zap.String("varDir", varDir), ) @@ -68,13 +68,13 @@ func NewRepo(server string, varDir string) *Repo { go repo.updateRoutine() go repo.dimensionUpdateRoutine() - Log.Info("trying to restore previous state") + logger.Log.Info("trying to restore previous state") restoreErr := repo.tryToRestoreCurrent() if restoreErr != nil { - Log.Error(" could not restore previous repo content", zap.Error(restoreErr)) + logger.Log.Error(" could not restore previous repo content", zap.Error(restoreErr)) } else { repo.recovered = true - Log.Info("restored previous repo content") + logger.Log.Info("restored previous repo content") } return repo } @@ -104,12 +104,11 @@ func (repo *Repo) getNodes(nodeRequests map[string]*requests.Node, env *requests path = []*content.Item{} ) for nodeName, nodeRequest := range nodeRequests { - if nodeName == "" || nodeRequest.ID == "" { - Log.Info("invalid node request", zap.Error(errors.New("nodeName or nodeRequest.ID empty"))) + logger.Log.Info("invalid node request", zap.Error(errors.New("nodeName or nodeRequest.ID empty"))) continue } - Log.Debug("adding node", zap.String("name", nodeName), zap.String("requestID", nodeRequest.ID)) + logger.Log.Debug("adding node", zap.String("name", nodeName), zap.String("requestID", nodeRequest.ID)) groups := env.Groups if len(nodeRequest.Groups) > 0 { @@ -120,30 +119,28 @@ func (repo *Repo) getNodes(nodeRequests map[string]*requests.Node, env *requests nodes[nodeName] = nil if !ok && nodeRequest.Dimension == "" { - Log.Debug("could not get dimension root node", zap.String("dimension", nodeRequest.Dimension)) + logger.Log.Debug("Could not get dimension root node", zap.String("dimension", nodeRequest.Dimension)) for _, dimension := range env.Dimensions { dimensionNode, ok = repo.Directory[dimension] if ok { - Log.Debug("found root node in env.Dimensions", zap.String("dimension", dimension)) + logger.Log.Debug("Found root node in env.Dimensions", zap.String("dimension", dimension)) break } - Log.Debug("could NOT find root node in env.Dimensions", zap.String("dimension", dimension)) + logger.Log.Debug("Could NOT find root node in env.Dimensions", zap.String("dimension", dimension)) } } if !ok { - Log.Error("could not get dimension root node", zap.String("nodeRequest.Dimension", nodeRequest.Dimension)) + logger.Log.Error("could not get dimension root node", zap.String("nodeRequest.Dimension", nodeRequest.Dimension)) continue } + treeNode, ok := dimensionNode.Directory[nodeRequest.ID] - if ok { - nodes[nodeName] = repo.getNode(treeNode, nodeRequest.Expand, nodeRequest.MimeTypes, path, 0, groups, nodeRequest.DataFields, nodeRequest.ExposeHiddenNodes) - } else { - Log.Error("an invalid tree node was requested", - zap.String("nodeName", nodeName), - zap.String("ID", nodeRequest.ID), - ) + if !ok { + status.M.InvalidNodeTreeRequests.WithLabelValues(nodeRequest.ID).Inc() + continue } + nodes[nodeName] = repo.getNode(treeNode, nodeRequest.Expand, nodeRequest.MimeTypes, path, 0, groups, nodeRequest.DataFields, nodeRequest.ExposeHiddenNodes) } return nodes } @@ -161,18 +158,18 @@ func (repo *Repo) GetContent(r *requests.Content) (c *content.SiteContent, err e // add more input validation err = repo.validateContentRequest(r) if err != nil { - Log.Error("repo.GetContent invalid request", zap.Error(err)) + logger.Log.Error("repo.GetContent invalid request", zap.Error(err)) return } - Log.Debug("repo.GetContent", zap.String("URI", r.URI)) + logger.Log.Debug("repo.GetContent", zap.String("URI", r.URI)) c = content.NewSiteContent() resolved, resolvedURI, resolvedDimension, node := repo.resolveContent(r.Env.Dimensions, r.URI) if resolved { if !node.CanBeAccessedByGroups(r.Env.Groups) { - Log.Warn("resolvecontent got status 401", zap.String("URI", r.URI)) + logger.Log.Warn("Resolved content cannot be accessed by specified group", zap.String("URI", r.URI)) c.Status = content.StatusForbidden } else { - Log.Info("resolvecontent got status 200", zap.String("URI", r.URI)) + logger.Log.Info("Content resolved", zap.String("URI", r.URI)) c.Status = content.StatusOk c.Data = node.Data } @@ -188,11 +185,11 @@ func (repo *Repo) GetContent(r *requests.Content) (c *content.SiteContent, err e } c.URIs = uris } else { - Log.Info("resolvecontent got status 404", zap.String("URI", r.URI)) + logger.Log.Info("Content not found", zap.String("URI", r.URI)) c.Status = content.StatusNotFound c.Dimension = r.Env.Dimensions[0] - Log.Debug("failed to resolve, falling back to default dimension", + logger.Log.Debug("Failed to resolve, falling back to default dimension", zap.String("URI", r.URI), zap.String("defaultDimension", r.Env.Dimensions[0]), ) @@ -226,13 +223,13 @@ func (repo *Repo) WriteRepoBytes(w io.Writer) { f, err := os.Open(repo.history.getCurrentFilename()) if err != nil { - Log.Error("failed to serve Repo JSON", zap.Error(err)) + logger.Log.Error("Failed to serve Repo JSON", zap.Error(err)) } w.Write([]byte("{\"reply\":")) _, err = io.Copy(w, f) if err != nil { - Log.Error("failed to serve Repo JSON", zap.Error(err)) + logger.Log.Error("Failed to serve Repo JSON", zap.Error(err)) } w.Write([]byte("}")) } @@ -243,7 +240,7 @@ func (repo *Repo) Update() (updateResponse *responses.Update) { return float64(float64(nanoSeconds) / float64(1000000000.0)) } - Log.Info("Update triggered") + logger.Log.Info("Update triggered") // Log.Info(ansi.Yellow + "BUFFER LENGTH BEFORE tryUpdate(): " + strconv.Itoa(len(repo.jsonBuf.Bytes())) + ansi.Reset) startTime := time.Now().UnixNano() @@ -262,13 +259,13 @@ func (repo *Repo) Update() (updateResponse *responses.Update) { if updateErr != errUpdateRejected { updateResponse.ErrorMessage = updateErr.Error() - Log.Error("could not update repository:", zap.Error(updateErr)) + logger.Log.Error("Failed to update repository", zap.Error(updateErr)) restoreErr := repo.tryToRestoreCurrent() if restoreErr != nil { - Log.Error("failed to restore preceding repo version", zap.Error(restoreErr)) + logger.Log.Error("Failed to restore preceding repository version", zap.Error(restoreErr)) } else { - Log.Info("restored current repo from local history") + logger.Log.Info("Successfully restored current repository from local history") } } } else { @@ -276,7 +273,7 @@ func (repo *Repo) Update() (updateResponse *responses.Update) { // persist the currently loaded one historyErr := repo.history.add(repo.jsonBuf.Bytes()) if historyErr != nil { - Log.Error("could not persist current repo in history", zap.Error(historyErr)) + logger.Log.Error("Could not persist current repo in history", zap.Error(historyErr)) status.M.HistoryPersistFailedCounter.WithLabelValues(historyErr.Error()).Inc() } // add some stats @@ -292,7 +289,7 @@ func (repo *Repo) Update() (updateResponse *responses.Update) { // resolveContent find content in a repository func (repo *Repo) resolveContent(dimensions []string, URI string) (resolved bool, resolvedURI string, resolvedDimension string, repoNode *content.RepoNode) { parts := strings.Split(URI, content.PathSeparator) - Log.Debug("repo.ResolveContent", zap.String("URI", URI)) + logger.Log.Debug("repo.ResolveContent", zap.String("URI", URI)) for i := len(parts); i > 0; i-- { testURI := strings.Join(parts[0:i], content.PathSeparator) if testURI == "" { @@ -300,13 +297,13 @@ func (repo *Repo) resolveContent(dimensions []string, URI string) (resolved bool } for _, dimension := range dimensions { if d, ok := repo.Directory[dimension]; ok { - Log.Debug("checking", + logger.Log.Debug("Checking node", zap.String("dimension", dimension), zap.String("URI", testURI), ) if repoNode, ok := d.URIDirectory[testURI]; ok { resolved = true - Log.Debug("found node", zap.String("URI", testURI), zap.String("destination", repoNode.DestinationID)) + logger.Log.Debug("Node found", zap.String("URI", testURI), zap.String("destination", repoNode.DestinationID)) if len(repoNode.DestinationID) > 0 { if destionationNode, destinationNodeOk := d.Directory[repoNode.DestinationID]; destinationNodeOk { repoNode = destionationNode @@ -328,7 +325,7 @@ func (repo *Repo) getURIForNode(dimension string, repoNode *content.RepoNode, re linkedNode, ok := repo.Directory[dimension].Directory[repoNode.LinkID] if ok { if recursionLevel > maxGetURIForNodeRecursionLevel { - Log.Error("maxGetURIForNodeRecursionLevel reached", zap.String("repoNode.ID", repoNode.ID), zap.String("linkID", repoNode.LinkID), zap.String("dimension", dimension)) + logger.Log.Error("maxGetURIForNodeRecursionLevel reached", zap.String("repoNode.ID", repoNode.ID), zap.String("linkID", repoNode.LinkID), zap.String("dimension", dimension)) return "" } return repo.getURIForNode(dimension, linkedNode, recursionLevel+1) @@ -356,7 +353,7 @@ func (repo *Repo) getNode( ) *content.Node { node := content.NewNode() node.Item = repoNode.ToItem(dataFields) - Log.Debug("getNode", zap.String("ID", repoNode.ID)) + logger.Log.Debug("getNode", zap.String("ID", repoNode.ID)) for _, childID := range repoNode.Index { childNode := repoNode.Nodes[childID] if (level == 0 || expanded || !expanded && childNode.InPath(path)) && (!childNode.Hidden || exposeHiddenNodes) && childNode.CanBeAccessedByGroups(groups) && childNode.IsOneOfTheseMimeTypes(mimeTypes) { diff --git a/server/handlerequest.go b/server/handlerequest.go index 3562ad6..2c72e68 100644 --- a/server/handlerequest.go +++ b/server/handlerequest.go @@ -12,13 +12,27 @@ import ( "github.com/foomo/contentserver/status" ) -func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source string) (replyBytes []byte, err error) { +func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source string) ([]byte, error) { + start := time.Now() + + reply, err := executeRequest(r, handler, jsonBytes, source) + result := "success" + if err != nil { + result = "error" + } + + status.M.ServiceRequestCounter.WithLabelValues(string(handler), result, source).Inc() + status.M.ServiceRequestDuration.WithLabelValues(string(handler), result, source).Observe(time.Since(start).Seconds()) + + return reply, err +} + +func executeRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source string) (replyBytes []byte, err error) { var ( reply interface{} apiErr error jsonErr error - start = time.Now() processIfJSONIsOk = func(err error, processingFunc func()) { if err != nil { jsonErr = err @@ -57,7 +71,6 @@ func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source strin default: reply = responses.NewError(1, "unknown handler: "+string(handler)) } - addMetrics(handler, start, jsonErr, apiErr, source) // error handling if jsonErr != nil { @@ -71,20 +84,6 @@ func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source strin return encodeReply(reply) } -func addMetrics(handlerName Handler, start time.Time, errJSON error, errAPI error, source string) { - - var ( - duration = time.Since(start) - s = "succeeded" - ) - if errJSON != nil || errAPI != nil { - s = "failed" - } - - status.M.ServiceRequestCounter.WithLabelValues(string(handlerName), s, source).Inc() - status.M.ServiceRequestDuration.WithLabelValues(string(handlerName), s, source).Observe(float64(duration.Seconds())) -} - // encodeReply takes an interface and encodes it as JSON // it returns the resulting JSON and a marshalling error func encodeReply(reply interface{}) (replyBytes []byte, err error) { diff --git a/server/socketserver.go b/server/socketserver.go index eb89906..d376e86 100644 --- a/server/socketserver.go +++ b/server/socketserver.go @@ -7,7 +7,6 @@ import ( "net" "strconv" "strings" - "time" "go.uber.org/zap" @@ -47,13 +46,10 @@ func (s *socketServer) execute(handler Handler, jsonBytes []byte) (reply []byte) Log.Debug("incoming json buffer", zap.Int("length", len(jsonBytes))) if handler == HandlerGetRepo { - var ( - b bytes.Buffer - start = time.Now() + b bytes.Buffer ) s.repo.WriteRepoBytes(&b) - addMetrics(handler, start, nil, nil, sourceSocketServer) return b.Bytes() } diff --git a/server/webserver.go b/server/webserver.go index f58546f..689956c 100644 --- a/server/webserver.go +++ b/server/webserver.go @@ -5,7 +5,6 @@ import ( "io/ioutil" "net/http" "strings" - "time" "go.uber.org/zap" @@ -31,7 +30,7 @@ func NewWebServer(path string, r *repo.Repo) http.Handler { func (s *webServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { defer func() { if r := recover(); r != nil { - Log.Error("panic in handle connection", zap.String("error", fmt.Sprint(r))) + Log.Error("Panic in handle connection", zap.String("error", fmt.Sprint(r))) } }() @@ -40,17 +39,14 @@ func (s *webServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } jsonBytes, readErr := ioutil.ReadAll(r.Body) - r.Body.Close() if readErr != nil { http.Error(w, "failed to read incoming request", http.StatusBadRequest) return } h := Handler(strings.TrimPrefix(r.URL.Path, s.path+"/")) if h == HandlerGetRepo { - start := time.Now() s.r.WriteRepoBytes(w) w.Header().Set("Content-Type", "application/json") - addMetrics(h, start, nil, nil, sourceWebserver) return } reply, errReply := handleRequest(s.r, h, jsonBytes, "webserver") diff --git a/status/metrics.go b/status/metrics.go index 61cc44b..521335e 100644 --- a/status/metrics.go +++ b/status/metrics.go @@ -28,6 +28,7 @@ type Metrics struct { ContentRequestCounter *prometheus.CounterVec // count the total number of content requests NumSocketsGauge *prometheus.GaugeVec // keep track of the total number of open sockets HistoryPersistFailedCounter *prometheus.CounterVec // count the number of failed attempts to persist the content history + InvalidNodeTreeRequests *prometheus.CounterVec // counts the number of invalid tree node requests } // newMetrics can be used to instantiate a metrics instance @@ -36,6 +37,9 @@ type Metrics struct { // the package exposes the initialized Metrics instance as the variable M. func newMetrics() *Metrics { return &Metrics{ + InvalidNodeTreeRequests: newCounterVec("invalid_node_tree_request_count", + "Counts the number of invalid tree nodes for a specific node ID", + "nodeID"), ServiceRequestCounter: newCounterVec( "service_request_count", "Count of requests for each handler", @@ -57,7 +61,6 @@ func newMetrics() *Metrics { UpdatesFailedCounter: newCounterVec( "updates_failed_count", "Number of updates that failed due to an error", - metricLabelError, ), UpdateDuration: newSummaryVec( "update_duration_seconds", From 24c65ba8df93e0c5456e79247076dae28703963d Mon Sep 17 00:00:00 2001 From: Stefan Martinov Date: Mon, 23 Nov 2020 09:29:47 +0100 Subject: [PATCH 2/3] chore: return logging of error requested --- repo/repo.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/repo/repo.go b/repo/repo.go index 2b89dfd..53c19c8 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -137,6 +137,10 @@ func (repo *Repo) getNodes(nodeRequests map[string]*requests.Node, env *requests treeNode, ok := dimensionNode.Directory[nodeRequest.ID] if !ok { + logger.Log.Error("Invalid tree node requested", + zap.String("nodeName", nodeName), + zap.String("nodeID", nodeRequest.ID), + ) status.M.InvalidNodeTreeRequests.WithLabelValues(nodeRequest.ID).Inc() continue } From ffa04ace3681a032f7c246aec778a4bfea423632 Mon Sep 17 00:00:00 2001 From: Stefan Martinov Date: Mon, 23 Nov 2020 11:33:36 +0100 Subject: [PATCH 3/3] feat: add better metrics for errors --- repo/loader.go | 42 ++++++++++++++++++++++++++++-------------- repo/repo.go | 18 ++++++++++++++++++ status/metrics.go | 1 + 3 files changed, 47 insertions(+), 14 deletions(-) diff --git a/repo/loader.go b/repo/loader.go index 6e0c8f9..9833d3f 100644 --- a/repo/loader.go +++ b/repo/loader.go @@ -1,6 +1,7 @@ package repo import ( + "context" "errors" "fmt" "io" @@ -28,13 +29,14 @@ func (repo *Repo) updateRoutine() { for { select { case resChan := <-repo.updateInProgressChannel: - logger.Log.Info("waiting for update to complete", zap.String("chan", fmt.Sprintf("%p", resChan))) + log := logger.Log.With(zap.String("chan", fmt.Sprintf("%p", resChan))) + log.Info("Waiting for update to complete") start := time.Now() - repoRuntime, errUpdate := repo.update() + repoRuntime, errUpdate := repo.update(context.Background()) if errUpdate != nil { - logger.Log.Error("Failed to update contentserver", zap.Error(errUpdate)) - status.M.UpdatesFailedCounter.WithLabelValues().Inc() + log.Error("Failed to update content server from routine", zap.Error(errUpdate)) + status.M.UpdatesFailedCounter.WithLabelValues(errUpdate.Error()).Inc() } else { status.M.UpdatesCompletedCounter.WithLabelValues().Inc() } @@ -45,7 +47,7 @@ func (repo *Repo) updateRoutine() { } duration := time.Since(start) - logger.Log.Info("update completed", zap.Duration("duration", duration), zap.String("chan", fmt.Sprintf("%p", resChan))) + log.Info("Update completed", zap.Duration("duration", duration)) status.M.UpdateDuration.WithLabelValues().Observe(duration.Seconds()) } } @@ -163,7 +165,11 @@ func wireAliases(directory map[string]*content.RepoNode) error { func (repo *Repo) loadNodesFromJSON() (nodes map[string]*content.RepoNode, err error) { nodes = make(map[string]*content.RepoNode) err = json.Unmarshal(repo.jsonBuf.Bytes(), &nodes) - return nodes, err + if err != nil { + logger.Log.Error("Failed to deserialize nodes", zap.Error(err)) + return nil, errors.New("failed to deserialize nodes") + } + return nodes, nil } func (repo *Repo) tryToRestoreCurrent() (err error) { @@ -174,14 +180,17 @@ func (repo *Repo) tryToRestoreCurrent() (err error) { return repo.loadJSONBytes() } -func (repo *Repo) get(URL string) (err error) { - response, err := http.Get(URL) +func (repo *Repo) get(URL string) error { + response, err := repo.httpClient.Get(URL) if err != nil { - return err + logger.Log.Error("Failed to get", zap.Error(err)) + return errors.New("failed to get repo") } defer response.Body.Close() + if response.StatusCode != http.StatusOK { - return fmt.Errorf("bad HTTP Response: %q", response.Status) + logger.Log.Error(fmt.Sprintf("Bad HTTP Response %q, want %q", response.Status, http.StatusOK)) + return errors.New("bad response code") } // Log.Info(ansi.Red + "RESETTING BUFFER" + ansi.Reset) @@ -189,10 +198,15 @@ func (repo *Repo) get(URL string) (err error) { // Log.Info(ansi.Green + "LOADING DATA INTO BUFFER" + ansi.Reset) _, err = io.Copy(&repo.jsonBuf, response.Body) - return err + if err != nil { + logger.Log.Error("Failed to copy IO stream", zap.Error(err)) + return errors.New("failed to copy IO stream") + } + + return nil } -func (repo *Repo) update() (repoRuntime int64, err error) { +func (repo *Repo) update(ctx context.Context) (repoRuntime int64, err error) { startTimeRepo := time.Now().UnixNano() err = repo.get(repo.server) repoRuntime = time.Now().UnixNano() - startTimeRepo @@ -263,8 +277,8 @@ func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error { logger.Log.Debug("loading nodes for dimension", zap.String("dimension", dimension)) loadErr := repo.updateDimension(dimension, newNode) if loadErr != nil { - logger.Log.Debug("failed to load", zap.String("dimension", dimension), zap.Error(loadErr)) - return loadErr + logger.Log.Error("Failed to update dimension", zap.String("dimension", dimension), zap.Error(loadErr)) + return errors.New("failed to update dimension") } } dimensionIsValid := func(dimension string) bool { diff --git a/repo/repo.go b/repo/repo.go index 53c19c8..37d5ecd 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -2,9 +2,11 @@ package repo import ( "bytes" + "crypto/tls" "errors" "fmt" "io" + "net/http" "os" "strings" "time" @@ -41,6 +43,8 @@ type Repo struct { // jsonBytes []byte jsonBuf bytes.Buffer + + httpClient *http.Client } type repoDimension struct { @@ -62,6 +66,7 @@ func NewRepo(server string, varDir string) *Repo { history: newHistory(varDir), dimensionUpdateChannel: make(chan *repoDimension), dimensionUpdateDoneChannel: make(chan error), + httpClient: getDefaultHTTPClient(2 * time.Minute), updateInProgressChannel: make(chan chan updateResponse, 0), } @@ -76,9 +81,22 @@ func NewRepo(server string, varDir string) *Repo { repo.recovered = true logger.Log.Info("restored previous repo content") } + return repo } +func getDefaultHTTPClient(timeout time.Duration) *http.Client { + client := &http.Client{ + Transport: &http.Transport{ + DisableKeepAlives: true, + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + TLSHandshakeTimeout: 5 * time.Second, + }, + Timeout: timeout, + } + return client +} + func (repo *Repo) Recovered() bool { return repo.recovered } diff --git a/status/metrics.go b/status/metrics.go index 521335e..a8031f6 100644 --- a/status/metrics.go +++ b/status/metrics.go @@ -61,6 +61,7 @@ func newMetrics() *Metrics { UpdatesFailedCounter: newCounterVec( "updates_failed_count", "Number of updates that failed due to an error", + metricLabelError, ), UpdateDuration: newSummaryVec( "update_duration_seconds",