Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions internal/internal_worker_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,12 @@ type (

// baseWorker that wraps worker activities.
baseWorker struct {
options baseWorkerOptions
isWorkerStarted bool
options baseWorkerOptions
isWorkerStarted bool
// stopCh is created by newBaseWorker and closed by baseWorker.Stop().
// It is internal to baseWorker and stops its poller, dispatcher, autoscaler,
// and throttling/backoff loops.
stopCh chan struct{}
stopCh chan struct{}
stopWG sync.WaitGroup // The WaitGroup for stopping existing routines.
pollLimiter *rate.Limiter
taskLimiter *rate.Limiter
Expand Down Expand Up @@ -899,6 +899,9 @@ func (bw *baseWorker) Stop() {
})
}
bw.taskLimiterContextCancel()
if bw.slotSupplier != nil {
bw.slotSupplier.stopMetrics()
}

// Close context
if bw.options.backgroundContextCancel != nil {
Expand Down
37 changes: 37 additions & 0 deletions internal/internal_worker_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1194,3 +1194,40 @@ func (s *ScalableTaskPollerSuite) TestNewScalableTaskPollerAllTypes() {
})
}
}

func (s *ScalableTaskPollerSuite) TestTrackingSlotSupplierStopsSlotMetrics() {
metricsHandler := metrics.NewCapturingHandler()
supplier, err := NewFixedSizeSlotSupplier(2)
s.NoError(err)

trackingSupplier := newTrackingSlotSupplier(supplier, trackingSlotSupplierOptions{
logger: ilog.NewNopLogger(),
metricsHandler: metricsHandler,
})

permit := trackingSupplier.TryReserveSlot(&slotReservationData{})
s.NotNil(permit)

trackingSupplier.MarkSlotUsed(permit)
s.Equal(1.0, capturedWorkerSlotGaugeValue(s.T(), metricsHandler, metrics.WorkerTaskSlotsAvailable))
s.Equal(1.0, capturedWorkerSlotGaugeValue(s.T(), metricsHandler, metrics.WorkerTaskSlotsUsed))

trackingSupplier.stopMetrics()
s.Equal(0.0, capturedWorkerSlotGaugeValue(s.T(), metricsHandler, metrics.WorkerTaskSlotsAvailable))
s.Equal(0.0, capturedWorkerSlotGaugeValue(s.T(), metricsHandler, metrics.WorkerTaskSlotsUsed))

trackingSupplier.ReleaseSlot(permit, SlotReleaseReasonTaskProcessed)
s.Equal(0.0, capturedWorkerSlotGaugeValue(s.T(), metricsHandler, metrics.WorkerTaskSlotsAvailable))
s.Equal(0.0, capturedWorkerSlotGaugeValue(s.T(), metricsHandler, metrics.WorkerTaskSlotsUsed))
}

func capturedWorkerSlotGaugeValue(t *testing.T, handler *metrics.CapturingHandler, name string) float64 {
t.Helper()
for _, gauge := range handler.Gauges() {
if gauge.Name == name {
return gauge.Value()
}
}
t.Fatalf("gauge %s not found", name)
return 0
}
26 changes: 26 additions & 0 deletions internal/tuning.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ type trackingSlotSupplier struct {

issuedSlotsAtomic atomic.Int32
slotsMutex sync.Mutex
metricsStopped bool
metricsPublished bool
// Values should eventually become slot info types
usedSlots map[*SlotPermit]struct{}
taskSlotsAvailableGauge metrics.Gauge
Expand Down Expand Up @@ -507,12 +509,36 @@ func (t *trackingSlotSupplier) ReleaseSlot(permit *SlotPermit, reason SlotReleas
}

func (t *trackingSlotSupplier) publishMetrics(usedSlots int) {
t.slotsMutex.Lock()
defer t.slotsMutex.Unlock()

if t.metricsStopped {
return
}
t.metricsPublished = true
if t.inner.MaxSlots() != 0 {
t.taskSlotsAvailableGauge.Update(float64(t.inner.MaxSlots() - usedSlots))
}
t.taskSlotsUsedGauge.Update(float64(usedSlots))
}

func (t *trackingSlotSupplier) stopMetrics() {
t.slotsMutex.Lock()
defer t.slotsMutex.Unlock()

if t.metricsStopped {
return
}
t.metricsStopped = true
if !t.metricsPublished {
return
}
if t.inner.MaxSlots() != 0 {
t.taskSlotsAvailableGauge.Update(0)
}
t.taskSlotsUsedGauge.Update(0)
}

func (t *trackingSlotSupplier) GetSlotSupplierKind() string {
return getSlotSupplierKind(t.inner)
}