From 96ee2a4627fd1bd6d2f4fe97f85c76ea38126baa Mon Sep 17 00:00:00 2001 From: mountain Date: Wed, 14 Jun 2023 01:50:40 +0900 Subject: [PATCH] =?UTF-8?q?=EC=9E=90=EB=8F=99=20=EC=9E=AC=EC=A0=91?= =?UTF-8?q?=EC=86=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/client.go | 67 +++++++++++++++++++++++++-------------------- client/deploy.go | 2 +- client/operation.go | 12 +++----- 3 files changed, 43 insertions(+), 38 deletions(-) diff --git a/client/client.go b/client/client.go index 763bfcb..ffb474f 100644 --- a/client/client.go +++ b/client/client.go @@ -103,7 +103,6 @@ type procmeta struct { } type houstonClient struct { - client *grpc.ClientConn childProcs []*procmeta extraMetrics unsafe.Pointer // map[string]float32 deploys map[string][]*protos.VersionAndArgs @@ -111,9 +110,10 @@ type houstonClient struct { ctx context.Context operationChan chan *protos.OperationQueryResponse exitChan chan *exec.Cmd - httpAddr string + clientChan chan *grpc.ClientConn timestamp string wg sync.WaitGroup + config clientConfig } func unmarshal[T any](val *T, src map[string]string) { @@ -195,22 +195,6 @@ func NewClient() (HoustonClient, error) { return nil, errors.New("client.http_server_address is missing") } - var client *grpc.ClientConn - for { - logger.Println("grpc.DialContext :", clientConfig.GrpcAddress) - dialContext, cancelDial := context.WithTimeout(context.Background(), 15*time.Second) - client, err = grpc.DialContext(dialContext, clientConfig.GrpcAddress, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err == nil { - cancelDial() - break - } - - cancelDial() - logger.Println("grpc.DialContext failed :", err) - time.Sleep(5 * time.Second) - } - - logger.Println("grpc.DialContext succeeded") exefile, err := os.Executable() if err != nil { return nil, err @@ -234,10 +218,10 @@ func NewClient() (HoustonClient, error) { } hc := &houstonClient{ - client: client, + config: clientConfig, + clientChan: make(chan *grpc.ClientConn), extraMetrics: unsafe.Pointer(&map[string]float32{}), deploys: deploys, - httpAddr: clientConfig.HttpAddress, timestamp: exefi.ModTime().String(), } @@ -251,7 +235,7 @@ func NewClient() (HoustonClient, error) { defer hc.wg.Done() // 메인 operator - op := protos.NewOperationClient(hc.client) + var op protos.OperationClient myname, _ := os.Executable() myname = path.Base(filepath.ToSlash(myname)) if len(path.Ext(myname)) > 0 { @@ -266,6 +250,9 @@ func NewClient() (HoustonClient, error) { case <-ctx.Done(): return + case newClient := <-hc.clientChan: + op = protos.NewOperationClient(newClient) + case exited := <-exitChan: var newprocs []*procmeta for _, proc := range hc.childProcs { @@ -329,21 +316,21 @@ func NewClient() (HoustonClient, error) { case shared.Start: var sr shared.StartProcessRequest unmarshal(&sr, resp.Args) - if err := hc.startChildProcess(&sr); err != nil { + if err := hc.startChildProcess(&sr, op); err != nil { logger.Println(err) } case shared.Stop: var sr shared.StopProcessRequest unmarshal(&sr, resp.Args) - if err := hc.stopChildProcess(&sr); err != nil { + if err := hc.stopChildProcess(&sr, op); err != nil { logger.Println(err) } case shared.Restart: var rr shared.RestartProcessRequest unmarshal(&rr, resp.Args) - if err := hc.restartChildProcess(&rr); err != nil { + if err := hc.restartChildProcess(&rr, op); err != nil { logger.Println(err) } @@ -392,15 +379,37 @@ func (hc *houstonClient) Start() { hc.shutdownFunc() }() + var client *grpc.ClientConn + reconnCount := 0 for { select { case <-hc.ctx.Done(): return default: - err := hc.checkOperation() - if err != nil { - logger.Println("hc.checkUpdate failed :", err) + if client == nil { + if reconnCount == 0 { + logger.Println("grpc.DialContext :", hc.config.GrpcAddress) + } + + reconnCount++ + dialContext, cancelDial := context.WithTimeout(context.Background(), 15*time.Second) + client, _ = grpc.DialContext(dialContext, hc.config.GrpcAddress, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cancelDial() + if client != nil { + reconnCount = 0 + logger.Println("grpc.DialContext succeeded") + hc.clientChan <- client + } + } + + if client != nil { + err := hc.checkOperation(client) + if err != nil { + logger.Println("hc.checkUpdate failed :", err) + + client = nil + } } } } @@ -410,7 +419,7 @@ func (hc *houstonClient) Shutdown() { hc.shutdownFunc() } -func (hc *houstonClient) checkOperation() error { +func (hc *houstonClient) checkOperation(client *grpc.ClientConn) error { defer func() { r := recover() if r != nil { @@ -418,7 +427,7 @@ func (hc *houstonClient) checkOperation() error { } }() - op := protos.NewOperationClient(hc.client) + op := protos.NewOperationClient(client) cl, err := op.Query(hc.ctx, grpc.WaitForReady(true)) if err != nil { return err diff --git a/client/deploy.go b/client/deploy.go index 485672e..676b9f6 100644 --- a/client/deploy.go +++ b/client/deploy.go @@ -194,7 +194,7 @@ func (hc *houstonClient) prepareDeploy(name string, version string) (destPath st func (hc *houstonClient) makeDownloadUrl(rel string) string { out := rel if !strings.HasPrefix(out, "http") { - tks := strings.SplitN(hc.httpAddr, "://", 2) + tks := strings.SplitN(hc.config.HttpAddress, "://", 2) out = fmt.Sprintf("%s://%s", tks[0], path.Join(tks[1], rel)) } return out diff --git a/client/operation.go b/client/operation.go index e04cb2e..51dc7b3 100644 --- a/client/operation.go +++ b/client/operation.go @@ -53,7 +53,7 @@ func (hc *houstonClient) uploadZipLogFile(zipFile string, name string, version s defer zf.Close() - req, err := http.NewRequest("POST", hc.httpAddr+"/upload", zf) + req, err := http.NewRequest("POST", hc.config.HttpAddress+"/upload", zf) if err != nil { logger.Println(err) } @@ -323,7 +323,7 @@ func (hc *houstonClient) launch(meta *procmeta) error { var errPrepareprocessLaunchFailed = errors.New("prepareProcessLaunch failed") -func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest) error { +func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest, op protos.OperationClient) error { logger.Println("startChildProcess :", *req) if req.Version == "latest" { // 최신 버전을 찾음 @@ -363,8 +363,6 @@ func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest) erro } hc.childProcs = append(hc.childProcs, meta) - - op := protos.NewOperationClient(hc.client) op.Refresh(context.Background(), hc.makeOperationQueryRequest()) return nil @@ -372,7 +370,7 @@ func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest) erro var errNoRunningProcess = errors.New("no running processed") -func (hc *houstonClient) stopChildProcess(req *shared.StopProcessRequest) error { +func (hc *houstonClient) stopChildProcess(req *shared.StopProcessRequest, op protos.OperationClient) error { if req.Version == "latest" { // 최신 버전을 찾음 latest, err := shared.FindLastestVersion(req.Name) @@ -422,7 +420,6 @@ func (hc *houstonClient) stopChildProcess(req *shared.StopProcessRequest) error } } - op := protos.NewOperationClient(hc.client) op.Refresh(context.Background(), hc.makeOperationQueryRequest()) for _, proc := range killing { @@ -439,7 +436,7 @@ func (hc *houstonClient) stopChildProcess(req *shared.StopProcessRequest) error return errNoRunningProcess } -func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest) error { +func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest, op protos.OperationClient) error { if req.Version == "latest" { // 최신 버전을 찾음 latest, err := shared.FindLastestVersion(req.Name) @@ -472,7 +469,6 @@ func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest) proc.state = protos.ProcessState_Stopping } - op := protos.NewOperationClient(hc.client) op.Refresh(context.Background(), hc.makeOperationQueryRequest()) for _, proc := range restarts {