From 69dec4160553cfc98f4ceba97db748692240967e Mon Sep 17 00:00:00 2001 From: Philipp Mieden Date: Fri, 24 May 2019 13:00:28 +0200 Subject: [PATCH] refactored repo to reuse a bytes.Buffer for updates, to reduce the number of allocations --- Makefile | 3 + repo/history.go | 9 ++- repo/loader.go | 126 ++++++++++++++++++++++++++++----------- repo/repo.go | 80 +++++++++++-------------- testing/client/client.go | 4 +- 5 files changed, 139 insertions(+), 83 deletions(-) diff --git a/Makefile b/Makefile index c667b88..500b18a 100644 --- a/Makefile +++ b/Makefile @@ -58,6 +58,9 @@ run-testserver: run-contentserver: contentserver -var-dir var -webserver-address :9191 -address :9999 http://127.0.0.1:1234 +run-contentserver-freeosmem: + contentserver -var-dir var -webserver-address :9191 -address :9999 -free-os-mem 1 http://127.0.0.1:1234 + run-prometheus: prometheus --config.file=prometheus/prometheus.yml diff --git a/repo/history.go b/repo/history.go index 8d7f579..2b3d67a 100644 --- a/repo/history.go +++ b/repo/history.go @@ -1,8 +1,10 @@ package repo import ( + "bytes" "errors" "fmt" + "io" "io/ioutil" "os" "path" @@ -96,6 +98,9 @@ func (h *history) getCurrentFilename() string { return path.Join(h.varDir, historyRepoJSONPrefix+"current"+historyRepoJSONSuffix) } -func (h *history) getCurrent() (jsonBytes []byte, err error) { - return ioutil.ReadFile(h.getCurrentFilename()) +func (h *history) getCurrent(buf *bytes.Buffer) (err error) { + f, err := os.Open(h.getCurrentFilename()) + defer f.Close() + _, err = io.Copy(buf, f) + return err } diff --git a/repo/loader.go b/repo/loader.go index c8aebe4..ac42e78 100644 --- a/repo/loader.go +++ b/repo/loader.go @@ -3,10 +3,12 @@ package repo import ( "errors" "fmt" - "io/ioutil" + "io" "net/http" "time" + "github.com/mgutz/ansi" + "github.com/foomo/contentserver/content" . "github.com/foomo/contentserver/logger" "github.com/foomo/contentserver/status" @@ -15,17 +17,42 @@ import ( ) var ( - json = jsoniter.ConfigCompatibleWithStandardLibrary + json = jsoniter.ConfigCompatibleWithStandardLibrary + errUpdateRejected = errors.New("update rejected: queue full") ) type updateResponse struct { repoRuntime int64 - jsonBytes []byte err error } func (repo *Repo) updateRoutine() { - for newDimension := range repo.updateChannel { + for { + select { + case resChan := <-repo.updateInProgressChannel: + Log.Info("waiting for update to complete", zap.String("chan", fmt.Sprintf("%p", resChan))) + start := time.Now() + + repoRuntime, errUpdate := repo.update() + if errUpdate != nil { + status.M.UpdatesFailedCounter.WithLabelValues(errUpdate.Error()).Inc() + } + + resChan <- updateResponse{ + repoRuntime: repoRuntime, + err: errUpdate, + } + + duration := time.Since(start) + Log.Info("update completed", zap.Duration("duration", duration), zap.String("chan", fmt.Sprintf("%p", resChan))) + status.M.UpdatesCompletedCounter.WithLabelValues().Inc() + status.M.UpdateDuration.WithLabelValues().Observe(duration.Seconds()) + } + } +} + +func (repo *Repo) dimensionUpdateRoutine() { + for newDimension := range repo.dimensionUpdateChannel { Log.Info("update routine received a new dimension", zap.String("dimension", newDimension.Dimension)) err := repo._updateDimension(newDimension.Dimension, newDimension.Node) @@ -33,18 +60,18 @@ func (repo *Repo) updateRoutine() { if err != nil { Log.Debug("update dimension failed", zap.Error(err)) } - repo.updateDoneChannel <- err + repo.dimensionUpdateDoneChannel <- err } } func (repo *Repo) updateDimension(dimension string, node *content.RepoNode) error { Log.Debug("trying to push dimension into update channel", zap.String("dimension", dimension), zap.String("nodeName", node.Name)) - repo.updateChannel <- &repoDimension{ + repo.dimensionUpdateChannel <- &repoDimension{ Dimension: dimension, Node: node, } Log.Debug("waiting for done signal") - return <-repo.updateDoneChannel + return <-repo.dimensionUpdateDoneChannel } // do not call directly, but only through channel @@ -54,7 +81,7 @@ func (repo *Repo) _updateDimension(dimension string, newNode *content.RepoNode) var ( newDirectory = make(map[string]*content.RepoNode) newURIDirectory = make(map[string]*content.RepoNode) - err = builDirectory(newNode, newDirectory, newURIDirectory) + err = buildDirectory(newNode, newDirectory, newURIDirectory) ) if err != nil { return errors.New("update dimension \"" + dimension + "\" failed when building its directory:: " + err.Error()) @@ -64,22 +91,39 @@ func (repo *Repo) _updateDimension(dimension string, newNode *content.RepoNode) return err } + // --------------------------------------------- + + // collect other dimension in the Directory newRepoDirectory := map[string]*Dimension{} for d, D := range repo.Directory { if d != dimension { newRepoDirectory[d] = D } } + + // add the new dimension newRepoDirectory[dimension] = &Dimension{ Node: newNode, Directory: newDirectory, URIDirectory: newURIDirectory, } repo.Directory = newRepoDirectory + + // --------------------------------------------- + + // @TODO: why not update only the dimension that has changed instead? + // repo.Directory[dimension] = &Dimension{ + // Node: newNode, + // Directory: newDirectory, + // URIDirectory: newURIDirectory, + // } + + // --------------------------------------------- + return nil } -func builDirectory(dirNode *content.RepoNode, directory map[string]*content.RepoNode, uRIDirectory map[string]*content.RepoNode) error { +func buildDirectory(dirNode *content.RepoNode, directory map[string]*content.RepoNode, uRIDirectory map[string]*content.RepoNode) error { // Log.Debug("buildDirectory", zap.String("ID", dirNode.ID)) @@ -94,7 +138,7 @@ func builDirectory(dirNode *content.RepoNode, directory map[string]*content.Repo } uRIDirectory[dirNode.URI] = dirNode for _, childNode := range dirNode.Nodes { - err := builDirectory(childNode, directory, uRIDirectory) + err := buildDirectory(childNode, directory, uRIDirectory) if err != nil { return err } @@ -115,79 +159,93 @@ func wireAliases(directory map[string]*content.RepoNode) error { return nil } -func loadNodesFromJSON(jsonBytes []byte) (nodes map[string]*content.RepoNode, err error) { +func (repo *Repo) loadNodesFromJSON() (nodes map[string]*content.RepoNode, err error) { nodes = make(map[string]*content.RepoNode) - err = json.Unmarshal(jsonBytes, &nodes) + err = json.Unmarshal(repo.jsonBuf.Bytes(), &nodes) return nodes, err } -func (repo *Repo) tryToRestoreCurrent() error { - currentJSONBytes, err := repo.history.getCurrent() +func (repo *Repo) tryToRestoreCurrent() (err error) { + err = repo.history.getCurrent(&repo.jsonBuf) if err != nil { return err } - return repo.loadJSONBytes(currentJSONBytes) + return repo.loadJSONBytes() } -func get(URL string) (data []byte, err error) { +func (repo *Repo) get(URL string) (err error) { response, err := http.Get(URL) if err != nil { - return data, err + return err } defer response.Body.Close() if response.StatusCode != http.StatusOK { - return data, fmt.Errorf("Bad HTTP Response: %q", response.Status) + return fmt.Errorf("Bad HTTP Response: %q", response.Status) } - return ioutil.ReadAll(response.Body) + + Log.Info(ansi.Red + "RESETTING BUFFER" + ansi.Reset) + repo.jsonBuf.Reset() + + Log.Info(ansi.Green + "LOADING DATA INTO BUFFER" + ansi.Reset) + _, err = io.Copy(&repo.jsonBuf, response.Body) + return err } -func (repo *Repo) update() (repoRuntime int64, jsonBytes []byte, err error) { +func (repo *Repo) update() (repoRuntime int64, err error) { startTimeRepo := time.Now().UnixNano() - jsonBytes, err = get(repo.server) + err = repo.get(repo.server) repoRuntime = time.Now().UnixNano() - startTimeRepo if err != nil { // we have no json to load - the repo server did not reply Log.Debug("failed to load json", zap.Error(err)) - return repoRuntime, jsonBytes, err + return repoRuntime, err } - Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(jsonBytes))) - nodes, err := loadNodesFromJSON(jsonBytes) + Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(repo.jsonBuf.Bytes()))) + nodes, err := repo.loadNodesFromJSON() if err != nil { // could not load nodes from json - return repoRuntime, jsonBytes, err + return repoRuntime, err } err = repo.loadNodes(nodes) if err != nil { // repo failed to load nodes - return repoRuntime, jsonBytes, err + return repoRuntime, err } - return repoRuntime, jsonBytes, nil + return repoRuntime, nil } // limit ressources and allow only one update request at once -func (repo *Repo) tryUpdate() (repoRuntime int64, jsonBytes []byte, err error) { +func (repo *Repo) tryUpdate() (repoRuntime int64, err error) { c := make(chan updateResponse) select { case repo.updateInProgressChannel <- c: Log.Info("update request added to queue") ur := <-c - return ur.repoRuntime, ur.jsonBytes, ur.err + return ur.repoRuntime, ur.err default: Log.Info("update request rejected, queue is full") status.M.UpdatesRejectedCounter.WithLabelValues().Inc() - return 0, nil, errors.New("update rejected: queue full") + return 0, errUpdateRejected } } -func (repo *Repo) loadJSONBytes(jsonBytes []byte) error { - nodes, err := loadNodesFromJSON(jsonBytes) +func (repo *Repo) loadJSONBytes() error { + nodes, err := repo.loadNodesFromJSON() if err != nil { - Log.Debug("could not parse json", zap.String("json", string(jsonBytes))) + data := repo.jsonBuf.Bytes() + + if len(data) > 10 { + Log.Debug("could not parse json", + zap.String("jsonStart", string(data[:10])), + zap.String("jsonStart", string(data[len(data)-10:])), + ) + } return err } + err = repo.loadNodes(nodes) if err == nil { - historyErr := repo.history.add(jsonBytes) + historyErr := repo.history.add(repo.jsonBuf.Bytes()) if historyErr != nil { Log.Error("could not add valid json to history", zap.Error(historyErr)) } else { diff --git a/repo/repo.go b/repo/repo.go index c676d41..9acbe89 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -1,12 +1,14 @@ package repo import ( + "bytes" "errors" "fmt" + "strconv" "strings" "time" - "github.com/foomo/contentserver/status" + "github.com/mgutz/ansi" "github.com/foomo/contentserver/content" . "github.com/foomo/contentserver/logger" @@ -29,10 +31,14 @@ type Repo struct { server string Directory map[string]*Dimension // updateLock sync.Mutex - updateChannel chan *repoDimension - updateDoneChannel chan error + dimensionUpdateChannel chan *repoDimension + dimensionUpdateDoneChannel chan error + history *history updateInProgressChannel chan chan updateResponse + + // jsonBytes []byte + jsonBuf bytes.Buffer } type repoDimension struct { @@ -48,40 +54,17 @@ func NewRepo(server string, varDir string) *Repo { zap.String("varDir", varDir), ) repo := &Repo{ - server: server, - Directory: map[string]*Dimension{}, - history: newHistory(varDir), - updateChannel: make(chan *repoDimension), - updateDoneChannel: make(chan error), - updateInProgressChannel: make(chan chan updateResponse, 1), + server: server, + Directory: map[string]*Dimension{}, + history: newHistory(varDir), + dimensionUpdateChannel: make(chan *repoDimension), + dimensionUpdateDoneChannel: make(chan error), + updateInProgressChannel: make(chan chan updateResponse, 0), } - go func() { - for { - select { - case resChan := <-repo.updateInProgressChannel: - Log.Info("waiting for update to complete") - start := time.Now() - repoRuntime, jsonBytes, errUpdate := repo.update() - - if errUpdate != nil { - status.M.UpdatesFailedCounter.WithLabelValues(errUpdate.Error()).Inc() - } - - resChan <- updateResponse{ - repoRuntime: repoRuntime, - jsonBytes: jsonBytes, - err: errUpdate, - } - - duration := time.Since(start) - Log.Info("update completed", zap.Duration("duration", duration)) - status.M.UpdatesCompletedCounter.WithLabelValues().Inc() - status.M.UpdateDuration.WithLabelValues().Observe(duration.Seconds()) - } - } - }() go repo.updateRoutine() + go repo.dimensionUpdateRoutine() + Log.Info("trying to restore previous state") restoreErr := repo.tryToRestoreCurrent() if restoreErr != nil { @@ -235,30 +218,37 @@ func (repo *Repo) Update() (updateResponse *responses.Update) { floatSeconds := func(nanoSeconds int64) float64 { return float64(float64(nanoSeconds) / float64(1000000000.0)) } - startTime := time.Now().UnixNano() - updateRepotime, jsonBytes, updateErr := repo.tryUpdate() - updateResponse = &responses.Update{} - updateResponse.Stats.RepoRuntime = floatSeconds(updateRepotime) Log.Info("Update triggered") + Log.Info(ansi.Yellow + "BUFFER LENGTH BEFORE tryUpdate(): " + strconv.Itoa(len(repo.jsonBuf.Bytes())) + ansi.Reset) + + startTime := time.Now().UnixNano() + updateRepotime, updateErr := repo.tryUpdate() + updateResponse = &responses.Update{} + updateResponse.Stats.RepoRuntime = floatSeconds(updateRepotime) if updateErr != nil { updateResponse.Success = false updateResponse.Stats.NumberOfNodes = -1 updateResponse.Stats.NumberOfURIs = -1 // let us try to restore the world from a file - Log.Error("could not update repository:" + updateErr.Error()) + Log.Error("could not update repository:", zap.Error(updateErr)) + Log.Info(ansi.Yellow + "BUFFER LENGTH AFTER ERROR: " + strconv.Itoa(len(repo.jsonBuf.Bytes())) + ansi.Reset) updateResponse.ErrorMessage = updateErr.Error() - restoreErr := repo.tryToRestoreCurrent() - if restoreErr != nil { - Log.Error("failed to restore preceding repo version", zap.Error(restoreErr)) - } else { - Log.Info("restored current repo from local history") + + // only try to restore if the update failed during processing + if updateErr != errUpdateRejected { + restoreErr := repo.tryToRestoreCurrent() + if restoreErr != nil { + Log.Error("failed to restore preceding repo version", zap.Error(restoreErr)) + } else { + Log.Info("restored current repo from local history") + } } } else { updateResponse.Success = true // persist the currently loaded one - historyErr := repo.history.add(jsonBytes) + historyErr := repo.history.add(repo.jsonBuf.Bytes()) if historyErr != nil { Log.Warn("could not persist current repo in history", zap.Error(historyErr)) } diff --git a/testing/client/client.go b/testing/client/client.go index 1023dce..4e8b830 100644 --- a/testing/client/client.go +++ b/testing/client/client.go @@ -15,7 +15,7 @@ func main() { log.Fatal(errClient) } - for i := 1; i <= 50; i++ { + for i := 1; i <= 150; i++ { go func(num int) { log.Println("start update") resp, errUpdate := c.Update() @@ -25,7 +25,7 @@ func main() { } log.Println(num, "update done", resp) }(i) - time.Sleep(1 * time.Second) + time.Sleep(2 * time.Second) } log.Println("done")