diff --git a/metric/common.go b/metric/common.go new file mode 100644 index 0000000..35f6792 --- /dev/null +++ b/metric/common.go @@ -0,0 +1,161 @@ +package metric + +import ( + "crypto/md5" + "encoding/binary" + "encoding/hex" + "encoding/json" + "fmt" + "math" + "os" + "path" + "sort" + "strings" + "sync/atomic" + + "repositories.action2quare.com/ayo/gocommon/logger" +) + +const metric_value_line_size = 19 + +type MetricDescription struct { + Key string + Type MetricType + Name string `json:",omitempty"` + Help string `json:",omitempty"` + ConstLabels map[string]string `json:",omitempty"` +} + +type Exporter interface { + RegisterMetric(*MetricDescription) + UpdateMetric(string, float64) + Shutdown() +} + +type MetricWriter interface { + Add(int64) + Set(int64) +} + +type metric_empty struct{} + +func (mw *metric_empty) Set(int64) {} +func (mw *metric_empty) Add(int64) {} + +var MetricWriterNil = MetricWriter(&metric_empty{}) + +type metric_int64 struct { + valptr *int64 + buff [metric_value_line_size]byte +} + +func (mw *metric_int64) printOut() { + binary.LittleEndian.PutUint64(mw.buff[9:], math.Float64bits(float64(atomic.LoadInt64(mw.valptr)))) + os.Stdout.Write(mw.buff[:]) +} + +func (mw *metric_int64) Set(newval int64) { + atomic.StoreInt64(mw.valptr, newval) + mw.printOut() +} + +func (mw *metric_int64) Add(inc int64) { + atomic.AddInt64(mw.valptr, inc) + mw.printOut() +} + +func NewMetric(mt MetricType, name string, help string, constLabels map[string]string) (writer MetricWriter) { + if !metricEnabled { + return MetricWriterNil + } + + var disorder []struct { + k string + v string + } + for k, v := range constLabels { + disorder = append(disorder, struct { + k string + v string + }{k: strings.ToLower(k), v: strings.ToLower(v)}) + } + + sort.Slice(disorder, func(i, j int) bool { + return disorder[i].k < disorder[j].k + }) + + hash := md5.New() + hash.Write([]byte(strings.ToLower(name))) + for _, d := range disorder { + hash.Write([]byte(d.k)) + hash.Write([]byte(d.v)) + } + + key := hex.EncodeToString(hash.Sum(nil))[:metric_key_size] + temp, _ := json.Marshal(MetricDescription{ + Key: key, + Type: mt, + Name: name, + Help: help, + ConstLabels: constLabels, + }) + + impl := &metric_int64{ + valptr: new(int64), + } + + impl.buff[0] = METRIC_HEAD_INLINE + impl.buff[17] = METRIC_TAIL_INLINE + impl.buff[18] = '\n' + copy(impl.buff[1:], []byte(key)) + + output := append([]byte{METRIC_HEAD_INLINE}, temp...) + output = append(output, METRIC_TAIL_INLINE, '\n') + os.Stdout.Write(output) + + // writer + + return impl +} + +func ReadMetricValue(line []byte) (string, float64) { + if len(line) < 16 { + return "", 0 + } + + key := string(line[0:8]) + valbits := binary.LittleEndian.Uint64(line[8:]) + val := math.Float64frombits(valbits) + + return key, val +} + +var metricEnabled = false + +func init() { + if path.Base(os.Args[0]) == "houston" { + logger.Println("metrics are going to be generated for myself(houston)") + metricEnabled = true + return + } + + ppid := os.Getppid() + if parent, _ := os.FindProcess(ppid); parent != nil { + filename := fmt.Sprintf(`/proc/%d/stat`, os.Getppid()) + if fn, err := os.ReadFile(filename); err == nil { + stats := strings.SplitN(string(fn), " ", 3) + parentname := strings.Trim(stats[1], "()") + + if path.Base(parentname) == "houston" { + logger.Println("metrics are going to be generated for houston") + metricEnabled = true + } else { + logger.Println("metrics are NOT going to be generated. parent is not houston :", filename, string(fn)) + } + } else { + logger.Println("metrics are NOT going to be generated. ppid proc is missing :", filename) + } + } else { + logger.Println("metrics are NOT going to be generated. parent process is missing. ppid :", ppid) + } +} diff --git a/metric/prometheus.go b/metric/prometheus.go new file mode 100644 index 0000000..89b3848 --- /dev/null +++ b/metric/prometheus.go @@ -0,0 +1,161 @@ +package metric + +import ( + "context" + "math" + "sync/atomic" + + "github.com/prometheus/client_golang/prometheus" + "repositories.action2quare.com/ayo/gocommon/logger" +) + +const ( + METRIC_HEAD_INLINE = byte(14) + METRIC_TAIL_INLINE = byte(15) +) + +type MetricType int + +const ( + MetricCounter = MetricType(1) + MetricGuage = MetricType(2) + metric_key_size = 8 +) + +func convertValueType(in MetricType) prometheus.ValueType { + switch in { + case MetricCounter: + return prometheus.CounterValue + + case MetricGuage: + return prometheus.GaugeValue + } + + return prometheus.UntypedValue +} + +type writeRequest struct { + key string + val float64 +} + +type prometheusMetricDesc struct { + *prometheus.Desc + valueType prometheus.ValueType + valptr *uint64 + key string +} + +type prometheusExporter struct { + writerChan chan *writeRequest + registerChan chan *prometheusMetricDesc + namespace string + cancel context.CancelFunc +} + +func (pe *prometheusExporter) RegisterMetric(nm *MetricDescription) { + pe.registerChan <- &prometheusMetricDesc{ + Desc: prometheus.NewDesc(prometheus.BuildFQName(pe.namespace, "", nm.Name), nm.Help, nil, nm.ConstLabels), + valueType: convertValueType(nm.Type), + valptr: new(uint64), + key: nm.Key, + } +} + +func (pe *prometheusExporter) UpdateMetric(key string, val float64) { + pe.writerChan <- &writeRequest{key: key, val: val} +} + +func (pe *prometheusExporter) Shutdown() { + if pe.cancel != nil { + pe.cancel() + } +} + +type prometheusCollector struct { + metrics map[string]*prometheusMetricDesc +} + +func (pc *prometheusCollector) Describe(ch chan<- *prometheus.Desc) { + for _, v := range pc.metrics { + ch <- v.Desc + } +} + +func (pc *prometheusCollector) Collect(ch chan<- prometheus.Metric) { + for _, v := range pc.metrics { + cm, err := prometheus.NewConstMetric(v.Desc, v.valueType, math.Float64frombits(atomic.LoadUint64(v.valptr))) + if err == nil { + ch <- cm + } + } +} + +func (pe *prometheusExporter) loop(ctx context.Context) { + defer func() { + r := recover() + if r != nil { + logger.Error(r) + } + }() + + var collector *prometheusCollector + defer func() { + if collector != nil { + prometheus.Unregister(collector) + } + }() + + for { + select { + case <-ctx.Done(): + return + + case req := <-pe.writerChan: + if collector != nil { + if m := collector.metrics[req.key]; m != nil { + atomic.StoreUint64(m.valptr, math.Float64bits(req.val)) + } + } + + case nm := <-pe.registerChan: + var nextmetrics map[string]*prometheusMetricDesc + if collector != nil { + nextmetrics = collector.metrics + prometheus.Unregister(collector) + nextmetrics[nm.key] = nm + } else { + nextmetrics = map[string]*prometheusMetricDesc{ + nm.key: nm, + } + } + + nextcollector := &prometheusCollector{ + metrics: nextmetrics, + } + + if err := prometheus.Register(nextcollector); err != nil { + if _, ok := err.(prometheus.AlreadyRegisteredError); ok { + // 이미 등록된 metric. child process를 여럿 실행하면 발생됨 + } else { + logger.Error("prometheus register err :", *nm, err) + } + } else { + collector = nextcollector + } + } + } +} + +func NewPrometheusExport(namespace string) Exporter { + ctx, cancel := context.WithCancel(context.Background()) + exp := &prometheusExporter{ + registerChan: make(chan *prometheusMetricDesc, 10), + writerChan: make(chan *writeRequest, 100), + namespace: namespace, + cancel: cancel, + } + + go exp.loop(ctx) + return exp +}