mirror of
https://github.com/foomo/sesamy-go.git
synced 2025-10-16 12:35:43 +00:00
feat: switch log level
This commit is contained in:
parent
738ddd94b5
commit
3c143f5d60
@ -182,7 +182,7 @@ func (l *Loki) Stop() {
|
||||
// ------------------------------------------------------------------------------------------------
|
||||
|
||||
func (l *Loki) process(entries []logproto.Entry) {
|
||||
l.l.Info("processing entries batch", zap.Int("num", len(entries)))
|
||||
l.l.Debug("processing entries batch", zap.Int("num", len(entries)))
|
||||
|
||||
labels := model.LabelSet{
|
||||
"name": "events",
|
||||
|
||||
@ -166,13 +166,13 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *gtag.Payloa
|
||||
// wait for ACK
|
||||
select {
|
||||
case <-msg.Acked():
|
||||
l.Info("message acked")
|
||||
l.Debug("message acked")
|
||||
return nil
|
||||
case <-msg.Nacked():
|
||||
l.Info("message nacked")
|
||||
l.Debug("message nacked")
|
||||
return ErrMessageNacked
|
||||
case <-r.Context().Done():
|
||||
l.Info("message cancled")
|
||||
l.Debug("message cancled")
|
||||
return ErrContextCanceled
|
||||
}
|
||||
}
|
||||
|
||||
@ -139,13 +139,13 @@ func (s *Subscriber) handle(l *zap.Logger, r *http.Request, payload *mpv2.Payloa
|
||||
// wait for ACK
|
||||
select {
|
||||
case <-msg.Acked():
|
||||
l.Info("message acked")
|
||||
l.Debug("message acked")
|
||||
return nil
|
||||
case <-msg.Nacked():
|
||||
l.Info("message nacked")
|
||||
l.Debug("message nacked")
|
||||
return ErrMessageNacked
|
||||
case <-r.Context().Done():
|
||||
l.Info("message cancled")
|
||||
l.Debug("message cancled")
|
||||
return ErrContextCanceled
|
||||
}
|
||||
}
|
||||
|
||||
@ -2,14 +2,12 @@ package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Batch reads from a channel and calls fn with a slice of batchSize.
|
||||
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T)) {
|
||||
if batchSize <= 1 { // sanity check,
|
||||
for v := range ch {
|
||||
fmt.Println("<< 1")
|
||||
fn([]T{v})
|
||||
}
|
||||
return
|
||||
@ -21,20 +19,17 @@ func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T))
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
if len(batch) > 0 {
|
||||
fmt.Println("<< 2")
|
||||
fn(batch)
|
||||
}
|
||||
return
|
||||
case v, ok := <-ch:
|
||||
if !ok { // closed
|
||||
fmt.Println("<< 3")
|
||||
fn(batch)
|
||||
return
|
||||
}
|
||||
|
||||
batch = append(batch, v)
|
||||
if len(batch) == batchSize { // full
|
||||
fmt.Println("<< 4")
|
||||
fn(batch)
|
||||
batch = make([]T, 0, batchSize) // reset
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user