From e9245a200c9de25607df180b095234ec92852633 Mon Sep 17 00:00:00 2001 From: Philipp Mieden Date: Thu, 23 May 2019 14:20:38 +0200 Subject: [PATCH] replaced logger, fixed update queue mechanism --- .gitignore | 1 + client/client_test.go | 8 +++- contentserver.go | 40 ++++++---------- log/log.go | 79 ------------------------------ logger/log.go | 39 +++++++++++++++ metrics/prometheus.go | 12 +++-- repo/loader.go | 84 ++++++++++++++++---------------- repo/mock/mock.go | 3 +- repo/repo.go | 103 ++++++++++++++++++++++++---------------- repo/repo_test.go | 6 +++ server/handlerequest.go | 11 +++-- server/server.go | 38 ++++++++++----- server/socketserver.go | 49 ++++++++++--------- server/webserver.go | 5 +- status/healthz.go | 13 +++-- 15 files changed, 251 insertions(+), 240 deletions(-) delete mode 100644 log/log.go create mode 100644 logger/log.go diff --git a/.gitignore b/.gitignore index 4230dfc..8a71575 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +*.log *.test cprof-* var diff --git a/client/client_test.go b/client/client_test.go index bf34007..d7e46ee 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -8,7 +8,7 @@ import ( "time" "github.com/foomo/contentserver/content" - "github.com/foomo/contentserver/log" + . "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/repo/mock" "github.com/foomo/contentserver/requests" "github.com/foomo/contentserver/server" @@ -21,6 +21,10 @@ var ( testServerWebserverAddr string ) +func init() { + SetupLogging(true, "contentserver_client_test.log") +} + func dump(t *testing.T, v interface{}) { jsonBytes, err := json.MarshalIndent(v, "", " ") if err != nil { @@ -51,7 +55,7 @@ func initTestServer(t testing.TB) (socketAddr, webserverAddr string) { socketAddr = getAvailableAddr() webserverAddr = getAvailableAddr() testServer, varDir := mock.GetMockData(t) - log.SelectedLevel = log.LevelError + go func() { err := server.RunServerSocketAndWebServer( testServer.URL+"/repo-two-dimensions.json", diff --git a/contentserver.go b/contentserver.go index 6aff610..abd3d6c 100644 --- a/contentserver.go +++ b/contentserver.go @@ -3,19 +3,19 @@ package main import ( "flag" "fmt" + "net/http" + _ "net/http/pprof" "os" "runtime/debug" "strings" "time" + "github.com/apex/log" + . "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/metrics" - "github.com/foomo/contentserver/status" - - "net/http" - _ "net/http/pprof" - - "github.com/foomo/contentserver/log" "github.com/foomo/contentserver/server" + "github.com/foomo/contentserver/status" + "go.uber.org/zap" ) const ( @@ -38,6 +38,7 @@ var ( flagWebserverAddress = flag.String("webserver-address", "", "address to bind web server host:port, when empty no webserver will be spawned") flagWebserverPath = flag.String("webserver-path", "/contentserver", "path to export the webserver on - useful when behind a proxy") flagVarDir = flag.String("var-dir", "/var/lib/contentserver", "where to put my data") + flagDebug = flag.Bool("debug", true, "toggle debug mode") // debugging / profiling flagFreeOSMem = flag.Int("free-os-mem", 0, "free OS mem every X minutes") @@ -69,6 +70,8 @@ func exitUsage(code int) { func main() { flag.Parse() + SetupLogging(*flagDebug, "contentserver.log") + go func() { fmt.Println(http.ListenAndServe("localhost:6060", nil)) }() @@ -79,12 +82,14 @@ func main() { } if *flagFreeOSMem > 0 { - log.Notice("[INFO] freeing OS memory every ", *flagFreeOSMem, " minutes!") + Log.Info("dumping heap every $interval minutes", zap.Int("interval", *flagHeapDump)) + Log.Info("freeing OS memory every $interval minutes", zap.Int("interval", *flagFreeOSMem)) go func() { for { select { case <-time.After(time.Duration(*flagFreeOSMem) * time.Minute): - log.Notice("FreeOSMemory") + Log.Info("dumping heap every $interval minutes", zap.Int("interval", *flagHeapDump)) + log.Info("FreeOSMemory") debug.FreeOSMemory() } } @@ -92,12 +97,12 @@ func main() { } if *flagHeapDump > 0 { - log.Notice("[INFO] dumping heap every ", *flagHeapDump, " minutes!") + Log.Info("dumping heap every $interval minutes", zap.Int("interval", *flagHeapDump)) go func() { for { select { case <-time.After(time.Duration(*flagFreeOSMem) * time.Minute): - log.Notice("HeapDump") + log.Info("HeapDump") f, err := os.Create("heapdump") if err != nil { panic("failed to create heap dump file") @@ -115,21 +120,6 @@ func main() { if len(flag.Args()) == 1 { fmt.Println(*flagAddress, flag.Arg(0)) - level := log.LevelRecord - switch *logLevel { - case logLevelError: - level = log.LevelError - case logLevelRecord: - level = log.LevelRecord - case logLevelWarning: - level = log.LevelWarning - case logLevelNotice: - level = log.LevelNotice - case logLevelDebug: - level = log.LevelDebug - } - log.SelectedLevel = level - // kickoff metric handlers go metrics.RunPrometheusHandler(DefaultPrometheusListener) go status.RunHealthzHandlerListener(DefaultHealthzHandlerAddress, ServiceName) diff --git a/log/log.go b/log/log.go deleted file mode 100644 index 892d737..0000000 --- a/log/log.go +++ /dev/null @@ -1,79 +0,0 @@ -package log - -import ( - "fmt" - "strings" - "time" -) - -// Level logging level enum -type Level int - -const ( - // LevelError an error - as bad as it gets - LevelError Level = 0 - // LevelRecord put this to the logs in any case - LevelRecord Level = 1 - // LevelWarning not that bad - LevelWarning Level = 2 - // LevelNotice almost on debug level - LevelNotice Level = 3 - // LevelDebug we are debugging - LevelDebug Level = 4 -) - -// SelectedLevel selected log level -var SelectedLevel = LevelDebug - -var prefices = map[Level]string{ - LevelRecord: "record : ", - LevelError: "error : ", - LevelWarning: "warning : ", - LevelNotice: "notice : ", - LevelDebug: "debug : ", -} - -func log(msg string, level Level) string { - if level <= SelectedLevel { - prefix := time.Now().Format(time.RFC3339Nano) + " " + prefices[level] - lines := strings.Split(msg, "\n") - for i := 0; i < len(lines); i++ { - fmt.Println(level, prefix+lines[i]) - } - } - return msg -} - -func logThings(msgs []interface{}, level Level) string { - r := "" - for _, msg := range msgs { - r += "\n" + fmt.Sprint(msg) - } - r = strings.Trim(r, "\n") - return log(r, level) -} - -// Debug write debug messages to the log -func Debug(msgs ...interface{}) string { - return logThings(msgs, LevelDebug) -} - -// Notice write notice messages to the log -func Notice(msgs ...interface{}) string { - return logThings(msgs, LevelNotice) -} - -// Warning write warning messages to the log -func Warning(msgs ...interface{}) string { - return logThings(msgs, LevelWarning) -} - -// Record write record messages to the log -func Record(msgs ...interface{}) string { - return logThings(msgs, LevelRecord) -} - -// Error write error messages to the log -func Error(msgs ...interface{}) string { - return logThings(msgs, LevelError) -} diff --git a/logger/log.go b/logger/log.go new file mode 100644 index 0000000..df0d920 --- /dev/null +++ b/logger/log.go @@ -0,0 +1,39 @@ +package logger + +import ( + "log" + "os" + + "go.uber.org/zap" +) + +var ( + // Log is the logger instance exposed by this package + // call Setup() prior to using it + // want JSON output? Set LOG_JSON env var to 1! + Log *zap.Logger +) + +// SetupLogging configures the logger +func SetupLogging(debug bool, outputPath string) { + + var err error + if debug { + zc := zap.NewDevelopmentConfig() + if os.Getenv("LOG_JSON") == "1" { + zc.Encoding = "json" + } + zc.OutputPaths = append(zc.OutputPaths, outputPath) + Log, err = zc.Build() + } else { + zc := zap.NewProductionConfig() + if os.Getenv("LOG_JSON") == "1" { + zc.Encoding = "json" + } + zc.OutputPaths = append(zc.OutputPaths, outputPath) + Log, err = zc.Build() + } + if err != nil { + log.Fatalf("can't initialize zap logger: %v", err) + } +} diff --git a/metrics/prometheus.go b/metrics/prometheus.go index 4ae5eaa..919b92a 100644 --- a/metrics/prometheus.go +++ b/metrics/prometheus.go @@ -1,11 +1,11 @@ package metrics import ( - "fmt" "net/http" - "github.com/foomo/contentserver/log" + . "github.com/foomo/contentserver/logger" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" ) func PrometheusHandler() http.Handler { @@ -15,6 +15,10 @@ func PrometheusHandler() http.Handler { } func RunPrometheusHandler(listener string) { - log.Notice(fmt.Sprintf("starting prometheus handler on address '%s'", listener)) - log.Error(http.ListenAndServe(listener, PrometheusHandler())) + Log.Info("starting prometheus handler on", + zap.String("address", listener), + ) + Log.Error("server failed: ", + zap.Error(http.ListenAndServe(listener, PrometheusHandler())), + ) } diff --git a/repo/loader.go b/repo/loader.go index 3839d8d..4d7f1c8 100644 --- a/repo/loader.go +++ b/repo/loader.go @@ -8,33 +8,41 @@ import ( "time" "github.com/foomo/contentserver/content" - "github.com/foomo/contentserver/log" + . "github.com/foomo/contentserver/logger" jsoniter "github.com/json-iterator/go" + "go.uber.org/zap" ) -var json = jsoniter.ConfigCompatibleWithStandardLibrary +var ( + json = jsoniter.ConfigCompatibleWithStandardLibrary +) + +type updateResponse struct { + repoRuntime int64 + jsonBytes []byte + err error +} func (repo *Repo) updateRoutine() { - go func() { - for newDimension := range repo.updateChannel { - log.Notice("update routine received a new dimension: " + newDimension.Dimension) + for newDimension := range repo.updateChannel { + Log.Info("update routine received a new dimension", zap.String("dimension", newDimension.Dimension)) - err := repo._updateDimension(newDimension.Dimension, newDimension.Node) - log.Notice("update routine received result") - if err != nil { - log.Debug(" update routine error: " + err.Error()) - } - repo.updateDoneChannel <- err - repo.updateCompleteChannel <- true + err := repo._updateDimension(newDimension.Dimension, newDimension.Node) + Log.Info("update routine received result") + if err != nil { + Log.Debug("update dimension failed", zap.Error(err)) } - }() + repo.updateDoneChannel <- err + } } func (repo *Repo) updateDimension(dimension string, node *content.RepoNode) error { + Log.Debug("trying to push dimension into update channel", zap.String("dimension", dimension), zap.String("nodeName", node.Name)) repo.updateChannel <- &repoDimension{ Dimension: dimension, Node: node, } + Log.Debug("waiting for done signal") return <-repo.updateDoneChannel } @@ -71,7 +79,9 @@ func (repo *Repo) _updateDimension(dimension string, newNode *content.RepoNode) } func builDirectory(dirNode *content.RepoNode, directory map[string]*content.RepoNode, uRIDirectory map[string]*content.RepoNode) error { - log.Debug("repo.buildDirectory: " + dirNode.ID) + + // Log.Debug("buildDirectory", zap.String("ID", dirNode.ID)) + existingNode, ok := directory[dirNode.ID] if ok { return errors.New("duplicate node with id:" + existingNode.ID) @@ -111,19 +121,11 @@ func loadNodesFromJSON(jsonBytes []byte) (nodes map[string]*content.RepoNode, er } func (repo *Repo) tryToRestoreCurrent() error { - - select { - case repo.updateInProgressChannel <- time.Now(): - log.Notice("update request added to queue") - currentJSONBytes, err := repo.history.getCurrent() - if err != nil { - return err - } - return repo.loadJSONBytes(currentJSONBytes) - default: - log.Notice("update request ignored, queue seems to be full") - return errors.New("Update request queue is full. Please try again later.") + currentJSONBytes, err := repo.history.getCurrent() + if err != nil { + return err } + return repo.loadJSONBytes(currentJSONBytes) } func get(URL string) (data []byte, err error) { @@ -144,10 +146,10 @@ func (repo *Repo) update() (repoRuntime int64, jsonBytes []byte, err error) { repoRuntime = time.Now().UnixNano() - startTimeRepo if err != nil { // we have no json to load - the repo server did not reply - log.Debug("we have no json to load - the repo server did not reply", err) + Log.Debug("failed to load json", zap.Error(err)) return repoRuntime, jsonBytes, err } - log.Debug("loading json from: "+repo.server, "length:", len(jsonBytes)) + Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(jsonBytes))) nodes, err := loadNodesFromJSON(jsonBytes) if err != nil { // could not load nodes from json @@ -163,12 +165,14 @@ func (repo *Repo) update() (repoRuntime int64, jsonBytes []byte, err error) { // limit ressources and allow only one update request at once func (repo *Repo) tryUpdate() (repoRuntime int64, jsonBytes []byte, err error) { + c := make(chan updateResponse) select { - case repo.updateInProgressChannel <- time.Now(): - log.Notice("update request added to queue") - return repo.update() + case repo.updateInProgressChannel <- c: + Log.Info("update request added to queue") + ur := <-c + return ur.repoRuntime, ur.jsonBytes, ur.err default: - log.Notice("invalidation request ignored, queue seems to be full") + Log.Info("update request ignored, queue is full") return 0, nil, errors.New("queue full") } } @@ -176,22 +180,22 @@ func (repo *Repo) tryUpdate() (repoRuntime int64, jsonBytes []byte, err error) { func (repo *Repo) loadJSONBytes(jsonBytes []byte) error { nodes, err := loadNodesFromJSON(jsonBytes) if err != nil { - log.Debug("could not parse json", string(jsonBytes)) + Log.Debug("could not parse json", zap.String("json", string(jsonBytes))) return err } err = repo.loadNodes(nodes) if err == nil { historyErr := repo.history.add(jsonBytes) if historyErr != nil { - log.Warning("could not add valid json to history:" + historyErr.Error()) + Log.Error("could not add valid json to history", zap.Error(historyErr)) } else { - log.Record("added valid json to history") + Log.Info("added valid json to history") } cleanUpErr := repo.history.cleanup() if cleanUpErr != nil { - log.Warning("an error occured while cleaning up my history:", cleanUpErr) + Log.Error("an error occured while cleaning up my history", zap.Error(cleanUpErr)) } else { - log.Record("cleaned up history") + Log.Info("cleaned up history") } } return err @@ -201,10 +205,10 @@ func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error { newDimensions := []string{} for dimension, newNode := range newNodes { newDimensions = append(newDimensions, dimension) - log.Debug("loading nodes for dimension " + dimension) + Log.Debug("loading nodes for dimension", zap.String("dimension", dimension)) loadErr := repo.updateDimension(dimension, newNode) if loadErr != nil { - log.Debug(" failed to load " + dimension + ": " + loadErr.Error()) + Log.Debug("failed to load", zap.String("dimension", dimension), zap.Error(loadErr)) return loadErr } } @@ -219,7 +223,7 @@ func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error { // we need to throw away orphaned dimensions for dimension := range repo.Directory { if !dimensionIsValid(dimension) { - log.Notice("removing orphaned dimension:" + dimension) + Log.Info("removing orphaned dimension", zap.String("dimension", dimension)) delete(repo.Directory, dimension) } } diff --git a/repo/mock/mock.go b/repo/mock/mock.go index 54b7736..2b1ae02 100644 --- a/repo/mock/mock.go +++ b/repo/mock/mock.go @@ -9,13 +9,12 @@ import ( "testing" "time" - "github.com/foomo/contentserver/log" "github.com/foomo/contentserver/requests" ) // GetMockData mock data to run a repo func GetMockData(t testing.TB) (server *httptest.Server, varDir string) { - log.SelectedLevel = log.LevelError + _, filename, _, _ := runtime.Caller(0) mockDir := path.Dir(filename) diff --git a/repo/repo.go b/repo/repo.go index a654162..85a0ed3 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -7,9 +7,10 @@ import ( "time" "github.com/foomo/contentserver/content" - "github.com/foomo/contentserver/log" + . "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/requests" "github.com/foomo/contentserver/responses" + "go.uber.org/zap" ) const maxGetURIForNodeRecursionLevel = 1000 @@ -29,8 +30,7 @@ type Repo struct { updateChannel chan *repoDimension updateDoneChannel chan error history *history - updateInProgressChannel chan time.Time - updateCompleteChannel chan bool + updateInProgressChannel chan chan updateResponse } type repoDimension struct { @@ -40,34 +40,45 @@ type repoDimension struct { // NewRepo constructor func NewRepo(server string, varDir string) *Repo { - log.Notice("creating new repo for " + server) - log.Notice(" using var dir:" + varDir) + + Log.Info("creating new repo", + zap.String("server", server), + zap.String("varDir", varDir), + ) repo := &Repo{ server: server, Directory: map[string]*Dimension{}, history: newHistory(varDir), updateChannel: make(chan *repoDimension), updateDoneChannel: make(chan error), - updateInProgressChannel: make(chan time.Time, 1), - updateCompleteChannel: make(chan bool), + updateInProgressChannel: make(chan chan updateResponse, 1), } go func() { for { select { - case t := <-repo.updateInProgressChannel: - log.Notice("got timestamp: ", t, "waiting for update to complete") - <-repo.updateCompleteChannel - log.Notice("update completed!") + case resChan := <-repo.updateInProgressChannel: + Log.Info("waiting for update to complete") + start := time.Now() + + repoRuntime, jsonBytes, errUpdate := repo.update() + + resChan <- updateResponse{ + repoRuntime: repoRuntime, + jsonBytes: jsonBytes, + err: errUpdate, + } + + Log.Info("update completed", zap.Duration("duration", time.Since(start))) } } }() go repo.updateRoutine() - log.Record("trying to restore previous state") + Log.Info("trying to restore previous state") restoreErr := repo.tryToRestoreCurrent() if restoreErr != nil { - log.Record(" could not restore previous repo content:" + restoreErr.Error()) + Log.Error(" could not restore previous repo content", zap.Error(restoreErr)) } else { - log.Record(" restored previous repo content") + Log.Info("restored previous repo content") } return repo } @@ -93,7 +104,7 @@ func (repo *Repo) getNodes(nodeRequests map[string]*requests.Node, env *requests path = []*content.Item{} ) for nodeName, nodeRequest := range nodeRequests { - log.Debug(" adding node " + nodeName + " " + nodeRequest.ID) + Log.Debug("adding node", zap.String("name", nodeName), zap.String("requestID", nodeRequest.ID)) groups := env.Groups if len(nodeRequest.Groups) > 0 { @@ -104,26 +115,29 @@ func (repo *Repo) getNodes(nodeRequests map[string]*requests.Node, env *requests nodes[nodeName] = nil if !ok && nodeRequest.Dimension == "" { - log.Debug(" could not get dimension root node for dimension " + nodeRequest.Dimension) + Log.Debug("could not get dimension root node", zap.String("dimension", nodeRequest.Dimension)) for _, dimension := range env.Dimensions { dimensionNode, ok = repo.Directory[dimension] if ok { - log.Debug(" searched for root node in env.dimension " + dimension + " with success") + Log.Debug("found root node in env.Dimensions", zap.String("dimension", dimension)) break } - log.Debug(" searched for root node in env.dimension " + dimension + " without success") + Log.Debug("could NOT find root node in env.Dimensions", zap.String("dimension", dimension)) } } if !ok { - log.Warning("could not get dimension root node for nodeRequest.Dimension: " + nodeRequest.Dimension) + Log.Error("could not get dimension root node", zap.String("nodeRequest.Dimension", nodeRequest.Dimension)) continue } treeNode, ok := dimensionNode.Directory[nodeRequest.ID] if ok { nodes[nodeName] = repo.getNode(treeNode, nodeRequest.Expand, nodeRequest.MimeTypes, path, 0, groups, nodeRequest.DataFields) } else { - log.Warning("you are requesting an invalid tree node for " + nodeName + " : " + nodeRequest.ID) + Log.Error("an invalid tree node was requested", + zap.String("nodeName", nodeName), + zap.String("ID", nodeRequest.ID), + ) } } return nodes @@ -142,18 +156,18 @@ func (repo *Repo) GetContent(r *requests.Content) (c *content.SiteContent, err e // add more input validation err = repo.validateContentRequest(r) if err != nil { - log.Debug("repo.GetContent invalid request", err) + Log.Error("repo.GetContent invalid request", zap.Error(err)) return } - log.Debug("repo.GetContent: ", r.URI) + Log.Debug("repo.GetContent", zap.String("URI", r.URI)) c = content.NewSiteContent() resolved, resolvedURI, resolvedDimension, node := repo.resolveContent(r.Env.Dimensions, r.URI) if resolved { if !node.CanBeAccessedByGroups(r.Env.Groups) { - log.Notice("401 for " + r.URI) + Log.Warn("resolvecontent got status 401", zap.String("URI", r.URI)) c.Status = content.StatusForbidden } else { - log.Notice("200 for " + r.URI) + Log.Info("resolvecontent got status 200", zap.String("URI", r.URI)) c.Status = content.StatusOk c.Data = node.Data } @@ -169,15 +183,22 @@ func (repo *Repo) GetContent(r *requests.Content) (c *content.SiteContent, err e } c.URIs = uris } else { - log.Notice("404 for " + r.URI) + Log.Info("resolvecontent got status 404", zap.String("URI", r.URI)) c.Status = content.StatusNotFound c.Dimension = r.Env.Dimensions[0] } - if log.SelectedLevel == log.LevelDebug { - log.Debug(fmt.Sprintf("resolved: %v, uri: %v, dim: %v, n: %v", resolved, resolvedURI, resolvedDimension, node)) - } + + Log.Debug("got content", + zap.Bool("resolved", resolved), + zap.String("resolvedURI", resolvedURI), + zap.String("resolvedDimension", resolvedDimension), + zap.String("nodeName", node.Name), + ) if !resolved { - log.Debug("repo.GetContent", r.URI, "could not be resolved falling back to default dimension", r.Env.Dimensions[0]) + Log.Debug("failed to resolve, falling back to default dimension", + zap.String("URI", r.URI), + zap.String("defaultDimension", r.Env.Dimensions[0]), + ) // r.Env.Dimensions is validated => we can access it resolvedDimension = r.Env.Dimensions[0] } @@ -210,27 +231,27 @@ func (repo *Repo) Update() (updateResponse *responses.Update) { updateResponse = &responses.Update{} updateResponse.Stats.RepoRuntime = floatSeconds(updateRepotime) - log.Notice("Update triggered") + Log.Info("Update triggered") if updateErr != nil { updateResponse.Success = false updateResponse.Stats.NumberOfNodes = -1 updateResponse.Stats.NumberOfURIs = -1 // let us try to restore the world from a file - log.Error("could not update repository:" + updateErr.Error()) + Log.Error("could not update repository:" + updateErr.Error()) updateResponse.ErrorMessage = updateErr.Error() restoreErr := repo.tryToRestoreCurrent() if restoreErr != nil { - log.Error("failed to restore preceding repo version: " + restoreErr.Error()) + Log.Error("failed to restore preceding repo version", zap.Error(restoreErr)) } else { - log.Record("restored current repo from local history") + Log.Info("restored current repo from local history") } } else { updateResponse.Success = true // persist the currently loaded one historyErr := repo.history.add(jsonBytes) if historyErr != nil { - log.Warning("could not persist current repo in history: " + historyErr.Error()) + Log.Warn("could not persist current repo in history", zap.Error(historyErr)) } // add some stats for dimension := range repo.Directory { @@ -245,7 +266,7 @@ func (repo *Repo) Update() (updateResponse *responses.Update) { // resolveContent find content in a repository func (repo *Repo) resolveContent(dimensions []string, URI string) (resolved bool, resolvedURI string, resolvedDimension string, repoNode *content.RepoNode) { parts := strings.Split(URI, content.PathSeparator) - log.Debug("repo.ResolveContent: " + URI) + Log.Debug("repo.ResolveContent", zap.String("URI", URI)) for i := len(parts); i > 0; i-- { testURI := strings.Join(parts[0:i], content.PathSeparator) if testURI == "" { @@ -253,11 +274,13 @@ func (repo *Repo) resolveContent(dimensions []string, URI string) (resolved bool } for _, dimension := range dimensions { if d, ok := repo.Directory[dimension]; ok { - log.Debug(" testing[" + dimension + "]: " + testURI) + Log.Debug("checking", + zap.String("dimension", dimension), + zap.String("URI", testURI), + ) if repoNode, ok := d.URIDirectory[testURI]; ok { resolved = true - log.Debug(" found => " + testURI) - log.Debug(" destination " + fmt.Sprint(repoNode.DestinationID)) + Log.Debug("found node", zap.String("URI", testURI), zap.String("destination", repoNode.DestinationID)) if len(repoNode.DestinationID) > 0 { if destionationNode, destinationNodeOk := d.Directory[repoNode.DestinationID]; destinationNodeOk { repoNode = destionationNode @@ -279,7 +302,7 @@ func (repo *Repo) getURIForNode(dimension string, repoNode *content.RepoNode, re linkedNode, ok := repo.Directory[dimension].Directory[repoNode.LinkID] if ok { if recursionLevel > maxGetURIForNodeRecursionLevel { - log.Error("maxGetURIForNodeRecursionLevel reached for", repoNode.ID, "link id", repoNode.LinkID, "in dimension", dimension) + Log.Error("maxGetURIForNodeRecursionLevel reached", zap.String("repoNode.ID", repoNode.ID), zap.String("linkID", repoNode.LinkID), zap.String("dimension", dimension)) return "" } return repo.getURIForNode(dimension, linkedNode, recursionLevel+1) @@ -298,7 +321,7 @@ func (repo *Repo) getURI(dimension string, id string) string { func (repo *Repo) getNode(repoNode *content.RepoNode, expanded bool, mimeTypes []string, path []*content.Item, level int, groups []string, dataFields []string) *content.Node { node := content.NewNode() node.Item = repoNode.ToItem(dataFields) - log.Debug("repo.GetNode: " + repoNode.ID) + Log.Debug("getNode", zap.String("ID", repoNode.ID)) for _, childID := range repoNode.Index { childNode := repoNode.Nodes[childID] if (level == 0 || expanded || !expanded && childNode.InPath(path)) && !childNode.Hidden && childNode.CanBeAccessedByGroups(groups) && childNode.IsOneOfTheseMimeTypes(mimeTypes) { diff --git a/repo/repo_test.go b/repo/repo_test.go index 89a48e3..4c0d96e 100644 --- a/repo/repo_test.go +++ b/repo/repo_test.go @@ -4,10 +4,16 @@ import ( "strings" "testing" + . "github.com/foomo/contentserver/logger" + _ "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/repo/mock" "github.com/foomo/contentserver/requests" ) +func init() { + SetupLogging(true, "contentserver_repo_test.log") +} + func assertRepoIsEmpty(t *testing.T, r *Repo, empty bool) { if empty { if len(r.Directory) > 0 { diff --git a/server/handlerequest.go b/server/handlerequest.go index ccac8b7..bf24297 100644 --- a/server/handlerequest.go +++ b/server/handlerequest.go @@ -1,10 +1,11 @@ package server import ( - "fmt" "time" - "github.com/foomo/contentserver/log" + "go.uber.org/zap" + + . "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/repo" "github.com/foomo/contentserver/requests" "github.com/foomo/contentserver/responses" @@ -67,10 +68,10 @@ func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, metrics *sta // error handling if jsonErr != nil { - log.Error(" could not read incoming json:", jsonErr) + Log.Error("could not read incoming json", zap.Error(jsonErr)) reply = responses.NewError(2, "could not read incoming json "+jsonErr.Error()) } else if apiErr != nil { - log.Error(" an API error occured:", apiErr) + Log.Error("an API error occured", zap.Error(apiErr)) reply = responses.NewError(3, "internal error "+apiErr.Error()) } @@ -107,7 +108,7 @@ func encodeReply(reply interface{}) (replyBytes []byte, err error) { "reply": reply, }, "", " ") if err != nil { - log.Error(" could not encode reply " + fmt.Sprint(err)) + Log.Error("could not encode reply", zap.Error(err)) } return } diff --git a/server/server.go b/server/server.go index 6de2063..82108e1 100644 --- a/server/server.go +++ b/server/server.go @@ -7,12 +7,15 @@ import ( "net/http" "os" - "github.com/foomo/contentserver/log" + . "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/repo" jsoniter "github.com/json-iterator/go" + "go.uber.org/zap" ) -var json = jsoniter.ConfigCompatibleWithStandardLibrary +var ( + json = jsoniter.ConfigCompatibleWithStandardLibrary +) // Handler type type Handler string @@ -45,14 +48,21 @@ func RunServerSocketAndWebServer( if address == "" && webserverAddress == "" { return errors.New("one of the addresses needs to be set") } - log.Record("building repo with content from " + server) + Log.Info("building repo with content", zap.String("server", server)) + r := repo.NewRepo(server, varDir) // start initial update and handle error go func() { resp := r.Update() if !resp.Success { - log.Error("failed to update: ", resp) + Log.Error("failed to update", + zap.String("error", resp.ErrorMessage), + zap.Int("NumberOfNodes", resp.Stats.NumberOfNodes), + zap.Int("NumberOfURIs", resp.Stats.NumberOfURIs), + zap.Float64("OwnRuntime", resp.Stats.OwnRuntime), + zap.Float64("RepoRuntime", resp.Stats.RepoRuntime), + ) os.Exit(1) } }() @@ -61,11 +71,11 @@ func RunServerSocketAndWebServer( chanErr := make(chan error) if address != "" { - log.Notice("starting socketserver on: ", address) + Log.Info("starting socketserver", zap.String("address", address)) go runSocketServer(r, address, chanErr) } if webserverAddress != "" { - log.Notice("starting webserver on: ", webserverAddress) + Log.Info("starting webserver", zap.String("webserverAddress", webserverAddress)) go runWebserver(r, webserverAddress, webserverPath, chanErr) } return <-chanErr @@ -91,24 +101,26 @@ func runSocketServer( // listen on socket ln, errListen := net.Listen("tcp", address) if errListen != nil { - errListenSocket := errors.New("RunSocketServer: could not start the on \"" + address + "\" - error: " + fmt.Sprint(errListen)) - log.Error(errListenSocket) - chanErr <- errListenSocket + Log.Error("runSocketServer: could not start", + zap.String("address", address), + zap.Error(errListen), + ) + chanErr <- errors.New("runSocketServer: could not start the on \"" + address + "\" - error: " + fmt.Sprint(errListen)) return } - log.Record("RunSocketServer: started to listen on " + address) + Log.Info("runSocketServer: started listening", zap.String("address", address)) for { // this blocks until connection or error conn, err := ln.Accept() if err != nil { - log.Error("RunSocketServer: could not accept connection" + fmt.Sprint(err)) + Log.Error("runSocketServer: could not accept connection", zap.Error(err)) continue } - log.Debug("new connection") + // a goroutine handles conn so that the loop can accept other connections go func() { - log.Debug("accepted connection") + Log.Debug("accepted connection", zap.String("source", conn.RemoteAddr().String())) s.handleConnection(conn) conn.Close() // log.Debug("connection closed") diff --git a/server/socketserver.go b/server/socketserver.go index f667e3e..3adacf1 100644 --- a/server/socketserver.go +++ b/server/socketserver.go @@ -7,7 +7,9 @@ import ( "strconv" "strings" - "github.com/foomo/contentserver/log" + "go.uber.org/zap" + + . "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/repo" "github.com/foomo/contentserver/responses" "github.com/foomo/contentserver/status" @@ -39,13 +41,10 @@ func extractHandlerAndJSONLentgh(header string) (handler Handler, jsonLength int } func (s *socketServer) execute(handler Handler, jsonBytes []byte) (reply []byte) { - if log.SelectedLevel == log.LevelDebug { - log.Debug(" incoming json buffer of length: ", len(jsonBytes)) - // log.Debug(" incoming json buffer:", string(jsonBytes)) - } + Log.Debug("incoming json buffer", zap.Int("length", len(jsonBytes))) reply, handlingError := handleRequest(s.repo, handler, jsonBytes, s.metrics) if handlingError != nil { - log.Error("socketServer.execute handlingError :", handlingError) + Log.Error("socketServer.execute failed", zap.Error(handlingError)) } return reply } @@ -53,22 +52,24 @@ func (s *socketServer) execute(handler Handler, jsonBytes []byte) (reply []byte) func (s *socketServer) writeResponse(conn net.Conn, reply []byte) { headerBytes := []byte(strconv.Itoa(len(reply))) reply = append(headerBytes, reply...) - log.Debug(" replying: " + string(reply)) + Log.Debug("replying", zap.String("reply", string(reply))) n, writeError := conn.Write(reply) if writeError != nil { - log.Error("socketServer.writeResponse: could not write my reply: " + fmt.Sprint(writeError)) + Log.Error("socketServer.writeResponse: could not write reply", zap.Error(writeError)) return } if n < len(reply) { - log.Error(fmt.Sprintf("socketServer.writeResponse: write too short %q instead of %q", n, len(reply))) + Log.Error("socketServer.writeResponse: write too short", + zap.Int("got", n), + zap.Int("expected", len(reply)), + ) return } - log.Debug(" replied. waiting for next request on open connection") - + Log.Debug("replied. waiting for next request on open connection") } func (s *socketServer) handleConnection(conn net.Conn) { - log.Debug("socketServer.handleConnection") + Log.Debug("socketServer.handleConnection") var ( headerBuffer [1]byte @@ -81,7 +82,7 @@ func (s *socketServer) handleConnection(conn net.Conn) { // let us read with 1 byte steps on conn until we find "{" _, readErr := conn.Read(headerBuffer[0:]) if readErr != nil { - log.Debug(" looks like the client closed the connection: ", readErr) + Log.Debug("looks like the client closed the connection", zap.Error(readErr)) return } // read next byte @@ -92,16 +93,16 @@ func (s *socketServer) handleConnection(conn net.Conn) { // reset header header = "" if headerErr != nil { - log.Error("invalid request could not read header", headerErr) + Log.Error("invalid request could not read header", zap.Error(headerErr)) encodedErr, encodingErr := encodeReply(responses.NewError(4, "invalid header "+headerErr.Error())) if encodingErr == nil { s.writeResponse(conn, encodedErr) } else { - log.Error("could not respond to invalid request", encodingErr) + Log.Error("could not respond to invalid request", zap.Error(encodingErr)) } return } - log.Debug(fmt.Sprintf(" found json with %d bytes", jsonLength)) + Log.Debug("found json", zap.Int("length", jsonLength)) if jsonLength > 0 { var ( @@ -120,22 +121,24 @@ func (s *socketServer) handleConnection(conn net.Conn) { if jsonReadErr != nil { //@fixme we need to force a read timeout (SetReadDeadline?), if expected jsonLength is lower than really sent bytes (e.g. if client implements protocol wrong) //@todo should we check for io.EOF here - log.Error(" could not read json - giving up with this client connection" + fmt.Sprint(jsonReadErr)) + Log.Error("could not read json - giving up with this client connection", zap.Error(jsonReadErr)) return } jsonLengthCurrent += readLength - log.Debug(fmt.Sprintf(" read so far %d of %d bytes in read cycle %d", jsonLengthCurrent, jsonLength, readRound)) + Log.Debug("read cycle status", + zap.Int("jsonLengthCurrent", jsonLengthCurrent), + zap.Int("jsonLength", jsonLength), + zap.Int("readRound", readRound), + ) } - if log.SelectedLevel == log.LevelDebug { - log.Debug(" read json, length: ", len(jsonBytes)) - // log.Debug(" read json: " + string(jsonBytes)) - } + Log.Debug("read json", zap.Int("length", len(jsonBytes))) + s.writeResponse(conn, s.execute(handler, jsonBytes)) // note: connection remains open continue } - log.Error("can not read empty json") + Log.Error("can not read empty json") return } // adding to header byte by byte diff --git a/server/webserver.go b/server/webserver.go index 4c5470a..61481b4 100644 --- a/server/webserver.go +++ b/server/webserver.go @@ -5,9 +5,10 @@ import ( "net/http" "strings" - "github.com/foomo/contentserver/log" "github.com/foomo/contentserver/status" + "go.uber.org/zap" + . "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/repo" ) @@ -44,6 +45,6 @@ func (s *webServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { } _, err := w.Write(reply) if err != nil { - log.Error("failed to write webServer reply: ", err) + Log.Error("failed to write webServer reply", zap.Error(err)) } } diff --git a/status/healthz.go b/status/healthz.go index dba2936..077aa11 100644 --- a/status/healthz.go +++ b/status/healthz.go @@ -4,15 +4,18 @@ import ( "fmt" "net/http" - "github.com/foomo/contentserver/log" + . "github.com/foomo/contentserver/logger" jsoniter "github.com/json-iterator/go" + "go.uber.org/zap" ) -var json = jsoniter.ConfigCompatibleWithStandardLibrary +var ( + json = jsoniter.ConfigCompatibleWithStandardLibrary +) func RunHealthzHandlerListener(address string, serviceName string) { - log.Notice(fmt.Sprintf("starting healthz handler on '%s'" + address)) - log.Error(http.ListenAndServe(address, HealthzHandler(serviceName))) + Log.Info(fmt.Sprintf("starting healthz handler on '%s'" + address)) + Log.Error("healthz server failed", zap.Error(http.ListenAndServe(address, HealthzHandler(serviceName)))) } func HealthzHandler(serviceName string) http.Handler { @@ -26,7 +29,7 @@ func HealthzHandler(serviceName string) http.Handler { h.Handle("/healthz", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { _, err := w.Write(status) if err != nil { - log.Error("failed to write healthz status: ", err) + Log.Error("failed to write healthz status", zap.Error(err)) } }))