Compare commits
10 Commits
bafb8b561c
...
f11cd5333a
Author | SHA1 | Date |
---|---|---|
Kristoffer Dalby | f11cd5333a | |
Kristoffer Dalby | 1b5bf82ff7 | |
Kristoffer Dalby | 666eb918a9 | |
Kristoffer Dalby | c5f42b019f | |
Kristoffer Dalby | 9152076dee | |
Kristoffer Dalby | cb42bf3084 | |
Kristoffer Dalby | a11ac59919 | |
Kristoffer Dalby | 48a543446e | |
Kristoffer Dalby | 4021368772 | |
Kristoffer Dalby | a965fd0c9a |
|
@ -47,6 +47,11 @@ var (
|
|||
Name: "mapresponse_rejected_new_sessions_total",
|
||||
Help: "total count of new mapsessions rejected",
|
||||
}, []string{"reason"})
|
||||
mapResponseEnded = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Namespace: prometheusNamespace,
|
||||
Name: "mapresponse_ended_total",
|
||||
Help: "total count of new mapsessions ended",
|
||||
}, []string{"reason"})
|
||||
httpDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: prometheusNamespace,
|
||||
Name: "http_duration_seconds",
|
||||
|
|
|
@ -241,7 +241,7 @@ func (ns *noiseServer) NoisePollNetMapHandler(
|
|||
sess.tracef("aquiring lock to check stream")
|
||||
|
||||
ns.headscale.mapSessionMu.Lock()
|
||||
if _, ok := ns.headscale.mapSessions[node.ID]; ok {
|
||||
if oldSession, ok := ns.headscale.mapSessions[node.ID]; ok {
|
||||
// NOTE/TODO(kradalby): From how I understand the protocol, when
|
||||
// a client connects with stream=true, and already has a streaming
|
||||
// connection open, the correct way is to close the current channel
|
||||
|
@ -266,7 +266,12 @@ func (ns *noiseServer) NoisePollNetMapHandler(
|
|||
|
||||
defer ns.headscale.mapSessionMu.Unlock()
|
||||
|
||||
sess.infof("node has an open stream(%p), rejecting new stream", sess)
|
||||
go func() {
|
||||
oldSession.infof("mapSession (%p) is open, trying to close stream and replace with %p", oldSession, sess)
|
||||
oldSession.close()
|
||||
}()
|
||||
|
||||
sess.infof("mapSession (%p) has an open stream, rejecting new stream", sess)
|
||||
mapResponseRejected.WithLabelValues("exists").Inc()
|
||||
return
|
||||
}
|
||||
|
|
|
@ -8,6 +8,11 @@ import (
|
|||
const prometheusNamespace = "headscale"
|
||||
|
||||
var (
|
||||
notifierWaitersForLock = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: prometheusNamespace,
|
||||
Name: "notifier_waiters_for_lock",
|
||||
Help: "gauge of waiters for the notifier lock",
|
||||
}, []string{"type", "action"})
|
||||
notifierWaitForLock = promauto.NewHistogramVec(prometheus.HistogramOpts{
|
||||
Namespace: prometheusNamespace,
|
||||
Name: "notifier_wait_for_lock_seconds",
|
||||
|
@ -29,4 +34,19 @@ var (
|
|||
Name: "notifier_open_channels_total",
|
||||
Help: "total count open channels in notifier",
|
||||
})
|
||||
notifierBatcherWaitersForLock = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: prometheusNamespace,
|
||||
Name: "notifier_batcher_waiters_for_lock",
|
||||
Help: "gauge of waiters for the notifier batcher lock",
|
||||
}, []string{"type", "action"})
|
||||
notifierBatcherChanges = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: prometheusNamespace,
|
||||
Name: "notifier_batcher_changes_pending",
|
||||
Help: "gauge of full changes pending in the notifier batcher",
|
||||
}, []string{})
|
||||
notifierBatcherPatches = promauto.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: prometheusNamespace,
|
||||
Name: "notifier_batcher_patches_pending",
|
||||
Help: "gauge of patches pending in the notifier batcher",
|
||||
}, []string{})
|
||||
)
|
||||
|
|
|
@ -40,15 +40,11 @@ func (n *Notifier) Close() {
|
|||
}
|
||||
|
||||
func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) {
|
||||
log.Trace().Caller().Uint64("node.id", nodeID.Uint64()).Msg("acquiring lock to add node")
|
||||
defer log.Trace().
|
||||
Caller().
|
||||
Uint64("node.id", nodeID.Uint64()).
|
||||
Msg("releasing lock to add node")
|
||||
|
||||
start := time.Now()
|
||||
notifierWaitersForLock.WithLabelValues("lock", "add").Inc()
|
||||
n.l.Lock()
|
||||
defer n.l.Unlock()
|
||||
notifierWaitersForLock.WithLabelValues("lock", "add").Dec()
|
||||
notifierWaitForLock.WithLabelValues("add").Observe(time.Since(start).Seconds())
|
||||
|
||||
n.nodes[nodeID] = c
|
||||
|
@ -62,15 +58,11 @@ func (n *Notifier) AddNode(nodeID types.NodeID, c chan<- types.StateUpdate) {
|
|||
}
|
||||
|
||||
func (n *Notifier) RemoveNode(nodeID types.NodeID) {
|
||||
log.Trace().Caller().Uint64("node.id", nodeID.Uint64()).Msg("acquiring lock to remove node")
|
||||
defer log.Trace().
|
||||
Caller().
|
||||
Uint64("node.id", nodeID.Uint64()).
|
||||
Msg("releasing lock to remove node")
|
||||
|
||||
start := time.Now()
|
||||
notifierWaitersForLock.WithLabelValues("lock", "remove").Inc()
|
||||
n.l.Lock()
|
||||
defer n.l.Unlock()
|
||||
notifierWaitersForLock.WithLabelValues("lock", "remove").Dec()
|
||||
notifierWaitForLock.WithLabelValues("remove").Observe(time.Since(start).Seconds())
|
||||
|
||||
if len(n.nodes) == 0 {
|
||||
|
@ -90,8 +82,10 @@ func (n *Notifier) RemoveNode(nodeID types.NodeID) {
|
|||
// IsConnected reports if a node is connected to headscale and has a
|
||||
// poll session open.
|
||||
func (n *Notifier) IsConnected(nodeID types.NodeID) bool {
|
||||
notifierWaitersForLock.WithLabelValues("rlock", "conncheck").Inc()
|
||||
n.l.RLock()
|
||||
defer n.l.RUnlock()
|
||||
notifierWaitersForLock.WithLabelValues("rlock", "conncheck").Dec()
|
||||
|
||||
if val, ok := n.connected.Load(nodeID); ok {
|
||||
return val
|
||||
|
@ -130,15 +124,11 @@ func (n *Notifier) NotifyByNodeID(
|
|||
update types.StateUpdate,
|
||||
nodeID types.NodeID,
|
||||
) {
|
||||
log.Trace().Caller().Str("type", update.Type.String()).Msg("acquiring lock to notify")
|
||||
defer log.Trace().
|
||||
Caller().
|
||||
Str("type", update.Type.String()).
|
||||
Msg("releasing lock, finished notifying")
|
||||
|
||||
start := time.Now()
|
||||
notifierWaitersForLock.WithLabelValues("rlock", "notify").Inc()
|
||||
n.l.RLock()
|
||||
defer n.l.RUnlock()
|
||||
notifierWaitersForLock.WithLabelValues("rlock", "notify").Dec()
|
||||
notifierWaitForLock.WithLabelValues("notify").Observe(time.Since(start).Seconds())
|
||||
|
||||
if c, ok := n.nodes[nodeID]; ok {
|
||||
|
@ -166,29 +156,45 @@ func (n *Notifier) NotifyByNodeID(
|
|||
|
||||
func (n *Notifier) sendAll(update types.StateUpdate) {
|
||||
start := time.Now()
|
||||
notifierWaitersForLock.WithLabelValues("rlock", "send-all").Inc()
|
||||
n.l.RLock()
|
||||
defer n.l.RUnlock()
|
||||
notifierWaitersForLock.WithLabelValues("rlock", "send-all").Dec()
|
||||
notifierWaitForLock.WithLabelValues("send-all").Observe(time.Since(start).Seconds())
|
||||
|
||||
for _, c := range n.nodes {
|
||||
c <- update
|
||||
notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc()
|
||||
for id, c := range n.nodes {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
|
||||
defer cancel()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Error().
|
||||
Err(ctx.Err()).
|
||||
Uint64("node.id", id.Uint64()).
|
||||
Msgf("update not sent, context cancelled")
|
||||
notifierUpdateSent.WithLabelValues("cancelled", update.Type.String(), "send-all").Inc()
|
||||
|
||||
return
|
||||
case c <- update:
|
||||
notifierUpdateSent.WithLabelValues("ok", update.Type.String(), "send-all").Inc()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Notifier) String() string {
|
||||
notifierWaitersForLock.WithLabelValues("rlock", "string").Inc()
|
||||
n.l.RLock()
|
||||
defer n.l.RUnlock()
|
||||
notifierWaitersForLock.WithLabelValues("rlock", "string").Dec()
|
||||
|
||||
var b strings.Builder
|
||||
b.WriteString("chans:\n")
|
||||
fmt.Fprintf(&b, "chans (%d):\n", len(n.nodes))
|
||||
|
||||
for k, v := range n.nodes {
|
||||
fmt.Fprintf(&b, "\t%d: %p\n", k, v)
|
||||
}
|
||||
|
||||
b.WriteString("\n")
|
||||
b.WriteString("connected:\n")
|
||||
fmt.Fprintf(&b, "connected (%d):\n", len(n.nodes))
|
||||
|
||||
n.connected.Range(func(k types.NodeID, v bool) bool {
|
||||
fmt.Fprintf(&b, "\t%d: %t\n", k, v)
|
||||
|
@ -230,13 +236,16 @@ func (b *batcher) close() {
|
|||
// addOrPassthrough adds the update to the batcher, if it is not a
|
||||
// type that is currently batched, it will be sent immediately.
|
||||
func (b *batcher) addOrPassthrough(update types.StateUpdate) {
|
||||
notifierBatcherWaitersForLock.WithLabelValues("lock", "add").Inc()
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
notifierBatcherWaitersForLock.WithLabelValues("lock", "add").Dec()
|
||||
|
||||
switch update.Type {
|
||||
case types.StatePeerChanged:
|
||||
b.changedNodeIDs.Add(update.ChangeNodes...)
|
||||
b.nodesChanged = true
|
||||
notifierBatcherChanges.WithLabelValues().Set(float64(b.changedNodeIDs.Len()))
|
||||
|
||||
case types.StatePeerChangedPatch:
|
||||
for _, newPatch := range update.ChangePatches {
|
||||
|
@ -248,6 +257,7 @@ func (b *batcher) addOrPassthrough(update types.StateUpdate) {
|
|||
}
|
||||
}
|
||||
b.patchesChanged = true
|
||||
notifierBatcherPatches.WithLabelValues().Set(float64(len(b.patches)))
|
||||
|
||||
default:
|
||||
b.n.sendAll(update)
|
||||
|
@ -257,8 +267,10 @@ func (b *batcher) addOrPassthrough(update types.StateUpdate) {
|
|||
// flush sends all the accumulated patches to all
|
||||
// nodes in the notifier.
|
||||
func (b *batcher) flush() {
|
||||
notifierBatcherWaitersForLock.WithLabelValues("lock", "flush").Inc()
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
notifierBatcherWaitersForLock.WithLabelValues("lock", "flush").Dec()
|
||||
|
||||
if b.nodesChanged || b.patchesChanged {
|
||||
var patches []*tailcfg.PeerChange
|
||||
|
@ -296,8 +308,10 @@ func (b *batcher) flush() {
|
|||
}
|
||||
|
||||
b.changedNodeIDs = set.Slice[types.NodeID]{}
|
||||
notifierBatcherChanges.WithLabelValues().Set(0)
|
||||
b.nodesChanged = false
|
||||
b.patches = make(map[types.NodeID]tailcfg.PeerChange, len(b.patches))
|
||||
notifierBatcherPatches.WithLabelValues().Set(0)
|
||||
b.patchesChanged = false
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,11 +41,11 @@ type mapSession struct {
|
|||
capVer tailcfg.CapabilityVersion
|
||||
mapper *mapper.Mapper
|
||||
|
||||
serving bool
|
||||
servingMu sync.Mutex
|
||||
cancelChMu sync.Mutex
|
||||
|
||||
ch chan types.StateUpdate
|
||||
cancelCh chan struct{}
|
||||
ch chan types.StateUpdate
|
||||
cancelCh chan struct{}
|
||||
cancelChOpen bool
|
||||
|
||||
keepAliveTicker *time.Ticker
|
||||
|
||||
|
@ -86,11 +86,9 @@ func (h *Headscale) newMapSession(
|
|||
capVer: req.Version,
|
||||
mapper: h.mapper,
|
||||
|
||||
// serving indicates if a client is being served.
|
||||
serving: false,
|
||||
|
||||
ch: updateChan,
|
||||
cancelCh: make(chan struct{}),
|
||||
ch: updateChan,
|
||||
cancelCh: make(chan struct{}),
|
||||
cancelChOpen: true,
|
||||
|
||||
keepAliveTicker: time.NewTicker(keepAliveInterval + (time.Duration(rand.IntN(9000)) * time.Millisecond)),
|
||||
|
||||
|
@ -103,15 +101,20 @@ func (h *Headscale) newMapSession(
|
|||
}
|
||||
|
||||
func (m *mapSession) close() {
|
||||
m.servingMu.Lock()
|
||||
defer m.servingMu.Unlock()
|
||||
if !m.serving {
|
||||
m.cancelChMu.Lock()
|
||||
defer m.cancelChMu.Unlock()
|
||||
|
||||
if !m.cancelChOpen {
|
||||
return
|
||||
}
|
||||
|
||||
m.tracef("mapSession (%p) sending message on cancel chan")
|
||||
m.cancelCh <- struct{}{}
|
||||
m.tracef("mapSession (%p) sent message on cancel chan")
|
||||
m.tracef("mapSession (%p) sending message on cancel chan", m)
|
||||
select {
|
||||
case m.cancelCh <- struct{}{}:
|
||||
m.tracef("mapSession (%p) sent message on cancel chan", m)
|
||||
case <-time.After(30 * time.Second):
|
||||
m.tracef("mapSession (%p) timed out sending close message", m)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *mapSession) isStreaming() bool {
|
||||
|
@ -145,15 +148,13 @@ func (m *mapSession) serve() {
|
|||
defer m.h.nodeNotifier.RemoveNode(m.node.ID)
|
||||
|
||||
defer func() {
|
||||
m.servingMu.Lock()
|
||||
defer m.servingMu.Unlock()
|
||||
m.cancelChMu.Lock()
|
||||
defer m.cancelChMu.Unlock()
|
||||
|
||||
m.serving = false
|
||||
m.cancelChOpen = false
|
||||
close(m.cancelCh)
|
||||
}()
|
||||
|
||||
m.serving = true
|
||||
|
||||
m.h.nodeNotifier.AddNode(m.node.ID, m.ch)
|
||||
m.h.updateNodeOnlineStatus(true, m.node)
|
||||
|
||||
|
@ -231,12 +232,15 @@ func (m *mapSession) serve() {
|
|||
select {
|
||||
case <-m.cancelCh:
|
||||
m.tracef("poll cancelled received")
|
||||
return
|
||||
case <-ctx.Done():
|
||||
m.tracef("poll context done")
|
||||
mapResponseEnded.WithLabelValues("cancelled").Inc()
|
||||
return
|
||||
|
||||
// Consume all updates sent to node
|
||||
case <-ctx.Done():
|
||||
m.tracef("poll context done")
|
||||
mapResponseEnded.WithLabelValues("done").Inc()
|
||||
return
|
||||
|
||||
// Consume updates sent to node
|
||||
case update := <-m.ch:
|
||||
m.tracef("received stream update: %s %s", update.Type.String(), update.Message)
|
||||
mapResponseUpdateReceived.WithLabelValues(update.Type.String()).Inc()
|
||||
|
@ -303,8 +307,6 @@ func (m *mapSession) serve() {
|
|||
return
|
||||
}
|
||||
|
||||
// log.Trace().Str("node", m.node.Hostname).TimeDiff("timeSpent", time.Now(), startMapResp).Str("mkey", m.node.MachineKey.String()).Int("type", int(update.Type)).Msg("finished making map response")
|
||||
|
||||
// Only send update if there is change
|
||||
if data != nil {
|
||||
startWrite := time.Now()
|
||||
|
|
Loading…
Reference in New Issue