contentserver/client/connectionpool.go
2019-05-21 10:07:13 +02:00

131 lines
3.3 KiB
Go

package client
import (
"net"
"time"
)
type connectionPool struct {
server string
// conn net.Conn
chanConnGet chan chan net.Conn
chanConnReturn chan connReturn
chanDrainPool chan int
}
func newConnectionPool(server string, connectionPoolSize int, waitTimeout time.Duration) *connectionPool {
connPool := &connectionPool{
server: server,
chanConnGet: make(chan chan net.Conn),
chanConnReturn: make(chan connReturn),
chanDrainPool: make(chan int),
}
go connPool.run(connectionPoolSize, waitTimeout)
return connPool
}
func (c *connectionPool) run(connectionPoolSize int, waitTimeout time.Duration) {
type poolEntry struct {
busy bool
err error
conn net.Conn
}
type waitPoolEntry struct {
entryTime time.Time
chanConn chan net.Conn
}
connectionPool := make(map[int]*poolEntry, connectionPoolSize)
waitPool := map[int]*waitPoolEntry{}
for i := 0; i < connectionPoolSize; i++ {
connectionPool[i] = &poolEntry{
conn: nil,
busy: false,
}
}
RunLoop:
for {
// fmt.Println("----------------------- run loop ------------------------")
select {
case <-c.chanDrainPool:
// fmt.Println("<-c.chanDrainPool")
for _, waitPoolEntry := range waitPool {
waitPoolEntry.chanConn <- nil
}
break RunLoop
case <-time.After(waitTimeout):
// fmt.Println("tick", len(connectionPool), len(waitPool))
// for i, poolEntry := range connectionPool {
// fmt.Println(i, poolEntry)
// }
// for i, waitPoolEntry := range waitPool {
// fmt.Println(i, waitPoolEntry)
// }
case chanReturnNextConn := <-c.chanConnGet:
// fmt.Println("chanReturnNextConn := <-c.chanConnGet:")
nextI := 0
for i := range waitPool {
if i >= nextI {
nextI = i + 1
}
}
waitPool[nextI] = &waitPoolEntry{
chanConn: chanReturnNextConn,
entryTime: time.Now(),
}
// fmt.Println("sbdy wants a new conn", nextI)
case connReturn := <-c.chanConnReturn:
// fmt.Println("connReturn := <-c.chanConnReturn:")
for _, poolEntry := range connectionPool {
if connReturn.conn == poolEntry.conn {
poolEntry.busy = false
if connReturn.err != nil {
poolEntry.err = connReturn.err
poolEntry.conn.Close()
poolEntry.conn = nil
}
}
}
}
// refill connection pool
for _, poolEntry := range connectionPool {
if poolEntry.conn == nil {
newConn, errDial := net.Dial("tcp", c.server)
poolEntry.err = errDial
poolEntry.conn = newConn
}
}
// redistribute available connections
for _, poolEntry := range connectionPool {
if len(waitPool) == 0 {
break
}
if poolEntry.err == nil && poolEntry.conn != nil && !poolEntry.busy {
for i, waitPoolEntry := range waitPool {
// fmt.Println("---------------------------> serving wait pool", i, waitPoolEntry)
poolEntry.busy = true
delete(waitPool, i)
waitPoolEntry.chanConn <- poolEntry.conn
break
}
}
}
// waitpool cleanup
waitPoolLoosers := []int{}
now := time.Now()
for i, waitPoolEntry := range waitPool {
if now.Sub(waitPoolEntry.entryTime) > waitTimeout {
waitPoolLoosers = append(waitPoolLoosers, i)
waitPoolEntry.chanConn <- nil
}
}
for _, i := range waitPoolLoosers {
delete(waitPool, i)
}
}
c.chanDrainPool = nil
c.chanConnReturn = nil
c.chanConnGet = nil
//fmt.Println("runloop is done", waitPool)
}