mirror of
https://github.com/foomo/contentserver.git
synced 2025-10-16 12:25:44 +00:00
implemented queuing update requests and canceling new ones when the queue is full
This commit is contained in:
parent
2decb53ec1
commit
e874cc2b16
2
Makefile
2
Makefile
@ -56,7 +56,7 @@ run-testserver:
|
|||||||
bin/testserver -json-file var/cse-globus-stage-b-with-main-section.json
|
bin/testserver -json-file var/cse-globus-stage-b-with-main-section.json
|
||||||
|
|
||||||
run-contentserver:
|
run-contentserver:
|
||||||
contentserver -var-dir var -webserver-address :9191 -address :9999 -log-level debug http://127.0.0.1:1234
|
contentserver -var-dir var -webserver-address :9191 -address :9999 -log-level notice http://127.0.0.1:1234
|
||||||
|
|
||||||
clean-var:
|
clean-var:
|
||||||
rm var/contentserver-repo-2019*
|
rm var/contentserver-repo-2019*
|
||||||
|
|||||||
@ -17,14 +17,15 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary
|
|||||||
func (repo *Repo) updateRoutine() {
|
func (repo *Repo) updateRoutine() {
|
||||||
go func() {
|
go func() {
|
||||||
for newDimension := range repo.updateChannel {
|
for newDimension := range repo.updateChannel {
|
||||||
log.Debug("update routine received a new dimension: " + newDimension.Dimension)
|
log.Notice("update routine received a new dimension: " + newDimension.Dimension)
|
||||||
|
|
||||||
err := repo._updateDimension(newDimension.Dimension, newDimension.Node)
|
err := repo._updateDimension(newDimension.Dimension, newDimension.Node)
|
||||||
log.Debug("update routine received result")
|
log.Notice("update routine received result")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug(" update routine error: " + err.Error())
|
log.Debug(" update routine error: " + err.Error())
|
||||||
}
|
}
|
||||||
repo.updateDoneChannel <- err
|
repo.updateDoneChannel <- err
|
||||||
|
repo.updateCompleteChannel <- true
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -110,11 +111,19 @@ func loadNodesFromJSON(jsonBytes []byte) (nodes map[string]*content.RepoNode, er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) tryToRestoreCurrent() error {
|
func (repo *Repo) tryToRestoreCurrent() error {
|
||||||
currentJSONBytes, err := repo.history.getCurrent()
|
|
||||||
if err != nil {
|
select {
|
||||||
return err
|
case repo.updateInProgressChannel <- time.Now():
|
||||||
|
log.Notice("update request added to queue")
|
||||||
|
currentJSONBytes, err := repo.history.getCurrent()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return repo.loadJSONBytes(currentJSONBytes)
|
||||||
|
default:
|
||||||
|
log.Notice("invalidation request ignored, queue seems to be full")
|
||||||
|
return errors.New("queue full")
|
||||||
}
|
}
|
||||||
return repo.loadJSONBytes(currentJSONBytes)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func get(URL string) (data []byte, err error) {
|
func get(URL string) (data []byte, err error) {
|
||||||
@ -130,11 +139,6 @@ func get(URL string) (data []byte, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (repo *Repo) update() (repoRuntime int64, jsonBytes []byte, err error) {
|
func (repo *Repo) update() (repoRuntime int64, jsonBytes []byte, err error) {
|
||||||
|
|
||||||
// limit ressources and allow only one update request at once
|
|
||||||
repo.updateLock.Lock()
|
|
||||||
defer repo.updateLock.Unlock()
|
|
||||||
|
|
||||||
startTimeRepo := time.Now().UnixNano()
|
startTimeRepo := time.Now().UnixNano()
|
||||||
jsonBytes, err = get(repo.server)
|
jsonBytes, err = get(repo.server)
|
||||||
repoRuntime = time.Now().UnixNano() - startTimeRepo
|
repoRuntime = time.Now().UnixNano() - startTimeRepo
|
||||||
@ -157,6 +161,18 @@ func (repo *Repo) update() (repoRuntime int64, jsonBytes []byte, err error) {
|
|||||||
return repoRuntime, jsonBytes, nil
|
return repoRuntime, jsonBytes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// limit ressources and allow only one update request at once
|
||||||
|
func (repo *Repo) tryUpdate() (repoRuntime int64, jsonBytes []byte, err error) {
|
||||||
|
select {
|
||||||
|
case repo.updateInProgressChannel <- time.Now():
|
||||||
|
log.Notice("update request added to queue")
|
||||||
|
return repo.update()
|
||||||
|
default:
|
||||||
|
log.Notice("invalidation request ignored, queue seems to be full")
|
||||||
|
return 0, nil, errors.New("queue full")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (repo *Repo) loadJSONBytes(jsonBytes []byte) error {
|
func (repo *Repo) loadJSONBytes(jsonBytes []byte) error {
|
||||||
nodes, err := loadNodesFromJSON(jsonBytes)
|
nodes, err := loadNodesFromJSON(jsonBytes)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
44
repo/repo.go
44
repo/repo.go
@ -4,7 +4,6 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/foomo/contentserver/content"
|
"github.com/foomo/contentserver/content"
|
||||||
@ -24,12 +23,14 @@ type Dimension struct {
|
|||||||
|
|
||||||
// Repo content repositiory
|
// Repo content repositiory
|
||||||
type Repo struct {
|
type Repo struct {
|
||||||
server string
|
server string
|
||||||
Directory map[string]*Dimension
|
Directory map[string]*Dimension
|
||||||
updateLock *sync.Mutex
|
// updateLock sync.Mutex
|
||||||
updateChannel chan *repoDimension
|
updateChannel chan *repoDimension
|
||||||
updateDoneChannel chan error
|
updateDoneChannel chan error
|
||||||
history *history
|
history *history
|
||||||
|
updateInProgressChannel chan time.Time
|
||||||
|
updateCompleteChannel chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type repoDimension struct {
|
type repoDimension struct {
|
||||||
@ -42,15 +43,26 @@ func NewRepo(server string, varDir string) *Repo {
|
|||||||
log.Notice("creating new repo for " + server)
|
log.Notice("creating new repo for " + server)
|
||||||
log.Notice(" using var dir:" + varDir)
|
log.Notice(" using var dir:" + varDir)
|
||||||
repo := &Repo{
|
repo := &Repo{
|
||||||
server: server,
|
server: server,
|
||||||
Directory: map[string]*Dimension{},
|
Directory: map[string]*Dimension{},
|
||||||
history: newHistory(varDir),
|
history: newHistory(varDir),
|
||||||
updateLock: &sync.Mutex{},
|
updateChannel: make(chan *repoDimension),
|
||||||
updateChannel: make(chan *repoDimension),
|
updateDoneChannel: make(chan error),
|
||||||
updateDoneChannel: make(chan error),
|
updateInProgressChannel: make(chan time.Time, 1),
|
||||||
|
updateCompleteChannel: make(chan bool),
|
||||||
}
|
}
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case t := <-repo.updateInProgressChannel:
|
||||||
|
log.Notice("got timestamp: ", t, "waiting for update to complete")
|
||||||
|
<-repo.updateCompleteChannel
|
||||||
|
log.Notice("update completed!")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
go repo.updateRoutine()
|
go repo.updateRoutine()
|
||||||
log.Record("trying to restore pervious state")
|
log.Record("trying to restore previous state")
|
||||||
restoreErr := repo.tryToRestoreCurrent()
|
restoreErr := repo.tryToRestoreCurrent()
|
||||||
if restoreErr != nil {
|
if restoreErr != nil {
|
||||||
log.Record(" could not restore previous repo content:" + restoreErr.Error())
|
log.Record(" could not restore previous repo content:" + restoreErr.Error())
|
||||||
@ -194,10 +206,12 @@ func (repo *Repo) Update() (updateResponse *responses.Update) {
|
|||||||
return float64(float64(nanoSeconds) / float64(1000000000.0))
|
return float64(float64(nanoSeconds) / float64(1000000000.0))
|
||||||
}
|
}
|
||||||
startTime := time.Now().UnixNano()
|
startTime := time.Now().UnixNano()
|
||||||
updateRepotime, jsonBytes, updateErr := repo.update()
|
updateRepotime, jsonBytes, updateErr := repo.tryUpdate()
|
||||||
updateResponse = &responses.Update{}
|
updateResponse = &responses.Update{}
|
||||||
updateResponse.Stats.RepoRuntime = floatSeconds(updateRepotime)
|
updateResponse.Stats.RepoRuntime = floatSeconds(updateRepotime)
|
||||||
|
|
||||||
|
log.Notice("Update triggered")
|
||||||
|
|
||||||
if updateErr != nil {
|
if updateErr != nil {
|
||||||
updateResponse.Success = false
|
updateResponse.Success = false
|
||||||
updateResponse.Stats.NumberOfNodes = -1
|
updateResponse.Stats.NumberOfNodes = -1
|
||||||
|
|||||||
@ -5,6 +5,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
|
||||||
"github.com/foomo/contentserver/log"
|
"github.com/foomo/contentserver/log"
|
||||||
"github.com/foomo/contentserver/repo"
|
"github.com/foomo/contentserver/repo"
|
||||||
@ -40,23 +41,35 @@ func Run(server string, address string, varDir string) error {
|
|||||||
func RunServerSocketAndWebServer(
|
func RunServerSocketAndWebServer(
|
||||||
server string,
|
server string,
|
||||||
address string,
|
address string,
|
||||||
webserverAdresss string,
|
webserverAddress string,
|
||||||
webserverPath string,
|
webserverPath string,
|
||||||
varDir string,
|
varDir string,
|
||||||
) error {
|
) error {
|
||||||
if address == "" && webserverAdresss == "" {
|
if address == "" && webserverAddress == "" {
|
||||||
return errors.New("one of the addresses needs to be set")
|
return errors.New("one of the addresses needs to be set")
|
||||||
}
|
}
|
||||||
log.Record("building repo with content from " + server)
|
log.Record("building repo with content from " + server)
|
||||||
r := repo.NewRepo(server, varDir)
|
r := repo.NewRepo(server, varDir)
|
||||||
go r.Update()
|
|
||||||
|
// start initial update and handle error
|
||||||
|
go func() {
|
||||||
|
resp := r.Update()
|
||||||
|
if !resp.Success {
|
||||||
|
log.Error("failed to update: ", resp)
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// update can run in bg
|
// update can run in bg
|
||||||
chanErr := make(chan error)
|
chanErr := make(chan error)
|
||||||
|
|
||||||
if address != "" {
|
if address != "" {
|
||||||
|
log.Notice("starting socketserver on: ", address)
|
||||||
go runSocketServer(r, address, chanErr)
|
go runSocketServer(r, address, chanErr)
|
||||||
}
|
}
|
||||||
if webserverAdresss != "" {
|
if webserverAddress != "" {
|
||||||
go runWebserver(r, webserverAdresss, webserverPath, chanErr)
|
log.Notice("starting webserver on: ", webserverAddress)
|
||||||
|
go runWebserver(r, webserverAddress, webserverPath, chanErr)
|
||||||
}
|
}
|
||||||
return <-chanErr
|
return <-chanErr
|
||||||
}
|
}
|
||||||
|
|||||||
@ -25,7 +25,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
log.Println(num, "update done", resp)
|
log.Println(num, "update done", resp)
|
||||||
}(i)
|
}(i)
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("done")
|
log.Println("done")
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user