package client import ( "archive/zip" "bufio" "context" "encoding/json" "errors" "fmt" "io" "math" "net/http" "os" "os/exec" "path" "path/filepath" "regexp" "strconv" "strings" "syscall" "time" "github.com/Knetic/govaluate" "repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/metric" "repositories.action2quare.com/ayo/houston/shared" "repositories.action2quare.com/ayo/houston/shared/protos" ) func lastExecutionArgs(verpath string) []string { argf, err := os.Open(path.Join(verpath, "@args")) if os.IsNotExist(err) { argf, err = os.Open(path.Clean(path.Join(verpath, "..", "@args"))) if os.IsNotExist(err) { return nil } } defer argf.Close() var out []string dec := json.NewDecoder(argf) dec.Decode(&out) return out } var errUploadZipLogFailed = errors.New("not ok") func (hc *houstonClient) uploadZipLogFile(zipFile string, name string, version string) error { zf, err := os.Open(zipFile) if err != nil { return err } if zf == nil { return errUploadZipLogFailed } defer zf.Close() req, err := http.NewRequest("POST", hc.config.HttpAddress+"/upload", zf) if err != nil { logger.Println(err) } req.Header.Set("Houston-Service-Name", name) req.Header.Set("Houston-Service-Version", version) req.Header.Set("Houston-Service-Filename", path.Base(filepath.ToSlash(zipFile))) req.Header.Set("Content-Type", "application/zip") resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return errUploadZipLogFailed } return nil } func findMatchFiles(storageRoot, name, version, filter string) (string, []string) { root := path.Join(storageRoot, name, version) matches, err := filepath.Glob(path.Join(root, filter)) if err != nil { return "", nil } if len(matches) == 0 { return "", nil } root = path.Join(root, path.Dir(filter)) out := make([]string, 0, len(matches)) for _, file := range matches { file = filepath.ToSlash(file) if file == root { continue } out = append(out, file) } return root, out } func zipCompressFiles(root string, matches []string) (string, error) { f, err := os.CreateTemp(os.TempDir(), "*.zip") if err != nil { return "", err } defer f.Close() w := zip.NewWriter(f) defer w.Close() oldestFile := "" for i, file := range matches { if fi, err := os.Lstat(file); err == nil { if (fi.Mode() & os.ModeSymlink) == os.ModeSymlink { matches[i] = "" continue } } if len(oldestFile) == 0 { oldestFile = path.Base(filepath.ToSlash(file)) } relative := file[len(root)+1:] fw, err := w.Create(relative) if err != nil { return "", err } src, err := os.Open(file) if err != nil { return "", err } defer src.Close() if _, err = io.Copy(fw, src); err != nil { return "", err } } return f.Name(), nil } func prepareProcessLaunch(storageRoot string, req *shared.StartProcessRequest) (*procmeta, error) { if len(req.Args) == 0 { return nil, errors.New("args is empty") } if req.Version == "latest" { entries, err := os.ReadDir(path.Join(storageRoot, req.Name)) if err != nil { return nil, err } var latestTimestamp time.Time var latestVersion string for _, entry := range entries { if !entry.IsDir() { continue } fi, err := entry.Info() if err != nil { return nil, err } createTime := fi.ModTime() if latestTimestamp.Before(createTime) { latestTimestamp = fi.ModTime() latestVersion = fi.Name() } } if len(latestVersion) > 0 { req.Version = latestVersion } } verpath := path.Join(storageRoot, req.Name, req.Version) fi, err := os.Stat(verpath) if err != nil { return nil, err } if fi.IsDir() { exefile := "./" + path.Clean(strings.TrimPrefix(req.Args[0], "/")) os.Chmod(path.Join(verpath, exefile), 0777) exef, _ := os.Executable() expanded := make([]string, len(req.Args)) for i, arg := range req.Args { expanded[i] = os.ExpandEnv(arg) } exename := path.Join(path.Dir(strings.ReplaceAll(exef, "\\", "/")), verpath, exefile) logger.Println("exefile :", exefile) logger.Println("verpath :", verpath) logger.Println("exef :", exef) logger.Println("path.Dir :", path.Dir(exef)) logger.Println("exename :", exename) cmd := exec.Command(os.ExpandEnv(exename), expanded[1:]...) cmd.Dir = verpath stdin, _ := cmd.StdinPipe() seq++ return &procmeta{ id: seq, cmd: cmd, name: req.Name, args: req.Args, version: req.Version, recover: req.AutoRestart, verpath: verpath, state: int32(protos.ProcessState_Stopped), stdin: stdin, logfile: req.OutputLogFile, }, nil } return nil, errors.New("not found") } func evaluateArgs(args []string, params map[string]any) ([]string, error) { re := regexp.MustCompile(`\$\(\((.*?)\)\)`) for i, input := range args { matches := re.FindAllStringSubmatch(input, -1) if len(matches) == 0 { continue } for _, match := range matches { if len(match) > 1 { expression := strings.TrimSpace(match[1]) expr, err := govaluate.NewEvaluableExpression(expression) if err != nil { return nil, err } result, err := expr.Evaluate(params) if err != nil { return nil, err } // 원래 표현식을 결과로 대체 input = strings.Replace(input, match[0], fmt.Sprintf("%v", result), -1) } } args[i] = input } return args, nil } func parseEnv(input []string) map[string]any { output := make(map[string]any, len(input)) for _, envkv := range input { kv := strings.SplitN(envkv, "=", 2) parsed, err := strconv.ParseInt(kv[1], 10, 0) if err == nil { output[kv[0]] = parsed } else { parsed, err := strconv.ParseFloat(kv[1], 32) if err == nil { output[kv[0]] = parsed } else { output[kv[0]] = kv[1] } } } return output } func (hc *houstonClient) launch(meta *procmeta) error { stdout, err := meta.cmd.StdoutPipe() if err != nil { return err } stderr, err := meta.cmd.StderrPipe() if err != nil { return err } logfolder := path.Join(meta.verpath, "logs") err = os.MkdirAll(logfolder, 0775) if err != nil { return err } stdReader := func(jobName string, r io.ReadCloser, index int, logfilePath string) { defer func() { logger.Println("stdReader is terminated :", meta.name) if meta.isState(protos.ProcessState_Running) { hc.operationChan <- &protos.OperationQueryResponse{ Operation: string(shared.Exception), Args: map[string]string{ "id": fmt.Sprintf("%d", meta.id), }, } } }() defer func() { overflow := index / 64 offset := index % 64 key := fmt.Sprintf("%s-%d", meta.args[0], overflow) runningFlags := hc.siblingProcIndex[key] mask := uint64(1 << offset) runningFlags = runningFlags ^ mask hc.siblingProcIndex[key] = runningFlags }() defer func() { reco := recover() if reco != nil { logger.Println(reco) } r.Close() }() metricExporter := metric.NewPrometheusExport(hc.config.MetricNamespace) defer metricExporter.Shutdown() total := 0 hn, _ := os.Hostname() var targetFile *os.File ext := path.Ext(logfilePath) head := logfilePath[:len(logfilePath)-len(ext)] reader := bufio.NewReader(r) readingMetric := false ext = "." + hn + ext var metricBuffer []byte for { if targetFile == nil { currentFile := head + time.Now().UTC().Format(".2006-01-02.150405") + ext targetFile, _ = os.OpenFile(currentFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) } buff, err := reader.ReadBytes('\n') if err != nil { logger.Println("ReadBytes at stdReader return err :", err, meta.name) break } if readingMetric { metricBuffer = append(metricBuffer, buff...) } else if buff[0] == metric.METRIC_HEAD_INLINE { readingMetric = true metricBuffer = append(metricBuffer, buff[1:]...) } if readingMetric { if metricBuffer[len(metricBuffer)-2] == metric.METRIC_TAIL_INLINE { readingMetric = false metricBuffer = metricBuffer[:len(metricBuffer)-2] if metricBuffer[0] == '{' { var desc metric.MetricDescription if err := json.Unmarshal(metricBuffer, &desc); err != nil { logger.Println("unmarshal metric failed :", err, string(metricBuffer)) continue } if desc.ConstLabels == nil { desc.ConstLabels = make(map[string]string) } for k, v := range hc.config.ConstLabels { desc.ConstLabels[k] = v } desc.ConstLabels["job"] = jobName metricExporter.RegisterMetric(&desc) } else { key, val := metric.ReadMetricValue(metricBuffer) metricExporter.UpdateMetric(key, val) } metricBuffer = metricBuffer[:0] } } else if targetFile != nil && len(buff) > 0 { for written := 0; written < len(buff); { n, err := targetFile.Write(buff[written:]) if err != nil { logger.Println("write log file failed :", logfilePath, err) break } else { written += n } } total += len(buff) if total > 1024*1024 { total = 0 targetFile.Close() targetFile = nil hc.uploadProcFiles(meta, "logs/*"+ext, true) } } } } errReader := func(r io.ReadCloser, logfilePath string) { defer func() { reco := recover() if reco != nil { logger.Println(reco) } }() defer r.Close() total := 0 hn, _ := os.Hostname() var targetFile *os.File ext := path.Ext(logfilePath) head := logfilePath[:len(logfilePath)-len(ext)] reader := bufio.NewReader(r) ext = "." + hn + ext for { if targetFile == nil { currentFile := head + time.Now().UTC().Format(".2006-01-02.150405") + ext targetFile, _ = os.OpenFile(currentFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) } buff, errRead := reader.ReadBytes('\n') if errRead != nil { logger.Println("ReadBytes at stdReader return err :", err, meta.name) break } if targetFile != nil && len(buff) > 0 { for written := 0; written < len(buff); { n, err := targetFile.Write(buff[written:]) if err != nil { logger.Println("write log file failed :", logfilePath, err) break } else { written += n } } total += len(buff) if total > 1024*1024 { total = 0 targetFile.Close() targetFile = nil hc.uploadProcFiles(meta, "logs/*"+ext, true) } } } } index := 0 for overflow := 0; ; overflow++ { key := fmt.Sprintf("%s-%d", meta.args[0], overflow) runningFlags := hc.siblingProcIndex[key] if runningFlags == math.MaxUint64 { index += 64 } else { for si := 0; si < 64; si++ { mask := uint64(1 << si) if runningFlags&mask == 0 { index += si runningFlags |= mask break } } hc.siblingProcIndex[key] = runningFlags break } } // 자체 환경 변수 customEnv := map[string]string{ "HOUSTON_SIBLIING_INDEX": fmt.Sprintf("%d", index), "HOUSTON_PROC_TIMESTAMP": time.Now().UTC().Format("2006-01-02T15-04-05"), } // 프로세스 환경 변수에 반영 meta.cmd.Env = os.Environ() for k, v := range customEnv { meta.cmd.Env = append(meta.cmd.Env, fmt.Sprintf("%s=%s", k, v)) } // argument 표현식 계산 meta.cmd.Args, err = evaluateArgs(meta.cmd.Args, parseEnv(meta.cmd.Env)) if err != nil { logger.Println("evaluateArgs failed :", err) return err } // 로그파일에 환경변수 적용 evalfile := os.Expand(meta.logfile, func(n string) string { v := os.Getenv(n) if len(v) == 0 { return customEnv[n] } return v }) if len(evalfile) > 0 { evalfile = path.Join(logfolder, evalfile) } else { evalfile = path.Join(logfolder, path.Base(meta.cmd.Args[0])) } go stdReader(meta.name, stdout, index, evalfile+".log") go errReader(stderr, evalfile+".err") logger.Println("startChildProcess :", meta.cmd.Args) err = meta.cmd.Start() if err == nil { logger.Println("process index, pid =", index, meta.cmd.Process.Pid) set_affinity(meta.cmd.Process.Pid, index) meta.setState(protos.ProcessState_Running) } return err } func (hc *houstonClient) startChildProcess(req *shared.StartProcessRequest) error { meta, err := prepareProcessLaunch(hc.config.StorageRoot, req) if err != nil { return err } if err := hc.launch(meta); err != nil { return err } // launch가 성공하면 args 저장. this and parent folder vers := hc.deploys[req.Name] for _, ver := range vers { if ver.Version == req.Version { ver.Args = meta.args } } if argfile, err := os.Create(path.Join(hc.config.StorageRoot, req.Name, "@args")); err == nil { enc := json.NewEncoder(argfile) enc.Encode(req.Args) argfile.Close() } if argfile, err := os.Create(path.Join(hc.config.StorageRoot, req.Name, req.Version, "@args")); err == nil { enc := json.NewEncoder(argfile) enc.Encode(req.Args) argfile.Close() } hc.childProcs = append(hc.childProcs, meta) return nil } func (hc *houstonClient) stopChildProcess(req *shared.StopProcessRequest, op protos.OperationClient) error { killer := func(proc *procmeta) { proc.setState(protos.ProcessState_Stopping) if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil { proc.cmd.Process.Signal(os.Kill) } go func() { proc.cmd.Wait() hc.operationChan <- &protos.OperationQueryResponse{ Operation: string(shared.Exception), Args: map[string]string{ "id": fmt.Sprintf("%d", proc.id), }, } }() } for _, proc := range hc.childProcs { if !proc.isState(protos.ProcessState_Running) { continue } if req.Pid != 0 { if req.Pid == int32(proc.cmd.Process.Pid) { // 해당 pid만 제거 killer(proc) } } else if proc.name == req.Name { if len(req.Version) == 0 { // program 다 정지 killer(proc) } else if req.Version == proc.version { // program의 특정 버전만 정지 killer(proc) } } } op.Refresh(context.Background(), hc.makeOperationQueryRequest()) return nil } func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest, op protos.OperationClient) error { for _, proc := range hc.childProcs { if proc.cmd.Process.Pid == int(req.Pid) { if len(req.Config) > 0 { // config.json를 먼저 다운로드 시도 root := proc.verpath if _, err := download(root, hc.makeDownloadUrl(req.Config), "", nil); err != nil { return err } } proc.setState(protos.ProcessState_Restart) op.Refresh(context.Background(), hc.makeOperationQueryRequest()) hc.exitChan <- proc.cmd break } } return nil } func (hc *houstonClient) uploadProcFiles(child *procmeta, filter string, deleteAfterUpload bool) { logger.Println("uploadFiles found :", child.version, child.name) root, matches := findMatchFiles(hc.config.StorageRoot, child.name, child.version, filter) go func(deleteAfterUpload bool, root string, matches []string) { zipFile, err := zipCompressFiles(root, matches) if err == nil && len(zipFile) > 0 { if err = hc.uploadZipLogFile(zipFile, child.name, child.version); err == nil { if deleteAfterUpload { for _, fn := range matches { if len(fn) > 0 { os.Remove(fn) } } } } else { logger.Println("uploadZipLogFile failed :", err) } os.Remove(zipFile) } else if err != nil { logger.Println("zipLogFiles failed :", err) } }(deleteAfterUpload, root, matches) } func (hc *houstonClient) uploadFiles(req *shared.UploadRequest) error { logger.Println("uploadFiles req :", *req) for _, child := range hc.childProcs { if child.version == req.Version && child.name == req.Name { hc.uploadProcFiles(child, req.Filter, false) return nil } } // 실행 중이 아닌 폴더에서도 대상을 찾는다 // 전체 파일을 대상으로 root, matches := findMatchFiles(hc.config.StorageRoot, req.Name, req.Version, req.Filter) zipFile, err := zipCompressFiles(root, matches) if err == nil && len(zipFile) > 0 && len(zipFile) > 0 { if err = hc.uploadZipLogFile(zipFile, req.Name, req.Version); err == nil { for _, fn := range matches { if len(fn) > 0 { os.Remove(fn) } } os.Remove(zipFile) } else { logger.Println("uploadZipLogFile failed :", err) } } else if err != nil { logger.Println("zipLogFiles failed :", err) } return nil }