refactored repo to reuse a bytes.Buffer for updates, to reduce the number of allocations

This commit is contained in:
Philipp Mieden 2019-05-24 13:00:28 +02:00
parent 5cff674940
commit 69dec41605
5 changed files with 139 additions and 83 deletions

View File

@ -58,6 +58,9 @@ run-testserver:
run-contentserver: run-contentserver:
contentserver -var-dir var -webserver-address :9191 -address :9999 http://127.0.0.1:1234 contentserver -var-dir var -webserver-address :9191 -address :9999 http://127.0.0.1:1234
run-contentserver-freeosmem:
contentserver -var-dir var -webserver-address :9191 -address :9999 -free-os-mem 1 http://127.0.0.1:1234
run-prometheus: run-prometheus:
prometheus --config.file=prometheus/prometheus.yml prometheus --config.file=prometheus/prometheus.yml

View File

@ -1,8 +1,10 @@
package repo package repo
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"os" "os"
"path" "path"
@ -96,6 +98,9 @@ func (h *history) getCurrentFilename() string {
return path.Join(h.varDir, historyRepoJSONPrefix+"current"+historyRepoJSONSuffix) return path.Join(h.varDir, historyRepoJSONPrefix+"current"+historyRepoJSONSuffix)
} }
func (h *history) getCurrent() (jsonBytes []byte, err error) { func (h *history) getCurrent(buf *bytes.Buffer) (err error) {
return ioutil.ReadFile(h.getCurrentFilename()) f, err := os.Open(h.getCurrentFilename())
defer f.Close()
_, err = io.Copy(buf, f)
return err
} }

View File

@ -3,10 +3,12 @@ package repo
import ( import (
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io"
"net/http" "net/http"
"time" "time"
"github.com/mgutz/ansi"
"github.com/foomo/contentserver/content" "github.com/foomo/contentserver/content"
. "github.com/foomo/contentserver/logger" . "github.com/foomo/contentserver/logger"
"github.com/foomo/contentserver/status" "github.com/foomo/contentserver/status"
@ -15,17 +17,42 @@ import (
) )
var ( var (
json = jsoniter.ConfigCompatibleWithStandardLibrary json = jsoniter.ConfigCompatibleWithStandardLibrary
errUpdateRejected = errors.New("update rejected: queue full")
) )
type updateResponse struct { type updateResponse struct {
repoRuntime int64 repoRuntime int64
jsonBytes []byte
err error err error
} }
func (repo *Repo) updateRoutine() { func (repo *Repo) updateRoutine() {
for newDimension := range repo.updateChannel { for {
select {
case resChan := <-repo.updateInProgressChannel:
Log.Info("waiting for update to complete", zap.String("chan", fmt.Sprintf("%p", resChan)))
start := time.Now()
repoRuntime, errUpdate := repo.update()
if errUpdate != nil {
status.M.UpdatesFailedCounter.WithLabelValues(errUpdate.Error()).Inc()
}
resChan <- updateResponse{
repoRuntime: repoRuntime,
err: errUpdate,
}
duration := time.Since(start)
Log.Info("update completed", zap.Duration("duration", duration), zap.String("chan", fmt.Sprintf("%p", resChan)))
status.M.UpdatesCompletedCounter.WithLabelValues().Inc()
status.M.UpdateDuration.WithLabelValues().Observe(duration.Seconds())
}
}
}
func (repo *Repo) dimensionUpdateRoutine() {
for newDimension := range repo.dimensionUpdateChannel {
Log.Info("update routine received a new dimension", zap.String("dimension", newDimension.Dimension)) Log.Info("update routine received a new dimension", zap.String("dimension", newDimension.Dimension))
err := repo._updateDimension(newDimension.Dimension, newDimension.Node) err := repo._updateDimension(newDimension.Dimension, newDimension.Node)
@ -33,18 +60,18 @@ func (repo *Repo) updateRoutine() {
if err != nil { if err != nil {
Log.Debug("update dimension failed", zap.Error(err)) Log.Debug("update dimension failed", zap.Error(err))
} }
repo.updateDoneChannel <- err repo.dimensionUpdateDoneChannel <- err
} }
} }
func (repo *Repo) updateDimension(dimension string, node *content.RepoNode) error { func (repo *Repo) updateDimension(dimension string, node *content.RepoNode) error {
Log.Debug("trying to push dimension into update channel", zap.String("dimension", dimension), zap.String("nodeName", node.Name)) Log.Debug("trying to push dimension into update channel", zap.String("dimension", dimension), zap.String("nodeName", node.Name))
repo.updateChannel <- &repoDimension{ repo.dimensionUpdateChannel <- &repoDimension{
Dimension: dimension, Dimension: dimension,
Node: node, Node: node,
} }
Log.Debug("waiting for done signal") Log.Debug("waiting for done signal")
return <-repo.updateDoneChannel return <-repo.dimensionUpdateDoneChannel
} }
// do not call directly, but only through channel // do not call directly, but only through channel
@ -54,7 +81,7 @@ func (repo *Repo) _updateDimension(dimension string, newNode *content.RepoNode)
var ( var (
newDirectory = make(map[string]*content.RepoNode) newDirectory = make(map[string]*content.RepoNode)
newURIDirectory = make(map[string]*content.RepoNode) newURIDirectory = make(map[string]*content.RepoNode)
err = builDirectory(newNode, newDirectory, newURIDirectory) err = buildDirectory(newNode, newDirectory, newURIDirectory)
) )
if err != nil { if err != nil {
return errors.New("update dimension \"" + dimension + "\" failed when building its directory:: " + err.Error()) return errors.New("update dimension \"" + dimension + "\" failed when building its directory:: " + err.Error())
@ -64,22 +91,39 @@ func (repo *Repo) _updateDimension(dimension string, newNode *content.RepoNode)
return err return err
} }
// ---------------------------------------------
// collect other dimension in the Directory
newRepoDirectory := map[string]*Dimension{} newRepoDirectory := map[string]*Dimension{}
for d, D := range repo.Directory { for d, D := range repo.Directory {
if d != dimension { if d != dimension {
newRepoDirectory[d] = D newRepoDirectory[d] = D
} }
} }
// add the new dimension
newRepoDirectory[dimension] = &Dimension{ newRepoDirectory[dimension] = &Dimension{
Node: newNode, Node: newNode,
Directory: newDirectory, Directory: newDirectory,
URIDirectory: newURIDirectory, URIDirectory: newURIDirectory,
} }
repo.Directory = newRepoDirectory repo.Directory = newRepoDirectory
// ---------------------------------------------
// @TODO: why not update only the dimension that has changed instead?
// repo.Directory[dimension] = &Dimension{
// Node: newNode,
// Directory: newDirectory,
// URIDirectory: newURIDirectory,
// }
// ---------------------------------------------
return nil return nil
} }
func builDirectory(dirNode *content.RepoNode, directory map[string]*content.RepoNode, uRIDirectory map[string]*content.RepoNode) error { func buildDirectory(dirNode *content.RepoNode, directory map[string]*content.RepoNode, uRIDirectory map[string]*content.RepoNode) error {
// Log.Debug("buildDirectory", zap.String("ID", dirNode.ID)) // Log.Debug("buildDirectory", zap.String("ID", dirNode.ID))
@ -94,7 +138,7 @@ func builDirectory(dirNode *content.RepoNode, directory map[string]*content.Repo
} }
uRIDirectory[dirNode.URI] = dirNode uRIDirectory[dirNode.URI] = dirNode
for _, childNode := range dirNode.Nodes { for _, childNode := range dirNode.Nodes {
err := builDirectory(childNode, directory, uRIDirectory) err := buildDirectory(childNode, directory, uRIDirectory)
if err != nil { if err != nil {
return err return err
} }
@ -115,79 +159,93 @@ func wireAliases(directory map[string]*content.RepoNode) error {
return nil return nil
} }
func loadNodesFromJSON(jsonBytes []byte) (nodes map[string]*content.RepoNode, err error) { func (repo *Repo) loadNodesFromJSON() (nodes map[string]*content.RepoNode, err error) {
nodes = make(map[string]*content.RepoNode) nodes = make(map[string]*content.RepoNode)
err = json.Unmarshal(jsonBytes, &nodes) err = json.Unmarshal(repo.jsonBuf.Bytes(), &nodes)
return nodes, err return nodes, err
} }
func (repo *Repo) tryToRestoreCurrent() error { func (repo *Repo) tryToRestoreCurrent() (err error) {
currentJSONBytes, err := repo.history.getCurrent() err = repo.history.getCurrent(&repo.jsonBuf)
if err != nil { if err != nil {
return err return err
} }
return repo.loadJSONBytes(currentJSONBytes) return repo.loadJSONBytes()
} }
func get(URL string) (data []byte, err error) { func (repo *Repo) get(URL string) (err error) {
response, err := http.Get(URL) response, err := http.Get(URL)
if err != nil { if err != nil {
return data, err return err
} }
defer response.Body.Close() defer response.Body.Close()
if response.StatusCode != http.StatusOK { if response.StatusCode != http.StatusOK {
return data, fmt.Errorf("Bad HTTP Response: %q", response.Status) return fmt.Errorf("Bad HTTP Response: %q", response.Status)
} }
return ioutil.ReadAll(response.Body)
Log.Info(ansi.Red + "RESETTING BUFFER" + ansi.Reset)
repo.jsonBuf.Reset()
Log.Info(ansi.Green + "LOADING DATA INTO BUFFER" + ansi.Reset)
_, err = io.Copy(&repo.jsonBuf, response.Body)
return err
} }
func (repo *Repo) update() (repoRuntime int64, jsonBytes []byte, err error) { func (repo *Repo) update() (repoRuntime int64, err error) {
startTimeRepo := time.Now().UnixNano() startTimeRepo := time.Now().UnixNano()
jsonBytes, err = get(repo.server) err = repo.get(repo.server)
repoRuntime = time.Now().UnixNano() - startTimeRepo repoRuntime = time.Now().UnixNano() - startTimeRepo
if err != nil { if err != nil {
// we have no json to load - the repo server did not reply // we have no json to load - the repo server did not reply
Log.Debug("failed to load json", zap.Error(err)) Log.Debug("failed to load json", zap.Error(err))
return repoRuntime, jsonBytes, err return repoRuntime, err
} }
Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(jsonBytes))) Log.Debug("loading json", zap.String("server", repo.server), zap.Int("length", len(repo.jsonBuf.Bytes())))
nodes, err := loadNodesFromJSON(jsonBytes) nodes, err := repo.loadNodesFromJSON()
if err != nil { if err != nil {
// could not load nodes from json // could not load nodes from json
return repoRuntime, jsonBytes, err return repoRuntime, err
} }
err = repo.loadNodes(nodes) err = repo.loadNodes(nodes)
if err != nil { if err != nil {
// repo failed to load nodes // repo failed to load nodes
return repoRuntime, jsonBytes, err return repoRuntime, err
} }
return repoRuntime, jsonBytes, nil return repoRuntime, nil
} }
// limit ressources and allow only one update request at once // limit ressources and allow only one update request at once
func (repo *Repo) tryUpdate() (repoRuntime int64, jsonBytes []byte, err error) { func (repo *Repo) tryUpdate() (repoRuntime int64, err error) {
c := make(chan updateResponse) c := make(chan updateResponse)
select { select {
case repo.updateInProgressChannel <- c: case repo.updateInProgressChannel <- c:
Log.Info("update request added to queue") Log.Info("update request added to queue")
ur := <-c ur := <-c
return ur.repoRuntime, ur.jsonBytes, ur.err return ur.repoRuntime, ur.err
default: default:
Log.Info("update request rejected, queue is full") Log.Info("update request rejected, queue is full")
status.M.UpdatesRejectedCounter.WithLabelValues().Inc() status.M.UpdatesRejectedCounter.WithLabelValues().Inc()
return 0, nil, errors.New("update rejected: queue full") return 0, errUpdateRejected
} }
} }
func (repo *Repo) loadJSONBytes(jsonBytes []byte) error { func (repo *Repo) loadJSONBytes() error {
nodes, err := loadNodesFromJSON(jsonBytes) nodes, err := repo.loadNodesFromJSON()
if err != nil { if err != nil {
Log.Debug("could not parse json", zap.String("json", string(jsonBytes))) data := repo.jsonBuf.Bytes()
if len(data) > 10 {
Log.Debug("could not parse json",
zap.String("jsonStart", string(data[:10])),
zap.String("jsonStart", string(data[len(data)-10:])),
)
}
return err return err
} }
err = repo.loadNodes(nodes) err = repo.loadNodes(nodes)
if err == nil { if err == nil {
historyErr := repo.history.add(jsonBytes) historyErr := repo.history.add(repo.jsonBuf.Bytes())
if historyErr != nil { if historyErr != nil {
Log.Error("could not add valid json to history", zap.Error(historyErr)) Log.Error("could not add valid json to history", zap.Error(historyErr))
} else { } else {

View File

@ -1,12 +1,14 @@
package repo package repo
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"strconv"
"strings" "strings"
"time" "time"
"github.com/foomo/contentserver/status" "github.com/mgutz/ansi"
"github.com/foomo/contentserver/content" "github.com/foomo/contentserver/content"
. "github.com/foomo/contentserver/logger" . "github.com/foomo/contentserver/logger"
@ -29,10 +31,14 @@ type Repo struct {
server string server string
Directory map[string]*Dimension Directory map[string]*Dimension
// updateLock sync.Mutex // updateLock sync.Mutex
updateChannel chan *repoDimension dimensionUpdateChannel chan *repoDimension
updateDoneChannel chan error dimensionUpdateDoneChannel chan error
history *history history *history
updateInProgressChannel chan chan updateResponse updateInProgressChannel chan chan updateResponse
// jsonBytes []byte
jsonBuf bytes.Buffer
} }
type repoDimension struct { type repoDimension struct {
@ -48,40 +54,17 @@ func NewRepo(server string, varDir string) *Repo {
zap.String("varDir", varDir), zap.String("varDir", varDir),
) )
repo := &Repo{ repo := &Repo{
server: server, server: server,
Directory: map[string]*Dimension{}, Directory: map[string]*Dimension{},
history: newHistory(varDir), history: newHistory(varDir),
updateChannel: make(chan *repoDimension), dimensionUpdateChannel: make(chan *repoDimension),
updateDoneChannel: make(chan error), dimensionUpdateDoneChannel: make(chan error),
updateInProgressChannel: make(chan chan updateResponse, 1), updateInProgressChannel: make(chan chan updateResponse, 0),
} }
go func() {
for {
select {
case resChan := <-repo.updateInProgressChannel:
Log.Info("waiting for update to complete")
start := time.Now()
repoRuntime, jsonBytes, errUpdate := repo.update()
if errUpdate != nil {
status.M.UpdatesFailedCounter.WithLabelValues(errUpdate.Error()).Inc()
}
resChan <- updateResponse{
repoRuntime: repoRuntime,
jsonBytes: jsonBytes,
err: errUpdate,
}
duration := time.Since(start)
Log.Info("update completed", zap.Duration("duration", duration))
status.M.UpdatesCompletedCounter.WithLabelValues().Inc()
status.M.UpdateDuration.WithLabelValues().Observe(duration.Seconds())
}
}
}()
go repo.updateRoutine() go repo.updateRoutine()
go repo.dimensionUpdateRoutine()
Log.Info("trying to restore previous state") Log.Info("trying to restore previous state")
restoreErr := repo.tryToRestoreCurrent() restoreErr := repo.tryToRestoreCurrent()
if restoreErr != nil { if restoreErr != nil {
@ -235,30 +218,37 @@ func (repo *Repo) Update() (updateResponse *responses.Update) {
floatSeconds := func(nanoSeconds int64) float64 { floatSeconds := func(nanoSeconds int64) float64 {
return float64(float64(nanoSeconds) / float64(1000000000.0)) return float64(float64(nanoSeconds) / float64(1000000000.0))
} }
startTime := time.Now().UnixNano()
updateRepotime, jsonBytes, updateErr := repo.tryUpdate()
updateResponse = &responses.Update{}
updateResponse.Stats.RepoRuntime = floatSeconds(updateRepotime)
Log.Info("Update triggered") Log.Info("Update triggered")
Log.Info(ansi.Yellow + "BUFFER LENGTH BEFORE tryUpdate(): " + strconv.Itoa(len(repo.jsonBuf.Bytes())) + ansi.Reset)
startTime := time.Now().UnixNano()
updateRepotime, updateErr := repo.tryUpdate()
updateResponse = &responses.Update{}
updateResponse.Stats.RepoRuntime = floatSeconds(updateRepotime)
if updateErr != nil { if updateErr != nil {
updateResponse.Success = false updateResponse.Success = false
updateResponse.Stats.NumberOfNodes = -1 updateResponse.Stats.NumberOfNodes = -1
updateResponse.Stats.NumberOfURIs = -1 updateResponse.Stats.NumberOfURIs = -1
// let us try to restore the world from a file // let us try to restore the world from a file
Log.Error("could not update repository:" + updateErr.Error()) Log.Error("could not update repository:", zap.Error(updateErr))
Log.Info(ansi.Yellow + "BUFFER LENGTH AFTER ERROR: " + strconv.Itoa(len(repo.jsonBuf.Bytes())) + ansi.Reset)
updateResponse.ErrorMessage = updateErr.Error() updateResponse.ErrorMessage = updateErr.Error()
restoreErr := repo.tryToRestoreCurrent()
if restoreErr != nil { // only try to restore if the update failed during processing
Log.Error("failed to restore preceding repo version", zap.Error(restoreErr)) if updateErr != errUpdateRejected {
} else { restoreErr := repo.tryToRestoreCurrent()
Log.Info("restored current repo from local history") if restoreErr != nil {
Log.Error("failed to restore preceding repo version", zap.Error(restoreErr))
} else {
Log.Info("restored current repo from local history")
}
} }
} else { } else {
updateResponse.Success = true updateResponse.Success = true
// persist the currently loaded one // persist the currently loaded one
historyErr := repo.history.add(jsonBytes) historyErr := repo.history.add(repo.jsonBuf.Bytes())
if historyErr != nil { if historyErr != nil {
Log.Warn("could not persist current repo in history", zap.Error(historyErr)) Log.Warn("could not persist current repo in history", zap.Error(historyErr))
} }

View File

@ -15,7 +15,7 @@ func main() {
log.Fatal(errClient) log.Fatal(errClient)
} }
for i := 1; i <= 50; i++ { for i := 1; i <= 150; i++ {
go func(num int) { go func(num int) {
log.Println("start update") log.Println("start update")
resp, errUpdate := c.Update() resp, errUpdate := c.Update()
@ -25,7 +25,7 @@ func main() {
} }
log.Println(num, "update done", resp) log.Println(num, "update done", resp)
}(i) }(i)
time.Sleep(1 * time.Second) time.Sleep(2 * time.Second)
} }
log.Println("done") log.Println("done")