diff --git a/poller/metric.go b/poller/metric.go index 16343aa..08f0fe7 100644 --- a/poller/metric.go +++ b/poller/metric.go @@ -4,15 +4,15 @@ import "sync/atomic" // Metric interface type Metric interface { - IncBusyWorker() uint64 - DecBusyWorker() uint64 - BusyWorkers() uint64 + IncBusyWorker() int64 + DecBusyWorker() int64 + BusyWorkers() int64 } var _ Metric = (*metric)(nil) type metric struct { - busyWorkers uint64 + busyWorkers int64 } // NewMetric for default metric structure @@ -20,14 +20,14 @@ func NewMetric() Metric { return &metric{} } -func (m *metric) IncBusyWorker() uint64 { - return atomic.AddUint64(&m.busyWorkers, 1) +func (m *metric) IncBusyWorker() int64 { + return atomic.AddInt64(&m.busyWorkers, 1) } -func (m *metric) DecBusyWorker() uint64 { - return atomic.AddUint64(&m.busyWorkers, ^uint64(0)) +func (m *metric) DecBusyWorker() int64 { + return atomic.AddInt64(&m.busyWorkers, -1) } -func (m *metric) BusyWorkers() uint64 { - return atomic.LoadUint64(&m.busyWorkers) +func (m *metric) BusyWorkers() int64 { + return atomic.LoadInt64(&m.busyWorkers) } diff --git a/poller/poller.go b/poller/poller.go index 9010236..3d9035c 100644 --- a/poller/poller.go +++ b/poller/poller.go @@ -83,7 +83,10 @@ func (p *Poller) Poll(ctx context.Context) error { break } + p.metric.IncBusyWorker() p.routineGroup.Run(func() { + defer p.schedule() + defer p.metric.DecBusyWorker() if err := p.dispatchTask(ctx, task); err != nil { l.Errorf("execute task: %v", err.Error()) } @@ -131,12 +134,10 @@ func (p *Poller) pollTask(ctx context.Context) (*runnerv1.Task, error) { func (p *Poller) dispatchTask(ctx context.Context, task *runnerv1.Task) error { l := log.WithField("func", "dispatchTask") defer func() { - p.metric.DecBusyWorker() e := recover() if e != nil { l.Errorf("panic error: %v", e) } - p.schedule() }() runCtx, cancel := context.WithTimeout(ctx, time.Hour)