diff --git a/.gitignore b/.gitignore index a81a4ad..284502e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,6 @@ .* *~ +/bin/* /main /*.sublime-* -/buildOnLinux.sh !.git* diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..581f8b0 --- /dev/null +++ b/Makefile @@ -0,0 +1,11 @@ +SHELL := /bin/bash + +options: + echo "you can clean | test | build | run" +clean: + rm -f bin/contentserver +build: + make clean + go build -o bin/contentserver +test: + go test -v github.com/foomo/contentserver/server/repo diff --git a/bin/.gitkeep b/bin/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/main.go b/contentserver.go similarity index 80% rename from main.go rename to contentserver.go index e7fb49e..0549328 100644 --- a/main.go +++ b/contentserver.go @@ -10,8 +10,7 @@ import ( ) const ( - PROTOCOL_TCP = "tcp" - PROTOCOL_HTTP = "http" + PROTOCOL_TCP = "tcp" ) type ExitCode int @@ -25,6 +24,7 @@ var contentServer string var protocol = flag.String("protocol", PROTOCOL_TCP, "what protocol to server for") var address = flag.String("address", "127.0.0.1:8081", "address to bind host:port") +var varDir = flag.String("vardir", "127.0.0.1:8081", "where to put my data") var logLevelOptions = []string{ log.LOG_LEVEL_NAME_ERROR, log.LOG_LEVEL_NAME_RECORD, @@ -52,11 +52,7 @@ func main() { log.SetLogLevel(log.GetLogLevelByName(*logLevel)) switch *protocol { case PROTOCOL_TCP: - server.RunSocketServer(flag.Arg(0), *address) - break - case PROTOCOL_HTTP: - //server.Run(":8080", "http://test.bestbytes/foomo/modules/Foomo.Page.Content/services/content.php") - fmt.Println("http server does not work yet - use tcp instead") + server.RunSocketServer(flag.Arg(0), *address, *varDir) break default: exitUsage(EXIT_CODE_INSUFFICIENT_ARGS) diff --git a/server/repo/history.go b/server/repo/history.go new file mode 100644 index 0000000..6c00bda --- /dev/null +++ b/server/repo/history.go @@ -0,0 +1,39 @@ +package repo + +import ( + "io/ioutil" + "path" + "time" +) + +const historyRepoJSONPrefix = "contentserver-repo-" +const historyRepoJSONSuffix = ".json" + +type history struct { + varDir string +} + +func newHistory(varDir string) *history { + return &history{ + varDir: varDir, + } +} + +func (h *history) add(jsonBytes []byte) error { + // historic file name + filename := path.Join(h.varDir, historyRepoJSONPrefix+time.Now().Format(time.RFC3339Nano)+historyRepoJSONSuffix) + err := ioutil.WriteFile(filename, jsonBytes, 0644) + if err != nil { + return err + } + // current filename + return ioutil.WriteFile(h.getCurrentFilename(), jsonBytes, 0644) +} + +func (h *history) getCurrentFilename() string { + return historyRepoJSONPrefix + "current" + historyRepoJSONSuffix +} + +func (h *history) getCurrent() (jsonBytes []byte, err error) { + return ioutil.ReadFile(h.getCurrentFilename()) +} diff --git a/server/repo/loader.go b/server/repo/loader.go new file mode 100644 index 0000000..15f8f69 --- /dev/null +++ b/server/repo/loader.go @@ -0,0 +1,226 @@ +package repo + +import ( + "encoding/json" + "errors" + "fmt" + "github.com/foomo/contentserver/server/log" + "github.com/foomo/contentserver/server/repo/content" + "github.com/foomo/contentserver/server/responses" + "github.com/foomo/contentserver/server/utils" + "time" + //golog "log" +) + +func (repo *Repo) updateRoutine() { + repo.updateChannel = make(chan *RepoDimension) + repo.updateDoneChannel = make(chan error) + go func() { + for { + select { + case newDimension := <-repo.updateChannel: + repo.updateDoneChannel <- repo.updateDimension(newDimension.Dimension, newDimension.Node) + } + } + }() +} + +func (repo *Repo) UpdateDimension(dimension string, node *content.RepoNode) error { + repo.updateChannel <- &RepoDimension{ + Dimension: dimension, + Node: node, + } + return <-repo.updateDoneChannel +} + +func (repo *Repo) updateDimension(dimension string, newNode *content.RepoNode) error { + newNode.WireParents() + newDirectory := make(map[string]*content.RepoNode) + newURIDirectory := make(map[string]*content.RepoNode) + + err := builDirectory(newNode, newDirectory, newURIDirectory) + if err != nil { + return err + } + err = wireAliases(newDirectory) + if err != nil { + return err + } + repo.Directory[dimension] = &Dimension{ + Node: newNode, + Directory: newDirectory, + URIDirectory: newURIDirectory, + } + return nil +} + +func builDirectory(dirNode *content.RepoNode, directory map[string]*content.RepoNode, uRIDirectory map[string]*content.RepoNode) error { + log.Debug("repo.buildDirectory: " + dirNode.Id) + existingNode, ok := directory[dirNode.Id] + if ok { + return errors.New("duplicate node with id:" + existingNode.Id) + } + directory[dirNode.Id] = dirNode + //todo handle duplicate uris + if _, thereIsAnExistingUriNode := uRIDirectory[dirNode.URI]; thereIsAnExistingUriNode { + return errors.New("duplicate node with uri: " + dirNode.URI) + } + uRIDirectory[dirNode.URI] = dirNode + for _, childNode := range dirNode.Nodes { + err := builDirectory(childNode, directory, uRIDirectory) + if err != nil { + return err + } + } + return nil +} + +func wireAliases(directory map[string]*content.RepoNode) error { + for _, repoNode := range directory { + if len(repoNode.LinkId) > 0 { + if destinationNode, ok := directory[repoNode.LinkId]; ok { + repoNode.URI = destinationNode.URI + } else { + return errors.New("that link id points nowhere " + repoNode.LinkId + " from " + repoNode.Id) + } + } + } + return nil +} + +func loadNodesFromJSON(jsonBytes []byte) (nodes map[string]*content.RepoNode, err error) { + nodes = make(map[string]*content.RepoNode) + err = json.Unmarshal(jsonBytes, &nodes) + return nodes, err +} + +func (repo *Repo) Update() (updateResponse *responses.Update) { + floatSeconds := func(nanoSeconds int64) float64 { + return float64(nanoSeconds / 1000000) + } + startTime := time.Now().UnixNano() + updateRepotime, updateErr := repo.update() + updateResponse = responses.NewUpdate() + updateResponse.Stats.RepoRuntime = floatSeconds(updateRepotime) + if updateErr != nil { + // let us try to restore the world from a file + log.Error("could not update repository:" + updateErr.Error()) + restoreErr := repo.tryToRestoreCurrent() + if restoreErr == nil { + log.Error("failed to restore preceding repo version") + } else { + log.Record("restored current repo from local history") + } + } else { + // add some stats + for dimension, _ := range repo.Directory { + updateResponse.Stats.NumberOfNodes += len(repo.Directory[dimension].Directory) + updateResponse.Stats.NumberOfURIs += len(repo.Directory[dimension].URIDirectory) + } + + } + updateResponse.Stats.OwnRuntime = floatSeconds(startTime-time.Now().UnixNano()) - updateResponse.Stats.RepoRuntime + return updateResponse +} + +func (repo *Repo) tryToRestoreCurrent() error { + currentJsonBytes, err := repo.history.getCurrent() + if err != nil { + return err + } + return repo.loadJSONBytes(currentJsonBytes) +} + +func (repo *Repo) update() (repoRuntime int64, err error) { + startTimeRepo := time.Now().UnixNano() + jsonBytes, err := utils.Get(repo.server) + repoRuntime = time.Now().UnixNano() - startTimeRepo + if err != nil { + // we have no json to load - the repo server did not reply + return + } else { + nodes, err := loadNodesFromJSON(jsonBytes) + if err != nil { + // could not load nodes from json + return repoRuntime, err + } + err = repo.loadNodes(nodes) + if err != nil { + // repo failed to load nodes + return repoRuntime, err + } + } + return repoRuntime, nil + + /* + + log.Debug("loaded nodes for dimension " + dimension) + _, dimensionOk := repo.Directory[dimension] + if dimensionOk { + updateResponse.Stats.NumberOfNodes += len(repo.Directory[dimension].Directory) + updateResponse.Stats.NumberOfURIs += len(repo.Directory[dimension].URIDirectory) + } + + + + jsonBytes + newNodes := loadNodesFromJSON(jsonBytes) + + data, err := utils.GetRepo(repo.server, newNodes) + updateResponse.Stats.RepoRuntime = + startTimeOwn := time.Now() + if err == nil { + } + updateResponse.Success = (err != nil) + + doneHandler := func() *responses.Update { + updateResponse.Stats.OwnRuntime = time.Now().Sub(startTimeOwn).Seconds() + return updateResponse + } + + if updateResponse.Success { + log.Debug("going to load dimensions from" + utils.ToJSON(newNodes)) + */ +} + +func updateErrorHandler(err error, updateResponse *responses.Update) *responses.Update { + log.Error(fmt.Sprintf("update error: %", err)) + if updateResponse == nil { + updateResponse = responses.NewUpdate() + } + updateResponse.Success = false + updateResponse.ErrorMessage = fmt.Sprintf("%", err) + updateResponse.Stats.NumberOfNodes = -1 + updateResponse.Stats.NumberOfURIs = -1 + return updateResponse +} + +func (repo *Repo) loadJSONBytes(jsonBytes []byte) error { + nodes, err := loadNodesFromJSON(jsonBytes) + if err != nil { + return err + } + err = repo.loadNodes(nodes) + if err == nil { + historyErr := repo.history.add(jsonBytes) + if historyErr != nil { + log.Warning("could not add valid json to history:" + historyErr.Error()) + } else { + log.Record("added valid json to history") + } + } + return err +} + +func (repo *Repo) loadNodes(newNodes map[string]*content.RepoNode) error { + for dimension, newNode := range newNodes { + log.Debug("loading nodes for dimension " + dimension) + loadErr := repo.UpdateDimension(dimension, newNode) + if loadErr != nil { + log.Debug(" failed to load " + dimension + ": " + loadErr.Error()) + return loadErr + } + } + // we need to throw away orphaned nodes + return nil +} diff --git a/server/repo/repo.go b/server/repo/repo.go index 5e81d6e..8f58b42 100644 --- a/server/repo/repo.go +++ b/server/repo/repo.go @@ -1,16 +1,15 @@ package repo import ( - "errors" + // "errors" "fmt" "github.com/foomo/contentserver/server/log" "github.com/foomo/contentserver/server/repo/content" "github.com/foomo/contentserver/server/requests" - "github.com/foomo/contentserver/server/responses" - "github.com/foomo/contentserver/server/utils" - golog "log" + //"github.com/foomo/contentserver/server/responses" + //"github.com/foomo/contentserver/server/utils" "strings" - "time" + //"time" ) type Dimension struct { @@ -29,55 +28,25 @@ type Repo struct { Directory map[string]*Dimension updateChannel chan *RepoDimension updateDoneChannel chan error + history *history } -func NewRepo(server string) *Repo { +func NewRepo(server string, varDir string) *Repo { log.Notice("creating new repo for " + server) repo := new(Repo) repo.Directory = make(map[string]*Dimension) repo.server = server - repo.updateChannel = make(chan *RepoDimension) - repo.updateDoneChannel = make(chan error) - go func() { - for { - select { - case newDimension := <-repo.updateChannel: - repo.updateDoneChannel <- repo.load(newDimension.Dimension, newDimension.Node) - } - } - }() + repo.history = newHistory(varDir) + go repo.updateRoutine() + restoreErr := repo.tryToRestoreCurrent() + if restoreErr == nil { + log.Record("could not restore previous repo content:" + restoreErr.Error()) + } else { + log.Record("restored previous repo content") + } return repo } -func (repo *Repo) Load(dimension string, node *content.RepoNode) error { - repo.updateChannel <- &RepoDimension{ - Dimension: dimension, - Node: node, - } - return <-repo.updateDoneChannel -} - -func (repo *Repo) load(dimension string, newNode *content.RepoNode) error { - newNode.WireParents() - newDirectory := make(map[string]*content.RepoNode) - newURIDirectory := make(map[string]*content.RepoNode) - - err := builDirectory(newNode, newDirectory, newURIDirectory) - if err != nil { - return err - } - err = wireAliases(newDirectory) - if err != nil { - return err - } - repo.Directory[dimension] = &Dimension{ - Node: newNode, - Directory: newDirectory, - URIDirectory: newURIDirectory, - } - return nil -} - func (repo *Repo) ResolveContent(dimensions []string, URI string) (resolved bool, resolvedURI string, resolvedDimension string, repoNode *content.RepoNode) { parts := strings.Split(URI, content.PATH_SEPARATOR) resolved = false @@ -214,74 +183,3 @@ func (repo *Repo) GetRepo() map[string]*content.RepoNode { func uriKeyForState(state string, uri string) string { return state + "-" + uri } - -func builDirectory(dirNode *content.RepoNode, directory map[string]*content.RepoNode, uRIDirectory map[string]*content.RepoNode) error { - log.Debug("repo.buildDirectory: " + dirNode.Id) - existingNode, ok := directory[dirNode.Id] - if ok { - return errors.New("duplicate node with id:" + existingNode.Id) - } - directory[dirNode.Id] = dirNode - //todo handle duplicate uris - if _, thereIsAnExistingUriNode := uRIDirectory[dirNode.URI]; thereIsAnExistingUriNode { - return errors.New("duplicate node with uri: " + dirNode.URI) - } - uRIDirectory[dirNode.URI] = dirNode - for _, childNode := range dirNode.Nodes { - err := builDirectory(childNode, directory, uRIDirectory) - if err != nil { - return err - } - } - return nil -} - -func wireAliases(directory map[string]*content.RepoNode) error { - for _, repoNode := range directory { - if len(repoNode.LinkId) > 0 { - if destinationNode, ok := directory[repoNode.LinkId]; ok { - repoNode.URI = destinationNode.URI - } else { - return errors.New("that link id points nowhere " + repoNode.LinkId + " from " + repoNode.Id) - } - } - } - return nil -} - -func (repo *Repo) Update() *responses.Update { - updateResponse := responses.NewUpdate() - newNodes := make(map[string]*content.RepoNode) //content.NewRepoNode() - startTimeRepo := time.Now() - ok, err := utils.GetRepo(repo.server, newNodes) - updateResponse.Stats.RepoRuntime = time.Now().Sub(startTimeRepo).Seconds() - startTimeOwn := time.Now() - updateResponse.Success = ok - if ok { - log.Debug("going to load dimensions from" + utils.ToJSON(newNodes)) - for dimension, newNode := range newNodes { - log.Debug("loading nodes for dimension " + dimension) - loadErr := repo.Load(dimension, newNode) - if loadErr != nil { - golog.Println(loadErr) - panic(loadErr) - } - log.Debug("loaded nodes for dimension " + dimension) - _, dimensionOk := repo.Directory[dimension] - if dimensionOk { - updateResponse.Stats.NumberOfNodes += len(repo.Directory[dimension].Directory) - updateResponse.Stats.NumberOfURIs += len(repo.Directory[dimension].URIDirectory) - } else { - log.Debug("where is dimension " + dimension) - golog.Println(repo.Directory) - } - } - } else { - log.Error(fmt.Sprintf("update error: %", err)) - updateResponse.ErrorMessage = fmt.Sprintf("%", err) - updateResponse.Stats.NumberOfNodes = -1 - updateResponse.Stats.NumberOfURIs = -1 - } - updateResponse.Stats.OwnRuntime = time.Now().Sub(startTimeOwn).Seconds() - return updateResponse -} diff --git a/server/socket.go b/server/socket.go index e197af6..6d99556 100644 --- a/server/socket.go +++ b/server/socket.go @@ -154,9 +154,9 @@ func handleConnection(conn net.Conn) { } } -func RunSocketServer(server string, address string) { +func RunSocketServer(server string, address string, varDir string) { log.Record("building repo with content from " + server) - contentRepo = repo.NewRepo(server) + contentRepo = repo.NewRepo(server, varDir) contentRepo.Update() ln, err := net.Listen("tcp", address) if err != nil { diff --git a/server/utils/utils.go b/server/utils/utils.go index 78a9ea2..5afaaff 100644 --- a/server/utils/utils.go +++ b/server/utils/utils.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/foomo/contentserver/server/repo/content" "io/ioutil" "net/http" ) @@ -47,56 +46,16 @@ func PopulateRequest(r *http.Request, obj interface{}) { json.Unmarshal(extractJsonFromRequest(r), obj) } -func GetRepo(URL string, obj map[string]*content.RepoNode) (ok bool, err error) { - // add proper error handling +func Get(URL string) (data []byte, err error) { response, err := http.Get(URL) if err != nil { - return false, err + return data, err } else { defer response.Body.Close() if response.StatusCode != http.StatusOK { - return false, errors.New(fmt.Sprintf("Bad HTTP Response: %v", response.Status)) + return data, errors.New(fmt.Sprintf("Bad HTTP Response: %v", response.Status)) } else { - contents, err := ioutil.ReadAll(response.Body) - if err != nil { - return false, err - } else { - fmt.Printf("json string %s", string(contents)) - jsonErr := json.Unmarshal(contents, &obj) - if jsonErr != nil { - panic(jsonErr) - return false, jsonErr - } else { - return true, nil - } - } - } - } -} - -func Get(URL string, obj interface{}) (ok bool, err error) { - // add proper error handling - response, err := http.Get(URL) - if err != nil { - return false, err - } else { - defer response.Body.Close() - if response.StatusCode != http.StatusOK { - return false, errors.New(fmt.Sprintf("Bad HTTP Response: %v", response.Status)) - } else { - contents, err := ioutil.ReadAll(response.Body) - if err != nil { - return false, err - } else { - fmt.Printf("json string %s", string(contents)) - jsonErr := json.Unmarshal(contents, &obj) - if jsonErr != nil { - panic(jsonErr) - return false, jsonErr - } else { - return true, nil - } - } + return ioutil.ReadAll(response.Body) } } }