added concurrency

This commit is contained in:
Jan Halfar 2014-10-01 17:25:00 +02:00
parent 52a5cbb418
commit f72739d8e9

View File

@ -1,6 +1,7 @@
package repo package repo
import ( import (
"errors"
"fmt" "fmt"
"github.com/foomo/ContentServer/server/log" "github.com/foomo/ContentServer/server/log"
"github.com/foomo/ContentServer/server/repo/content" "github.com/foomo/ContentServer/server/repo/content"
@ -18,15 +19,50 @@ type Repo struct {
Directory map[string]*content.RepoNode Directory map[string]*content.RepoNode
URIDirectory map[string]*content.RepoNode URIDirectory map[string]*content.RepoNode
Node *content.RepoNode Node *content.RepoNode
updateChannel chan *content.RepoNode
updateDoneChannel chan error
} }
func NewRepo(server string) *Repo { func NewRepo(server string) *Repo {
log.Notice("creating new repo for " + server) log.Notice("creating new repo for " + server)
repo := new(Repo) repo := new(Repo)
repo.server = server repo.server = server
repo.updateChannel = make(chan *content.RepoNode)
repo.updateDoneChannel = make(chan error)
go func() {
select {
case newNode := <-repo.updateChannel:
repo.updateDoneChannel <- repo.load(newNode)
}
}()
return repo return repo
} }
func (repo *Repo) Load(newNode *content.RepoNode) error {
repo.updateChannel <- newNode
return <-repo.updateDoneChannel
}
func (repo *Repo) load(newNode *content.RepoNode) error {
newNode.WireParents()
newDirectory := make(map[string]*content.RepoNode)
newURIDirectory := make(map[string]*content.RepoNode)
err := builDirectory(newNode, newDirectory, newURIDirectory)
if err != nil {
return err
}
err = wireAliases(newDirectory)
if err != nil {
return err
}
repo.Node = newNode
repo.Directory = newDirectory
repo.URIDirectory = newURIDirectory
return nil
}
func (repo *Repo) ResolveContent(state string, URI string) (resolved bool, resolvedURI string, region string, language string, repoNode *content.RepoNode) { func (repo *Repo) ResolveContent(state string, URI string) (resolved bool, resolvedURI string, region string, language string, repoNode *content.RepoNode) {
parts := strings.Split(URI, content.PATH_SEPARATOR) parts := strings.Split(URI, content.PATH_SEPARATOR)
log.Debug("repo.ResolveContent: " + URI) log.Debug("repo.ResolveContent: " + URI)
@ -178,29 +214,49 @@ func uriKeyForState(state string, uri string) string {
return state + "-" + uri return state + "-" + uri
} }
func builDirectory(dirNode *content.RepoNode, directory map[string]*content.RepoNode, uRIDirectory map[string]*content.RepoNode) { func builDirectory(dirNode *content.RepoNode, directory map[string]*content.RepoNode, uRIDirectory map[string]*content.RepoNode) error {
log.Debug("repo.buildDirectory: " + dirNode.Id) log.Debug("repo.buildDirectory: " + dirNode.Id)
existingNode, ok := directory[dirNode.Id]
if ok {
return errors.New("duplicate node with id:" + existingNode.Id)
}
directory[dirNode.Id] = dirNode directory[dirNode.Id] = dirNode
//todo handle duplicate uris //todo handle duplicate uris
for _, languageURIs := range dirNode.URIs { for _, languageURIs := range dirNode.URIs {
for _, uri := range languageURIs { for _, uri := range languageURIs {
log.Debug(" uri: " + uri + " => Id: " + dirNode.Id) log.Debug(" uri: " + uri + " => Id: " + dirNode.Id)
if len(dirNode.States) == 0 { if len(dirNode.States) == 0 {
uRIDirectory[uriKeyForState("", uri)] = dirNode key := uriKeyForState("", uri)
existingNode, ok = uRIDirectory[key]
if ok {
return errors.New("duplicate node with uri: " + uri)
}
uRIDirectory[key] = dirNode
} else { } else {
for _, state := range dirNode.States { for _, state := range dirNode.States {
uRIDirectory[uriKeyForState(state, uri)] = dirNode key := uriKeyForState(state, uri)
existingNode, ok = uRIDirectory[key]
if ok {
return errors.New("dupicate node with uri: " + uri + " for state " + state)
}
uRIDirectory[key] = dirNode
} }
} }
} }
} }
for _, childNode := range dirNode.Nodes { for _, childNode := range dirNode.Nodes {
builDirectory(childNode, directory, uRIDirectory) err := builDirectory(childNode, directory, uRIDirectory)
if err != nil {
return err
} }
}
return nil
} }
func wireAliases(directory map[string]*content.RepoNode) { func wireAliases(directory map[string]*content.RepoNode) error {
// validation ?!
for _, repoNode := range directory { for _, repoNode := range directory {
if len(repoNode.LinkIds) > 0 { if len(repoNode.LinkIds) > 0 {
for region, languages := range repoNode.LinkIds { for region, languages := range repoNode.LinkIds {
@ -212,30 +268,12 @@ func wireAliases(directory map[string]*content.RepoNode) {
} }
} }
} }
} return nil
func (repo *Repo) Load(newNode *content.RepoNode) {
newNode.WireParents()
newDirectory := make(map[string]*content.RepoNode)
newURIDirectory := make(map[string]*content.RepoNode)
builDirectory(newNode, newDirectory, newURIDirectory)
wireAliases(newDirectory)
// some more validation anyone?
// invalid destination ids
repo.Node = newNode
repo.Directory = newDirectory
repo.URIDirectory = newURIDirectory
} }
func (repo *Repo) Update() *responses.Update { func (repo *Repo) Update() *responses.Update {
updateResponse := responses.NewUpdate() updateResponse := responses.NewUpdate()
newNode := content.NewRepoNode() newNode := content.NewRepoNode()
startTimeRepo := time.Now() startTimeRepo := time.Now()
ok, err := utils.Get(repo.server, newNode) ok, err := utils.Get(repo.server, newNode)
updateResponse.Stats.RepoRuntime = time.Now().Sub(startTimeRepo).Seconds() updateResponse.Stats.RepoRuntime = time.Now().Sub(startTimeRepo).Seconds()