package client import ( "context" "encoding/json" "errors" "fmt" "io" "io/fs" "os" "os/exec" "os/signal" "path" "path/filepath" "reflect" "sort" "strconv" "sync" "sync/atomic" "syscall" "time" "unsafe" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/houston/shared" "repositories.action2quare.com/ayo/houston/shared/protos" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) type clientConfig struct { GrpcAddress string `json:"grpc_server_address"` HttpAddress string `json:"http_server_address"` } type totalConfig struct { Client clientConfig `json:"houston_client"` } type HoustonClient interface { SetReportMetrics(map[string]float32) Shutdown() Start() } type bufferStack struct { pool [5][]byte cursor int32 } func (bs *bufferStack) pop() []byte { pos := atomic.LoadInt32(&bs.cursor) for !atomic.CompareAndSwapInt32(&bs.cursor, pos, pos+1) { pos = atomic.LoadInt32(&bs.cursor) } defer func() { bs.pool[pos] = nil }() curbuf := bs.pool[pos] if curbuf == nil { curbuf = make([]byte, 1024) } return curbuf } func (bs *bufferStack) push(x []byte) { pos := atomic.AddInt32(&bs.cursor, -1) bs.pool[pos] = x } type procmeta struct { cmd *exec.Cmd name string version string state protos.ProcessState stdin io.WriteCloser logUploadChan chan *shared.UploadRequest buffers bufferStack } type houstonClient struct { client *grpc.ClientConn childProcs []*procmeta extraMetrics unsafe.Pointer // map[string]float32 deploys map[string][]*protos.VersionAndArgs shutdownFunc context.CancelFunc ctx context.Context operationChan chan *protos.OperationQueryResponse exitChan chan *exec.Cmd httpAddr string timestamp string wg sync.WaitGroup } func unmarshal[T any](val *T, src map[string]string) { argval := reflect.ValueOf(val) for i := 0; i < argval.Elem().Type().NumField(); i++ { if !argval.Elem().Type().Field(i).IsExported() { continue } arg := src[argval.Elem().Type().Field(i).Name] if argval.Elem().Field(i).CanInt() { num, _ := strconv.ParseInt(arg, 10, 0) argval.Elem().Field(i).SetInt(num) } else { argval.Elem().Field(i).SetString(arg) } } } func gatherDeployedPrograms(name string) []*protos.VersionAndArgs { var rawvers []*protos.VersionAndArgs if vers, err := os.ReadDir(name); err == nil { for _, ver := range vers { if ver.IsDir() { args := lastExecutionArgs(path.Join(name, ver.Name())) rawvers = append(rawvers, &protos.VersionAndArgs{ Version: ver.Name(), Args: args, }) } } } sort.Slice(rawvers, func(i, j int) bool { leftParsed := shared.ParseVersionString(rawvers[i].Version) rightParsed := shared.ParseVersionString(rawvers[j].Version) return shared.CompareVersionString(leftParsed, rightParsed) < 0 }) return rawvers } func (hc *houstonClient) makeOperationQueryRequest() *protos.OperationQueryRequest { hn, _ := os.Hostname() procs := make([]*protos.ProcessDescription, 0, len(hc.childProcs)) for _, child := range hc.childProcs { procs = append(procs, &protos.ProcessDescription{ Name: child.name, Args: child.cmd.Args, Version: child.version, State: child.state, Pid: int32(child.cmd.Process.Pid), }) } var deploys []*protos.DeployedVersions for name, prog := range hc.deploys { deploys = append(deploys, &protos.DeployedVersions{ Name: name, Versions: prog, }) } return &protos.OperationQueryRequest{ Hostname: hn, Procs: procs, Deploys: deploys, } } func NewClient() (HoustonClient, error) { bt, err := os.ReadFile("config.json") if errors.Is(err, fs.ErrNotExist) { return nil, err } var config totalConfig if err := json.Unmarshal(bt, &config); err != nil { return nil, err } if len(config.Client.GrpcAddress) == 0 { return nil, errors.New("client.grpc_server_address is missing") } if len(config.Client.HttpAddress) == 0 { return nil, errors.New("client.http_server_address is missing") } var client *grpc.ClientConn for { logger.Println("grpc.DialContext :", config.Client.GrpcAddress) dialContext, cancelDial := context.WithTimeout(context.Background(), 15*time.Second) client, err = grpc.DialContext(dialContext, config.Client.GrpcAddress, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err == nil { cancelDial() break } cancelDial() logger.Println("grpc.DialContext failed :", err) time.Sleep(5 * time.Second) } logger.Println("grpc.DialContext succeeded") exefile, err := os.Executable() if err != nil { return nil, err } exefi, err := os.Stat(exefile) if err != nil { return nil, err } deploys := make(map[string][]*protos.VersionAndArgs) if dirs, err := os.ReadDir("./"); err == nil { for _, dir := range dirs { if dir.IsDir() { flagf := path.Join(dir.Name(), "@houston") if _, err := os.Stat(flagf); !os.IsNotExist(err) { deploys[dir.Name()] = gatherDeployedPrograms(dir.Name()) } } } } hc := &houstonClient{ client: client, extraMetrics: unsafe.Pointer(&map[string]float32{}), deploys: deploys, httpAddr: config.Client.HttpAddress, timestamp: exefi.ModTime().String(), } ctx, cancel := context.WithCancel(context.Background()) exitChan := make(chan *exec.Cmd, 10) operationChan := make(chan *protos.OperationQueryResponse, 10) hc.wg.Add(1) go func() { defer hc.wg.Done() // 메인 operator op := protos.NewOperationClient(hc.client) myname, _ := os.Executable() myname = path.Base(filepath.ToSlash(myname)) if len(path.Ext(myname)) > 0 { myname = myname[:len(myname)-len(path.Ext(myname))] } if myname == "__debug_bin" { myname = "houston" } for { select { case <-ctx.Done(): return case exited := <-exitChan: var newprocs []*procmeta for _, proc := range hc.childProcs { if proc.cmd == exited && proc.state != protos.ProcessState_Stopped { proc.state = protos.ProcessState_Stopped } else { newprocs = append(newprocs, proc) } } hc.childProcs = newprocs op.Refresh(ctx, hc.makeOperationQueryRequest()) case resp := <-operationChan: switch shared.Operation(resp.Operation) { case shared.Deploy: var dr shared.DeployRequest unmarshal(&dr, resp.Args) if dr.Name == myname { if srcdir, replacer, err := hc.prepareUpdateSelf(&dr); err == nil { args := []string{ fmt.Sprintf("%d", os.Getpid()), srcdir, filepath.ToSlash(os.Args[0]), } args = append(args, os.Args[1:]...) cmd := exec.Command(replacer, args...) if err := cmd.Start(); err != nil { logger.Println(err) } else { hc.shutdownFunc() } } else { logger.Println(err) } } else { if err := hc.deploy(&dr); err == nil { prog := gatherDeployedPrograms(dr.Name) hc.deploys[dr.Name] = prog op.Refresh(ctx, hc.makeOperationQueryRequest()) } else { logger.Println(err) } } case shared.Withdraw: var wr shared.WithdrawRequest unmarshal(&wr, resp.Args) err := hc.withdraw(&wr) if err == nil { prog := gatherDeployedPrograms(wr.Name) if len(prog) == 0 { delete(hc.deploys, wr.Name) } else { hc.deploys[wr.Name] = prog } op.Refresh(ctx, hc.makeOperationQueryRequest()) } else { logger.Println(err) } case shared.Start: var sr shared.StartProcessRequest unmarshal(&sr, resp.Args) if err := hc.startChildProcess(&sr); err != nil { logger.Println(err) } case shared.Stop: var sr shared.StopProcessRequest unmarshal(&sr, resp.Args) if err := hc.stopChildProcess(&sr); err != nil { logger.Println(err) } case shared.Restart: var rr shared.RestartProcessRequest unmarshal(&rr, resp.Args) if err := hc.restartChildProcess(&rr); err != nil { logger.Println(err) } case shared.Upload: var ur shared.UploadRequest unmarshal(&ur, resp.Args) if err := hc.uploadFiles(&ur); err != nil { logger.Println(err) } } } } }() hc.shutdownFunc = cancel hc.exitChan = exitChan hc.ctx = ctx hc.operationChan = operationChan return hc, nil } func (hc *houstonClient) Start() { // receive from stream defer func() { hc.wg.Wait() for _, proc := range hc.childProcs { if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil { proc.cmd.Process.Signal(os.Kill) proc.state = protos.ProcessState_Stopping } } for _, proc := range hc.childProcs { proc.cmd.Wait() } }() interrupt := make(chan os.Signal, 1) signal.Notify(interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) go func() { c := <-interrupt logger.Println("interrupt!!!!!!!! :", c.String()) hc.shutdownFunc() }() for { select { case <-hc.ctx.Done(): return default: err := hc.checkOperation() if err != nil { logger.Println("hc.checkUpdate failed :", err) } } } } func (hc *houstonClient) Shutdown() { hc.shutdownFunc() } func (hc *houstonClient) checkOperation() error { defer func() { r := recover() if r != nil { logger.Error(r) } }() op := protos.NewOperationClient(hc.client) cl, err := op.Query(hc.ctx, grpc.WaitForReady(true)) if err != nil { return err } err = cl.Send(hc.makeOperationQueryRequest()) if err != nil { cl.CloseSend() return err } for { update, err := cl.Recv() if err != nil { cl.CloseSend() return err } logger.Println(update) hc.operationChan <- update } } func (hc *houstonClient) SetReportMetrics(extra map[string]float32) { atomic.StorePointer(&hc.extraMetrics, unsafe.Pointer(&extra)) }