package metric import ( "context" "math" "sync/atomic" "github.com/prometheus/client_golang/prometheus" "repositories.action2quare.com/ayo/gocommon/logger" ) const ( METRIC_HEAD_INLINE = byte(14) METRIC_TAIL_INLINE = byte(15) ) type MetricType int const ( MetricCounter = MetricType(1) MetricGuage = MetricType(2) metric_key_size = 8 ) func convertValueType(in MetricType) prometheus.ValueType { switch in { case MetricCounter: return prometheus.CounterValue case MetricGuage: return prometheus.GaugeValue } return prometheus.UntypedValue } type writeRequest struct { key string val float64 } type prometheusMetricDesc struct { *prometheus.Desc valueType prometheus.ValueType valptr *uint64 key string } type prometheusExporter struct { writerChan chan *writeRequest registerChan chan *prometheusMetricDesc namespace string cancel context.CancelFunc } func (pe *prometheusExporter) RegisterMetric(nm *MetricDescription) { pe.registerChan <- &prometheusMetricDesc{ Desc: prometheus.NewDesc(prometheus.BuildFQName(pe.namespace, "", nm.Name), nm.Help, nil, nm.ConstLabels), valueType: convertValueType(nm.Type), valptr: new(uint64), key: nm.Key, } } func (pe *prometheusExporter) UpdateMetric(key string, val float64) { pe.writerChan <- &writeRequest{key: key, val: val} } func (pe *prometheusExporter) Shutdown() { if pe.cancel != nil { pe.cancel() } } type prometheusCollector struct { metrics map[string]*prometheusMetricDesc } func (pc *prometheusCollector) Describe(ch chan<- *prometheus.Desc) { for _, v := range pc.metrics { ch <- v.Desc } } func (pc *prometheusCollector) Collect(ch chan<- prometheus.Metric) { for _, v := range pc.metrics { cm, err := prometheus.NewConstMetric(v.Desc, v.valueType, math.Float64frombits(atomic.LoadUint64(v.valptr))) if err == nil { ch <- cm } } } func (pe *prometheusExporter) loop(ctx context.Context) { defer func() { r := recover() if r != nil { logger.Error(r) } }() var collector *prometheusCollector defer func() { if collector != nil { prometheus.Unregister(collector) } }() for { select { case <-ctx.Done(): return case req := <-pe.writerChan: if collector != nil { if m := collector.metrics[req.key]; m != nil { atomic.StoreUint64(m.valptr, math.Float64bits(req.val)) } } case nm := <-pe.registerChan: var nextmetrics map[string]*prometheusMetricDesc if collector != nil { nextmetrics = collector.metrics prometheus.Unregister(collector) nextmetrics[nm.key] = nm } else { nextmetrics = map[string]*prometheusMetricDesc{ nm.key: nm, } } nextcollector := &prometheusCollector{ metrics: nextmetrics, } if err := prometheus.Register(nextcollector); err != nil { logger.Error("prometheus register err :", *nm, err) } else { collector = nextcollector } } } } func NewPrometheusExport(namespace string) Exporter { ctx, cancel := context.WithCancel(context.Background()) exp := &prometheusExporter{ registerChan: make(chan *prometheusMetricDesc, 10), writerChan: make(chan *writeRequest, 100), namespace: namespace, cancel: cancel, } go exp.loop(ctx) return exp }