Webhook targets refactor and bug fixes (#19275)

- old version was unable to retain messages during config reload
- old version could not go from memory to disk during reload
- new version can batch disk queue entries to single for to reduce I/O load
- error logging has been improved, previous version would miss certain errors.
- logic for spawning/despawning additional workers has been adjusted to trigger when half capacity is reached, instead of when the log queue becomes full.
- old version would json marshall x2 and unmarshal 1x for every log item. Now we only do marshal x1 and then we GetRaw from the store and send it without having to re-marshal.
This commit is contained in:
Sveinn 2024-03-25 16:44:20 +00:00 committed by GitHub
parent 15b930be1f
commit 1fc4203c19
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 440 additions and 237 deletions

View File

@ -620,13 +620,13 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
userAgent := getUserAgent(getMinioMode())
for n, l := range loggerCfg.HTTP {
if l.Enabled {
l.LogOnce = logger.LogOnceConsoleIf
l.LogOnceIf = logger.LogOnceConsoleIf
l.UserAgent = userAgent
l.Transport = NewHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
}
loggerCfg.HTTP[n] = l
}
if errs := logger.UpdateSystemTargets(ctx, loggerCfg); len(errs) > 0 {
if errs := logger.UpdateHTTPWebhooks(ctx, loggerCfg.HTTP); len(errs) > 0 {
logger.LogIf(ctx, fmt.Errorf("Unable to update logger webhook config: %v", errs))
}
case config.AuditWebhookSubSys:
@ -637,14 +637,14 @@ func applyDynamicConfigForSubSys(ctx context.Context, objAPI ObjectLayer, s conf
userAgent := getUserAgent(getMinioMode())
for n, l := range loggerCfg.AuditWebhook {
if l.Enabled {
l.LogOnce = logger.LogOnceConsoleIf
l.LogOnceIf = logger.LogOnceConsoleIf
l.UserAgent = userAgent
l.Transport = NewHTTPTransportWithClientCerts(l.ClientCert, l.ClientKey)
}
loggerCfg.AuditWebhook[n] = l
}
if errs := logger.UpdateAuditWebhookTargets(ctx, loggerCfg); len(errs) > 0 {
if errs := logger.UpdateAuditWebhooks(ctx, loggerCfg.AuditWebhook); len(errs) > 0 {
logger.LogIf(ctx, fmt.Errorf("Unable to update audit webhook targets: %v", errs))
}
case config.AuditKafkaSubSys:

View File

@ -20,11 +20,8 @@ package http
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"math"
"math/rand"
"net/http"
"net/url"
"os"
@ -63,6 +60,11 @@ const (
statusClosed
)
var (
logChBuffers = make(map[string]chan interface{})
logChLock = sync.Mutex{}
)
// Config http logger target
type Config struct {
Enabled bool `json:"enabled"`
@ -79,7 +81,7 @@ type Config struct {
Transport http.RoundTripper `json:"-"`
// Custom logger
LogOnce func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"`
LogOnceIf func(ctx context.Context, err error, id string, errKind ...interface{}) `json:"-"`
}
// Target implements logger.Target and sends the json
@ -93,9 +95,10 @@ type Target struct {
status int32
// Worker control
workers int64
workerStartMu sync.Mutex
lastStarted time.Time
workers int64
maxWorkers int64
// workerStartMu sync.Mutex
lastStarted time.Time
wg sync.WaitGroup
@ -105,23 +108,29 @@ type Target struct {
logCh chan interface{}
logChMu sync.RWMutex
// If this webhook is being re-configured we will
// assign the new webhook target to this field.
// The Send() method will then re-direct entries
// to the new target when the current one
// has been set to status "statusClosed".
// Once the glogal target slice has been migrated
// the current target will stop receiving entries.
migrateTarget *Target
// Number of events per HTTP send to webhook target
// this is ideally useful only if your endpoint can
// support reading multiple events on a stream for example
// like : Splunk HTTP Event collector, if you are unsure
// set this to '1'.
batchSize int
// If the first init fails, this starts a goroutine that
// will attempt to establish the connection.
revive sync.Once
batchSize int
payloadType string
// store to persist and replay the logs to the target
// to avoid missing events when the target is down.
store store.Store[interface{}]
storeCtxCancel context.CancelFunc
initQueueStoreOnce once.Init
initQueueOnce once.Init
config Config
client *http.Client
@ -132,6 +141,11 @@ func (h *Target) Name() string {
return "minio-http-" + h.config.Name
}
// Type - returns type of the target
func (h *Target) Type() types.TargetType {
return types.TargetHTTP
}
// Endpoint returns the backend endpoint
func (h *Target) Endpoint() string {
return h.config.Endpoint.String()
@ -146,19 +160,6 @@ func (h *Target) IsOnline(ctx context.Context) bool {
return atomic.LoadInt32(&h.status) == statusOnline
}
// ping returns true if the target is reachable.
func (h *Target) ping(ctx context.Context) bool {
if err := h.send(ctx, []byte(`{}`), "application/json", webhookCallTimeout); err != nil {
return !xnet.IsNetworkOrHostDown(err, false) && !xnet.IsConnRefusedErr(err)
}
// We are online.
h.workerStartMu.Lock()
h.lastStarted = time.Now()
h.workerStartMu.Unlock()
go h.startHTTPLogger(ctx)
return true
}
// Stats returns the target statistics.
func (h *Target) Stats() types.TargetStats {
h.logChMu.RLock()
@ -173,56 +174,34 @@ func (h *Target) Stats() types.TargetStats {
return stats
}
// AssignMigrateTarget assigns a target
// which will eventually replace the current target.
func (h *Target) AssignMigrateTarget(migrateTgt *Target) {
h.migrateTarget = migrateTgt
}
// Init validate and initialize the http target
func (h *Target) Init(ctx context.Context) (err error) {
if h.config.QueueDir != "" {
return h.initQueueStoreOnce.DoWithContext(ctx, h.initQueueStore)
return h.initQueueOnce.DoWithContext(ctx, h.initDiskStore)
}
return h.init(ctx)
return h.initQueueOnce.DoWithContext(ctx, h.initMemoryStore)
}
func (h *Target) initQueueStore(ctx context.Context) (err error) {
var queueStore store.Store[interface{}]
queueDir := filepath.Join(h.config.QueueDir, h.Name())
queueStore = store.NewQueueStore[interface{}](queueDir, uint64(h.config.QueueSize), httpLoggerExtension)
if err = queueStore.Open(); err != nil {
return fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err)
}
func (h *Target) initDiskStore(ctx context.Context) (err error) {
ctx, cancel := context.WithCancel(ctx)
h.store = queueStore
h.storeCtxCancel = cancel
store.StreamItems(h.store, h, ctx.Done(), h.config.LogOnce)
return
h.lastStarted = time.Now()
go h.startQueueProcessor(ctx, true)
store.StreamItems(h.store, h, ctx.Done(), h.config.LogOnceIf)
return nil
}
func (h *Target) init(ctx context.Context) (err error) {
switch atomic.LoadInt32(&h.status) {
case statusOnline:
return nil
case statusClosed:
return errors.New("target is closed")
}
if !h.ping(ctx) {
// Start a goroutine that will continue to check if we can reach
h.revive.Do(func() {
go func() {
// Avoid stamping herd, add jitter.
t := time.NewTicker(time.Second + time.Duration(rand.Int63n(int64(5*time.Second))))
defer t.Stop()
for range t.C {
if atomic.LoadInt32(&h.status) != statusOffline {
return
}
if h.ping(ctx) {
return
}
}
}()
})
return err
}
func (h *Target) initMemoryStore(ctx context.Context) (err error) {
ctx, cancel := context.WithCancel(ctx)
h.storeCtxCancel = cancel
h.lastStarted = time.Now()
go h.startQueueProcessor(ctx, true)
return nil
}
@ -275,90 +254,244 @@ func (h *Target) send(ctx context.Context, payload []byte, payloadType string, t
}
}
func (h *Target) logEntry(ctx context.Context, payload []byte, payloadType string) {
const maxTries = 3
tries := 0
for tries < maxTries {
if atomic.LoadInt32(&h.status) == statusClosed {
// Don't retry when closing...
return
}
// sleep = (tries+2) ^ 2 milliseconds.
sleep := time.Duration(math.Pow(float64(tries+2), 2)) * time.Millisecond
if sleep > time.Second {
sleep = time.Second
}
time.Sleep(sleep)
tries++
err := h.send(ctx, payload, payloadType, webhookCallTimeout)
if err == nil {
return
}
h.config.LogOnce(ctx, err, h.Endpoint())
func (h *Target) startQueueProcessor(ctx context.Context, mainWorker bool) {
h.logChMu.RLock()
if h.logCh == nil {
h.logChMu.RUnlock()
return
}
if tries == maxTries {
// Even with multiple retries, count failed messages as only one.
atomic.AddInt64(&h.failedMessages, 1)
}
}
h.logChMu.RUnlock()
func (h *Target) startHTTPLogger(ctx context.Context) {
atomic.AddInt64(&h.workers, 1)
defer atomic.AddInt64(&h.workers, -1)
h.logChMu.RLock()
logCh := h.logCh
if logCh != nil {
// We are not allowed to add when logCh is nil
h.wg.Add(1)
defer h.wg.Done()
}
h.logChMu.RUnlock()
if logCh == nil {
return
}
h.wg.Add(1)
defer h.wg.Done()
entries := make([]interface{}, 0)
name := h.Name()
defer func() {
// re-load the global buffer pointer
// in case it was modified by a new target.
logChLock.Lock()
currentGlobalBuffer, ok := logChBuffers[name]
logChLock.Unlock()
if !ok {
return
}
for _, v := range entries {
select {
case currentGlobalBuffer <- v:
default:
}
}
if mainWorker {
drain:
for {
select {
case v, ok := <-h.logCh:
if !ok {
break drain
}
currentGlobalBuffer <- v
default:
break drain
}
}
}
}()
var entry interface{}
var ok bool
var err error
lastBatchProcess := time.Now()
buf := bytebufferpool.Get()
enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf)
defer bytebufferpool.Put(buf)
json := jsoniter.ConfigCompatibleWithStandardLibrary
enc := json.NewEncoder(buf)
batchSize := h.batchSize
if batchSize <= 0 {
batchSize = 1
isDirQueue := false
if h.config.QueueDir != "" {
isDirQueue = true
}
payloadType := "application/json"
if batchSize > 1 {
payloadType = ""
}
// globalBuffer is always created or adjusted
// before this method is launched.
logChLock.Lock()
globalBuffer := logChBuffers[name]
logChLock.Unlock()
var nevents int
// Send messages until channel is closed.
for entry := range logCh {
atomic.AddInt64(&h.totalMessages, 1)
nevents++
if err := enc.Encode(&entry); err != nil {
atomic.AddInt64(&h.failedMessages, 1)
nevents--
continue
newTicker := time.NewTicker(time.Second)
isTick := false
for {
isTick = false
select {
case _ = <-newTicker.C:
isTick = true
case entry, _ = <-globalBuffer:
case entry, ok = <-h.logCh:
if !ok {
return
}
case <-ctx.Done():
return
}
if (nevents == batchSize || len(logCh) == 0) && buf.Len() > 0 {
h.logEntry(ctx, buf.Bytes(), payloadType)
if !isTick {
atomic.AddInt64(&h.totalMessages, 1)
if !isDirQueue {
if err := enc.Encode(&entry); err != nil {
h.config.LogOnceIf(
ctx,
fmt.Errorf("unable to encode webhook log entry, err '%w' entry: %v\n", err, entry),
h.Name(),
)
atomic.AddInt64(&h.failedMessages, 1)
continue
}
}
entries = append(entries, entry)
}
if len(entries) != h.batchSize {
if len(h.logCh) > 0 || len(globalBuffer) > 0 || len(entries) == 0 {
continue
}
if h.batchSize > 1 {
// If we are doing batching, we should wait
// at least one second before sending.
// Even if there is nothing in the queue.
if time.Since(lastBatchProcess).Seconds() < 1 {
continue
}
}
}
lastBatchProcess = time.Now()
retry:
// If the channel reaches above half capacity
// we spawn more workers. The workers spawned
// from this main worker routine will exit
// once the channel drops below half capacity
// and when it's been at least 30 seconds since
// we launched a new worker.
if mainWorker && len(h.logCh) > cap(h.logCh)/2 {
nWorkers := atomic.LoadInt64(&h.workers)
if nWorkers < h.maxWorkers {
if time.Since(h.lastStarted).Milliseconds() > 10 {
h.lastStarted = time.Now()
go h.startQueueProcessor(ctx, false)
}
}
}
if !isDirQueue {
err = h.send(ctx, buf.Bytes(), h.payloadType, webhookCallTimeout)
} else {
err = h.store.PutMultiple(entries)
}
if err != nil {
h.config.LogOnceIf(
context.Background(),
fmt.Errorf("unable to send webhook log entry to '%s' err '%w'", name, err),
name,
)
if errors.Is(err, context.Canceled) {
return
}
time.Sleep(3 * time.Second)
goto retry
}
entries = make([]interface{}, 0)
if !isDirQueue {
buf.Reset()
nevents = 0
}
if !mainWorker && len(h.logCh) < cap(h.logCh)/2 {
if time.Since(h.lastStarted).Seconds() > 30 {
return
}
}
}
}
// CreateOrAdjustGlobalBuffer will create or adjust the global log entry buffers
// which are used to migrate log entries between old and new targets.
func CreateOrAdjustGlobalBuffer(currentTgt *Target, newTgt *Target) {
logChLock.Lock()
defer logChLock.Unlock()
requiredCap := currentTgt.config.QueueSize + (currentTgt.config.BatchSize * int(currentTgt.maxWorkers))
currentCap := 0
name := newTgt.Name()
currentBuff, ok := logChBuffers[name]
if !ok {
logChBuffers[name] = make(chan interface{}, requiredCap)
currentCap = requiredCap
} else {
currentCap = cap(currentBuff)
requiredCap += len(currentBuff)
}
if requiredCap > currentCap {
logChBuffers[name] = make(chan interface{}, requiredCap)
if len(currentBuff) > 0 {
drain:
for {
select {
case v, ok := <-currentBuff:
if !ok {
break drain
}
logChBuffers[newTgt.Name()] <- v
default:
break drain
}
}
}
}
}
// New initializes a new logger target which
// sends log over http to the specified endpoint
func New(config Config) *Target {
func New(config Config) (*Target, error) {
maxWorkers := maxWorkers
if config.BatchSize > 100 {
maxWorkers = maxWorkersWithBatchEvents
} else if config.BatchSize <= 0 {
config.BatchSize = 1
}
h := &Target{
logCh: make(chan interface{}, config.QueueSize),
config: config,
status: statusOffline,
batchSize: config.BatchSize,
logCh: make(chan interface{}, config.QueueSize),
config: config,
status: statusOffline,
batchSize: config.BatchSize,
maxWorkers: int64(maxWorkers),
}
if config.BatchSize > 1 {
h.payloadType = ""
} else {
h.payloadType = "application/json"
}
// If proxy available, set the same
@ -369,32 +502,41 @@ func New(config Config) *Target {
ctransport.Proxy = http.ProxyURL(proxyURL)
h.config.Transport = ctransport
}
h.client = &http.Client{Transport: h.config.Transport}
return h
if h.config.QueueDir != "" {
queueStore := store.NewQueueStore[interface{}](
filepath.Join(h.config.QueueDir, h.Name()),
uint64(h.config.QueueSize),
httpLoggerExtension,
)
if err := queueStore.Open(); err != nil {
return h, fmt.Errorf("unable to initialize the queue store of %s webhook: %w", h.Name(), err)
}
h.store = queueStore
}
return h, nil
}
// SendFromStore - reads the log from store and sends it to webhook.
func (h *Target) SendFromStore(key store.Key) (err error) {
var eventData interface{}
eventData, err = h.store.Get(key.Name)
var eventData []byte
eventData, err = h.store.GetRaw(key.Name)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
atomic.AddInt64(&h.totalMessages, 1)
logJSON, err := json.Marshal(&eventData)
if err != nil {
if err := h.send(context.Background(), eventData, h.payloadType, webhookCallTimeout); err != nil {
atomic.AddInt64(&h.failedMessages, 1)
return
}
if err := h.send(context.Background(), logJSON, "application/json", webhookCallTimeout); err != nil {
atomic.AddInt64(&h.failedMessages, 1)
if xnet.IsNetworkOrHostDown(err, true) {
return store.ErrNotConnected
}
return err
}
// Delete the event from store.
@ -406,12 +548,12 @@ func (h *Target) SendFromStore(key store.Key) (err error) {
// If Cancel has been called the message is ignored.
func (h *Target) Send(ctx context.Context, entry interface{}) error {
if atomic.LoadInt32(&h.status) == statusClosed {
if h.migrateTarget != nil {
return h.migrateTarget.Send(ctx, entry)
}
return nil
}
if h.store != nil {
// save the entry to the queue store which will be replayed to the target.
return h.store.Put(entry)
}
h.logChMu.RLock()
defer h.logChMu.RUnlock()
if h.logCh == nil {
@ -419,11 +561,6 @@ func (h *Target) Send(ctx context.Context, entry interface{}) error {
return nil
}
mworkers := maxWorkers
if h.batchSize > 100 {
mworkers = maxWorkersWithBatchEvents
}
retry:
select {
case h.logCh <- entry:
@ -435,16 +572,7 @@ retry:
}
return nil
default:
nWorkers := atomic.LoadInt64(&h.workers)
if nWorkers < int64(mworkers) {
// Only have one try to start at the same time.
h.workerStartMu.Lock()
if time.Since(h.lastStarted) > time.Second {
h.lastStarted = time.Now()
go h.startHTTPLogger(ctx)
}
h.workerStartMu.Unlock()
if h.workers < h.maxWorkers {
goto retry
}
atomic.AddInt64(&h.totalMessages, 1)
@ -460,12 +588,10 @@ retry:
// All messages sent to the target after this function has been called will be dropped.
func (h *Target) Cancel() {
atomic.StoreInt32(&h.status, statusClosed)
h.storeCtxCancel()
// If queuestore is configured, cancel it's context to
// stop the replay go-routine.
if h.store != nil {
h.storeCtxCancel()
}
// Wait for messages to be sent...
h.wg.Wait()
// Set logch to nil and close it.
// This will block all Send operations,
@ -475,12 +601,4 @@ func (h *Target) Cancel() {
xioutil.SafeClose(h.logCh)
h.logCh = nil
h.logChMu.Unlock()
// Wait for messages to be sent...
h.wg.Wait()
}
// Type - returns type of the target
func (h *Target) Type() types.TargetType {
return types.TargetHTTP
}

View File

@ -44,13 +44,18 @@ type Target interface {
}
var (
swapAuditMuRW sync.RWMutex
swapSystemMuRW sync.RWMutex
// systemTargets is the set of enabled loggers.
// Must be immutable at all times.
// Can be swapped to another while holding swapMu
systemTargets = []Target{}
systemTargets = []Target{}
swapSystemMuRW sync.RWMutex
// auditTargets is the list of enabled audit loggers
// Must be immutable at all times.
// Can be swapped to another while holding swapMu
auditTargets = []Target{}
swapAuditMuRW sync.RWMutex
// This is always set represent /dev/console target
consoleTgt Target
@ -103,13 +108,6 @@ func CurrentStats() map[string]types.TargetStats {
return res
}
// auditTargets is the list of enabled audit loggers
// Must be immutable at all times.
// Can be swapped to another while holding swapMu
var (
auditTargets = []Target{}
)
// AddSystemTarget adds a new logger target to the
// list of enabled loggers
func AddSystemTarget(ctx context.Context, t Target) error {
@ -132,23 +130,6 @@ func AddSystemTarget(ctx context.Context, t Target) error {
return nil
}
func initSystemTargets(ctx context.Context, cfgMap map[string]http.Config) ([]Target, []error) {
tgts := []Target{}
errs := []error{}
for _, l := range cfgMap {
if l.Enabled {
t := http.New(l)
tgts = append(tgts, t)
e := t.Init(ctx)
if e != nil {
errs = append(errs, e)
}
}
}
return tgts, errs
}
func initKafkaTargets(ctx context.Context, cfgMap map[string]kafka.Config) ([]Target, []error) {
tgts := []Target{}
errs := []error{}
@ -183,36 +164,67 @@ func splitTargets(targets []Target, t types.TargetType) (group1 []Target, group2
func cancelTargets(targets []Target) {
for _, target := range targets {
target.Cancel()
go target.Cancel()
}
}
// UpdateSystemTargets swaps targets with newly loaded ones from the cfg
func UpdateSystemTargets(ctx context.Context, cfg Config) []error {
newTgts, errs := initSystemTargets(ctx, cfg.HTTP)
swapSystemMuRW.Lock()
consoleTargets, otherTargets := splitTargets(systemTargets, types.TargetConsole)
newTgts = append(newTgts, consoleTargets...)
systemTargets = newTgts
swapSystemMuRW.Unlock()
cancelTargets(otherTargets) // cancel running targets
return errs
// UpdateHTTPWebhooks swaps system webhook targets with newly loaded ones from the cfg
func UpdateHTTPWebhooks(ctx context.Context, cfgs map[string]http.Config) (errs []error) {
return updateHTTPTargets(ctx, cfgs, &systemTargets)
}
// UpdateAuditWebhookTargets swaps audit webhook targets with newly loaded ones from the cfg
func UpdateAuditWebhookTargets(ctx context.Context, cfg Config) []error {
newWebhookTgts, errs := initSystemTargets(ctx, cfg.AuditWebhook)
// UpdateAuditWebhooks swaps audit webhook targets with newly loaded ones from the cfg
func UpdateAuditWebhooks(ctx context.Context, cfgs map[string]http.Config) (errs []error) {
return updateHTTPTargets(ctx, cfgs, &auditTargets)
}
func updateHTTPTargets(ctx context.Context, cfgs map[string]http.Config, targetList *[]Target) (errs []error) {
tgts := make([]*http.Target, 0)
newWebhooks := make([]Target, 0)
for _, cfg := range cfgs {
if cfg.Enabled {
t, err := http.New(cfg)
if err != nil {
errs = append(errs, err)
}
tgts = append(tgts, t)
newWebhooks = append(newWebhooks, t)
}
}
oldTargets := make([]Target, len(*targetList))
copy(oldTargets, *targetList)
for i := range oldTargets {
currentTgt, ok := oldTargets[i].(*http.Target)
if !ok {
continue
}
var newTgt *http.Target
for ii := range tgts {
if currentTgt.Name() == tgts[ii].Name() {
newTgt = tgts[ii]
currentTgt.AssignMigrateTarget(newTgt)
http.CreateOrAdjustGlobalBuffer(currentTgt, newTgt)
break
}
}
}
for _, t := range tgts {
err := t.Init(ctx)
if err != nil {
errs = append(errs, err)
}
}
swapAuditMuRW.Lock()
// Retain kafka targets
oldWebhookTgts, otherTgts := splitTargets(auditTargets, types.TargetHTTP)
newWebhookTgts = append(newWebhookTgts, otherTgts...)
auditTargets = newWebhookTgts
*targetList = newWebhooks
swapAuditMuRW.Unlock()
cancelTargets(oldWebhookTgts) // cancel running targets
cancelTargets(oldTargets)
return errs
}

View File

@ -28,6 +28,8 @@ import (
"time"
"github.com/google/uuid"
jsoniter "github.com/json-iterator/go"
"github.com/valyala/bytebufferpool"
)
const (
@ -84,6 +86,7 @@ func (store *QueueStore[_]) Open() error {
if uint64(len(files)) > store.entryLimit {
files = files[:store.entryLimit]
}
for _, file := range files {
if file.IsDir() {
continue
@ -97,6 +100,54 @@ func (store *QueueStore[_]) Open() error {
return nil
}
// Delete - Remove the store directory from disk
func (store *QueueStore[_]) Delete() error {
return os.Remove(store.directory)
}
// PutMultiple - puts an item to the store.
func (store *QueueStore[I]) PutMultiple(item []I) error {
store.Lock()
defer store.Unlock()
if uint64(len(store.entries)) >= store.entryLimit {
return errLimitExceeded
}
// Generate a new UUID for the key.
key, err := uuid.NewRandom()
if err != nil {
return err
}
return store.multiWrite(key.String(), item)
}
// multiWrite - writes an item to the directory.
func (store *QueueStore[I]) multiWrite(key string, item []I) error {
buf := bytebufferpool.Get()
defer bytebufferpool.Put(buf)
enc := jsoniter.ConfigCompatibleWithStandardLibrary.NewEncoder(buf)
for i := range item {
err := enc.Encode(item[i])
if err != nil {
return err
}
}
b := buf.Bytes()
path := filepath.Join(store.directory, key+store.fileExt)
err := os.WriteFile(path, b, os.FileMode(0o770))
buf.Reset()
if err != nil {
return err
}
// Increment the item count.
store.entries[key] = time.Now().UnixNano()
return nil
}
// write - writes an item to the directory.
func (store *QueueStore[I]) write(key string, item I) error {
// Marshalls the item.
@ -131,6 +182,30 @@ func (store *QueueStore[I]) Put(item I) error {
return store.write(key.String(), item)
}
// GetRaw - gets an item from the store.
func (store *QueueStore[I]) GetRaw(key string) (raw []byte, err error) {
store.RLock()
defer func(store *QueueStore[I]) {
store.RUnlock()
if err != nil {
// Upon error we remove the entry.
store.Del(key)
}
}(store)
raw, err = os.ReadFile(filepath.Join(store.directory, key+store.fileExt))
if err != nil {
return
}
if len(raw) == 0 {
return raw, os.ErrNotExist
}
return
}
// Get - gets an item from the store.
func (store *QueueStore[I]) Get(key string) (item I, err error) {
store.RLock()

View File

@ -25,7 +25,6 @@ import (
"time"
xioutil "github.com/minio/minio/internal/ioutil"
xnet "github.com/minio/pkg/v2/net"
)
const (
@ -46,12 +45,15 @@ type Target interface {
// Store - Used to persist items.
type Store[I any] interface {
Put(item I) error
PutMultiple(item []I) error
Get(key string) (I, error)
GetRaw(key string) ([]byte, error)
Len() int
List() ([]string, error)
Del(key string) error
DelList(key []string) error
Open() error
Delete() error
Extension() string
}
@ -110,15 +112,14 @@ func sendItems(target Target, keyCh <-chan Key, doneCh <-chan struct{}, logger l
break
}
if err != ErrNotConnected && !xnet.IsConnResetErr(err) {
logger(context.Background(),
fmt.Errorf("target.SendFromStore() failed with '%w'", err),
target.Name())
}
// Retrying after 3secs back-off
logger(
context.Background(),
fmt.Errorf("unable to send webhook log entry to '%s' err '%w'", target.Name(), err),
target.Name(),
)
select {
// Retrying after 3secs back-off
case <-retryTicker.C:
case <-doneCh:
return false
@ -131,7 +132,6 @@ func sendItems(target Target, keyCh <-chan Key, doneCh <-chan struct{}, logger l
select {
case key, ok := <-keyCh:
if !ok {
// closed channel.
return
}
@ -147,9 +147,7 @@ func sendItems(target Target, keyCh <-chan Key, doneCh <-chan struct{}, logger l
// StreamItems reads the keys from the store and replays the corresponding item to the target.
func StreamItems[I any](store Store[I], target Target, doneCh <-chan struct{}, logger logger) {
go func() {
// Replays the items from the store.
keyCh := replayItems(store, doneCh, logger, target.Name())
// Send items from the store.
sendItems(target, keyCh, doneCh, logger)
}()
}