Compare commits

...

10 Commits

Author SHA1 Message Date
Kristoffer Dalby 1b5bf82ff7
correct metrics
Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
2024-05-04 10:10:58 +02:00
Kristoffer Dalby 666eb918a9
count mapresps ended
Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
2024-05-04 10:05:49 +02:00
Kristoffer Dalby c5f42b019f
count mapresps ended
Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
2024-05-04 10:05:26 +02:00
Kristoffer Dalby 9152076dee
add timeout to sendall
Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
2024-05-04 09:59:20 +02:00
Kristoffer Dalby cb42bf3084
add metric and count to String method
Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
2024-05-04 09:42:56 +02:00
Kristoffer Dalby a11ac59919
add gauge metrics for current pending updates
Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
2024-05-04 09:40:24 +02:00
Kristoffer Dalby 48a543446e
add gauge metrics for waiting for notifier batcher lock
Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
2024-05-04 09:34:31 +02:00
Kristoffer Dalby 4021368772
add gauge metrics for waiting for notifier lock
Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
2024-05-04 09:30:57 +02:00
Kristoffer Dalby a965fd0c9a
try to close on existing connection, also reject
Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
2024-05-04 09:20:14 +02:00
Kristoffer Dalby 622aa82da2
ensure expire routines are cleaned up (#1924)
Signed-off-by: Kristoffer Dalby <kristoffer@tailscale.com>
2024-05-02 15:57:53 +00:00
6 changed files with 160 additions and 97 deletions

View File

@ -70,7 +70,7 @@ var (
const (
AuthPrefix = "Bearer "
updateInterval = 5000
updateInterval = 5 * time.Second
privateKeyFileMode = 0o600
headscaleDirPerm = 0o700
@ -219,64 +219,75 @@ func (h *Headscale) redirect(w http.ResponseWriter, req *http.Request) {
// deleteExpireEphemeralNodes deletes ephemeral node records that have not been
// seen for longer than h.cfg.EphemeralNodeInactivityTimeout.
func (h *Headscale) deleteExpireEphemeralNodes(milliSeconds int64) {
ticker := time.NewTicker(time.Duration(milliSeconds) * time.Millisecond)
func (h *Headscale) deleteExpireEphemeralNodes(ctx context.Context, every time.Duration) {
ticker := time.NewTicker(every)
for range ticker.C {
var removed []types.NodeID
var changed []types.NodeID
if err := h.db.Write(func(tx *gorm.DB) error {
removed, changed = db.DeleteExpiredEphemeralNodes(tx, h.cfg.EphemeralNodeInactivityTimeout)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
var removed []types.NodeID
var changed []types.NodeID
if err := h.db.Write(func(tx *gorm.DB) error {
removed, changed = db.DeleteExpiredEphemeralNodes(tx, h.cfg.EphemeralNodeInactivityTimeout)
return nil
}); err != nil {
log.Error().Err(err).Msg("database error while expiring ephemeral nodes")
continue
}
return nil
}); err != nil {
log.Error().Err(err).Msg("database error while expiring ephemeral nodes")
continue
}
if removed != nil {
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerRemoved,
Removed: removed,
})
}
if removed != nil {
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerRemoved,
Removed: removed,
})
}
if changed != nil {
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerChanged,
ChangeNodes: changed,
})
if changed != nil {
ctx := types.NotifyCtx(context.Background(), "expire-ephemeral", "na")
h.nodeNotifier.NotifyAll(ctx, types.StateUpdate{
Type: types.StatePeerChanged,
ChangeNodes: changed,
})
}
}
}
}
// expireExpiredMachines expires nodes that have an explicit expiry set
// expireExpiredNodes expires nodes that have an explicit expiry set
// after that expiry time has passed.
func (h *Headscale) expireExpiredMachines(intervalMs int64) {
interval := time.Duration(intervalMs) * time.Millisecond
ticker := time.NewTicker(interval)
func (h *Headscale) expireExpiredNodes(ctx context.Context, every time.Duration) {
ticker := time.NewTicker(every)
lastCheck := time.Unix(0, 0)
var update types.StateUpdate
var changed bool
for range ticker.C {
if err := h.db.Write(func(tx *gorm.DB) error {
lastCheck, update, changed = db.ExpireExpiredNodes(tx, lastCheck)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
if err := h.db.Write(func(tx *gorm.DB) error {
lastCheck, update, changed = db.ExpireExpiredNodes(tx, lastCheck)
return nil
}); err != nil {
log.Error().Err(err).Msg("database error while expiring nodes")
continue
}
return nil
}); err != nil {
log.Error().Err(err).Msg("database error while expiring nodes")
continue
}
if changed {
log.Trace().Interface("nodes", update.ChangePatches).Msgf("expiring nodes")
if changed {
log.Trace().Interface("nodes", update.ChangePatches).Msgf("expiring nodes")
ctx := types.NotifyCtx(context.Background(), "expire-expired", "na")
h.nodeNotifier.NotifyAll(ctx, update)
ctx := types.NotifyCtx(context.Background(), "expire-expired", "na")
h.nodeNotifier.NotifyAll(ctx, update)
}
}
}
}
@ -538,10 +549,13 @@ func (h *Headscale) Serve() error {
return errEmptyInitialDERPMap
}
// TODO(kradalby): These should have cancel channels and be cleaned
// up on shutdown.
go h.deleteExpireEphemeralNodes(updateInterval)
go h.expireExpiredMachines(updateInterval)
expireEphemeralCtx, expireEphemeralCancel := context.WithCancel(context.Background())
defer expireEphemeralCancel()
go h.deleteExpireEphemeralNodes(expireEphemeralCtx, updateInterval)
expireNodeCtx, expireNodeCancel := context.WithCancel(context.Background())
defer expireNodeCancel()
go h.expireExpiredNodes(expireNodeCtx, updateInterval)
if zl.GlobalLevel() == zl.TraceLevel {
zerolog.RespLog = true
@ -805,6 +819,9 @@ func (h *Headscale) Serve() error {
Str("signal", sig.String()).
Msg("Received signal to stop, shutting down gracefully")
expireNodeCancel()
expireEphemeralCancel()
trace("closing map sessions")
wg := sync.WaitGroup{}
for _, mapSess := range h.mapSessions {

View File

@ -47,6 +47,11 @@ var (
Name: "mapresponse_rejected_new_sessions_total",
Help: "total count of new mapsessions rejected",
}, []string{"reason"})
mapResponseEnded = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: prometheusNamespace,
Name: "mapresponse_ended_total",
Help: "total count of new mapsessions ended",
}, []string{"reason"})
httpDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: prometheusNamespace,
Name: "http_duration_seconds",

View File

@ -241,7 +241,7 @@ func (ns *noiseServer) NoisePollNetMapHandler(
sess.tracef("aquiring lock to check stream")
ns.headscale.mapSessionMu.Lock()
if _, ok := ns.headscale.mapSessions[node.ID]; ok {
if oldSession, ok := ns.headscale.mapSessions[node.ID]; ok {
// NOTE/TODO(kradalby): From how I understand the protocol, when
// a client connects with stream=true, and already has a streaming
// connection open, the correct way is to close the current channel
@ -266,7 +266,12 @@ func (ns *noiseServer) NoisePollNetMapHandler(
defer ns.headscale.mapSessionMu.Unlock()
sess.infof("node has an open stream(%p), rejecting new stream", sess)
go func() {
oldSession.infof("mapSession (%p) is open, trying to close stream and replace with %p", oldSession, sess)
oldSession.close()
}()
sess.infof("mapSession (%p) has an open stream, rejecting new stream", sess)
mapResponseRejected.WithLabelValues("exists").Inc()
return
}

View File

@ -8,6 +8,11 @@ import (
const prometheusNamespace = "headscale"
var (
notifierWaitersForLock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: prometheusNamespace,
Name: "notifier_waiters_for_lock",
Help: "gauge of waiters for the notifier lock",
}, []string{"type", "action"})
notifierWaitForLock = promauto.NewHistogramVec(prometheus.HistogramOpts{
Namespace: prometheusNamespace,
Name: "notifier_wait_for_lock_seconds",
@ -29,4 +34,19 @@ var (
Name: "notifier_open_channels_total",
Help: "total count open channels in notifier",
})
notifierBatcherWaitersForLock = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: prometheusNamespace,
Name: "notifier_batcher_waiters_for_lock",
Help: "gauge of waiters for the notifier batcher lock",
}, []string{"type", "action"})
notifierBatcherChanges = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: prometheusNamespace,
Name: "notifier_batcher_changes_pending",
Help: "gauge of full changes pending in the notifier batcher",
}, []string{})
notifierBatcherPatches = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: prometheusNamespace,
Name: "notifier_batcher_patches_pending",
Help: "gauge of patches pending in the notifier batcher",
}, []string{})
)

View File

@ -40,15 +40,11 @@ func (n *Notifier) Close() {
}
func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) {
log.Trace().Caller().Uint64("node.id", nodeID.Uint64()).Msg("acquiring lock to add node")
defer log.Trace().
Caller().
Uint64("node.id", nodeID.Uint64()).
Msg("releasing lock to add node")
start := time.Now()
notifierWaitersForLock.WithLabelValues("lock", "add").Inc()
n.l.Lock()
defer n.l.Unlock()
notifierWaitersForLock.WithLabelValues("lock", "add").Dec()
notifierWaitForLock.WithLabelValues("add").Observe(time.Since(start).Seconds())
n.nodes[nodeID] = c
@ -62,15 +58,11 @@ func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) {
}
func (n *Notifier) RemoveNode(nodeID types.NodeID) {
log.Trace().Caller().Uint64("node.id", nodeID.Uint64()).Msg("acquiring lock to remove node")
defer log.Trace().
Caller().
Uint64("node.id", nodeID.Uint64()).
Msg("releasing lock to remove node")
start := time.Now()
notifierWaitersForLock.WithLabelValues("lock", "remove").Inc()
n.l.Lock()
defer n.l.Unlock()
notifierWaitersForLock.WithLabelValues("lock", "remove").Dec()
notifierWaitForLock.WithLabelValues("remove").Observe(time.Since(start).Seconds())
if len(n.nodes) == 0 {
@ -90,8 +82,10 @@ func (n *Notifier) RemoveNode(nodeID types.NodeID) {
// IsConnected reports if a node is connected to headscale and has a
// poll session open.
func (n *Notifier) IsConnected(nodeID types.NodeID) bool {
notifierWaitersForLock.WithLabelValues("rlock", "conncheck").Inc()
n.l.RLock()
defer n.l.RUnlock()
notifierWaitersForLock.WithLabelValues("rlock", "conncheck").Dec()
if val, ok := n.connected.Load(nodeID); ok {
return val
@ -130,15 +124,11 @@ func (n *Notifier) NotifyByNodeID(
update types.StateUpdate,
nodeID types.NodeID,
) {
log.Trace().Caller().Str("type", update.Type.String()).Msg("acquiring lock to notify")
defer log.Trace().
Caller().
Str("type", update.Type.String()).
Msg("releasing lock, finished notifying")
start := time.Now()
notifierWaitersForLock.WithLabelValues("rlock", "notify").Inc()
n.l.RLock()
defer n.l.RUnlock()
notifierWaitersForLock.WithLabelValues("rlock", "notify").Dec()
notifierWaitForLock.WithLabelValues("notify").Observe(time.Since(start).Seconds())
if c, ok := n.nodes[nodeID]; ok {
@ -166,29 +156,45 @@ func (n *Notifier) NotifyByNodeID(
func (n *Notifier) sendAll(update types.StateUpdate) {
start := time.Now()
notifierWaitersForLock.WithLabelValues("rlock", "send-all").Inc()
n.l.RLock()
defer n.l.RUnlock()
notifierWaitersForLock.WithLabelValues("rlock", "send-all").Dec()
notifierWaitForLock.WithLabelValues("send-all").Observe(time.Since(start).Seconds())
for _, c := range n.nodes {
c <- update
notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc()
for id, c := range n.nodes {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
select {
case <-ctx.Done():
log.Error().
Err(ctx.Err()).
Uint64("node.id", id.Uint64()).
Msgf("update not sent, context cancelled")
notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all").Inc()
return
case c <- update:
notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc()
}
}
}
func (n *Notifier) String() string {
notifierWaitersForLock.WithLabelValues("rlock", "string").Inc()
n.l.RLock()
defer n.l.RUnlock()
notifierWaitersForLock.WithLabelValues("rlock", "string").Dec()
var b strings.Builder
b.WriteString("chans:\n")
fmt.Fprintf(&b, "chans (%d):\n", len(n.nodes))
for k, v := range n.nodes {
fmt.Fprintf(&b, "\t%d: %p\n", k, v)
}
b.WriteString("\n")
b.WriteString("connected:\n")
fmt.Fprintf(&b, "connected (%d):\n", len(n.nodes))
n.connected.Range(func(k types.NodeID, v bool) bool {
fmt.Fprintf(&b, "\t%d: %t\n", k, v)
@ -230,13 +236,16 @@ func (b *batcher) close() {
// addOrPassthrough adds the update to the batcher, if it is not a
// type that is currently batched, it will be sent immediately.
func (b *batcher) addOrPassthrough(update types.StateUpdate) {
notifierBatcherWaitersForLock.WithLabelValues("lock", "add").Inc()
b.mu.Lock()
defer b.mu.Unlock()
notifierBatcherWaitersForLock.WithLabelValues("lock", "add").Dec()
switch update.Type {
case types.StatePeerChanged:
b.changedNodeIDs.Add(update.ChangeNodes...)
b.nodesChanged = true
notifierBatcherChanges.WithLabelValues().Set(float64(b.changedNodeIDs.Len()))
case types.StatePeerChangedPatch:
for _, newPatch := range update.ChangePatches {
@ -248,6 +257,7 @@ func (b *batcher) addOrPassthrough(update types.StateUpdate) {
}
}
b.patchesChanged = true
notifierBatcherPatches.WithLabelValues().Set(float64(len(b.patches)))
default:
b.n.sendAll(update)
@ -257,8 +267,10 @@ func (b *batcher) addOrPassthrough(update types.StateUpdate) {
// flush sends all the accumulated patches to all
// nodes in the notifier.
func (b *batcher) flush() {
notifierBatcherWaitersForLock.WithLabelValues("lock", "flush").Inc()
b.mu.Lock()
defer b.mu.Unlock()
notifierBatcherWaitersForLock.WithLabelValues("lock", "flush").Dec()
if b.nodesChanged || b.patchesChanged {
var patches []*tailcfg.PeerChange
@ -296,8 +308,10 @@ func (b *batcher) flush() {
}
b.changedNodeIDs = set.Slice[types.NodeID]{}
notifierBatcherChanges.WithLabelValues().Set(0)
b.nodesChanged = false
b.patches = make(map[types.NodeID]tailcfg.PeerChange, len(b.patches))
notifierBatcherPatches.WithLabelValues().Set(0)
b.patchesChanged = false
}
}

View File

@ -41,11 +41,11 @@ type mapSession struct {
capVer tailcfg.CapabilityVersion
mapper *mapper.Mapper
serving bool
servingMu sync.Mutex
cancelChMu sync.Mutex
ch chan types.StateUpdate
cancelCh chan struct{}
ch chan types.StateUpdate
cancelCh chan struct{}
cancelChOpen bool
keepAliveTicker *time.Ticker
@ -86,11 +86,9 @@ func (h *Headscale) newMapSession(
capVer: req.Version,
mapper: h.mapper,
// serving indicates if a client is being served.
serving: false,
ch: updateChan,
cancelCh: make(chan struct{}),
ch: updateChan,
cancelCh: make(chan struct{}),
cancelChOpen: true,
keepAliveTicker: time.NewTicker(keepAliveInterval + (time.Duration(rand.IntN(9000)) * time.Millisecond)),
@ -103,15 +101,20 @@ func (h *Headscale) newMapSession(
}
func (m *mapSession) close() {
m.servingMu.Lock()
defer m.servingMu.Unlock()
if !m.serving {
m.cancelChMu.Lock()
defer m.cancelChMu.Unlock()
if !m.cancelChOpen {
return
}
m.tracef("mapSession (%p) sending message on cancel chan")
m.cancelCh <- struct{}{}
m.tracef("mapSession (%p) sent message on cancel chan")
m.tracef("mapSession (%p) sending message on cancel chan", m)
select {
case m.cancelCh <- struct{}{}:
m.tracef("mapSession (%p) sent message on cancel chan", m)
case <-time.After(30 * time.Second):
m.tracef("mapSession (%p) timed out sending close message", m)
}
}
func (m *mapSession) isStreaming() bool {
@ -145,15 +148,13 @@ func (m *mapSession) serve() {
defer m.h.nodeNotifier.RemoveNode(m.node.ID)
defer func() {
m.servingMu.Lock()
defer m.servingMu.Unlock()
m.cancelChMu.Lock()
defer m.cancelChMu.Unlock()
m.serving = false
m.cancelChOpen = false
close(m.cancelCh)
}()
m.serving = true
m.h.nodeNotifier.AddNode(m.node.ID, m.ch)
m.h.updateNodeOnlineStatus(true, m.node)
@ -231,12 +232,15 @@ func (m *mapSession) serve() {
select {
case <-m.cancelCh:
m.tracef("poll cancelled received")
return
case <-ctx.Done():
m.tracef("poll context done")
mapResponseEnded.WithLabelValues("cancelled").Inc()
return
// Consume all updates sent to node
case <-ctx.Done():
m.tracef("poll context done")
mapResponseEnded.WithLabelValues("done").Inc()
return
// Consume updates sent to node
case update := <-m.ch:
m.tracef("received stream update: %s %s", update.Type.String(), update.Message)
mapResponseUpdateReceived.WithLabelValues(update.Type.String()).Inc()
@ -303,8 +307,6 @@ func (m *mapSession) serve() {
return
}
// log.Trace().Str("node", m.node.Hostname).TimeDiff("timeSpent", time.Now(), startMapResp).Str("mkey", m.node.MachineKey.String()).Int("type", int(update.Type)).Msg("finished making map response")
// Only send update if there is change
if data != nil {
startWrite := time.Now()