package server import ( "context" "encoding/json" "fmt" "net" "os" "reflect" "strings" "sync" "time" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/houston/shared" "repositories.action2quare.com/ayo/houston/shared/protos" ) type opdef struct { operation shared.Operation args any } type ProcessSnapshot struct { Name string `json:"name"` Args []string `json:"args"` Version string `json:"version"` State protos.ProcessState `json:"state"` Pid int32 `json:"pid"` } type hostWithChan struct { Hostname string PrivateIp string PublicIp string Procs []*protos.ProcessDescription `json:"procs"` Deploys map[string][]*protos.VersionAndArgs `json:"deploys"` opChan chan *opdef } func makeHostWithChan(desc *protos.OperationQueryRequest) *hostWithChan { newdeploys := make(map[string][]*protos.VersionAndArgs) for _, deploy := range desc.Deploys { newdeploys[deploy.Name] = deploy.Versions } return &hostWithChan{ PrivateIp: desc.PrivateIp, PublicIp: desc.PublicIp, Hostname: desc.GetHostname(), Procs: desc.Procs, Deploys: newdeploys, } } func (pc *hostWithChan) withOpChan(c chan *opdef) *hostWithChan { pc.opChan = c return pc } func (pc *hostWithChan) makeOpChan() *hostWithChan { pc.opChan = make(chan *opdef, 1) return pc } type hostPool struct { sync.Mutex hosts map[string]*hostWithChan exportChan chan string } type deployingProgress struct { *protos.DeployingProgress Timestamp int64 } type deployingBoard struct { sync.Mutex progs []deployingProgress } func (db *deployingBoard) clone() (out []deployingProgress) { db.Lock() defer db.Unlock() out = make([]deployingProgress, len(db.progs)) copy(out, db.progs) return } func (sp *hostPool) regist(desc *protos.OperationQueryRequest) (string, chan *opdef) { sp.Lock() defer sp.Unlock() host := sp.hosts[desc.Hostname] if host == nil { host = makeHostWithChan(desc).makeOpChan() } else { host = makeHostWithChan(desc).withOpChan(host.opChan) } logger.Println("houston agent registered :", desc.Hostname, desc.PrivateIp, desc.PublicIp) go func(prvip string, pubip string) { if len(prvip) > 0 { address := net.JoinHostPort(prvip, "9100") if conn, _ := net.DialTimeout("tcp", address, 3*time.Second); conn != nil { conn.Close() sp.exportChan <- "+" + prvip return } } if len(pubip) > 0 { address := net.JoinHostPort(pubip, "9100") if conn, _ := net.DialTimeout("tcp", address, 3*time.Second); conn != nil { conn.Close() sp.exportChan <- "+" + pubip return } } }(desc.PrivateIp, desc.PublicIp) sp.hosts[desc.Hostname] = host return desc.Hostname, host.opChan } func (sp *hostPool) refresh(desc *protos.OperationQueryRequest) { sp.Lock() defer sp.Unlock() host := sp.hosts[desc.Hostname] if host != nil { host = makeHostWithChan(desc).withOpChan(host.opChan) sp.hosts[desc.Hostname] = host } } func (sp *hostPool) unregist(key string) { sp.Lock() defer sp.Unlock() host := sp.hosts[key] if host != nil { sp.exportChan <- "-" + host.PublicIp sp.exportChan <- "-" + host.PrivateIp } delete(sp.hosts, key) } type hostSnapshot struct { Procs []ProcessSnapshot `json:"procs"` Deploys map[string][]*protos.VersionAndArgs `json:"deploys"` } func (sp *hostPool) allHosts() map[string]hostSnapshot { sp.Lock() defer sp.Unlock() out := make(map[string]hostSnapshot) for hn, v := range sp.hosts { procs := make([]ProcessSnapshot, 0, len(v.Procs)) for _, p := range v.Procs { procs = append(procs, ProcessSnapshot{ Name: p.Name, Args: p.Args, Version: p.Version, State: p.State, Pid: p.Pid, }) } out[hn] = hostSnapshot{ Procs: procs, Deploys: v.Deploys, } } return out } func (sp *hostPool) query(filter func(*hostWithChan) bool) []*hostWithChan { sp.Lock() defer sp.Unlock() var targets []*hostWithChan for _, v := range sp.hosts { if filter(v) { targets = append(targets, v) } } return targets } type operationServer struct { protos.UnimplementedOperationServer hp hostPool db deployingBoard } func marshal(argval reflect.Value, output map[string]string) map[string]string { if argval.Kind() == reflect.Pointer { argval = argval.Elem() } for i := 0; i < argval.Type().NumField(); i++ { if !argval.Type().Field(i).IsExported() { continue } if argval.Type().Field(i).Anonymous { marshal(argval.Field(i), output) } else if argval.Field(i).CanInt() { output[argval.Type().Field(i).Name] = fmt.Sprintf("%d", argval.Field(i).Int()) } else if argval.Field(i).Kind() == reflect.Array || argval.Field(i).Kind() == reflect.Slice { var conv []string for j := 0; j < argval.Field(i).Len(); j++ { conv = append(conv, argval.Field(i).Index(j).String()) } output[argval.Type().Field(i).Name] = strings.Join(conv, "\n") } else { output[argval.Type().Field(i).Name] = argval.Field(i).String() } } return output } func (os *operationServer) Query(svr protos.Operation_QueryServer) error { desc, err := svr.Recv() if err != nil { return err } hostname := desc.Hostname key, opChan := os.hp.regist(desc) defer func() { logger.Println("operationServer.Query : houston client unregistered ", hostname) os.hp.unregist(key) }() logger.Println("operationServer.Query : houston client registered ", hostname) Outer: for { select { case <-svr.Context().Done(): break Outer case opdef := <-opChan: svr.Send(&protos.OperationQueryResponse{ Operation: string(opdef.operation), Args: marshal(reflect.ValueOf(opdef.args), make(map[string]string)), }) } } return nil } func (os *operationServer) ReportDeployingProgress(ctx context.Context, dp *protos.DeployingProgress) (*protos.Empty, error) { os.db.Lock() defer os.db.Unlock() for i, p := range os.db.progs { if p.Hostname == dp.Hostname && p.Name == dp.Name && p.Version == dp.Version { os.db.progs[i].DeployingProgress = dp os.db.progs[i].Timestamp = time.Now().UTC().Unix() return &protos.Empty{}, nil } } os.db.progs = append(os.db.progs, deployingProgress{ DeployingProgress: dp, Timestamp: time.Now().UTC().Unix(), }) return &protos.Empty{}, nil } func (os *operationServer) Refresh(ctx context.Context, desc *protos.OperationQueryRequest) (*protos.Empty, error) { os.hp.refresh(desc) return &protos.Empty{}, nil } func (os *operationServer) Deploy(d DeployRequest) { var targets []*hostWithChan if len(d.hostnames) > 0 { // hostname에 배포 conv := make(map[string]bool) for _, hn := range d.hostnames { conv[hn] = true } targets = os.hp.query(func(p *hostWithChan) bool { _, ok := conv[p.Hostname] return ok }) } else { // d.process에 모두 배포 targets = os.hp.query(func(p *hostWithChan) bool { for _, p := range p.Procs { if p.Name == d.Name { return true } } return false }) } dps := make([]deployingProgress, len(targets)) now := time.Now().UTC().Unix() for i, t := range targets { dps[i] = deployingProgress{ DeployingProgress: &protos.DeployingProgress{ Hostname: t.Hostname, Name: d.Name, Version: d.Version, State: "prepare", }, Timestamp: now, } t.opChan <- &opdef{ operation: shared.Deploy, args: d, } } os.db.Lock() defer os.db.Unlock() os.db.progs = dps } func (os *operationServer) Withdraw(d WithdrawRequest) { // 프로세스가 안돌고 있는 호스트에서도 회수해야 할 수 있다. targets := os.hp.query(func(p *hostWithChan) bool { return true }) if len(d.hostnames) > 0 { // hostname만 정지 var final []*hostWithChan conv := make(map[string]bool) for _, hn := range d.hostnames { conv[hn] = true } for _, t := range targets { if _, ok := conv[t.Hostname]; ok { final = append(final, t) } } targets = final } for _, t := range targets { t.opChan <- &opdef{ operation: shared.Withdraw, args: d, } } } func (os *operationServer) StartProcess(d StartProcessRequest) { targets := os.hp.query(func(p *hostWithChan) bool { // 디플로이만 되어있어도 해당 _, ok := p.Deploys[d.Name] return ok }) if len(d.hostnames) > 0 { // hostname만 업로드 var final []*hostWithChan conv := make(map[string]bool) for _, hn := range d.hostnames { conv[hn] = true } for _, t := range targets { if _, ok := conv[t.Hostname]; ok { final = append(final, t) } } targets = final } for _, t := range targets { t.opChan <- &opdef{ operation: shared.Start, args: d, } } } func (os *operationServer) StopProcess(d StopProcessRequest) { // d.process 모두 정지 targets := os.hp.query(func(p *hostWithChan) bool { for _, p := range p.Procs { if p.Name == d.Name { return true } } return false }) if len(d.hostnames) > 0 { // hostname만 정지 var final []*hostWithChan conv := make(map[string]bool) for _, hn := range d.hostnames { conv[hn] = true } for _, t := range targets { if _, ok := conv[t.Hostname]; ok { final = append(final, t) } } targets = final } for _, t := range targets { t.opChan <- &opdef{ operation: shared.Stop, args: d, } } } func (os *operationServer) RestartProcess(d RestartProcessRequest) { targets := os.hp.query(func(p *hostWithChan) bool { for _, p := range p.Procs { if p.Name == d.Name { return true } } return false }) if len(d.hostnames) != 1 { return } // hostname만 재시작 for _, t := range targets { if t.Hostname == d.hostnames[0] { t.opChan <- &opdef{ operation: shared.Restart, args: d, } return } } } func (os *operationServer) Upload(d UploadRequest) { targets := os.hp.query(func(p *hostWithChan) bool { // 실행 중이 아니라 디플로이만 되어있어도 해당 _, ok := p.Deploys[d.Name] return ok }) if len(d.hostnames) > 0 { // hostname만 업로드 var final []*hostWithChan conv := make(map[string]bool) for _, hn := range d.hostnames { conv[hn] = true } for _, t := range targets { if _, ok := conv[t.Hostname]; ok { final = append(final, t) } } targets = final } for _, t := range targets { t.opChan <- &opdef{ operation: shared.Upload, args: d, } } } func (os *operationServer) Hosts() map[string]hostSnapshot { return os.hp.allHosts() } func (os *operationServer) DeplyingProgress() []deployingProgress { return os.db.clone() } func targetExportLoop(in chan string) { all := make(map[string]bool) for addr := range in { logger.Println("targetExportLoop :", addr) if addr[0] == '+' { all[addr[1:]] = true } else if addr[0] == '-' { delete(all, addr[1:]) } list := make([]string, 0, len(all)) for k := range all { list = append(list, k) } output := []map[string]any{{"targets": list}} if file, err := os.Create("prometheus_targets.json"); err == nil { enc := json.NewEncoder(file) enc.Encode(output) file.Close() } } } func newOperationServer() *operationServer { exportChan := make(chan string) go targetExportLoop(exportChan) return &operationServer{ hp: hostPool{ hosts: map[string]*hostWithChan{}, exportChan: exportChan, }, } }