From d77fa2108add477bac4c54c7c4e8e04c3b67a409 Mon Sep 17 00:00:00 2001 From: mountain Date: Tue, 1 Jul 2025 18:50:03 +0900 Subject: [PATCH] =?UTF-8?q?metric=20=EA=B4=80=EB=A0=A8=20=EC=BD=94?= =?UTF-8?q?=EB=93=9C=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- metric/common.go | 13 ---- metric/prometheus.go | 140 ++++++++++++++----------------------------- 2 files changed, 44 insertions(+), 109 deletions(-) diff --git a/metric/common.go b/metric/common.go index 35f6792..c7c3cb1 100644 --- a/metric/common.go +++ b/metric/common.go @@ -29,7 +29,6 @@ type MetricDescription struct { type Exporter interface { RegisterMetric(*MetricDescription) UpdateMetric(string, float64) - Shutdown() } type MetricWriter interface { @@ -118,18 +117,6 @@ func NewMetric(mt MetricType, name string, help string, constLabels map[string]s return impl } -func ReadMetricValue(line []byte) (string, float64) { - if len(line) < 16 { - return "", 0 - } - - key := string(line[0:8]) - valbits := binary.LittleEndian.Uint64(line[8:]) - val := math.Float64frombits(valbits) - - return key, val -} - var metricEnabled = false func init() { diff --git a/metric/prometheus.go b/metric/prometheus.go index 89b3848..e03dc5b 100644 --- a/metric/prometheus.go +++ b/metric/prometheus.go @@ -1,7 +1,7 @@ package metric import ( - "context" + "maps" "math" "sync/atomic" @@ -34,11 +34,6 @@ func convertValueType(in MetricType) prometheus.ValueType { return prometheus.UntypedValue } -type writeRequest struct { - key string - val float64 -} - type prometheusMetricDesc struct { *prometheus.Desc valueType prometheus.ValueType @@ -46,116 +41,69 @@ type prometheusMetricDesc struct { key string } -type prometheusExporter struct { - writerChan chan *writeRequest - registerChan chan *prometheusMetricDesc - namespace string - cancel context.CancelFunc +type PrometheusCollector struct { + namespace string + metrics map[string]*prometheusMetricDesc + registry *prometheus.Registry } -func (pe *prometheusExporter) RegisterMetric(nm *MetricDescription) { - pe.registerChan <- &prometheusMetricDesc{ - Desc: prometheus.NewDesc(prometheus.BuildFQName(pe.namespace, "", nm.Name), nm.Help, nil, nm.ConstLabels), - valueType: convertValueType(nm.Type), - valptr: new(uint64), - key: nm.Key, - } -} - -func (pe *prometheusExporter) UpdateMetric(key string, val float64) { - pe.writerChan <- &writeRequest{key: key, val: val} -} - -func (pe *prometheusExporter) Shutdown() { - if pe.cancel != nil { - pe.cancel() - } -} - -type prometheusCollector struct { - metrics map[string]*prometheusMetricDesc -} - -func (pc *prometheusCollector) Describe(ch chan<- *prometheus.Desc) { +func (pc *PrometheusCollector) Describe(ch chan<- *prometheus.Desc) { for _, v := range pc.metrics { + logger.Println("collector describe :", v.Desc.String()) ch <- v.Desc } } -func (pc *prometheusCollector) Collect(ch chan<- prometheus.Metric) { +func (pc *PrometheusCollector) Collect(ch chan<- prometheus.Metric) { for _, v := range pc.metrics { - cm, err := prometheus.NewConstMetric(v.Desc, v.valueType, math.Float64frombits(atomic.LoadUint64(v.valptr))) + logger.Println("collector collect :", v.Desc.String()) + value := atomic.LoadUint64(v.valptr) + cm, err := prometheus.NewConstMetric(v.Desc, v.valueType, math.Float64frombits(value)) if err == nil { ch <- cm } } } -func (pe *prometheusExporter) loop(ctx context.Context) { - defer func() { - r := recover() - if r != nil { - logger.Error(r) - } - }() +func (pc *PrometheusCollector) RegisterMetric(md *MetricDescription) *PrometheusCollector { + nm := &prometheusMetricDesc{ + Desc: prometheus.NewDesc(prometheus.BuildFQName("ou", "", md.Name), md.Help, nil, md.ConstLabels), + valueType: convertValueType(md.Type), + valptr: new(uint64), + key: md.Key, + } - var collector *prometheusCollector - defer func() { - if collector != nil { - prometheus.Unregister(collector) - } - }() + next := NewPrometheusCollector(pc.namespace, pc.registry) + maps.Copy(next.metrics, pc.metrics) + next.metrics[nm.key] = nm - for { - select { - case <-ctx.Done(): - return + pc.registry.Unregister(pc) + pc.registry.Register(next) - case req := <-pe.writerChan: - if collector != nil { - if m := collector.metrics[req.key]; m != nil { - atomic.StoreUint64(m.valptr, math.Float64bits(req.val)) - } - } + return next +} - case nm := <-pe.registerChan: - var nextmetrics map[string]*prometheusMetricDesc - if collector != nil { - nextmetrics = collector.metrics - prometheus.Unregister(collector) - nextmetrics[nm.key] = nm - } else { - nextmetrics = map[string]*prometheusMetricDesc{ - nm.key: nm, - } - } - - nextcollector := &prometheusCollector{ - metrics: nextmetrics, - } - - if err := prometheus.Register(nextcollector); err != nil { - if _, ok := err.(prometheus.AlreadyRegisteredError); ok { - // 이미 등록된 metric. child process를 여럿 실행하면 발생됨 - } else { - logger.Error("prometheus register err :", *nm, err) - } - } else { - collector = nextcollector - } - } +func (pc *PrometheusCollector) UpdateMetric(key string, val float64) { + if m := pc.metrics[key]; m != nil { + atomic.StoreUint64(m.valptr, math.Float64bits(val)) } } -func NewPrometheusExport(namespace string) Exporter { - ctx, cancel := context.WithCancel(context.Background()) - exp := &prometheusExporter{ - registerChan: make(chan *prometheusMetricDesc, 10), - writerChan: make(chan *writeRequest, 100), - namespace: namespace, - cancel: cancel, - } +func (pc *PrometheusCollector) UnregisterMetric(key string) *PrometheusCollector { + next := NewPrometheusCollector(pc.namespace, pc.registry) + maps.Copy(next.metrics, pc.metrics) + delete(next.metrics, key) - go exp.loop(ctx) - return exp + pc.registry.Unregister(pc) + pc.registry.Register(next) + + return next +} + +func NewPrometheusCollector(namespace string, registry *prometheus.Registry) *PrometheusCollector { + return &PrometheusCollector{ + namespace: namespace, + metrics: make(map[string]*prometheusMetricDesc), + registry: registry, + } }