Merge pull request #23 from foomo/feature/metrics-and-logging

feat: update metrics & logging
This commit is contained in:
Stefan Martinov 2020-11-23 12:38:02 +01:00 committed by GitHub
commit c9bf5666e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 121 additions and 96 deletions

View File

@ -3,7 +3,6 @@ package main
import ( import (
"flag" "flag"
"fmt" "fmt"
"net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"runtime/debug" "runtime/debug"
@ -26,7 +25,7 @@ const (
ServiceName = "Content Server" ServiceName = "Content Server"
DefaultHealthzHandlerAddress = ":8080" DefaultHealthzHandlerAddress = ":8080"
DefaultPrometheusListener = "127.0.0.1:9111" DefaultPrometheusListener = "127.0.0.1:9200"
) )
var ( var (
@ -52,10 +51,6 @@ func main() {
SetupLogging(*flagDebug, "contentserver.log") SetupLogging(*flagDebug, "contentserver.log")
go func() {
fmt.Println(http.ListenAndServe("localhost:6060", nil))
}()
if *flagFreeOSMem > 0 { if *flagFreeOSMem > 0 {
Log.Info("freeing OS memory every $interval minutes", zap.Int("interval", *flagFreeOSMem)) Log.Info("freeing OS memory every $interval minutes", zap.Int("interval", *flagFreeOSMem))
go func() { go func() {

View File

@ -1,6 +1,7 @@
package repo package repo
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -8,7 +9,7 @@ import (
"time" "time"
"github.com/foomo/contentserver/content" "github.com/foomo/contentserver/content"
. "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/logger"
"github.com/foomo/contentserver/status" "github.com/foomo/contentserver/status"
jsoniter "github.com/json-iterator/go" jsoniter "github.com/json-iterator/go"
"go.uber.org/zap" "go.uber.org/zap"
@ -28,12 +29,16 @@ func (repo *Repo) updateRoutine() {
for { for {
select { select {
case resChan := <-repo.updateInProgressChannel: case resChan := <-repo.updateInProgressChannel:
Log.Info("waiting for update to complete", zap.String("chan", fmt.Sprintf("%p", resChan))) log := logger.Log.With(zap.String("chan", fmt.Sprintf("%p", resChan)))
log.Info("Waiting for update to complete")
start := time.Now() start := time.Now()
repoRuntime, errUpdate := repo.update() repoRuntime, errUpdate := repo.update(context.Background())
if errUpdate != nil { if errUpdate != nil {
log.Error("Failed to update content server from routine", zap.Error(errUpdate))
status.M.UpdatesFailedCounter.WithLabelValues(errUpdate.Error()).Inc() status.M.UpdatesFailedCounter.WithLabelValues(errUpdate.Error()).Inc()
} else {
status.M.UpdatesCompletedCounter.WithLabelValues().Inc()
} }
resChan <- updateResponse{ resChan <- updateResponse{
@ -42,8 +47,7 @@ func (repo *Repo) updateRoutine() {
} }
duration := time.Since(start) duration := time.Since(start)
Log.Info("update completed", zap.Duration("duration", duration), zap.String("chan", fmt.Sprintf("%p", resChan))) log.Info("Update completed", zap.Duration("duration", duration))
status.M.UpdatesCompletedCounter.WithLabelValues().Inc()
status.M.UpdateDuration.WithLabelValues().Observe(duration.Seconds()) status.M.UpdateDuration.WithLabelValues().Observe(duration.Seconds())
} }
} }
@ -51,24 +55,24 @@ func (repo *Repo) updateRoutine() {
func (repo *Repo) dimensionUpdateRoutine() { func (repo *Repo) dimensionUpdateRoutine() {
for newDimension := range repo.dimensionUpdateChannel { for newDimension := range repo.dimensionUpdateChannel {
Log.Info("dimensionUpdateRoutine received a new dimension", zap.String("dimension", newDimension.Dimension)) logger.Log.Info("dimensionUpdateRoutine received a new dimension", zap.String("dimension", newDimension.Dimension))
err := repo._updateDimension(newDimension.Dimension, newDimension.Node) err := repo._updateDimension(newDimension.Dimension, newDimension.Node)
Log.Info("dimensionUpdateRoutine received result") logger.Log.Info("dimensionUpdateRoutine received result")
if err != nil { if err != nil {
Log.Debug("update dimension failed", zap.Error(err)) logger.Log.Debug("update dimension failed", zap.Error(err))
} }
repo.dimensionUpdateDoneChannel <- err repo.dimensionUpdateDoneChannel <- err
} }
} }
func (repo *Repo) updateDimension(dimension string, node *content.RepoNode) error { func (repo *Repo) updateDimension(dimension string, node *content.RepoNode) error {
Log.Debug("trying to push dimension into update channel", zap.String("dimension", dimension), zap.String("nodeName", node.Name)) logger.Log.Debug("trying to push dimension into update channel", zap.String("dimension", dimension), zap.String("nodeName", node.Name))
repo.dimensionUpdateChannel <- &repoDimension{ repo.dimensionUpdateChannel <- &repoDimension{
Dimension: dimension, Dimension: dimension,
Node: node, Node: node,
} }
Log.Debug("waiting for done signal") logger.Log.Debug("waiting for done signal")
return <-repo.dimensionUpdateDoneChannel return <-repo.dimensionUpdateDoneChannel
} }
@ -161,7 +165,11 @@ func wireAliases(directory map[string]*content.RepoNode) error {
func (repo *Repo) loadNodesFromJSON() (nodes map[string]*content.RepoNode, err error) { func (repo *Repo) loadNodesFromJSON() (nodes map[string]*content.RepoNode, err error) {
nodes = make(map[string]*content.RepoNode) nodes = make(map[string]*content.RepoNode)
err = json.Unmarshal(repo.jsonBuf.Bytes(), &nodes) err = json.Unmarshal(repo.jsonBuf.Bytes(), &nodes)
return nodes, err if err != nil {
logger.Log.Error("Failed to deserialize nodes", zap.Error(err))
return nil, errors.New("failed to deserialize nodes")
}
return nodes, nil
} }
func (repo *Repo) tryToRestoreCurrent() (err error) { func (repo *Repo) tryToRestoreCurrent() (err error) {
@ -172,14 +180,17 @@ func (repo *Repo) tryToRestoreCurrent() (err error) {
return repo.loadJSONBytes() return repo.loadJSONBytes()
} }
func (repo *Repo) get(URL string) (err error) { func (repo *Repo) get(URL string) error {
response, err := http.Get(URL) response, err := repo.httpClient.Get(URL)
if err != nil { if err != nil {
return err logger.Log.Error("Failed to get", zap.Error(err))
return errors.New("failed to get repo")
} }
defer response.Body.Close() defer response.Body.Close()
if response.StatusCode != http.StatusOK { if response.StatusCode != http.StatusOK {
return fmt.Errorf("Bad HTTP Response: %q", response.Status) logger.Log.Error(fmt.Sprintf("Bad HTTP Response %q, want %q", response.Status, http.StatusOK))
return errors.New("bad response code")
} }
// Log.Info(ansi.Red + "RESETTING BUFFER" + ansi.Reset) // Log.Info(ansi.Red + "RESETTING BUFFER" + ansi.Reset)
@ -187,19 +198,24 @@ func (repo *Repo) get(URL string) (err error) {
// Log.Info(ansi.Green + "LOADING DATA INTO BUFFER" + ansi.Reset) // Log.Info(ansi.Green + "LOADING DATA INTO BUFFER" + ansi.Reset)
_, err = io.Copy(&repo.jsonBuf, response.Body) _, err = io.Copy(&repo.jsonBuf, response.Body)
return err if err != nil {
logger.Log.Error("Failed to copy IO stream", zap.Error(err))
return errors.New("failed to copy IO stream")
}
return nil
} }
func (repo *Repo) update() (repoRuntime int64, err error) { func (repo *Repo) update(ctx context.Context) (repoRuntime int64, err error) {
startTimeRepo := time.Now().UnixNano() startTimeRepo := time.Now().UnixNano()
err = repo.get(repo.server) err = repo.get(repo.server)
repoRuntime = time.Now().UnixNano() - startTimeRepo repoRuntime = time.Now().UnixNano() - startTimeRepo
if err != nil { if err != nil {
// we have no json to load - the repo server did not reply // we have no json to load - the repo server did not reply
Log.Debug("failed to load json", zap.Error(err)) logger.Log.Debug("Failed to load json", zap.Error(err))
return repoRuntime, err return repoRuntime, err
} }
Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(repo.jsonBuf.Bytes()))) logger.Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(repo.jsonBuf.Bytes())))
nodes, err := repo.loadNodesFromJSON() nodes, err := repo.loadNodesFromJSON()
if err != nil { if err != nil {
// could not load nodes from json // could not load nodes from json
@ -218,11 +234,11 @@ func (repo *Repo) tryUpdate() (repoRuntime int64, err error) {
c := make(chan updateResponse) c := make(chan updateResponse)
select { select {
case repo.updateInProgressChannel <- c: case repo.updateInProgressChannel <- c:
Log.Info("update request added to queue") logger.Log.Info("update request added to queue")
ur := <-c ur := <-c
return ur.repoRuntime, ur.err return ur.repoRuntime, ur.err
default: default:
Log.Info("update request accepted, will be processed after the previous update") logger.Log.Info("update request accepted, will be processed after the previous update")
return 0, errUpdateRejected return 0, errUpdateRejected
} }
} }
@ -233,7 +249,7 @@ func (repo *Repo) loadJSONBytes() error {
data := repo.jsonBuf.Bytes() data := repo.jsonBuf.Bytes()
if len(data) > 10 { if len(data) > 10 {
Log.Debug("could not parse json", logger.Log.Debug("could not parse json",
zap.String("jsonStart", string(data[:10])), zap.String("jsonStart", string(data[:10])),
zap.String("jsonStart", string(data[len(data)-10:])), zap.String("jsonStart", string(data[len(data)-10:])),
) )
@ -245,10 +261,10 @@ func (repo *Repo) loadJSONBytes() error {
if err == nil { if err == nil {
historyErr := repo.history.add(repo.jsonBuf.Bytes()) historyErr := repo.history.add(repo.jsonBuf.Bytes())
if historyErr != nil { if historyErr != nil {
Log.Error("could not add valid json to history", zap.Error(historyErr)) logger.Log.Error("could not add valid json to history", zap.Error(historyErr))
status.M.HistoryPersistFailedCounter.WithLabelValues(historyErr.Error()).Inc() status.M.HistoryPersistFailedCounter.WithLabelValues(historyErr.Error()).Inc()
} else { } else {
Log.Info("added valid json to history") logger.Log.Info("added valid json to history")
} }
} }
return err return err
@ -258,11 +274,11 @@ func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error {
newDimensions := []string{} newDimensions := []string{}
for dimension, newNode := range newNodes { for dimension, newNode := range newNodes {
newDimensions = append(newDimensions, dimension) newDimensions = append(newDimensions, dimension)
Log.Debug("loading nodes for dimension", zap.String("dimension", dimension)) logger.Log.Debug("loading nodes for dimension", zap.String("dimension", dimension))
loadErr := repo.updateDimension(dimension, newNode) loadErr := repo.updateDimension(dimension, newNode)
if loadErr != nil { if loadErr != nil {
Log.Debug("failed to load", zap.String("dimension", dimension), zap.Error(loadErr)) logger.Log.Error("Failed to update dimension", zap.String("dimension", dimension), zap.Error(loadErr))
return loadErr return errors.New("failed to update dimension")
} }
} }
dimensionIsValid := func(dimension string) bool { dimensionIsValid := func(dimension string) bool {
@ -276,7 +292,7 @@ func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error {
// we need to throw away orphaned dimensions // we need to throw away orphaned dimensions
for dimension := range repo.Directory { for dimension := range repo.Directory {
if !dimensionIsValid(dimension) { if !dimensionIsValid(dimension) {
Log.Info("removing orphaned dimension", zap.String("dimension", dimension)) logger.Log.Info("removing orphaned dimension", zap.String("dimension", dimension))
delete(repo.Directory, dimension) delete(repo.Directory, dimension)
} }
} }

View File

@ -2,9 +2,11 @@ package repo
import ( import (
"bytes" "bytes"
"crypto/tls"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net/http"
"os" "os"
"strings" "strings"
"time" "time"
@ -12,7 +14,7 @@ import (
"github.com/foomo/contentserver/status" "github.com/foomo/contentserver/status"
"github.com/foomo/contentserver/content" "github.com/foomo/contentserver/content"
. "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/logger"
"github.com/foomo/contentserver/requests" "github.com/foomo/contentserver/requests"
"github.com/foomo/contentserver/responses" "github.com/foomo/contentserver/responses"
"go.uber.org/zap" "go.uber.org/zap"
@ -41,6 +43,8 @@ type Repo struct {
// jsonBytes []byte // jsonBytes []byte
jsonBuf bytes.Buffer jsonBuf bytes.Buffer
httpClient *http.Client
} }
type repoDimension struct { type repoDimension struct {
@ -51,7 +55,7 @@ type repoDimension struct {
// NewRepo constructor // NewRepo constructor
func NewRepo(server string, varDir string) *Repo { func NewRepo(server string, varDir string) *Repo {
Log.Info("creating new repo", logger.Log.Info("creating new repo",
zap.String("server", server), zap.String("server", server),
zap.String("varDir", varDir), zap.String("varDir", varDir),
) )
@ -62,23 +66,37 @@ func NewRepo(server string, varDir string) *Repo {
history: newHistory(varDir), history: newHistory(varDir),
dimensionUpdateChannel: make(chan *repoDimension), dimensionUpdateChannel: make(chan *repoDimension),
dimensionUpdateDoneChannel: make(chan error), dimensionUpdateDoneChannel: make(chan error),
httpClient: getDefaultHTTPClient(2 * time.Minute),
updateInProgressChannel: make(chan chan updateResponse, 0), updateInProgressChannel: make(chan chan updateResponse, 0),
} }
go repo.updateRoutine() go repo.updateRoutine()
go repo.dimensionUpdateRoutine() go repo.dimensionUpdateRoutine()
Log.Info("trying to restore previous state") logger.Log.Info("trying to restore previous state")
restoreErr := repo.tryToRestoreCurrent() restoreErr := repo.tryToRestoreCurrent()
if restoreErr != nil { if restoreErr != nil {
Log.Error(" could not restore previous repo content", zap.Error(restoreErr)) logger.Log.Error(" could not restore previous repo content", zap.Error(restoreErr))
} else { } else {
repo.recovered = true repo.recovered = true
Log.Info("restored previous repo content") logger.Log.Info("restored previous repo content")
} }
return repo return repo
} }
func getDefaultHTTPClient(timeout time.Duration) *http.Client {
client := &http.Client{
Transport: &http.Transport{
DisableKeepAlives: true,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
TLSHandshakeTimeout: 5 * time.Second,
},
Timeout: timeout,
}
return client
}
func (repo *Repo) Recovered() bool { func (repo *Repo) Recovered() bool {
return repo.recovered return repo.recovered
} }
@ -104,12 +122,11 @@ func (repo *Repo) getNodes(nodeRequests map[string]*requests.Node, env *requests
path = []*content.Item{} path = []*content.Item{}
) )
for nodeName, nodeRequest := range nodeRequests { for nodeName, nodeRequest := range nodeRequests {
if nodeName == "" || nodeRequest.ID == "" { if nodeName == "" || nodeRequest.ID == "" {
Log.Info("invalid node request", zap.Error(errors.New("nodeName or nodeRequest.ID empty"))) logger.Log.Info("invalid node request", zap.Error(errors.New("nodeName or nodeRequest.ID empty")))
continue continue
} }
Log.Debug("adding node", zap.String("name", nodeName), zap.String("requestID", nodeRequest.ID)) logger.Log.Debug("adding node", zap.String("name", nodeName), zap.String("requestID", nodeRequest.ID))
groups := env.Groups groups := env.Groups
if len(nodeRequest.Groups) > 0 { if len(nodeRequest.Groups) > 0 {
@ -120,30 +137,32 @@ func (repo *Repo) getNodes(nodeRequests map[string]*requests.Node, env *requests
nodes[nodeName] = nil nodes[nodeName] = nil
if !ok && nodeRequest.Dimension == "" { if !ok && nodeRequest.Dimension == "" {
Log.Debug("could not get dimension root node", zap.String("dimension", nodeRequest.Dimension)) logger.Log.Debug("Could not get dimension root node", zap.String("dimension", nodeRequest.Dimension))
for _, dimension := range env.Dimensions { for _, dimension := range env.Dimensions {
dimensionNode, ok = repo.Directory[dimension] dimensionNode, ok = repo.Directory[dimension]
if ok { if ok {
Log.Debug("found root node in env.Dimensions", zap.String("dimension", dimension)) logger.Log.Debug("Found root node in env.Dimensions", zap.String("dimension", dimension))
break break
} }
Log.Debug("could NOT find root node in env.Dimensions", zap.String("dimension", dimension)) logger.Log.Debug("Could NOT find root node in env.Dimensions", zap.String("dimension", dimension))
} }
} }
if !ok { if !ok {
Log.Error("could not get dimension root node", zap.String("nodeRequest.Dimension", nodeRequest.Dimension)) logger.Log.Error("could not get dimension root node", zap.String("nodeRequest.Dimension", nodeRequest.Dimension))
continue continue
} }
treeNode, ok := dimensionNode.Directory[nodeRequest.ID] treeNode, ok := dimensionNode.Directory[nodeRequest.ID]
if ok { if !ok {
nodes[nodeName] = repo.getNode(treeNode, nodeRequest.Expand, nodeRequest.MimeTypes, path, 0, groups, nodeRequest.DataFields, nodeRequest.ExposeHiddenNodes) logger.Log.Error("Invalid tree node requested",
} else {
Log.Error("an invalid tree node was requested",
zap.String("nodeName", nodeName), zap.String("nodeName", nodeName),
zap.String("ID", nodeRequest.ID), zap.String("nodeID", nodeRequest.ID),
) )
status.M.InvalidNodeTreeRequests.WithLabelValues(nodeRequest.ID).Inc()
continue
} }
nodes[nodeName] = repo.getNode(treeNode, nodeRequest.Expand, nodeRequest.MimeTypes, path, 0, groups, nodeRequest.DataFields, nodeRequest.ExposeHiddenNodes)
} }
return nodes return nodes
} }
@ -161,18 +180,18 @@ func (repo *Repo) GetContent(r *requests.Content) (c *content.SiteContent, err e
// add more input validation // add more input validation
err = repo.validateContentRequest(r) err = repo.validateContentRequest(r)
if err != nil { if err != nil {
Log.Error("repo.GetContent invalid request", zap.Error(err)) logger.Log.Error("repo.GetContent invalid request", zap.Error(err))
return return
} }
Log.Debug("repo.GetContent", zap.String("URI", r.URI)) logger.Log.Debug("repo.GetContent", zap.String("URI", r.URI))
c = content.NewSiteContent() c = content.NewSiteContent()
resolved, resolvedURI, resolvedDimension, node := repo.resolveContent(r.Env.Dimensions, r.URI) resolved, resolvedURI, resolvedDimension, node := repo.resolveContent(r.Env.Dimensions, r.URI)
if resolved { if resolved {
if !node.CanBeAccessedByGroups(r.Env.Groups) { if !node.CanBeAccessedByGroups(r.Env.Groups) {
Log.Warn("resolvecontent got status 401", zap.String("URI", r.URI)) logger.Log.Warn("Resolved content cannot be accessed by specified group", zap.String("URI", r.URI))
c.Status = content.StatusForbidden c.Status = content.StatusForbidden
} else { } else {
Log.Info("resolvecontent got status 200", zap.String("URI", r.URI)) logger.Log.Info("Content resolved", zap.String("URI", r.URI))
c.Status = content.StatusOk c.Status = content.StatusOk
c.Data = node.Data c.Data = node.Data
} }
@ -188,11 +207,11 @@ func (repo *Repo) GetContent(r *requests.Content) (c *content.SiteContent, err e
} }
c.URIs = uris c.URIs = uris
} else { } else {
Log.Info("resolvecontent got status 404", zap.String("URI", r.URI)) logger.Log.Info("Content not found", zap.String("URI", r.URI))
c.Status = content.StatusNotFound c.Status = content.StatusNotFound
c.Dimension = r.Env.Dimensions[0] c.Dimension = r.Env.Dimensions[0]
Log.Debug("failed to resolve, falling back to default dimension", logger.Log.Debug("Failed to resolve, falling back to default dimension",
zap.String("URI", r.URI), zap.String("URI", r.URI),
zap.String("defaultDimension", r.Env.Dimensions[0]), zap.String("defaultDimension", r.Env.Dimensions[0]),
) )
@ -226,13 +245,13 @@ func (repo *Repo) WriteRepoBytes(w io.Writer) {
f, err := os.Open(repo.history.getCurrentFilename()) f, err := os.Open(repo.history.getCurrentFilename())
if err != nil { if err != nil {
Log.Error("failed to serve Repo JSON", zap.Error(err)) logger.Log.Error("Failed to serve Repo JSON", zap.Error(err))
} }
w.Write([]byte("{\"reply\":")) w.Write([]byte("{\"reply\":"))
_, err = io.Copy(w, f) _, err = io.Copy(w, f)
if err != nil { if err != nil {
Log.Error("failed to serve Repo JSON", zap.Error(err)) logger.Log.Error("Failed to serve Repo JSON", zap.Error(err))
} }
w.Write([]byte("}")) w.Write([]byte("}"))
} }
@ -243,7 +262,7 @@ func (repo *Repo) Update() (updateResponse *responses.Update) {
return float64(float64(nanoSeconds) / float64(1000000000.0)) return float64(float64(nanoSeconds) / float64(1000000000.0))
} }
Log.Info("Update triggered") logger.Log.Info("Update triggered")
// Log.Info(ansi.Yellow + "BUFFER LENGTH BEFORE tryUpdate(): " + strconv.Itoa(len(repo.jsonBuf.Bytes())) + ansi.Reset) // Log.Info(ansi.Yellow + "BUFFER LENGTH BEFORE tryUpdate(): " + strconv.Itoa(len(repo.jsonBuf.Bytes())) + ansi.Reset)
startTime := time.Now().UnixNano() startTime := time.Now().UnixNano()
@ -262,13 +281,13 @@ func (repo *Repo) Update() (updateResponse *responses.Update) {
if updateErr != errUpdateRejected { if updateErr != errUpdateRejected {
updateResponse.ErrorMessage = updateErr.Error() updateResponse.ErrorMessage = updateErr.Error()
Log.Error("could not update repository:", zap.Error(updateErr)) logger.Log.Error("Failed to update repository", zap.Error(updateErr))
restoreErr := repo.tryToRestoreCurrent() restoreErr := repo.tryToRestoreCurrent()
if restoreErr != nil { if restoreErr != nil {
Log.Error("failed to restore preceding repo version", zap.Error(restoreErr)) logger.Log.Error("Failed to restore preceding repository version", zap.Error(restoreErr))
} else { } else {
Log.Info("restored current repo from local history") logger.Log.Info("Successfully restored current repository from local history")
} }
} }
} else { } else {
@ -276,7 +295,7 @@ func (repo *Repo) Update() (updateResponse *responses.Update) {
// persist the currently loaded one // persist the currently loaded one
historyErr := repo.history.add(repo.jsonBuf.Bytes()) historyErr := repo.history.add(repo.jsonBuf.Bytes())
if historyErr != nil { if historyErr != nil {
Log.Error("could not persist current repo in history", zap.Error(historyErr)) logger.Log.Error("Could not persist current repo in history", zap.Error(historyErr))
status.M.HistoryPersistFailedCounter.WithLabelValues(historyErr.Error()).Inc() status.M.HistoryPersistFailedCounter.WithLabelValues(historyErr.Error()).Inc()
} }
// add some stats // add some stats
@ -292,7 +311,7 @@ func (repo *Repo) Update() (updateResponse *responses.Update) {
// resolveContent find content in a repository // resolveContent find content in a repository
func (repo *Repo) resolveContent(dimensions []string, URI string) (resolved bool, resolvedURI string, resolvedDimension string, repoNode *content.RepoNode) { func (repo *Repo) resolveContent(dimensions []string, URI string) (resolved bool, resolvedURI string, resolvedDimension string, repoNode *content.RepoNode) {
parts := strings.Split(URI, content.PathSeparator) parts := strings.Split(URI, content.PathSeparator)
Log.Debug("repo.ResolveContent", zap.String("URI", URI)) logger.Log.Debug("repo.ResolveContent", zap.String("URI", URI))
for i := len(parts); i > 0; i-- { for i := len(parts); i > 0; i-- {
testURI := strings.Join(parts[0:i], content.PathSeparator) testURI := strings.Join(parts[0:i], content.PathSeparator)
if testURI == "" { if testURI == "" {
@ -300,13 +319,13 @@ func (repo *Repo) resolveContent(dimensions []string, URI string) (resolved bool
} }
for _, dimension := range dimensions { for _, dimension := range dimensions {
if d, ok := repo.Directory[dimension]; ok { if d, ok := repo.Directory[dimension]; ok {
Log.Debug("checking", logger.Log.Debug("Checking node",
zap.String("dimension", dimension), zap.String("dimension", dimension),
zap.String("URI", testURI), zap.String("URI", testURI),
) )
if repoNode, ok := d.URIDirectory[testURI]; ok { if repoNode, ok := d.URIDirectory[testURI]; ok {
resolved = true resolved = true
Log.Debug("found node", zap.String("URI", testURI), zap.String("destination", repoNode.DestinationID)) logger.Log.Debug("Node found", zap.String("URI", testURI), zap.String("destination", repoNode.DestinationID))
if len(repoNode.DestinationID) > 0 { if len(repoNode.DestinationID) > 0 {
if destionationNode, destinationNodeOk := d.Directory[repoNode.DestinationID]; destinationNodeOk { if destionationNode, destinationNodeOk := d.Directory[repoNode.DestinationID]; destinationNodeOk {
repoNode = destionationNode repoNode = destionationNode
@ -328,7 +347,7 @@ func (repo *Repo) getURIForNode(dimension string, repoNode *content.RepoNode, re
linkedNode, ok := repo.Directory[dimension].Directory[repoNode.LinkID] linkedNode, ok := repo.Directory[dimension].Directory[repoNode.LinkID]
if ok { if ok {
if recursionLevel > maxGetURIForNodeRecursionLevel { if recursionLevel > maxGetURIForNodeRecursionLevel {
Log.Error("maxGetURIForNodeRecursionLevel reached", zap.String("repoNode.ID", repoNode.ID), zap.String("linkID", repoNode.LinkID), zap.String("dimension", dimension)) logger.Log.Error("maxGetURIForNodeRecursionLevel reached", zap.String("repoNode.ID", repoNode.ID), zap.String("linkID", repoNode.LinkID), zap.String("dimension", dimension))
return "" return ""
} }
return repo.getURIForNode(dimension, linkedNode, recursionLevel+1) return repo.getURIForNode(dimension, linkedNode, recursionLevel+1)
@ -356,7 +375,7 @@ func (repo *Repo) getNode(
) *content.Node { ) *content.Node {
node := content.NewNode() node := content.NewNode()
node.Item = repoNode.ToItem(dataFields) node.Item = repoNode.ToItem(dataFields)
Log.Debug("getNode", zap.String("ID", repoNode.ID)) logger.Log.Debug("getNode", zap.String("ID", repoNode.ID))
for _, childID := range repoNode.Index { for _, childID := range repoNode.Index {
childNode := repoNode.Nodes[childID] childNode := repoNode.Nodes[childID]
if (level == 0 || expanded || !expanded && childNode.InPath(path)) && (!childNode.Hidden || exposeHiddenNodes) && childNode.CanBeAccessedByGroups(groups) && childNode.IsOneOfTheseMimeTypes(mimeTypes) { if (level == 0 || expanded || !expanded && childNode.InPath(path)) && (!childNode.Hidden || exposeHiddenNodes) && childNode.CanBeAccessedByGroups(groups) && childNode.IsOneOfTheseMimeTypes(mimeTypes) {

View File

@ -12,13 +12,27 @@ import (
"github.com/foomo/contentserver/status" "github.com/foomo/contentserver/status"
) )
func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source string) (replyBytes []byte, err error) { func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source string) ([]byte, error) {
start := time.Now()
reply, err := executeRequest(r, handler, jsonBytes, source)
result := "success"
if err != nil {
result = "error"
}
status.M.ServiceRequestCounter.WithLabelValues(string(handler), result, source).Inc()
status.M.ServiceRequestDuration.WithLabelValues(string(handler), result, source).Observe(time.Since(start).Seconds())
return reply, err
}
func executeRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source string) (replyBytes []byte, err error) {
var ( var (
reply interface{} reply interface{}
apiErr error apiErr error
jsonErr error jsonErr error
start = time.Now()
processIfJSONIsOk = func(err error, processingFunc func()) { processIfJSONIsOk = func(err error, processingFunc func()) {
if err != nil { if err != nil {
jsonErr = err jsonErr = err
@ -57,7 +71,6 @@ func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source strin
default: default:
reply = responses.NewError(1, "unknown handler: "+string(handler)) reply = responses.NewError(1, "unknown handler: "+string(handler))
} }
addMetrics(handler, start, jsonErr, apiErr, source)
// error handling // error handling
if jsonErr != nil { if jsonErr != nil {
@ -71,20 +84,6 @@ func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, source strin
return encodeReply(reply) return encodeReply(reply)
} }
func addMetrics(handlerName Handler, start time.Time, errJSON error, errAPI error, source string) {
var (
duration = time.Since(start)
s = "succeeded"
)
if errJSON != nil || errAPI != nil {
s = "failed"
}
status.M.ServiceRequestCounter.WithLabelValues(string(handlerName), s, source).Inc()
status.M.ServiceRequestDuration.WithLabelValues(string(handlerName), s, source).Observe(float64(duration.Seconds()))
}
// encodeReply takes an interface and encodes it as JSON // encodeReply takes an interface and encodes it as JSON
// it returns the resulting JSON and a marshalling error // it returns the resulting JSON and a marshalling error
func encodeReply(reply interface{}) (replyBytes []byte, err error) { func encodeReply(reply interface{}) (replyBytes []byte, err error) {

View File

@ -7,7 +7,6 @@ import (
"net" "net"
"strconv" "strconv"
"strings" "strings"
"time"
"go.uber.org/zap" "go.uber.org/zap"
@ -47,13 +46,10 @@ func (s *socketServer) execute(handler Handler, jsonBytes []byte) (reply []byte)
Log.Debug("incoming json buffer", zap.Int("length", len(jsonBytes))) Log.Debug("incoming json buffer", zap.Int("length", len(jsonBytes)))
if handler == HandlerGetRepo { if handler == HandlerGetRepo {
var ( var (
b bytes.Buffer b bytes.Buffer
start = time.Now()
) )
s.repo.WriteRepoBytes(&b) s.repo.WriteRepoBytes(&b)
addMetrics(handler, start, nil, nil, sourceSocketServer)
return b.Bytes() return b.Bytes()
} }

View File

@ -5,7 +5,6 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"strings" "strings"
"time"
"go.uber.org/zap" "go.uber.org/zap"
@ -31,7 +30,7 @@ func NewWebServer(path string, r *repo.Repo) http.Handler {
func (s *webServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (s *webServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
Log.Error("panic in handle connection", zap.String("error", fmt.Sprint(r))) Log.Error("Panic in handle connection", zap.String("error", fmt.Sprint(r)))
} }
}() }()
@ -40,17 +39,14 @@ func (s *webServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
} }
jsonBytes, readErr := ioutil.ReadAll(r.Body) jsonBytes, readErr := ioutil.ReadAll(r.Body)
r.Body.Close()
if readErr != nil { if readErr != nil {
http.Error(w, "failed to read incoming request", http.StatusBadRequest) http.Error(w, "failed to read incoming request", http.StatusBadRequest)
return return
} }
h := Handler(strings.TrimPrefix(r.URL.Path, s.path+"/")) h := Handler(strings.TrimPrefix(r.URL.Path, s.path+"/"))
if h == HandlerGetRepo { if h == HandlerGetRepo {
start := time.Now()
s.r.WriteRepoBytes(w) s.r.WriteRepoBytes(w)
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
addMetrics(h, start, nil, nil, sourceWebserver)
return return
} }
reply, errReply := handleRequest(s.r, h, jsonBytes, "webserver") reply, errReply := handleRequest(s.r, h, jsonBytes, "webserver")

View File

@ -28,6 +28,7 @@ type Metrics struct {
ContentRequestCounter *prometheus.CounterVec // count the total number of content requests ContentRequestCounter *prometheus.CounterVec // count the total number of content requests
NumSocketsGauge *prometheus.GaugeVec // keep track of the total number of open sockets NumSocketsGauge *prometheus.GaugeVec // keep track of the total number of open sockets
HistoryPersistFailedCounter *prometheus.CounterVec // count the number of failed attempts to persist the content history HistoryPersistFailedCounter *prometheus.CounterVec // count the number of failed attempts to persist the content history
InvalidNodeTreeRequests *prometheus.CounterVec // counts the number of invalid tree node requests
} }
// newMetrics can be used to instantiate a metrics instance // newMetrics can be used to instantiate a metrics instance
@ -36,6 +37,9 @@ type Metrics struct {
// the package exposes the initialized Metrics instance as the variable M. // the package exposes the initialized Metrics instance as the variable M.
func newMetrics() *Metrics { func newMetrics() *Metrics {
return &Metrics{ return &Metrics{
InvalidNodeTreeRequests: newCounterVec("invalid_node_tree_request_count",
"Counts the number of invalid tree nodes for a specific node ID",
"nodeID"),
ServiceRequestCounter: newCounterVec( ServiceRequestCounter: newCounterVec(
"service_request_count", "service_request_count",
"Count of requests for each handler", "Count of requests for each handler",