From 387d4f3ea87df5f6d3302ca9e7bd335b61b7aae2 Mon Sep 17 00:00:00 2001 From: mountain Date: Thu, 26 Sep 2024 12:01:53 +0900 Subject: [PATCH] =?UTF-8?q?=EB=A1=9C=EA=B7=B8=ED=8C=8C=EC=9D=BC=20?= =?UTF-8?q?=EC=A0=84=EC=86=A1=20=EC=98=A4=EB=A5=98=20=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 +- client/operation.go | 279 ++++++++++++++++++++++++----------------- server/http_handler.go | 25 +++- 3 files changed, 186 insertions(+), 121 deletions(-) diff --git a/.gitignore b/.gitignore index bc3a327..3f08894 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ houston houston.zip config.json -.vscode/ \ No newline at end of file +.vscode/ +/data diff --git a/client/operation.go b/client/operation.go index 97abd23..489cbb7 100644 --- a/client/operation.go +++ b/client/operation.go @@ -15,7 +15,6 @@ import ( "path" "path/filepath" "regexp" - "sort" "strconv" "strings" "syscall" @@ -79,30 +78,34 @@ func (hc *houstonClient) uploadZipLogFile(zipFile string, name string, version s return nil } -func zipLogFiles(storageRoot string, req *shared.UploadRequest) (string, []string, error) { - root := path.Join(storageRoot, req.Name, req.Version) - matches, err := filepath.Glob(path.Join(root, req.Filter)) +func findMatchFiles(storageRoot, name, version, filter string) (string, []string) { + root := path.Join(storageRoot, name, version) + matches, err := filepath.Glob(path.Join(root, filter)) if err != nil { - return "", nil, err + return "", nil } if len(matches) == 0 { - return "", nil, nil + return "", nil } - for i, file := range matches { + root = path.Join(root, path.Dir(filter)) + out := make([]string, 0, len(matches)) + for _, file := range matches { file = filepath.ToSlash(file) - matches[i] = file + if file == root { + continue + } + + out = append(out, file) } + return root, out +} - root = path.Join(root, path.Dir(req.Filter)) - hostname, _ := os.Hostname() - zipFileName := path.Join(os.TempDir(), hostname+"_"+path.Base(filepath.ToSlash(matches[0]))) + ".zip" - os.Remove(zipFileName) - f, err := os.OpenFile(zipFileName, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0600) - +func zipCompressFiles(root string, matches []string) (string, error) { + f, err := os.CreateTemp(os.TempDir(), "*.zip") if err != nil { - return "", nil, err + return "", err } defer f.Close() @@ -111,10 +114,6 @@ func zipLogFiles(storageRoot string, req *shared.UploadRequest) (string, []strin oldestFile := "" for i, file := range matches { - if file == root { - continue - } - if fi, err := os.Lstat(file); err == nil { if (fi.Mode() & os.ModeSymlink) == os.ModeSymlink { matches[i] = "" @@ -129,21 +128,21 @@ func zipLogFiles(storageRoot string, req *shared.UploadRequest) (string, []strin relative := file[len(root)+1:] fw, err := w.Create(relative) if err != nil { - return "", nil, err + return "", err } src, err := os.Open(file) if err != nil { - return "", nil, err + return "", err } defer src.Close() if _, err = io.Copy(fw, src); err != nil { - return "", nil, err + return "", err } } - return f.Name(), matches, nil + return f.Name(), nil } func prepareProcessLaunch(storageRoot string, req *shared.StartProcessRequest) (*procmeta, error) { @@ -283,17 +282,27 @@ func (hc *houstonClient) launch(meta *procmeta) error { if err != nil { return err } + stderr, err := meta.cmd.StderrPipe() + if err != nil { + return err + } - err = os.MkdirAll(path.Join(meta.verpath, "logs"), 0775) + logfolder := path.Join(meta.verpath, "logs") + err = os.MkdirAll(logfolder, 0775) if err != nil { return err } stdReader := func(jobName string, r io.ReadCloser, index int, logfilePath string) { defer func() { - reco := recover() - if reco != nil { - logger.Println(reco) + logger.Println("stdReader is terminated :", meta.name) + if meta.isState(protos.ProcessState_Running) { + hc.operationChan <- &protos.OperationQueryResponse{ + Operation: string(shared.Exception), + Args: map[string]string{ + "id": fmt.Sprintf("%d", meta.id), + }, + } } }() @@ -307,61 +316,35 @@ func (hc *houstonClient) launch(meta *procmeta) error { hc.siblingProcIndex[key] = runningFlags }() - defer r.Close() - - reader := bufio.NewReader(r) - var logWriter func([]byte) - if len(logfilePath) > 0 { - targetFile, err := os.OpenFile(logfilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) - if err == nil && targetFile != nil { - defer func() { - if targetFile != nil { - targetFile.Close() - } - }() - - logWriter = func(buff []byte) { - for written := 0; written < len(buff); { - os.Stdout.Write(buff) - n, err := targetFile.Write(buff) - if err != nil { - logger.Println("write log file failed :", logfilePath, err) - break - } else { - written += n - } - } - } - } else { - logger.Println("failed to create log file :", logfilePath, err) - logWriter = func(buff []byte) { - os.Stdout.Write(buff) - } - } - } else { - logWriter = func(buff []byte) { - os.Stdout.Write(buff) - } - } - - readingMetric := false - var metricBuffer []byte defer func() { - logger.Println("stdReader is terminated :", meta.name) - if meta.isState(protos.ProcessState_Running) { - hc.operationChan <- &protos.OperationQueryResponse{ - Operation: string(shared.Exception), - Args: map[string]string{ - "id": fmt.Sprintf("%d", meta.id), - }, - } + reco := recover() + if reco != nil { + logger.Println(reco) } + + r.Close() }() metricExporter := metric.NewPrometheusExport(hc.config.MetricNamespace) defer metricExporter.Shutdown() + total := 0 + hn, _ := os.Hostname() + var targetFile *os.File + ext := path.Ext(logfilePath) + head := logfilePath[:len(logfilePath)-len(ext)] + reader := bufio.NewReader(r) + readingMetric := false + ext = "." + hn + ext + + var metricBuffer []byte + for { + if targetFile == nil { + currentFile := head + time.Now().UTC().Format(".2006-01-02.150405") + ext + targetFile, _ = os.OpenFile(currentFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) + } + buff, err := reader.ReadBytes('\n') if err != nil { logger.Println("ReadBytes at stdReader return err :", err, meta.name) @@ -405,11 +388,79 @@ func (hc *houstonClient) launch(meta *procmeta) error { metricBuffer = metricBuffer[:0] } + } else if targetFile != nil && len(buff) > 0 { + for written := 0; written < len(buff); { + n, err := targetFile.Write(buff[written:]) + if err != nil { + logger.Println("write log file failed :", logfilePath, err) + break + } else { + written += n + } + } + total += len(buff) - continue + if total > 1024*1024 { + total = 0 + targetFile.Close() + targetFile = nil + + hc.uploadProcFiles(meta, "logs/*"+ext, true) + } + } + } + } + + errReader := func(r io.ReadCloser, logfilePath string) { + defer func() { + reco := recover() + if reco != nil { + logger.Println(reco) + } + }() + defer r.Close() + + total := 0 + hn, _ := os.Hostname() + + var targetFile *os.File + ext := path.Ext(logfilePath) + head := logfilePath[:len(logfilePath)-len(ext)] + reader := bufio.NewReader(r) + ext = "." + hn + ext + + for { + if targetFile == nil { + currentFile := head + time.Now().UTC().Format(".2006-01-02.150405") + ext + targetFile, _ = os.OpenFile(currentFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666) } - logWriter(buff) + buff, errRead := reader.ReadBytes('\n') + if errRead != nil { + logger.Println("ReadBytes at stdReader return err :", err, meta.name) + break + } + + if targetFile != nil && len(buff) > 0 { + for written := 0; written < len(buff); { + n, err := targetFile.Write(buff[written:]) + if err != nil { + logger.Println("write log file failed :", logfilePath, err) + break + } else { + written += n + } + } + total += len(buff) + + if total > 1024*1024 { + total = 0 + targetFile.Close() + targetFile = nil + + hc.uploadProcFiles(meta, "logs/*"+ext, true) + } + } } } @@ -462,12 +513,14 @@ func (hc *houstonClient) launch(meta *procmeta) error { }) if len(evalfile) > 0 { - logfolder := path.Join(path.Dir(meta.cmd.Args[0]), "logs") - os.MkdirAll(logfolder, 0666) evalfile = path.Join(logfolder, evalfile) + } else { + evalfile = path.Join(logfolder, path.Base(meta.cmd.Args[0])) } - go stdReader(meta.name, stdout, index, evalfile) + go stdReader(meta.name, stdout, index, evalfile+".log") + go errReader(stderr, evalfile+".err") + logger.Println("startChildProcess :", meta.cmd.Args) err = meta.cmd.Start() @@ -579,54 +632,52 @@ func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest, return nil } +func (hc *houstonClient) uploadProcFiles(child *procmeta, filter string, deleteAfterUpload bool) { + logger.Println("uploadFiles found :", child.version, child.name) + root, matches := findMatchFiles(hc.config.StorageRoot, child.name, child.version, filter) + + go func(deleteAfterUpload bool, root string, matches []string) { + zipFile, err := zipCompressFiles(root, matches) + if err == nil && len(zipFile) > 0 { + if err = hc.uploadZipLogFile(zipFile, child.name, child.version); err == nil { + if deleteAfterUpload { + for _, fn := range matches { + if len(fn) > 0 { + os.Remove(fn) + } + } + } + } else { + logger.Println("uploadZipLogFile failed :", err) + } + os.Remove(zipFile) + } else if err != nil { + logger.Println("zipLogFiles failed :", err) + } + }(deleteAfterUpload, root, matches) +} + func (hc *houstonClient) uploadFiles(req *shared.UploadRequest) error { logger.Println("uploadFiles req :", *req) for _, child := range hc.childProcs { if child.version == req.Version && child.name == req.Name { - logger.Println("uploadFiles found :", child.version, child.name) - - go func() { - zipFile, srcFiles, err := zipLogFiles(hc.config.StorageRoot, req) - if err == nil && len(zipFile) > 0 && len(srcFiles) > 0 { - if err = hc.uploadZipLogFile(zipFile, child.name, child.version); err == nil { - // 마지막거 빼고 삭제 - if req.DeleteAfterUploaded == "true" { - for i := 0; i < len(srcFiles)-1; i++ { - os.Remove(srcFiles[i]) - } - } else { - sort.StringSlice(srcFiles).Sort() - - for i := 0; i < len(srcFiles)-1; i++ { - if len(srcFiles[i]) > 0 { - os.Remove(srcFiles[i]) - } - } - } - } else { - logger.Println("uploadZipLogFile failed :", err) - } - } else if err != nil { - logger.Println("zipLogFiles failed :", err) - } - }() - + hc.uploadProcFiles(child, req.Filter, false) return nil } } // 실행 중이 아닌 폴더에서도 대상을 찾는다 // 전체 파일을 대상으로 - zipFile, srcFiles, err := zipLogFiles(hc.config.StorageRoot, req) - if err == nil && len(zipFile) > 0 && len(srcFiles) > 0 { + root, matches := findMatchFiles(hc.config.StorageRoot, req.Name, req.Version, req.Filter) + zipFile, err := zipCompressFiles(root, matches) + if err == nil && len(zipFile) > 0 && len(zipFile) > 0 { if err = hc.uploadZipLogFile(zipFile, req.Name, req.Version); err == nil { - // 마지막거 빼고 삭제 - sort.StringSlice(srcFiles).Sort() - for i := 0; i < len(srcFiles)-1; i++ { - if len(srcFiles[i]) > 0 { - os.Remove(srcFiles[i]) + for _, fn := range matches { + if len(fn) > 0 { + os.Remove(fn) } } + os.Remove(zipFile) } else { logger.Println("uploadZipLogFile failed :", err) } diff --git a/server/http_handler.go b/server/http_handler.go index c34a795..19cc4b6 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -1,6 +1,7 @@ package server import ( + "archive/zip" "crypto/md5" "encoding/hex" "fmt" @@ -128,8 +129,7 @@ func (h *houstonHandler) RegisterHandlers(serveMux gocommon.ServerMuxInterface, defer func() { s := recover() if s != nil { - logger.Println(s) - debug.PrintStack() + logger.Error(s) } io.Copy(io.Discard, r.Body) r.Body.Close() @@ -140,12 +140,25 @@ func (h *houstonHandler) RegisterHandlers(serveMux gocommon.ServerMuxInterface, filename := r.Header.Get("Houston-Service-Filename") dir := path.Join(h.downloadPath, name, version) if err := os.MkdirAll(dir, 0775); err == nil { - file, _ := os.Create(path.Join(dir, filename)) - if file != nil { - defer file.Close() - if _, err = io.Copy(file, r.Body); err != nil { + zipfile, _ := os.Create(path.Join(dir, filename)) + logger.Println("file uploaded :", zipfile) + if zipfile != nil { + if _, err = io.Copy(zipfile, r.Body); err != nil { w.WriteHeader(http.StatusInternalServerError) + } else { + if strings.HasSuffix(filename, ".zip") { + stat, _ := zipfile.Stat() + zipreader, _ := zip.NewReader(zipfile, stat.Size()) + for _, f := range zipreader.File { + file, _ := os.Create(path.Join(dir, f.Name)) + comp, _ := f.Open() + io.Copy(file, comp) + file.Close() + } + defer os.Remove(path.Join(dir, filename)) + } } + zipfile.Close() } else { w.WriteHeader(http.StatusInternalServerError) }