From e874cc2b168712c26a39bdc85bb01e19fce53a32 Mon Sep 17 00:00:00 2001 From: Philipp Mieden Date: Thu, 23 May 2019 10:35:30 +0200 Subject: [PATCH] implemented queuing update requests and canceling new ones when the queue is full --- Makefile | 2 +- repo/loader.go | 38 ++++++++++++++++++++++++---------- repo/repo.go | 44 ++++++++++++++++++++++++++-------------- server/server.go | 23 ++++++++++++++++----- testing/client/client.go | 2 +- 5 files changed, 76 insertions(+), 33 deletions(-) diff --git a/Makefile b/Makefile index 6089af2..1e079d9 100644 --- a/Makefile +++ b/Makefile @@ -56,7 +56,7 @@ run-testserver: bin/testserver -json-file var/cse-globus-stage-b-with-main-section.json run-contentserver: - contentserver -var-dir var -webserver-address :9191 -address :9999 -log-level debug http://127.0.0.1:1234 + contentserver -var-dir var -webserver-address :9191 -address :9999 -log-level notice http://127.0.0.1:1234 clean-var: rm var/contentserver-repo-2019* diff --git a/repo/loader.go b/repo/loader.go index 783bd95..a151efc 100644 --- a/repo/loader.go +++ b/repo/loader.go @@ -17,14 +17,15 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary func (repo *Repo) updateRoutine() { go func() { for newDimension := range repo.updateChannel { - log.Debug("update routine received a new dimension: " + newDimension.Dimension) + log.Notice("update routine received a new dimension: " + newDimension.Dimension) err := repo._updateDimension(newDimension.Dimension, newDimension.Node) - log.Debug("update routine received result") + log.Notice("update routine received result") if err != nil { log.Debug(" update routine error: " + err.Error()) } repo.updateDoneChannel <- err + repo.updateCompleteChannel <- true } }() } @@ -110,11 +111,19 @@ func loadNodesFromJSON(jsonBytes []byte) (nodes map[string]*content.RepoNode, er } func (repo *Repo) tryToRestoreCurrent() error { - currentJSONBytes, err := repo.history.getCurrent() - if err != nil { - return err + + select { + case repo.updateInProgressChannel <- time.Now(): + log.Notice("update request added to queue") + currentJSONBytes, err := repo.history.getCurrent() + if err != nil { + return err + } + return repo.loadJSONBytes(currentJSONBytes) + default: + log.Notice("invalidation request ignored, queue seems to be full") + return errors.New("queue full") } - return repo.loadJSONBytes(currentJSONBytes) } func get(URL string) (data []byte, err error) { @@ -130,11 +139,6 @@ func get(URL string) (data []byte, err error) { } func (repo *Repo) update() (repoRuntime int64, jsonBytes []byte, err error) { - - // limit ressources and allow only one update request at once - repo.updateLock.Lock() - defer repo.updateLock.Unlock() - startTimeRepo := time.Now().UnixNano() jsonBytes, err = get(repo.server) repoRuntime = time.Now().UnixNano() - startTimeRepo @@ -157,6 +161,18 @@ func (repo *Repo) update() (repoRuntime int64, jsonBytes []byte, err error) { return repoRuntime, jsonBytes, nil } +// limit ressources and allow only one update request at once +func (repo *Repo) tryUpdate() (repoRuntime int64, jsonBytes []byte, err error) { + select { + case repo.updateInProgressChannel <- time.Now(): + log.Notice("update request added to queue") + return repo.update() + default: + log.Notice("invalidation request ignored, queue seems to be full") + return 0, nil, errors.New("queue full") + } +} + func (repo *Repo) loadJSONBytes(jsonBytes []byte) error { nodes, err := loadNodesFromJSON(jsonBytes) if err != nil { diff --git a/repo/repo.go b/repo/repo.go index b2cb75b..a654162 100644 --- a/repo/repo.go +++ b/repo/repo.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "strings" - "sync" "time" "github.com/foomo/contentserver/content" @@ -24,12 +23,14 @@ type Dimension struct { // Repo content repositiory type Repo struct { - server string - Directory map[string]*Dimension - updateLock *sync.Mutex - updateChannel chan *repoDimension - updateDoneChannel chan error - history *history + server string + Directory map[string]*Dimension + // updateLock sync.Mutex + updateChannel chan *repoDimension + updateDoneChannel chan error + history *history + updateInProgressChannel chan time.Time + updateCompleteChannel chan bool } type repoDimension struct { @@ -42,15 +43,26 @@ func NewRepo(server string, varDir string) *Repo { log.Notice("creating new repo for " + server) log.Notice(" using var dir:" + varDir) repo := &Repo{ - server: server, - Directory: map[string]*Dimension{}, - history: newHistory(varDir), - updateLock: &sync.Mutex{}, - updateChannel: make(chan *repoDimension), - updateDoneChannel: make(chan error), + server: server, + Directory: map[string]*Dimension{}, + history: newHistory(varDir), + updateChannel: make(chan *repoDimension), + updateDoneChannel: make(chan error), + updateInProgressChannel: make(chan time.Time, 1), + updateCompleteChannel: make(chan bool), } + go func() { + for { + select { + case t := <-repo.updateInProgressChannel: + log.Notice("got timestamp: ", t, "waiting for update to complete") + <-repo.updateCompleteChannel + log.Notice("update completed!") + } + } + }() go repo.updateRoutine() - log.Record("trying to restore pervious state") + log.Record("trying to restore previous state") restoreErr := repo.tryToRestoreCurrent() if restoreErr != nil { log.Record(" could not restore previous repo content:" + restoreErr.Error()) @@ -194,10 +206,12 @@ func (repo *Repo) Update() (updateResponse *responses.Update) { return float64(float64(nanoSeconds) / float64(1000000000.0)) } startTime := time.Now().UnixNano() - updateRepotime, jsonBytes, updateErr := repo.update() + updateRepotime, jsonBytes, updateErr := repo.tryUpdate() updateResponse = &responses.Update{} updateResponse.Stats.RepoRuntime = floatSeconds(updateRepotime) + log.Notice("Update triggered") + if updateErr != nil { updateResponse.Success = false updateResponse.Stats.NumberOfNodes = -1 diff --git a/server/server.go b/server/server.go index bf7e249..59b1b73 100644 --- a/server/server.go +++ b/server/server.go @@ -5,6 +5,7 @@ import ( "fmt" "net" "net/http" + "os" "github.com/foomo/contentserver/log" "github.com/foomo/contentserver/repo" @@ -40,23 +41,35 @@ func Run(server string, address string, varDir string) error { func RunServerSocketAndWebServer( server string, address string, - webserverAdresss string, + webserverAddress string, webserverPath string, varDir string, ) error { - if address == "" && webserverAdresss == "" { + if address == "" && webserverAddress == "" { return errors.New("one of the addresses needs to be set") } log.Record("building repo with content from " + server) r := repo.NewRepo(server, varDir) - go r.Update() + + // start initial update and handle error + go func() { + resp := r.Update() + if !resp.Success { + log.Error("failed to update: ", resp) + os.Exit(1) + } + }() + // update can run in bg chanErr := make(chan error) + if address != "" { + log.Notice("starting socketserver on: ", address) go runSocketServer(r, address, chanErr) } - if webserverAdresss != "" { - go runWebserver(r, webserverAdresss, webserverPath, chanErr) + if webserverAddress != "" { + log.Notice("starting webserver on: ", webserverAddress) + go runWebserver(r, webserverAddress, webserverPath, chanErr) } return <-chanErr } diff --git a/testing/client/client.go b/testing/client/client.go index eb31011..1023dce 100644 --- a/testing/client/client.go +++ b/testing/client/client.go @@ -25,7 +25,7 @@ func main() { } log.Println(num, "update done", resp) }(i) - time.Sleep(5 * time.Second) + time.Sleep(1 * time.Second) } log.Println("done")