mirror of
https://github.com/foomo/shop.git
synced 2026-05-31 08:00:05 +00:00
300 lines
7.2 KiB
Go
300 lines
7.2 KiB
Go
package queue
|
|
|
|
// Package process handles the processing of Datas as they change their status
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
//------------------------------------------------------------------
|
|
// ~ PUBLIC TYPES
|
|
//------------------------------------------------------------------
|
|
|
|
type Queue struct {
|
|
Verbose bool
|
|
running bool
|
|
processors []Processor
|
|
waitGroup *sync.WaitGroup
|
|
chanFinished chan int
|
|
chanFinishedRegistered bool // true, if GetChanFinished has been called
|
|
}
|
|
|
|
//------------------------------------------------------------------
|
|
// ~ CONSTRUCTORS
|
|
//------------------------------------------------------------------
|
|
|
|
func NewQueue() *Queue {
|
|
return &Queue{
|
|
chanFinished: make(chan int),
|
|
}
|
|
}
|
|
|
|
//------------------------------------------------------------------
|
|
// ~ PUBLIC METHODS
|
|
//------------------------------------------------------------------
|
|
|
|
func (q *Queue) getChanFinished() chan int {
|
|
q.chanFinishedRegistered = true
|
|
return q.chanFinished
|
|
}
|
|
|
|
// AddProcessor Add a processor to the queue. Do not add multiple processors for the same task!
|
|
func (q *Queue) AddProcessor(processor Processor) {
|
|
q.processors = append(q.processors, processor)
|
|
if q.Verbose {
|
|
log.Println("Added Processor to queue. New length: ", len(q.processors))
|
|
}
|
|
for _, p := range q.processors {
|
|
if q.Verbose {
|
|
fmt.Println("\t", p.GetId())
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *Queue) GetProcessors() []Processor {
|
|
return q.processors
|
|
}
|
|
|
|
func (q *Queue) IsRunning() bool {
|
|
return q.running
|
|
}
|
|
|
|
// Start continuously tries to run all available processors until all assigned jobs have been cpmpleted or
|
|
// Stop() has been called.
|
|
// As long as there is only one processor per kind (Status, Payment, ...), there should be no race conditions
|
|
func (q *Queue) Start() error {
|
|
if q.Verbose {
|
|
log.Println("Queue: Schedule Start")
|
|
}
|
|
if q.IsRunning() {
|
|
return errors.New("Did not start Queue. It's already running!")
|
|
}
|
|
q.running = true
|
|
q.waitGroup = &sync.WaitGroup{}
|
|
for _, proc := range q.processors {
|
|
q.waitGroup.Add(1)
|
|
go schedule(proc, q.waitGroup)
|
|
//fmt.Println("New go routine 8")
|
|
}
|
|
q.waitGroup.Wait()
|
|
q.running = false
|
|
// for _, proc := range q.processors {
|
|
// proc.ResetStop()
|
|
// }
|
|
if q.Verbose {
|
|
fmt.Println("")
|
|
fmt.Println("*****------------------------------------****")
|
|
}
|
|
for _, proc := range q.processors {
|
|
proc.Report()
|
|
}
|
|
for _, proc := range q.processors {
|
|
proc.Reset()
|
|
}
|
|
if q.Verbose {
|
|
fmt.Println("*****------------------------------------****")
|
|
}
|
|
if q.chanFinishedRegistered { // if true, we suppose that some is listening to the channel. Because otherwise we would block here forever!
|
|
q.chanFinished <- 1
|
|
q.chanFinishedRegistered = false
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (q *Queue) Stop() {
|
|
if q.Verbose {
|
|
log.Println("Queue: Schedule Stop")
|
|
}
|
|
chanQueueFinished := q.getChanFinished()
|
|
for _, proc := range q.processors {
|
|
proc.Stop()
|
|
}
|
|
<-chanQueueFinished
|
|
|
|
}
|
|
|
|
//------------------------------------------------------------------
|
|
// ~ PRIVATE METHODS
|
|
//------------------------------------------------------------------
|
|
|
|
func schedule(proc Processor, waitGroup *sync.WaitGroup) {
|
|
chanStart := make(chan int)
|
|
chanStop := make(chan int) // this is called by ScheduleStop()
|
|
|
|
go func() {
|
|
//fmt.Println("New go routine 7")
|
|
for {
|
|
select {
|
|
case <-proc.GetChanExit():
|
|
//fmt.Println("exiting")
|
|
if proc.GetCountProcessed() < proc.GetJobsAssigned() && !proc.GetStop() {
|
|
time.Sleep(5 * time.Second) // wait a moment and try again
|
|
chanStart <- 1
|
|
} else {
|
|
chanStop <- 1
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
// fmt.Println("New go routine 6")
|
|
for {
|
|
select {
|
|
case <-chanStart:
|
|
|
|
//log.Println("Started Processor:", proc.GetId())
|
|
|
|
runProcessor(proc)
|
|
}
|
|
}
|
|
}()
|
|
|
|
go func() {
|
|
// fmt.Println("New go routine 5")
|
|
for {
|
|
select {
|
|
// Initially start processor or stop processing completely
|
|
case run := <-proc.GetChanRun():
|
|
if run {
|
|
chanStart <- 1
|
|
} else {
|
|
chanStop <- 1
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
proc.GetChanRun() <- true
|
|
<-chanStop
|
|
waitGroup.Done()
|
|
// log.Println("Exiting schedule:", proc.GetId())
|
|
|
|
}
|
|
|
|
func runProcessor(processor Processor) error {
|
|
processor.SetStartTimeProcessing(time.Now().UnixNano())
|
|
chanDone := make(chan int)
|
|
chanDone2 := make(chan int)
|
|
chanReady := make(chan interface{})
|
|
chanCheckRunning := make(chan int)
|
|
chanGoCheckRunning := make(chan int)
|
|
|
|
collection := processor.GetPersistor().GetGlobalSessionCollection()
|
|
|
|
iter, err := processor.Find(processor.GetQuery(), collection)
|
|
if err != nil {
|
|
log.Println(err)
|
|
processor.GetChanExit() <- 1 // this will be received in schedule() and stop the processor
|
|
return err
|
|
}
|
|
|
|
// Process availably data
|
|
go func() {
|
|
//fmt.Println("New go routine 1")
|
|
for {
|
|
select {
|
|
case data := <-chanReady:
|
|
// fmt.Println("--- DATA")
|
|
f := func(data interface{}) {
|
|
err := processor.Process(data)
|
|
if err != nil {
|
|
log.Println(err)
|
|
}
|
|
processor.IncCountProcessed()
|
|
processor.DecRunningJobs()
|
|
processor.GetWaitGroup().Done()
|
|
// fmt.Println("Return 1")
|
|
return
|
|
}
|
|
|
|
go f(data)
|
|
// fmt.Println("New go routine 2")
|
|
chanCheckRunning <- 1
|
|
case <-chanDone2:
|
|
// fmt.Println("Return 1")
|
|
return
|
|
|
|
}
|
|
}
|
|
//fmt.Println("Exiting 1")
|
|
}()
|
|
|
|
// Wait for chanDone to end current processing
|
|
go func() {
|
|
//fmt.Println("New go routine 3")
|
|
for {
|
|
select {
|
|
case <-chanDone:
|
|
// Wait for jobs to finsish and exit
|
|
//log.Println("** chanDone", processor.GetId())
|
|
processor.GetWaitGroup().Wait()
|
|
processor.SetWaitGroupFinished(true)
|
|
processor.GetChanExit() <- 1
|
|
processor.SetEndTimeProcessing(time.Now().UnixNano())
|
|
chanDone2 <- 1
|
|
// fmt.Println("Return 3")
|
|
return
|
|
|
|
case <-chanGoCheckRunning:
|
|
//log.Println("** goCheckRunning", processor.GetId())
|
|
chanCheckRunning <- 1
|
|
}
|
|
}
|
|
|
|
// fmt.Println("Exiting 3")
|
|
}()
|
|
go func() {
|
|
// fmt.Println("New go routine 4")
|
|
run := true
|
|
//Loop:
|
|
for run {
|
|
select {
|
|
case <-chanCheckRunning:
|
|
if processor.GetStop() {
|
|
chanDone <- 1
|
|
run = false
|
|
return //break Loop
|
|
}
|
|
//log.Println("** chanCheckRunning", processor.GetId())
|
|
if processor.GetJobsStarted() >= processor.GetJobsAssigned() { // We are done
|
|
// log.Println("** goChanDone :: JobsAssignedProcessed", processor.GetId())
|
|
chanDone <- 1
|
|
return // break Loop
|
|
} else if processor.GetRunningJobs() >= processor.GetMaxConcurrency() { // Wait for better times
|
|
|
|
// log.Println("** wait", processor.GetId())
|
|
chanGoCheckRunning <- 1
|
|
} else {
|
|
if processor.GetJobsStarted() < processor.GetJobsAssigned() {
|
|
data, err := iter()
|
|
if err != nil {
|
|
log.Println("Error: Could not get data", err)
|
|
chanDone <- 1
|
|
return //break
|
|
}
|
|
if data != nil {
|
|
processor.GetWaitGroup().Add(1)
|
|
processor.IncJobsStarted()
|
|
processor.IncRunningJobs() // We count here also, because we cannot access current number of jobs through waitgroup
|
|
chanReady <- data
|
|
} else {
|
|
// log.Println("data is nil", err)
|
|
chanDone <- 1
|
|
// fmt.Println("--- chanDone return 4")
|
|
return // break Loop
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|
|
}
|
|
//fmt.Println("Exiting 4")
|
|
}()
|
|
chanCheckRunning <- 1 // Start loop
|
|
return nil
|
|
}
|