metric 서비스 복원

This commit is contained in:
2025-07-01 18:51:40 +09:00
parent 106aa68529
commit 19a4ff103f
7 changed files with 383 additions and 13 deletions

View File

@ -31,6 +31,9 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
type runcommand struct {
@ -44,6 +47,7 @@ type clientConfig struct {
HttpAddress string `json:"http_server_address"`
StorageRoot string `json:"storage_path"`
MetricNamespace string `json:"metric_namespace"`
MetricPipeName string `json:"metric_pipe"`
ConstLabels map[string]string `json:"metric_const_labels"`
Autorun map[string]runcommand `json:"autorun"`
}
@ -70,6 +74,7 @@ func loadClientConfig() (clientConfig, error) {
type HoustonClient interface {
Shutdown()
Start()
MetricHandler() http.Handler
}
var seq = int32(1)
@ -120,6 +125,7 @@ type houstonClient struct {
version string
standalone bool
siblingProcIndex map[string]uint64
registry *prometheus.Registry
}
func unmarshal[T any](val *T, src map[string]string) {
@ -298,6 +304,7 @@ func NewClient(standalone bool) (HoustonClient, error) {
standalone: standalone,
uploadChan: make(chan uploadRequest, 100),
siblingProcIndex: make(map[string]uint64),
registry: prometheus.NewRegistry(),
}
ctx, cancel := context.WithCancel(context.Background())
@ -580,6 +587,16 @@ func (hc *houstonClient) Start() {
close(hc.uploadChan)
}()
if len(hc.config.MetricPipeName) == 0 {
hc.config.MetricPipeName = "houston_metric_pipe"
}
if len(hc.config.MetricNamespace) == 0 {
hc.config.MetricNamespace = "ou"
}
run_metric_pipe_reader(hc.config, hc.registry, hc.ctx)
go func() {
// upload 고루틴
url := hc.config.HttpAddress + "/upload"
@ -651,7 +668,7 @@ func (hc *houstonClient) Start() {
reconnCount++
var err error
dialContext, cancelDial := context.WithTimeout(context.Background(), 15*time.Second)
dialContext, cancelDial := context.WithTimeout(context.Background(), 5*time.Second)
client, err = grpc.DialContext(dialContext, hc.config.GrpcAddress, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
cancelDial()
@ -679,6 +696,12 @@ func (hc *houstonClient) Shutdown() {
hc.shutdownFunc()
}
func (hc *houstonClient) MetricHandler() http.Handler {
return promhttp.InstrumentMetricHandler(
hc.registry, promhttp.HandlerFor(hc.registry, promhttp.HandlerOpts{}),
)
}
func (hc *houstonClient) checkOperation(client *grpc.ClientConn) error {
defer func() {
r := recover()

View File

@ -0,0 +1,95 @@
package client
import (
"bufio"
"context"
"encoding/json"
"io"
"maps"
"strconv"
"strings"
"github.com/prometheus/client_golang/prometheus"
"repositories.action2quare.com/ayo/gocommon/logger"
"repositories.action2quare.com/ayo/gocommon/metric"
)
type pipeListener struct {
config clientConfig
registry *prometheus.Registry
}
func run_metric_pipe_reader(config clientConfig, registry *prometheus.Registry, ctx context.Context) {
r := &pipeListener{
config: config,
registry: registry,
}
go r.listen(ctx)
}
type pipeReader struct {
handle io.ReadCloser
constLabels map[string]string
collector *metric.PrometheusCollector
}
func (l *pipeListener) startReader(r io.ReadCloser) {
reader := pipeReader{
handle: r,
constLabels: l.config.ConstLabels,
collector: metric.NewPrometheusCollector(l.config.MetricNamespace, l.registry),
}
defer func() {
reader.close(l.registry)
}()
scanner := bufio.NewScanner(r)
for scanner.Scan() {
reader.parseLine(scanner.Text())
}
}
func (r *pipeReader) close(registry *prometheus.Registry) {
registry.Unregister(r.collector)
r.handle.Close()
}
func (r *pipeReader) parseLine(line string) {
defer func() {
r := recover()
if r != nil {
logger.Println(r)
}
}()
switch line[0] {
case '{':
var desc metric.MetricDescription
if err := json.Unmarshal([]byte(line), &desc); err != nil {
logger.Println("unmarshal metric failed :", err, line)
return
}
if desc.ConstLabels == nil {
desc.ConstLabels = make(map[string]string)
}
maps.Copy(desc.ConstLabels, r.constLabels)
r.collector = r.collector.RegisterMetric(&desc)
default:
kv := strings.Split(line, ":")
if len(kv) != 2 {
return
}
if len(kv[1]) == 0 {
r.collector = r.collector.UnregisterMetric(kv[0])
} else {
if val, err := strconv.ParseFloat(kv[1], 64); err == nil {
r.collector.UpdateMetric(kv[0], val)
}
}
}
}

View File

@ -0,0 +1,40 @@
package client
import (
"context"
"os"
"golang.org/x/sys/unix"
"repositories.action2quare.com/ayo/gocommon/logger"
)
func (r *pipeListener) listen(ctx context.Context) {
mode := 0666 // 읽기/쓰기 권한
pipeName := "/tmp/" + r.config.MetricPipeName
os.Remove(pipeName)
if err := unix.Mkfifo(pipeName, uint32(mode)); err != nil {
logger.Println("mkfifo failed :", pipeName, err)
return
}
defer os.Remove(pipeName)
go func() {
// file에 쓰기 핸들을 하나 열고 ctx가 Done일 때 닫음. 이래야 reader가 계속 열려있게 됨
f, err := os.OpenFile(pipeName, os.O_WRONLY, 0)
if err != nil {
logger.Println(err)
return
}
<-ctx.Done()
f.Close()
}()
file, err := os.Open(pipeName)
if err != nil {
logger.Println("FIFO open error:", err)
return
}
r.startReader(file)
}

View File

@ -0,0 +1,35 @@
package client
import (
"context"
"strings"
"github.com/natefinch/npipe"
"repositories.action2quare.com/ayo/gocommon/logger"
)
func (r *pipeListener) listen(ctx context.Context) {
pipename := r.config.MetricPipeName
if !strings.HasPrefix(pipename, `\\.\pipe\`) {
pipename = `\\.\pipe\` + pipename
}
listener, err := npipe.Listen(pipename)
if err != nil {
logger.Println("metric pipe npipe.Listen failed :", err)
return
}
go func() {
<-ctx.Done()
logger.Println("listener close")
listener.Close()
}()
for {
if conn, err := listener.Accept(); err == nil {
go r.startReader(conn)
} else {
logger.Println("metric pipe listener.Accept failed :", err)
}
}
}