clustvirt/lib/stats/stats.go

147 lines
3.9 KiB
Go
Raw Normal View History

package stats
import (
"context"
"errors"
"fmt"
"io"
"time"
"golang.org/x/exp/constraints"
)
type Number interface {
constraints.Integer | constraints.Float
}
// StatFetcher represents a data collector. It is given a channel to send a Number
// to, being the metric that is being collected.
type StatFetcher[T Number] func(chan<- Stat[T])
// StatFetcherList is a map of collector types, where the key is the label to use
// for the statistic
type StatFetcherList[T Number] map[string]StatFetcher[T]
// StatProvider is something that has the Stats method and provides stats
type StatProvider[T Number] interface {
Stats() StatFetcherList[T]
}
// StatCollector is something that has the Collect method, recieving a StatProvider to collect from
type StatCollector[T Number] interface {
Collect(StatProvider[T])
}
// Stat represents a single statistic
type Stat[T Number] struct {
Name string
Value T
}
// StatsCollector controls running stats
type StatsCollector[T Number] struct {
ctx context.Context
ctxCancel context.CancelFunc
collectors StatFetcherList[T]
interval time.Duration
rchan chan Stat[T]
running bool
}
// AddCollector adds a collector to the existing list of collectors. If `name` is already defined,
// will return an error but still replace the collector. If the StatsCollector is running, it is restarted.
func (sc *StatsCollector[T]) AddCollector(name string, collector StatFetcher[T], restart bool) error {
var err error
if _, ok := sc.collectors[name]; !ok {
err = fmt.Errorf("replacing collector %s", name)
}
sc.collectors[name] = collector
if sc.IsRunning() {
sc.Restart()
}
return err
}
// MergeCollectors is a utility function to merge a Collectors map with another one, allowing for
// better localization of code and avoiding repeating the same function all over the place when
// an object with Collector functions wants to add them
func (sc *StatsCollector[T]) MergeCollectors(collectors StatFetcherList[T]) error {
var errList []error
for k, c := range collectors {
errList = append(errList, sc.AddCollector(k, c, false))
}
sc.Restart()
return errors.Join(errList...)
}
// Collect satisfys the StatCollector interface. It is given a StatProvider, and those stats are then
// merged with the existing providers, running on the same interval
func (sc *StatsCollector[T]) Collect(sp StatProvider[T]) {
sc.MergeCollectors(sp.Stats())
}
// IsRunning is a utility function that returns true if the StatsCollector is running
func (sc *StatsCollector[T]) IsRunning() bool { return sc.running }
// RunningStats returns a channel of type Stat, and receives a context and a list
// of collectors that get run every 'd' duration
func (sc *StatsCollector[T]) Start() {
sc.ctx, sc.ctxCancel = context.WithCancel(context.Background())
sc.rchan = make(chan Stat[T])
ticker := time.NewTicker(sc.interval)
// start the goroutine that does the actual work
go func() {
defer func() { sc.running = false }()
select {
case <-sc.ctx.Done():
return
case <-ticker.C:
for _, f := range sc.collectors {
f(sc.rchan)
}
}
}()
sc.running = true
}
// Stop stops the collection of stats
func (sc *StatsCollector[T]) Stop() {
defer close(sc.rchan)
sc.ctxCancel()
<-sc.ctx.Done()
}
// Restart stops the StatsCollector,
func (sc *StatsCollector[T]) Restart() {
if sc.IsRunning() {
sc.Stop()
}
if !sc.IsRunning() {
sc.Start()
}
}
// StatList is a simple map[string]Stat tht allows for easy lookup of stats by name
// Getting all stats can still be done using a range loop
type StatList[T Number] map[string]Stat[T]
func (sl StatList[T]) Get(name string) Stat[T] {
if v, ok := sl[name]; ok {
return v
}
return Stat[T]{
Name: "UNDEFINED",
Value: 0,
}
}
func (sl StatList[T]) Set(name string, s Stat[T]) {
sl[name] = s
}
// PrometheusMetrics writes the stats in a format prometheus understands to an io.Writer
func (sl StatList[T]) PrometheusMetrics(w io.Writer) {
}