mirror of
https://github.com/foomo/sesamy-go.git
synced 2025-10-16 12:35:43 +00:00
266 lines
6.4 KiB
Go
266 lines
6.4 KiB
Go
package loki
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/foomo/sesamy-go/pkg/encoding/mpv2"
|
|
"github.com/foomo/sesamy-go/pkg/utils"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/golang/snappy"
|
|
"github.com/grafana/dskit/backoff"
|
|
"github.com/grafana/loki/pkg/push"
|
|
"github.com/grafana/loki/v3/pkg/logproto"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/common/model"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
pushEndpoint = "/loki/api/v1/push"
|
|
defaultContentType = "application/x-protobuf"
|
|
defaultMaxReponseBufferLen = 1024
|
|
)
|
|
|
|
type (
|
|
Loki struct {
|
|
l *zap.Logger
|
|
endpoint string
|
|
|
|
// channel for incoming logs
|
|
entries chan logproto.Entry
|
|
|
|
// shutdown
|
|
cancel context.CancelFunc
|
|
|
|
// options
|
|
backoff *backoff.Config
|
|
batchSize int
|
|
bufferSize int
|
|
userAgent string
|
|
httpClient *http.Client
|
|
}
|
|
Option func(*Loki)
|
|
)
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
// ~ Options
|
|
// ------------------------------------------------------------------------------------------------
|
|
|
|
func WithHTTPClient(v *http.Client) Option {
|
|
return func(l *Loki) {
|
|
l.httpClient = v
|
|
}
|
|
}
|
|
|
|
func WithBackoffConfig(v *backoff.Config) Option {
|
|
return func(l *Loki) {
|
|
l.backoff = v
|
|
}
|
|
}
|
|
|
|
func WithUserAgent(v string) Option {
|
|
return func(l *Loki) {
|
|
l.userAgent = v
|
|
}
|
|
}
|
|
|
|
func WithBatchSize(v int) Option {
|
|
return func(l *Loki) {
|
|
l.batchSize = v
|
|
}
|
|
}
|
|
|
|
func WithBufferSize(v int) Option {
|
|
return func(l *Loki) {
|
|
l.bufferSize = v
|
|
}
|
|
}
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
// ~ Constructor
|
|
// ------------------------------------------------------------------------------------------------
|
|
|
|
// New creates an instance of `Push` which writes logs directly to given `lokiAddr`
|
|
func New(l *zap.Logger, addr string, opts ...Option) *Loki {
|
|
inst := &Loki{
|
|
l: l,
|
|
endpoint: addr + pushEndpoint,
|
|
httpClient: http.DefaultClient,
|
|
userAgent: "sesamy",
|
|
batchSize: 10,
|
|
bufferSize: 50,
|
|
backoff: &backoff.Config{
|
|
MinBackoff: 500 * time.Millisecond,
|
|
MaxBackoff: 5 * time.Minute,
|
|
MaxRetries: 10,
|
|
},
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
if opt != nil {
|
|
opt(inst)
|
|
}
|
|
}
|
|
|
|
inst.entries = make(chan logproto.Entry, inst.bufferSize) // Use a buffered channel so we can retry failed pushes without blocking WriteEntry
|
|
|
|
return inst
|
|
}
|
|
|
|
// Start pulls lines out of the channel and sends them to Loki
|
|
func (l *Loki) Start(ctx context.Context) {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
l.cancel = cancel
|
|
utils.Batch(ctx, l.entries, l.batchSize, l.process)
|
|
}
|
|
|
|
func (l *Loki) Write(payload mpv2.Payload[any]) {
|
|
for _, event := range payload.Events {
|
|
// sanity check
|
|
if event.Name == "" {
|
|
l.l.Warn("received event without event name")
|
|
continue
|
|
}
|
|
|
|
line := Line{
|
|
Name: event.Name,
|
|
Params: event.Params,
|
|
UserID: payload.UserID,
|
|
Consent: payload.Consent,
|
|
UserData: payload.UserData,
|
|
ClientID: payload.ClientID,
|
|
UserProperties: payload.UserProperties,
|
|
DebugMode: payload.DebugMode,
|
|
SessionID: payload.SessionID,
|
|
EngagementTimeMSec: payload.EngagementTimeMSec,
|
|
}
|
|
|
|
lineBytes, err := line.Marshal()
|
|
if err != nil {
|
|
l.l.Warn("failed to marshal line", zap.Error(err))
|
|
continue
|
|
}
|
|
|
|
if len(l.entries) == l.bufferSize {
|
|
l.l.Warn("buffer size reached", zap.Int("size", l.bufferSize))
|
|
}
|
|
|
|
var timestamp time.Time
|
|
if payload.TimestampMicros > 0 {
|
|
timestamp = time.UnixMicro(payload.TimestampMicros)
|
|
} else {
|
|
timestamp = time.Now()
|
|
}
|
|
|
|
l.entries <- logproto.Entry{
|
|
Line: string(lineBytes),
|
|
Timestamp: timestamp,
|
|
StructuredMetadata: push.LabelsAdapter{
|
|
{
|
|
Name: "event_name",
|
|
Value: event.Name.String(),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop will cancel any ongoing requests and stop the goroutine listening for requests
|
|
func (l *Loki) Stop() {
|
|
if l.cancel != nil {
|
|
l.cancel()
|
|
l.cancel = nil
|
|
}
|
|
}
|
|
|
|
// ------------------------------------------------------------------------------------------------
|
|
// ~ Private methods
|
|
// ------------------------------------------------------------------------------------------------
|
|
|
|
func (l *Loki) process(entries []logproto.Entry) {
|
|
l.l.Debug("processing entries batch", zap.Int("num", len(entries)))
|
|
|
|
labels := model.LabelSet{
|
|
"name": "events",
|
|
"stream": "sesamy",
|
|
}
|
|
request, err := proto.Marshal(&logproto.PushRequest{
|
|
Streams: []logproto.Stream{
|
|
{
|
|
Labels: labels.String(),
|
|
Entries: entries,
|
|
Hash: uint64(labels.Fingerprint()),
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
l.l.Error("failed to marshal payload to json", zap.Error(err))
|
|
return
|
|
}
|
|
|
|
payload := snappy.Encode(nil, request)
|
|
|
|
// We will use a timeout within each attempt to send
|
|
back := backoff.New(context.Background(), *l.backoff)
|
|
|
|
// send log with retry
|
|
for {
|
|
var status int
|
|
status, err = l.send(context.Background(), payload)
|
|
if err == nil {
|
|
break
|
|
}
|
|
|
|
if status > 0 && status != 429 && status/100 != 5 {
|
|
l.l.Error("failed to send entries, server rejected push with a non-retryable status code", zap.Error(err), zap.Int("status", status))
|
|
break
|
|
}
|
|
|
|
if !back.Ongoing() {
|
|
l.l.Error("failed to send entries, retries exhausted, entries will be dropped", zap.Error(err), zap.Int("status", status))
|
|
break
|
|
}
|
|
l.l.Warn("failed to send entries, retrying", zap.Error(err), zap.Int("status", status))
|
|
back.Wait()
|
|
}
|
|
}
|
|
|
|
// send makes one attempt to send the payload to Loki
|
|
func (l *Loki) send(ctx context.Context, payload []byte) (int, error) {
|
|
// Set a timeout for the request
|
|
ctx, cancel := context.WithTimeout(ctx, l.httpClient.Timeout)
|
|
defer cancel()
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, l.endpoint, bytes.NewReader(payload))
|
|
if err != nil {
|
|
return -1, errors.Wrap(err, "failed to create request")
|
|
}
|
|
req.Header.Set("Content-Type", defaultContentType)
|
|
req.Header.Set("User-Agent", l.userAgent)
|
|
|
|
resp, err := l.httpClient.Do(req)
|
|
if err != nil {
|
|
return -1, errors.Wrap(err, "failed to send payload")
|
|
}
|
|
status := resp.StatusCode
|
|
if status/100 != 2 {
|
|
scanner := bufio.NewScanner(io.LimitReader(resp.Body, defaultMaxReponseBufferLen))
|
|
line := ""
|
|
if scanner.Scan() {
|
|
line = scanner.Text()
|
|
}
|
|
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, status, line)
|
|
}
|
|
|
|
if err := resp.Body.Close(); err != nil {
|
|
l.l.Error("failed to close response body", zap.Error(err))
|
|
}
|
|
|
|
return status, err
|
|
}
|