Files
shop/queue/queue.go
2017-11-08 14:00:19 +01:00

300 lines
7.2 KiB
Go

package queue
// Package process handles the processing of Datas as they change their status
import (
"errors"
"fmt"
"log"
"sync"
"time"
)
//------------------------------------------------------------------
// ~ PUBLIC TYPES
//------------------------------------------------------------------
type Queue struct {
Verbose bool
running bool
processors []Processor
waitGroup *sync.WaitGroup
chanFinished chan int
chanFinishedRegistered bool // true, if GetChanFinished has been called
}
//------------------------------------------------------------------
// ~ CONSTRUCTORS
//------------------------------------------------------------------
func NewQueue() *Queue {
return &Queue{
chanFinished: make(chan int),
}
}
//------------------------------------------------------------------
// ~ PUBLIC METHODS
//------------------------------------------------------------------
func (q *Queue) getChanFinished() chan int {
q.chanFinishedRegistered = true
return q.chanFinished
}
// AddProcessor Add a processor to the queue. Do not add multiple processors for the same task!
func (q *Queue) AddProcessor(processor Processor) {
q.processors = append(q.processors, processor)
if q.Verbose {
log.Println("Added Processor to queue. New length: ", len(q.processors))
}
for _, p := range q.processors {
if q.Verbose {
fmt.Println("\t", p.GetId())
}
}
}
func (q *Queue) GetProcessors() []Processor {
return q.processors
}
func (q *Queue) IsRunning() bool {
return q.running
}
// Start continuously tries to run all available processors until all assigned jobs have been cpmpleted or
// Stop() has been called.
// As long as there is only one processor per kind (Status, Payment, ...), there should be no race conditions
func (q *Queue) Start() error {
if q.Verbose {
log.Println("Queue: Schedule Start")
}
if q.IsRunning() {
return errors.New("Did not start Queue. It's already running!")
}
q.running = true
q.waitGroup = &sync.WaitGroup{}
for _, proc := range q.processors {
q.waitGroup.Add(1)
go schedule(proc, q.waitGroup)
//fmt.Println("New go routine 8")
}
q.waitGroup.Wait()
q.running = false
// for _, proc := range q.processors {
// proc.ResetStop()
// }
if q.Verbose {
fmt.Println("")
fmt.Println("*****------------------------------------****")
}
for _, proc := range q.processors {
proc.Report()
}
for _, proc := range q.processors {
proc.Reset()
}
if q.Verbose {
fmt.Println("*****------------------------------------****")
}
if q.chanFinishedRegistered { // if true, we suppose that some is listening to the channel. Because otherwise we would block here forever!
q.chanFinished <- 1
q.chanFinishedRegistered = false
}
return nil
}
func (q *Queue) Stop() {
if q.Verbose {
log.Println("Queue: Schedule Stop")
}
chanQueueFinished := q.getChanFinished()
for _, proc := range q.processors {
proc.Stop()
}
<-chanQueueFinished
}
//------------------------------------------------------------------
// ~ PRIVATE METHODS
//------------------------------------------------------------------
func schedule(proc Processor, waitGroup *sync.WaitGroup) {
chanStart := make(chan int)
chanStop := make(chan int) // this is called by ScheduleStop()
go func() {
//fmt.Println("New go routine 7")
for {
select {
case <-proc.GetChanExit():
//fmt.Println("exiting")
if proc.GetCountProcessed() < proc.GetJobsAssigned() && !proc.GetStop() {
time.Sleep(5 * time.Second) // wait a moment and try again
chanStart <- 1
} else {
chanStop <- 1
}
}
}
}()
go func() {
// fmt.Println("New go routine 6")
for {
select {
case <-chanStart:
//log.Println("Started Processor:", proc.GetId())
runProcessor(proc)
}
}
}()
go func() {
// fmt.Println("New go routine 5")
for {
select {
// Initially start processor or stop processing completely
case run := <-proc.GetChanRun():
if run {
chanStart <- 1
} else {
chanStop <- 1
}
}
}
}()
proc.GetChanRun() <- true
<-chanStop
waitGroup.Done()
// log.Println("Exiting schedule:", proc.GetId())
}
func runProcessor(processor Processor) error {
processor.SetStartTimeProcessing(time.Now().UnixNano())
chanDone := make(chan int)
chanDone2 := make(chan int)
chanReady := make(chan interface{})
chanCheckRunning := make(chan int)
chanGoCheckRunning := make(chan int)
collection := processor.GetPersistor().GetGlobalSessionCollection()
iter, err := processor.Find(processor.GetQuery(), collection)
if err != nil {
log.Println(err)
processor.GetChanExit() <- 1 // this will be received in schedule() and stop the processor
return err
}
// Process availably data
go func() {
//fmt.Println("New go routine 1")
for {
select {
case data := <-chanReady:
// fmt.Println("--- DATA")
f := func(data interface{}) {
err := processor.Process(data)
if err != nil {
log.Println(err)
}
processor.IncCountProcessed()
processor.DecRunningJobs()
processor.GetWaitGroup().Done()
// fmt.Println("Return 1")
return
}
go f(data)
// fmt.Println("New go routine 2")
chanCheckRunning <- 1
case <-chanDone2:
// fmt.Println("Return 1")
return
}
}
//fmt.Println("Exiting 1")
}()
// Wait for chanDone to end current processing
go func() {
//fmt.Println("New go routine 3")
for {
select {
case <-chanDone:
// Wait for jobs to finsish and exit
//log.Println("** chanDone", processor.GetId())
processor.GetWaitGroup().Wait()
processor.SetWaitGroupFinished(true)
processor.GetChanExit() <- 1
processor.SetEndTimeProcessing(time.Now().UnixNano())
chanDone2 <- 1
// fmt.Println("Return 3")
return
case <-chanGoCheckRunning:
//log.Println("** goCheckRunning", processor.GetId())
chanCheckRunning <- 1
}
}
// fmt.Println("Exiting 3")
}()
go func() {
// fmt.Println("New go routine 4")
run := true
//Loop:
for run {
select {
case <-chanCheckRunning:
if processor.GetStop() {
chanDone <- 1
run = false
return //break Loop
}
//log.Println("** chanCheckRunning", processor.GetId())
if processor.GetJobsStarted() >= processor.GetJobsAssigned() { // We are done
// log.Println("** goChanDone :: JobsAssignedProcessed", processor.GetId())
chanDone <- 1
return // break Loop
} else if processor.GetRunningJobs() >= processor.GetMaxConcurrency() { // Wait for better times
// log.Println("** wait", processor.GetId())
chanGoCheckRunning <- 1
} else {
if processor.GetJobsStarted() < processor.GetJobsAssigned() {
data, err := iter()
if err != nil {
log.Println("Error: Could not get data", err)
chanDone <- 1
return //break
}
if data != nil {
processor.GetWaitGroup().Add(1)
processor.IncJobsStarted()
processor.IncRunningJobs() // We count here also, because we cannot access current number of jobs through waitgroup
chanReady <- data
} else {
// log.Println("data is nil", err)
chanDone <- 1
// fmt.Println("--- chanDone return 4")
return // break Loop
}
}
}
}
}
//fmt.Println("Exiting 4")
}()
chanCheckRunning <- 1 // Start loop
return nil
}