replaced logger, fixed update queue mechanism

This commit is contained in:
Philipp Mieden 2019-05-23 14:20:38 +02:00
parent d9f6cc60c4
commit e9245a200c
15 changed files with 251 additions and 240 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
*.log
*.test
cprof-*
var

View File

@ -8,7 +8,7 @@ import (
"time"
"github.com/foomo/contentserver/content"
"github.com/foomo/contentserver/log"
. "github.com/foomo/contentserver/logger"
"github.com/foomo/contentserver/repo/mock"
"github.com/foomo/contentserver/requests"
"github.com/foomo/contentserver/server"
@ -21,6 +21,10 @@ var (
testServerWebserverAddr string
)
func init() {
SetupLogging(true, "contentserver_client_test.log")
}
func dump(t *testing.T, v interface{}) {
jsonBytes, err := json.MarshalIndent(v, "", " ")
if err != nil {
@ -51,7 +55,7 @@ func initTestServer(t testing.TB) (socketAddr, webserverAddr string) {
socketAddr = getAvailableAddr()
webserverAddr = getAvailableAddr()
testServer, varDir := mock.GetMockData(t)
log.SelectedLevel = log.LevelError
go func() {
err := server.RunServerSocketAndWebServer(
testServer.URL+"/repo-two-dimensions.json",

View File

@ -3,19 +3,19 @@ package main
import (
"flag"
"fmt"
"net/http"
_ "net/http/pprof"
"os"
"runtime/debug"
"strings"
"time"
"github.com/apex/log"
. "github.com/foomo/contentserver/logger"
"github.com/foomo/contentserver/metrics"
"github.com/foomo/contentserver/status"
"net/http"
_ "net/http/pprof"
"github.com/foomo/contentserver/log"
"github.com/foomo/contentserver/server"
"github.com/foomo/contentserver/status"
"go.uber.org/zap"
)
const (
@ -38,6 +38,7 @@ var (
flagWebserverAddress = flag.String("webserver-address", "", "address to bind web server host:port, when empty no webserver will be spawned")
flagWebserverPath = flag.String("webserver-path", "/contentserver", "path to export the webserver on - useful when behind a proxy")
flagVarDir = flag.String("var-dir", "/var/lib/contentserver", "where to put my data")
flagDebug = flag.Bool("debug", true, "toggle debug mode")
// debugging / profiling
flagFreeOSMem = flag.Int("free-os-mem", 0, "free OS mem every X minutes")
@ -69,6 +70,8 @@ func exitUsage(code int) {
func main() {
flag.Parse()
SetupLogging(*flagDebug, "contentserver.log")
go func() {
fmt.Println(http.ListenAndServe("localhost:6060", nil))
}()
@ -79,12 +82,14 @@ func main() {
}
if *flagFreeOSMem > 0 {
log.Notice("[INFO] freeing OS memory every ", *flagFreeOSMem, " minutes!")
Log.Info("dumping heap every $interval minutes", zap.Int("interval", *flagHeapDump))
Log.Info("freeing OS memory every $interval minutes", zap.Int("interval", *flagFreeOSMem))
go func() {
for {
select {
case <-time.After(time.Duration(*flagFreeOSMem) * time.Minute):
log.Notice("FreeOSMemory")
Log.Info("dumping heap every $interval minutes", zap.Int("interval", *flagHeapDump))
log.Info("FreeOSMemory")
debug.FreeOSMemory()
}
}
@ -92,12 +97,12 @@ func main() {
}
if *flagHeapDump > 0 {
log.Notice("[INFO] dumping heap every ", *flagHeapDump, " minutes!")
Log.Info("dumping heap every $interval minutes", zap.Int("interval", *flagHeapDump))
go func() {
for {
select {
case <-time.After(time.Duration(*flagFreeOSMem) * time.Minute):
log.Notice("HeapDump")
log.Info("HeapDump")
f, err := os.Create("heapdump")
if err != nil {
panic("failed to create heap dump file")
@ -115,21 +120,6 @@ func main() {
if len(flag.Args()) == 1 {
fmt.Println(*flagAddress, flag.Arg(0))
level := log.LevelRecord
switch *logLevel {
case logLevelError:
level = log.LevelError
case logLevelRecord:
level = log.LevelRecord
case logLevelWarning:
level = log.LevelWarning
case logLevelNotice:
level = log.LevelNotice
case logLevelDebug:
level = log.LevelDebug
}
log.SelectedLevel = level
// kickoff metric handlers
go metrics.RunPrometheusHandler(DefaultPrometheusListener)
go status.RunHealthzHandlerListener(DefaultHealthzHandlerAddress, ServiceName)

View File

@ -1,79 +0,0 @@
package log
import (
"fmt"
"strings"
"time"
)
// Level logging level enum
type Level int
const (
// LevelError an error - as bad as it gets
LevelError Level = 0
// LevelRecord put this to the logs in any case
LevelRecord Level = 1
// LevelWarning not that bad
LevelWarning Level = 2
// LevelNotice almost on debug level
LevelNotice Level = 3
// LevelDebug we are debugging
LevelDebug Level = 4
)
// SelectedLevel selected log level
var SelectedLevel = LevelDebug
var prefices = map[Level]string{
LevelRecord: "record : ",
LevelError: "error : ",
LevelWarning: "warning : ",
LevelNotice: "notice : ",
LevelDebug: "debug : ",
}
func log(msg string, level Level) string {
if level <= SelectedLevel {
prefix := time.Now().Format(time.RFC3339Nano) + " " + prefices[level]
lines := strings.Split(msg, "\n")
for i := 0; i < len(lines); i++ {
fmt.Println(level, prefix+lines[i])
}
}
return msg
}
func logThings(msgs []interface{}, level Level) string {
r := ""
for _, msg := range msgs {
r += "\n" + fmt.Sprint(msg)
}
r = strings.Trim(r, "\n")
return log(r, level)
}
// Debug write debug messages to the log
func Debug(msgs ...interface{}) string {
return logThings(msgs, LevelDebug)
}
// Notice write notice messages to the log
func Notice(msgs ...interface{}) string {
return logThings(msgs, LevelNotice)
}
// Warning write warning messages to the log
func Warning(msgs ...interface{}) string {
return logThings(msgs, LevelWarning)
}
// Record write record messages to the log
func Record(msgs ...interface{}) string {
return logThings(msgs, LevelRecord)
}
// Error write error messages to the log
func Error(msgs ...interface{}) string {
return logThings(msgs, LevelError)
}

39
logger/log.go Normal file
View File

@ -0,0 +1,39 @@
package logger
import (
"log"
"os"
"go.uber.org/zap"
)
var (
// Log is the logger instance exposed by this package
// call Setup() prior to using it
// want JSON output? Set LOG_JSON env var to 1!
Log *zap.Logger
)
// SetupLogging configures the logger
func SetupLogging(debug bool, outputPath string) {
var err error
if debug {
zc := zap.NewDevelopmentConfig()
if os.Getenv("LOG_JSON") == "1" {
zc.Encoding = "json"
}
zc.OutputPaths = append(zc.OutputPaths, outputPath)
Log, err = zc.Build()
} else {
zc := zap.NewProductionConfig()
if os.Getenv("LOG_JSON") == "1" {
zc.Encoding = "json"
}
zc.OutputPaths = append(zc.OutputPaths, outputPath)
Log, err = zc.Build()
}
if err != nil {
log.Fatalf("can't initialize zap logger: %v", err)
}
}

View File

@ -1,11 +1,11 @@
package metrics
import (
"fmt"
"net/http"
"github.com/foomo/contentserver/log"
. "github.com/foomo/contentserver/logger"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
)
func PrometheusHandler() http.Handler {
@ -15,6 +15,10 @@ func PrometheusHandler() http.Handler {
}
func RunPrometheusHandler(listener string) {
log.Notice(fmt.Sprintf("starting prometheus handler on address '%s'", listener))
log.Error(http.ListenAndServe(listener, PrometheusHandler()))
Log.Info("starting prometheus handler on",
zap.String("address", listener),
)
Log.Error("server failed: ",
zap.Error(http.ListenAndServe(listener, PrometheusHandler())),
)
}

View File

@ -8,33 +8,41 @@ import (
"time"
"github.com/foomo/contentserver/content"
"github.com/foomo/contentserver/log"
. "github.com/foomo/contentserver/logger"
jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
var (
json = jsoniter.ConfigCompatibleWithStandardLibrary
)
type updateResponse struct {
repoRuntime int64
jsonBytes []byte
err error
}
func (repo *Repo) updateRoutine() {
go func() {
for newDimension := range repo.updateChannel {
log.Notice("update routine received a new dimension: " + newDimension.Dimension)
for newDimension := range repo.updateChannel {
Log.Info("update routine received a new dimension", zap.String("dimension", newDimension.Dimension))
err := repo._updateDimension(newDimension.Dimension, newDimension.Node)
log.Notice("update routine received result")
if err != nil {
log.Debug(" update routine error: " + err.Error())
}
repo.updateDoneChannel <- err
repo.updateCompleteChannel <- true
err := repo._updateDimension(newDimension.Dimension, newDimension.Node)
Log.Info("update routine received result")
if err != nil {
Log.Debug("update dimension failed", zap.Error(err))
}
}()
repo.updateDoneChannel <- err
}
}
func (repo *Repo) updateDimension(dimension string, node *content.RepoNode) error {
Log.Debug("trying to push dimension into update channel", zap.String("dimension", dimension), zap.String("nodeName", node.Name))
repo.updateChannel <- &repoDimension{
Dimension: dimension,
Node: node,
}
Log.Debug("waiting for done signal")
return <-repo.updateDoneChannel
}
@ -71,7 +79,9 @@ func (repo *Repo) _updateDimension(dimension string, newNode *content.RepoNode)
}
func builDirectory(dirNode *content.RepoNode, directory map[string]*content.RepoNode, uRIDirectory map[string]*content.RepoNode) error {
log.Debug("repo.buildDirectory: " + dirNode.ID)
// Log.Debug("buildDirectory", zap.String("ID", dirNode.ID))
existingNode, ok := directory[dirNode.ID]
if ok {
return errors.New("duplicate node with id:" + existingNode.ID)
@ -111,19 +121,11 @@ func loadNodesFromJSON(jsonBytes []byte) (nodes map[string]*content.RepoNode, er
}
func (repo *Repo) tryToRestoreCurrent() error {
select {
case repo.updateInProgressChannel <- time.Now():
log.Notice("update request added to queue")
currentJSONBytes, err := repo.history.getCurrent()
if err != nil {
return err
}
return repo.loadJSONBytes(currentJSONBytes)
default:
log.Notice("update request ignored, queue seems to be full")
return errors.New("Update request queue is full. Please try again later.")
currentJSONBytes, err := repo.history.getCurrent()
if err != nil {
return err
}
return repo.loadJSONBytes(currentJSONBytes)
}
func get(URL string) (data []byte, err error) {
@ -144,10 +146,10 @@ func (repo *Repo) update() (repoRuntime int64, jsonBytes []byte, err error) {
repoRuntime = time.Now().UnixNano() - startTimeRepo
if err != nil {
// we have no json to load - the repo server did not reply
log.Debug("we have no json to load - the repo server did not reply", err)
Log.Debug("failed to load json", zap.Error(err))
return repoRuntime, jsonBytes, err
}
log.Debug("loading json from: "+repo.server, "length:", len(jsonBytes))
Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(jsonBytes)))
nodes, err := loadNodesFromJSON(jsonBytes)
if err != nil {
// could not load nodes from json
@ -163,12 +165,14 @@ func (repo *Repo) update() (repoRuntime int64, jsonBytes []byte, err error) {
// limit ressources and allow only one update request at once
func (repo *Repo) tryUpdate() (repoRuntime int64, jsonBytes []byte, err error) {
c := make(chan updateResponse)
select {
case repo.updateInProgressChannel <- time.Now():
log.Notice("update request added to queue")
return repo.update()
case repo.updateInProgressChannel <- c:
Log.Info("update request added to queue")
ur := <-c
return ur.repoRuntime, ur.jsonBytes, ur.err
default:
log.Notice("invalidation request ignored, queue seems to be full")
Log.Info("update request ignored, queue is full")
return 0, nil, errors.New("queue full")
}
}
@ -176,22 +180,22 @@ func (repo *Repo) tryUpdate() (repoRuntime int64, jsonBytes []byte, err error) {
func (repo *Repo) loadJSONBytes(jsonBytes []byte) error {
nodes, err := loadNodesFromJSON(jsonBytes)
if err != nil {
log.Debug("could not parse json", string(jsonBytes))
Log.Debug("could not parse json", zap.String("json", string(jsonBytes)))
return err
}
err = repo.loadNodes(nodes)
if err == nil {
historyErr := repo.history.add(jsonBytes)
if historyErr != nil {
log.Warning("could not add valid json to history:" + historyErr.Error())
Log.Error("could not add valid json to history", zap.Error(historyErr))
} else {
log.Record("added valid json to history")
Log.Info("added valid json to history")
}
cleanUpErr := repo.history.cleanup()
if cleanUpErr != nil {
log.Warning("an error occured while cleaning up my history:", cleanUpErr)
Log.Error("an error occured while cleaning up my history", zap.Error(cleanUpErr))
} else {
log.Record("cleaned up history")
Log.Info("cleaned up history")
}
}
return err
@ -201,10 +205,10 @@ func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error {
newDimensions := []string{}
for dimension, newNode := range newNodes {
newDimensions = append(newDimensions, dimension)
log.Debug("loading nodes for dimension " + dimension)
Log.Debug("loading nodes for dimension", zap.String("dimension", dimension))
loadErr := repo.updateDimension(dimension, newNode)
if loadErr != nil {
log.Debug(" failed to load " + dimension + ": " + loadErr.Error())
Log.Debug("failed to load", zap.String("dimension", dimension), zap.Error(loadErr))
return loadErr
}
}
@ -219,7 +223,7 @@ func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error {
// we need to throw away orphaned dimensions
for dimension := range repo.Directory {
if !dimensionIsValid(dimension) {
log.Notice("removing orphaned dimension:" + dimension)
Log.Info("removing orphaned dimension", zap.String("dimension", dimension))
delete(repo.Directory, dimension)
}
}

View File

@ -9,13 +9,12 @@ import (
"testing"
"time"
"github.com/foomo/contentserver/log"
"github.com/foomo/contentserver/requests"
)
// GetMockData mock data to run a repo
func GetMockData(t testing.TB) (server *httptest.Server, varDir string) {
log.SelectedLevel = log.LevelError
_, filename, _, _ := runtime.Caller(0)
mockDir := path.Dir(filename)

View File

@ -7,9 +7,10 @@ import (
"time"
"github.com/foomo/contentserver/content"
"github.com/foomo/contentserver/log"
. "github.com/foomo/contentserver/logger"
"github.com/foomo/contentserver/requests"
"github.com/foomo/contentserver/responses"
"go.uber.org/zap"
)
const maxGetURIForNodeRecursionLevel = 1000
@ -29,8 +30,7 @@ type Repo struct {
updateChannel chan *repoDimension
updateDoneChannel chan error
history *history
updateInProgressChannel chan time.Time
updateCompleteChannel chan bool
updateInProgressChannel chan chan updateResponse
}
type repoDimension struct {
@ -40,34 +40,45 @@ type repoDimension struct {
// NewRepo constructor
func NewRepo(server string, varDir string) *Repo {
log.Notice("creating new repo for " + server)
log.Notice(" using var dir:" + varDir)
Log.Info("creating new repo",
zap.String("server", server),
zap.String("varDir", varDir),
)
repo := &Repo{
server: server,
Directory: map[string]*Dimension{},
history: newHistory(varDir),
updateChannel: make(chan *repoDimension),
updateDoneChannel: make(chan error),
updateInProgressChannel: make(chan time.Time, 1),
updateCompleteChannel: make(chan bool),
updateInProgressChannel: make(chan chan updateResponse, 1),
}
go func() {
for {
select {
case t := <-repo.updateInProgressChannel:
log.Notice("got timestamp: ", t, "waiting for update to complete")
<-repo.updateCompleteChannel
log.Notice("update completed!")
case resChan := <-repo.updateInProgressChannel:
Log.Info("waiting for update to complete")
start := time.Now()
repoRuntime, jsonBytes, errUpdate := repo.update()
resChan <- updateResponse{
repoRuntime: repoRuntime,
jsonBytes: jsonBytes,
err: errUpdate,
}
Log.Info("update completed", zap.Duration("duration", time.Since(start)))
}
}
}()
go repo.updateRoutine()
log.Record("trying to restore previous state")
Log.Info("trying to restore previous state")
restoreErr := repo.tryToRestoreCurrent()
if restoreErr != nil {
log.Record(" could not restore previous repo content:" + restoreErr.Error())
Log.Error(" could not restore previous repo content", zap.Error(restoreErr))
} else {
log.Record(" restored previous repo content")
Log.Info("restored previous repo content")
}
return repo
}
@ -93,7 +104,7 @@ func (repo *Repo) getNodes(nodeRequests map[string]*requests.Node, env *requests
path = []*content.Item{}
)
for nodeName, nodeRequest := range nodeRequests {
log.Debug(" adding node " + nodeName + " " + nodeRequest.ID)
Log.Debug("adding node", zap.String("name", nodeName), zap.String("requestID", nodeRequest.ID))
groups := env.Groups
if len(nodeRequest.Groups) > 0 {
@ -104,26 +115,29 @@ func (repo *Repo) getNodes(nodeRequests map[string]*requests.Node, env *requests
nodes[nodeName] = nil
if !ok && nodeRequest.Dimension == "" {
log.Debug(" could not get dimension root node for dimension " + nodeRequest.Dimension)
Log.Debug("could not get dimension root node", zap.String("dimension", nodeRequest.Dimension))
for _, dimension := range env.Dimensions {
dimensionNode, ok = repo.Directory[dimension]
if ok {
log.Debug(" searched for root node in env.dimension " + dimension + " with success")
Log.Debug("found root node in env.Dimensions", zap.String("dimension", dimension))
break
}
log.Debug(" searched for root node in env.dimension " + dimension + " without success")
Log.Debug("could NOT find root node in env.Dimensions", zap.String("dimension", dimension))
}
}
if !ok {
log.Warning("could not get dimension root node for nodeRequest.Dimension: " + nodeRequest.Dimension)
Log.Error("could not get dimension root node", zap.String("nodeRequest.Dimension", nodeRequest.Dimension))
continue
}
treeNode, ok := dimensionNode.Directory[nodeRequest.ID]
if ok {
nodes[nodeName] = repo.getNode(treeNode, nodeRequest.Expand, nodeRequest.MimeTypes, path, 0, groups, nodeRequest.DataFields)
} else {
log.Warning("you are requesting an invalid tree node for " + nodeName + " : " + nodeRequest.ID)
Log.Error("an invalid tree node was requested",
zap.String("nodeName", nodeName),
zap.String("ID", nodeRequest.ID),
)
}
}
return nodes
@ -142,18 +156,18 @@ func (repo *Repo) GetContent(r *requests.Content) (c *content.SiteContent, err e
// add more input validation
err = repo.validateContentRequest(r)
if err != nil {
log.Debug("repo.GetContent invalid request", err)
Log.Error("repo.GetContent invalid request", zap.Error(err))
return
}
log.Debug("repo.GetContent: ", r.URI)
Log.Debug("repo.GetContent", zap.String("URI", r.URI))
c = content.NewSiteContent()
resolved, resolvedURI, resolvedDimension, node := repo.resolveContent(r.Env.Dimensions, r.URI)
if resolved {
if !node.CanBeAccessedByGroups(r.Env.Groups) {
log.Notice("401 for " + r.URI)
Log.Warn("resolvecontent got status 401", zap.String("URI", r.URI))
c.Status = content.StatusForbidden
} else {
log.Notice("200 for " + r.URI)
Log.Info("resolvecontent got status 200", zap.String("URI", r.URI))
c.Status = content.StatusOk
c.Data = node.Data
}
@ -169,15 +183,22 @@ func (repo *Repo) GetContent(r *requests.Content) (c *content.SiteContent, err e
}
c.URIs = uris
} else {
log.Notice("404 for " + r.URI)
Log.Info("resolvecontent got status 404", zap.String("URI", r.URI))
c.Status = content.StatusNotFound
c.Dimension = r.Env.Dimensions[0]
}
if log.SelectedLevel == log.LevelDebug {
log.Debug(fmt.Sprintf("resolved: %v, uri: %v, dim: %v, n: %v", resolved, resolvedURI, resolvedDimension, node))
}
Log.Debug("got content",
zap.Bool("resolved", resolved),
zap.String("resolvedURI", resolvedURI),
zap.String("resolvedDimension", resolvedDimension),
zap.String("nodeName", node.Name),
)
if !resolved {
log.Debug("repo.GetContent", r.URI, "could not be resolved falling back to default dimension", r.Env.Dimensions[0])
Log.Debug("failed to resolve, falling back to default dimension",
zap.String("URI", r.URI),
zap.String("defaultDimension", r.Env.Dimensions[0]),
)
// r.Env.Dimensions is validated => we can access it
resolvedDimension = r.Env.Dimensions[0]
}
@ -210,27 +231,27 @@ func (repo *Repo) Update() (updateResponse *responses.Update) {
updateResponse = &responses.Update{}
updateResponse.Stats.RepoRuntime = floatSeconds(updateRepotime)
log.Notice("Update triggered")
Log.Info("Update triggered")
if updateErr != nil {
updateResponse.Success = false
updateResponse.Stats.NumberOfNodes = -1
updateResponse.Stats.NumberOfURIs = -1
// let us try to restore the world from a file
log.Error("could not update repository:" + updateErr.Error())
Log.Error("could not update repository:" + updateErr.Error())
updateResponse.ErrorMessage = updateErr.Error()
restoreErr := repo.tryToRestoreCurrent()
if restoreErr != nil {
log.Error("failed to restore preceding repo version: " + restoreErr.Error())
Log.Error("failed to restore preceding repo version", zap.Error(restoreErr))
} else {
log.Record("restored current repo from local history")
Log.Info("restored current repo from local history")
}
} else {
updateResponse.Success = true
// persist the currently loaded one
historyErr := repo.history.add(jsonBytes)
if historyErr != nil {
log.Warning("could not persist current repo in history: " + historyErr.Error())
Log.Warn("could not persist current repo in history", zap.Error(historyErr))
}
// add some stats
for dimension := range repo.Directory {
@ -245,7 +266,7 @@ func (repo *Repo) Update() (updateResponse *responses.Update) {
// resolveContent find content in a repository
func (repo *Repo) resolveContent(dimensions []string, URI string) (resolved bool, resolvedURI string, resolvedDimension string, repoNode *content.RepoNode) {
parts := strings.Split(URI, content.PathSeparator)
log.Debug("repo.ResolveContent: " + URI)
Log.Debug("repo.ResolveContent", zap.String("URI", URI))
for i := len(parts); i > 0; i-- {
testURI := strings.Join(parts[0:i], content.PathSeparator)
if testURI == "" {
@ -253,11 +274,13 @@ func (repo *Repo) resolveContent(dimensions []string, URI string) (resolved bool
}
for _, dimension := range dimensions {
if d, ok := repo.Directory[dimension]; ok {
log.Debug(" testing[" + dimension + "]: " + testURI)
Log.Debug("checking",
zap.String("dimension", dimension),
zap.String("URI", testURI),
)
if repoNode, ok := d.URIDirectory[testURI]; ok {
resolved = true
log.Debug(" found => " + testURI)
log.Debug(" destination " + fmt.Sprint(repoNode.DestinationID))
Log.Debug("found node", zap.String("URI", testURI), zap.String("destination", repoNode.DestinationID))
if len(repoNode.DestinationID) > 0 {
if destionationNode, destinationNodeOk := d.Directory[repoNode.DestinationID]; destinationNodeOk {
repoNode = destionationNode
@ -279,7 +302,7 @@ func (repo *Repo) getURIForNode(dimension string, repoNode *content.RepoNode, re
linkedNode, ok := repo.Directory[dimension].Directory[repoNode.LinkID]
if ok {
if recursionLevel > maxGetURIForNodeRecursionLevel {
log.Error("maxGetURIForNodeRecursionLevel reached for", repoNode.ID, "link id", repoNode.LinkID, "in dimension", dimension)
Log.Error("maxGetURIForNodeRecursionLevel reached", zap.String("repoNode.ID", repoNode.ID), zap.String("linkID", repoNode.LinkID), zap.String("dimension", dimension))
return ""
}
return repo.getURIForNode(dimension, linkedNode, recursionLevel+1)
@ -298,7 +321,7 @@ func (repo *Repo) getURI(dimension string, id string) string {
func (repo *Repo) getNode(repoNode *content.RepoNode, expanded bool, mimeTypes []string, path []*content.Item, level int, groups []string, dataFields []string) *content.Node {
node := content.NewNode()
node.Item = repoNode.ToItem(dataFields)
log.Debug("repo.GetNode: " + repoNode.ID)
Log.Debug("getNode", zap.String("ID", repoNode.ID))
for _, childID := range repoNode.Index {
childNode := repoNode.Nodes[childID]
if (level == 0 || expanded || !expanded && childNode.InPath(path)) && !childNode.Hidden && childNode.CanBeAccessedByGroups(groups) && childNode.IsOneOfTheseMimeTypes(mimeTypes) {

View File

@ -4,10 +4,16 @@ import (
"strings"
"testing"
. "github.com/foomo/contentserver/logger"
_ "github.com/foomo/contentserver/logger"
"github.com/foomo/contentserver/repo/mock"
"github.com/foomo/contentserver/requests"
)
func init() {
SetupLogging(true, "contentserver_repo_test.log")
}
func assertRepoIsEmpty(t *testing.T, r *Repo, empty bool) {
if empty {
if len(r.Directory) > 0 {

View File

@ -1,10 +1,11 @@
package server
import (
"fmt"
"time"
"github.com/foomo/contentserver/log"
"go.uber.org/zap"
. "github.com/foomo/contentserver/logger"
"github.com/foomo/contentserver/repo"
"github.com/foomo/contentserver/requests"
"github.com/foomo/contentserver/responses"
@ -67,10 +68,10 @@ func handleRequest(r *repo.Repo, handler Handler, jsonBytes []byte, metrics *sta
// error handling
if jsonErr != nil {
log.Error(" could not read incoming json:", jsonErr)
Log.Error("could not read incoming json", zap.Error(jsonErr))
reply = responses.NewError(2, "could not read incoming json "+jsonErr.Error())
} else if apiErr != nil {
log.Error(" an API error occured:", apiErr)
Log.Error("an API error occured", zap.Error(apiErr))
reply = responses.NewError(3, "internal error "+apiErr.Error())
}
@ -107,7 +108,7 @@ func encodeReply(reply interface{}) (replyBytes []byte, err error) {
"reply": reply,
}, "", " ")
if err != nil {
log.Error(" could not encode reply " + fmt.Sprint(err))
Log.Error("could not encode reply", zap.Error(err))
}
return
}

View File

@ -7,12 +7,15 @@ import (
"net/http"
"os"
"github.com/foomo/contentserver/log"
. "github.com/foomo/contentserver/logger"
"github.com/foomo/contentserver/repo"
jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
var (
json = jsoniter.ConfigCompatibleWithStandardLibrary
)
// Handler type
type Handler string
@ -45,14 +48,21 @@ func RunServerSocketAndWebServer(
if address == "" && webserverAddress == "" {
return errors.New("one of the addresses needs to be set")
}
log.Record("building repo with content from " + server)
Log.Info("building repo with content", zap.String("server", server))
r := repo.NewRepo(server, varDir)
// start initial update and handle error
go func() {
resp := r.Update()
if !resp.Success {
log.Error("failed to update: ", resp)
Log.Error("failed to update",
zap.String("error", resp.ErrorMessage),
zap.Int("NumberOfNodes", resp.Stats.NumberOfNodes),
zap.Int("NumberOfURIs", resp.Stats.NumberOfURIs),
zap.Float64("OwnRuntime", resp.Stats.OwnRuntime),
zap.Float64("RepoRuntime", resp.Stats.RepoRuntime),
)
os.Exit(1)
}
}()
@ -61,11 +71,11 @@ func RunServerSocketAndWebServer(
chanErr := make(chan error)
if address != "" {
log.Notice("starting socketserver on: ", address)
Log.Info("starting socketserver", zap.String("address", address))
go runSocketServer(r, address, chanErr)
}
if webserverAddress != "" {
log.Notice("starting webserver on: ", webserverAddress)
Log.Info("starting webserver", zap.String("webserverAddress", webserverAddress))
go runWebserver(r, webserverAddress, webserverPath, chanErr)
}
return <-chanErr
@ -91,24 +101,26 @@ func runSocketServer(
// listen on socket
ln, errListen := net.Listen("tcp", address)
if errListen != nil {
errListenSocket := errors.New("RunSocketServer: could not start the on \"" + address + "\" - error: " + fmt.Sprint(errListen))
log.Error(errListenSocket)
chanErr <- errListenSocket
Log.Error("runSocketServer: could not start",
zap.String("address", address),
zap.Error(errListen),
)
chanErr <- errors.New("runSocketServer: could not start the on \"" + address + "\" - error: " + fmt.Sprint(errListen))
return
}
log.Record("RunSocketServer: started to listen on " + address)
Log.Info("runSocketServer: started listening", zap.String("address", address))
for {
// this blocks until connection or error
conn, err := ln.Accept()
if err != nil {
log.Error("RunSocketServer: could not accept connection" + fmt.Sprint(err))
Log.Error("runSocketServer: could not accept connection", zap.Error(err))
continue
}
log.Debug("new connection")
// a goroutine handles conn so that the loop can accept other connections
go func() {
log.Debug("accepted connection")
Log.Debug("accepted connection", zap.String("source", conn.RemoteAddr().String()))
s.handleConnection(conn)
conn.Close()
// log.Debug("connection closed")

View File

@ -7,7 +7,9 @@ import (
"strconv"
"strings"
"github.com/foomo/contentserver/log"
"go.uber.org/zap"
. "github.com/foomo/contentserver/logger"
"github.com/foomo/contentserver/repo"
"github.com/foomo/contentserver/responses"
"github.com/foomo/contentserver/status"
@ -39,13 +41,10 @@ func extractHandlerAndJSONLentgh(header string) (handler Handler, jsonLength int
}
func (s *socketServer) execute(handler Handler, jsonBytes []byte) (reply []byte) {
if log.SelectedLevel == log.LevelDebug {
log.Debug(" incoming json buffer of length: ", len(jsonBytes))
// log.Debug(" incoming json buffer:", string(jsonBytes))
}
Log.Debug("incoming json buffer", zap.Int("length", len(jsonBytes)))
reply, handlingError := handleRequest(s.repo, handler, jsonBytes, s.metrics)
if handlingError != nil {
log.Error("socketServer.execute handlingError :", handlingError)
Log.Error("socketServer.execute failed", zap.Error(handlingError))
}
return reply
}
@ -53,22 +52,24 @@ func (s *socketServer) execute(handler Handler, jsonBytes []byte) (reply []byte)
func (s *socketServer) writeResponse(conn net.Conn, reply []byte) {
headerBytes := []byte(strconv.Itoa(len(reply)))
reply = append(headerBytes, reply...)
log.Debug(" replying: " + string(reply))
Log.Debug("replying", zap.String("reply", string(reply)))
n, writeError := conn.Write(reply)
if writeError != nil {
log.Error("socketServer.writeResponse: could not write my reply: " + fmt.Sprint(writeError))
Log.Error("socketServer.writeResponse: could not write reply", zap.Error(writeError))
return
}
if n < len(reply) {
log.Error(fmt.Sprintf("socketServer.writeResponse: write too short %q instead of %q", n, len(reply)))
Log.Error("socketServer.writeResponse: write too short",
zap.Int("got", n),
zap.Int("expected", len(reply)),
)
return
}
log.Debug(" replied. waiting for next request on open connection")
Log.Debug("replied. waiting for next request on open connection")
}
func (s *socketServer) handleConnection(conn net.Conn) {
log.Debug("socketServer.handleConnection")
Log.Debug("socketServer.handleConnection")
var (
headerBuffer [1]byte
@ -81,7 +82,7 @@ func (s *socketServer) handleConnection(conn net.Conn) {
// let us read with 1 byte steps on conn until we find "{"
_, readErr := conn.Read(headerBuffer[0:])
if readErr != nil {
log.Debug(" looks like the client closed the connection: ", readErr)
Log.Debug("looks like the client closed the connection", zap.Error(readErr))
return
}
// read next byte
@ -92,16 +93,16 @@ func (s *socketServer) handleConnection(conn net.Conn) {
// reset header
header = ""
if headerErr != nil {
log.Error("invalid request could not read header", headerErr)
Log.Error("invalid request could not read header", zap.Error(headerErr))
encodedErr, encodingErr := encodeReply(responses.NewError(4, "invalid header "+headerErr.Error()))
if encodingErr == nil {
s.writeResponse(conn, encodedErr)
} else {
log.Error("could not respond to invalid request", encodingErr)
Log.Error("could not respond to invalid request", zap.Error(encodingErr))
}
return
}
log.Debug(fmt.Sprintf(" found json with %d bytes", jsonLength))
Log.Debug("found json", zap.Int("length", jsonLength))
if jsonLength > 0 {
var (
@ -120,22 +121,24 @@ func (s *socketServer) handleConnection(conn net.Conn) {
if jsonReadErr != nil {
//@fixme we need to force a read timeout (SetReadDeadline?), if expected jsonLength is lower than really sent bytes (e.g. if client implements protocol wrong)
//@todo should we check for io.EOF here
log.Error(" could not read json - giving up with this client connection" + fmt.Sprint(jsonReadErr))
Log.Error("could not read json - giving up with this client connection", zap.Error(jsonReadErr))
return
}
jsonLengthCurrent += readLength
log.Debug(fmt.Sprintf(" read so far %d of %d bytes in read cycle %d", jsonLengthCurrent, jsonLength, readRound))
Log.Debug("read cycle status",
zap.Int("jsonLengthCurrent", jsonLengthCurrent),
zap.Int("jsonLength", jsonLength),
zap.Int("readRound", readRound),
)
}
if log.SelectedLevel == log.LevelDebug {
log.Debug(" read json, length: ", len(jsonBytes))
// log.Debug(" read json: " + string(jsonBytes))
}
Log.Debug("read json", zap.Int("length", len(jsonBytes)))
s.writeResponse(conn, s.execute(handler, jsonBytes))
// note: connection remains open
continue
}
log.Error("can not read empty json")
Log.Error("can not read empty json")
return
}
// adding to header byte by byte

View File

@ -5,9 +5,10 @@ import (
"net/http"
"strings"
"github.com/foomo/contentserver/log"
"github.com/foomo/contentserver/status"
"go.uber.org/zap"
. "github.com/foomo/contentserver/logger"
"github.com/foomo/contentserver/repo"
)
@ -44,6 +45,6 @@ func (s *webServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
_, err := w.Write(reply)
if err != nil {
log.Error("failed to write webServer reply: ", err)
Log.Error("failed to write webServer reply", zap.Error(err))
}
}

View File

@ -4,15 +4,18 @@ import (
"fmt"
"net/http"
"github.com/foomo/contentserver/log"
. "github.com/foomo/contentserver/logger"
jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"
)
var json = jsoniter.ConfigCompatibleWithStandardLibrary
var (
json = jsoniter.ConfigCompatibleWithStandardLibrary
)
func RunHealthzHandlerListener(address string, serviceName string) {
log.Notice(fmt.Sprintf("starting healthz handler on '%s'" + address))
log.Error(http.ListenAndServe(address, HealthzHandler(serviceName)))
Log.Info(fmt.Sprintf("starting healthz handler on '%s'" + address))
Log.Error("healthz server failed", zap.Error(http.ListenAndServe(address, HealthzHandler(serviceName))))
}
func HealthzHandler(serviceName string) http.Handler {
@ -26,7 +29,7 @@ func HealthzHandler(serviceName string) http.Handler {
h.Handle("/healthz", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
_, err := w.Write(status)
if err != nil {
log.Error("failed to write healthz status: ", err)
Log.Error("failed to write healthz status", zap.Error(err))
}
}))