From 7945a32ac1b72208903d852a463fae4b5c9ba79c Mon Sep 17 00:00:00 2001 From: Matthew Stobbs Date: Wed, 20 Mar 2024 11:40:32 -0600 Subject: [PATCH] creating better system for stats - goal is to have near realtime stats being recorded - allows for different objects with stats (host, VM, etc.) to create it's own stat provideres, and register them with a collector that gathers the stats on an interval --- lib/host/stats.go | 10 ++++ lib/stats/stats.go | 146 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+) create mode 100644 lib/host/stats.go create mode 100644 lib/stats/stats.go diff --git a/lib/host/stats.go b/lib/host/stats.go new file mode 100644 index 0000000..f9189f5 --- /dev/null +++ b/lib/host/stats.go @@ -0,0 +1,10 @@ +package host + +import "git.staur.ca/stobbsm/clustvirt/lib/stats" + +// CollectMemUsage sends the current used memory of the host +// over the given channel +func (h *Host) CollectMemUsage(u chan<- stats.StatValue) error { + + return nil +} diff --git a/lib/stats/stats.go b/lib/stats/stats.go new file mode 100644 index 0000000..507202e --- /dev/null +++ b/lib/stats/stats.go @@ -0,0 +1,146 @@ +package stats + +import ( + "context" + "errors" + "fmt" + "io" + "time" + + "golang.org/x/exp/constraints" +) + +type Number interface { + constraints.Integer | constraints.Float +} + +// StatFetcher represents a data collector. It is given a channel to send a Number +// to, being the metric that is being collected. +type StatFetcher[T Number] func(chan<- Stat[T]) + +// StatFetcherList is a map of collector types, where the key is the label to use +// for the statistic +type StatFetcherList[T Number] map[string]StatFetcher[T] + +// StatProvider is something that has the Stats method and provides stats +type StatProvider[T Number] interface { + Stats() StatFetcherList[T] +} + +// StatCollector is something that has the Collect method, recieving a StatProvider to collect from +type StatCollector[T Number] interface { + Collect(StatProvider[T]) +} + +// Stat represents a single statistic +type Stat[T Number] struct { + Name string + Value T +} + +// StatsCollector controls running stats +type StatsCollector[T Number] struct { + ctx context.Context + ctxCancel context.CancelFunc + collectors StatFetcherList[T] + interval time.Duration + rchan chan Stat[T] + running bool +} + +// AddCollector adds a collector to the existing list of collectors. If `name` is already defined, +// will return an error but still replace the collector. If the StatsCollector is running, it is restarted. +func (sc *StatsCollector[T]) AddCollector(name string, collector StatFetcher[T], restart bool) error { + var err error + if _, ok := sc.collectors[name]; !ok { + err = fmt.Errorf("replacing collector %s", name) + } + sc.collectors[name] = collector + if sc.IsRunning() { + sc.Restart() + } + return err +} + +// MergeCollectors is a utility function to merge a Collectors map with another one, allowing for +// better localization of code and avoiding repeating the same function all over the place when +// an object with Collector functions wants to add them +func (sc *StatsCollector[T]) MergeCollectors(collectors StatFetcherList[T]) error { + var errList []error + for k, c := range collectors { + errList = append(errList, sc.AddCollector(k, c, false)) + } + sc.Restart() + return errors.Join(errList...) +} + +// Collect satisfys the StatCollector interface. It is given a StatProvider, and those stats are then +// merged with the existing providers, running on the same interval +func (sc *StatsCollector[T]) Collect(sp StatProvider[T]) { + sc.MergeCollectors(sp.Stats()) +} + +// IsRunning is a utility function that returns true if the StatsCollector is running +func (sc *StatsCollector[T]) IsRunning() bool { return sc.running } + +// RunningStats returns a channel of type Stat, and receives a context and a list +// of collectors that get run every 'd' duration +func (sc *StatsCollector[T]) Start() { + sc.ctx, sc.ctxCancel = context.WithCancel(context.Background()) + sc.rchan = make(chan Stat[T]) + ticker := time.NewTicker(sc.interval) + + // start the goroutine that does the actual work + go func() { + defer func() { sc.running = false }() + select { + case <-sc.ctx.Done(): + return + case <-ticker.C: + for _, f := range sc.collectors { + f(sc.rchan) + } + } + }() + sc.running = true +} + +// Stop stops the collection of stats +func (sc *StatsCollector[T]) Stop() { + defer close(sc.rchan) + sc.ctxCancel() + <-sc.ctx.Done() +} + +// Restart stops the StatsCollector, +func (sc *StatsCollector[T]) Restart() { + if sc.IsRunning() { + sc.Stop() + } + if !sc.IsRunning() { + sc.Start() + } +} + +// StatList is a simple map[string]Stat tht allows for easy lookup of stats by name +// Getting all stats can still be done using a range loop +type StatList[T Number] map[string]Stat[T] + +func (sl StatList[T]) Get(name string) Stat[T] { + if v, ok := sl[name]; ok { + return v + } + return Stat[T]{ + Name: "UNDEFINED", + Value: 0, + } +} + +func (sl StatList[T]) Set(name string, s Stat[T]) { + sl[name] = s +} + +// PrometheusMetrics writes the stats in a format prometheus understands to an io.Writer +func (sl StatList[T]) PrometheusMetrics(w io.Writer) { + +}