Compare commits

..

10 Commits

12 changed files with 342 additions and 556 deletions

View File

@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"io" "io"
"io/fs" "io/fs"
"net/http"
"os" "os"
"os/exec" "os/exec"
"os/signal" "os/signal"
@ -22,6 +23,7 @@ import (
"time" "time"
"unsafe" "unsafe"
"github.com/djherbis/times"
"repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/flagx" "repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
@ -30,6 +32,7 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status"
) )
type runcommand struct { type runcommand struct {
@ -116,6 +119,12 @@ func (pm *procmeta) setState(s protos.ProcessState) {
atomic.StoreInt32(&pm.state, int32(s)) atomic.StoreInt32(&pm.state, int32(s))
} }
type uploadRequest struct {
logFile string
name string
version string
}
type houstonClient struct { type houstonClient struct {
childProcs []*procmeta childProcs []*procmeta
extraMetrics unsafe.Pointer // map[string]float32 extraMetrics unsafe.Pointer // map[string]float32
@ -125,6 +134,7 @@ type houstonClient struct {
operationChan chan *protos.OperationQueryResponse operationChan chan *protos.OperationQueryResponse
exitChan chan *exec.Cmd exitChan chan *exec.Cmd
clientChan chan *grpc.ClientConn clientChan chan *grpc.ClientConn
uploadChan chan uploadRequest
timestamp string timestamp string
wg sync.WaitGroup wg sync.WaitGroup
config clientConfig config clientConfig
@ -317,6 +327,7 @@ func NewClient(standalone bool) (HoustonClient, error) {
timestamp: exefi.ModTime().String(), timestamp: exefi.ModTime().String(),
version: string(ver), version: string(ver),
standalone: standalone, standalone: standalone,
uploadChan: make(chan uploadRequest, 100),
siblingProcIndex: make(map[string]uint64), siblingProcIndex: make(map[string]uint64),
} }
@ -325,6 +336,7 @@ func NewClient(standalone bool) (HoustonClient, error) {
exitChan := make(chan *exec.Cmd, 10) exitChan := make(chan *exec.Cmd, 10)
operationChan := make(chan *protos.OperationQueryResponse, 10) operationChan := make(chan *protos.OperationQueryResponse, 10)
hc.wg.Add(1) hc.wg.Add(1)
ignoreRecover := int32(0)
// autorun 처리 // autorun 처리
go func() { go func() {
@ -395,24 +407,6 @@ func NewClient(standalone bool) (HoustonClient, error) {
unmarshal(&dr, resp.Args) unmarshal(&dr, resp.Args)
logger.Println("args :", dr) logger.Println("args :", dr)
if dr.Name == myname {
if srcdir, replacer, err := hc.prepareUpdateSelf(&dr); err == nil {
args := []string{
fmt.Sprintf("%d", os.Getpid()),
srcdir,
filepath.ToSlash(os.Args[0]),
}
args = append(args, os.Args[1:]...)
cmd := exec.Command(replacer, args...)
if err := cmd.Start(); err != nil {
logger.Println(err)
} else {
hc.shutdownFunc()
}
} else {
logger.Println(err)
}
} else {
hn, _ := os.Hostname() hn, _ := os.Hostname()
if err := hc.deploy(&dr, func(dp *protos.DeployingProgress) { if err := hc.deploy(&dr, func(dp *protos.DeployingProgress) {
@ -421,6 +415,12 @@ func NewClient(standalone bool) (HoustonClient, error) {
dp.Version = dr.Version dp.Version = dr.Version
op.ReportDeployingProgress(ctx, dp) op.ReportDeployingProgress(ctx, dp)
}); err == nil { }); err == nil {
if dr.Name == "houston" {
// houston_update_dir 다운로드가 완료되었으므로 종료
// 종료되고나면 스크립트가 알아서 재 실행
hc.Shutdown()
return
}
prog := gatherDeployedPrograms(hc.config.StorageRoot, dr.Name) prog := gatherDeployedPrograms(hc.config.StorageRoot, dr.Name)
hc.deploys[dr.Name] = prog hc.deploys[dr.Name] = prog
op.Refresh(ctx, hc.makeOperationQueryRequest()) op.Refresh(ctx, hc.makeOperationQueryRequest())
@ -445,7 +445,6 @@ func NewClient(standalone bool) (HoustonClient, error) {
Total: 0, Total: 0,
}) })
} }
}
case shared.Withdraw: case shared.Withdraw:
var wr shared.WithdrawRequest var wr shared.WithdrawRequest
@ -494,15 +493,6 @@ func NewClient(standalone bool) (HoustonClient, error) {
logger.Println(err) logger.Println(err)
} }
case shared.Upload:
var ur shared.UploadRequest
unmarshal(&ur, resp.Args)
logger.Println("args :", ur)
if err := hc.uploadFiles(&ur); err != nil {
logger.Println(err)
}
case shared.Exception: case shared.Exception:
idstr := resp.Args["id"] idstr := resp.Args["id"]
id64, _ := strconv.ParseInt(idstr, 10, 0) id64, _ := strconv.ParseInt(idstr, 10, 0)
@ -521,7 +511,7 @@ func NewClient(standalone bool) (HoustonClient, error) {
found.cmd.Wait() found.cmd.Wait()
found.cmd.Process.Release() found.cmd.Process.Release()
if found.recover { if found.recover && atomic.LoadInt32(&ignoreRecover) == 0 {
time.Sleep(time.Second) time.Sleep(time.Second)
sr := shared.StartProcessRequest{ sr := shared.StartProcessRequest{
Name: found.name, Name: found.name,
@ -550,6 +540,8 @@ func NewClient(standalone bool) (HoustonClient, error) {
hc.shutdownFunc = func() { hc.shutdownFunc = func() {
// child process 강제 종료 // child process 강제 종료
atomic.StoreInt32(&ignoreRecover, 1)
for _, procmeta := range hc.childProcs { for _, procmeta := range hc.childProcs {
if procmeta.cmd != nil && procmeta.cmd.Process != nil { if procmeta.cmd != nil && procmeta.cmd.Process != nil {
procmeta.cmd.Process.Signal(os.Kill) procmeta.cmd.Process.Signal(os.Kill)
@ -567,6 +559,59 @@ func NewClient(standalone bool) (HoustonClient, error) {
return hc, nil return hc, nil
} }
func uploadSafe(url, filePath, name, version string) error {
defer func() {
r := recover()
if r != nil {
logger.Error(r)
}
}()
t, err := times.Stat(filePath)
if err != nil {
return err
}
file, err := os.Open(filePath)
if err != nil {
return err
}
if file == nil {
return errors.New("upload file is missing :" + filePath)
}
defer file.Close()
// hc.config.HttpAddress+"/upload",
httpreq, err := http.NewRequest("POST", url, file)
if err != nil {
return err
}
hn, _ := os.Hostname()
// createTime := file.
httpreq.Header.Set("Houston-Service-Name", name)
httpreq.Header.Set("Houston-Service-Version", version)
httpreq.Header.Set("Houston-Service-Filename", t.BirthTime().UTC().Format(time.DateOnly)+"."+hn+path.Ext(filePath))
httpreq.Header.Set("Content-Type", "application/zip")
resp, err := http.DefaultClient.Do(httpreq)
if err != nil {
return err
}
resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("upload file failed. response code : %s, %d", filePath, resp.StatusCode)
}
if err := os.Remove(filePath); err != nil {
return err
}
return nil
}
func (hc *houstonClient) Start() { func (hc *houstonClient) Start() {
// receive from stream // receive from stream
defer func() { defer func() {
@ -583,6 +628,20 @@ func (hc *houstonClient) Start() {
proc.cmd.Wait() proc.cmd.Wait()
proc.cmd.Process.Release() proc.cmd.Process.Release()
} }
close(hc.uploadChan)
}()
go func() {
// upload 고루틴
url := hc.config.HttpAddress + "/upload"
for req := range hc.uploadChan {
logger.Println("uploadSafe :", req)
err := uploadSafe(url, req.logFile, req.name, req.version)
if err != nil {
logger.Println("uploadSafe return err :", err)
}
}
}() }()
interrupt := make(chan os.Signal, 1) interrupt := make(chan os.Signal, 1)
@ -662,7 +721,9 @@ func (hc *houstonClient) Start() {
if client != nil { if client != nil {
err := hc.checkOperation(client) err := hc.checkOperation(client)
if err != nil { if err != nil {
if status.Convert(err).Message() != status.Convert(context.Canceled).Message() {
logger.Println("grpc.DialContext hc.checkOperation failed :", err) logger.Println("grpc.DialContext hc.checkOperation failed :", err)
}
client = nil client = nil
} }
} }

35
client/client_test.go Normal file
View File

@ -0,0 +1,35 @@
package client
import (
"fmt"
"sync"
"testing"
"time"
)
func Test_houstonClient_Start(t *testing.T) {
tc := make(chan int, 1000)
var wg sync.WaitGroup
wg.Add(1)
go func() {
// receive
defer wg.Done()
for v := range tc {
fmt.Println(v)
time.Sleep(100 * time.Millisecond)
}
}()
go func() {
// send
for i := 0; i < 100; i++ {
tc <- i
}
close(tc)
fmt.Println("channel close called")
}()
wg.Wait()
}

View File

@ -25,6 +25,10 @@ import (
"golang.org/x/text/transform" "golang.org/x/text/transform"
) )
const (
houston_update_dir = "./houston.update"
)
func pof2(x int64, min int64) (out int64) { func pof2(x int64, min int64) (out int64) {
out = 1 out = 1
org := x org := x
@ -217,13 +221,7 @@ func (hc *houstonClient) prepareDeploy(name string, version string) (destPath st
}() }()
verpath := path.Join(hc.config.StorageRoot, name, version) verpath := path.Join(hc.config.StorageRoot, name, version)
if _, err := os.Stat(verpath); os.IsNotExist(err) { if _, err := os.Stat(verpath); !os.IsNotExist(err) {
// 없네? 만들면 된다.
err = os.MkdirAll(verpath, 0775)
if err != nil {
return "", err
}
} else {
// 있네? 재배포 가능한가? // 있네? 재배포 가능한가?
for _, child := range hc.childProcs { for _, child := range hc.childProcs {
if child.version == version && child.name == name { if child.version == version && child.name == name {
@ -282,50 +280,23 @@ func copyfile(src, dst string) error {
return nil return nil
} }
func (hc *houstonClient) prepareUpdateSelf(req *shared.DeployRequest) (srcdir string, replacer string, err error) { func (hc *houstonClient) deploy(req *shared.DeployRequest, cb func(*protos.DeployingProgress)) (err error) {
// 내가 스스로 업데이트
// 다운로드 받고 압축 푼 다음에 교체용 프로세스 시작
tempdir, err := os.MkdirTemp(os.TempDir(), "*")
if err != nil {
return "", "", err
}
fname, err := download(tempdir, hc.makeDownloadUrl(req.Url), req.AccessToken, nil)
if err != nil {
return "", "", err
}
switch path.Ext(fname) {
case ".zip":
err = unzip(fname)
case ".tar":
err = untar(fname)
}
if err != nil {
return "", "", err
}
// houston version 파일
err = os.WriteFile(path.Join(path.Dir(fname), "@version"), []byte(req.Version), 0644)
if err != nil {
return "", "", err
}
selfname, _ := os.Executable()
srcreplacer := path.Join(path.Dir(fname), "replacer") + path.Ext(selfname)
replacer = "./" + filepath.ToSlash("replacer"+path.Ext(selfname))
err = copyfile(srcreplacer, replacer)
if err == nil {
err = os.Chmod(replacer, 0775)
}
// replacer먼저 가져옴
return filepath.ToSlash(tempdir), replacer, err
}
func (hc *houstonClient) deploy(req *shared.DeployRequest, cb func(*protos.DeployingProgress)) error {
logger.Println("start deploying") logger.Println("start deploying")
root, err := hc.prepareDeploy(req.Name, req.Version)
var root string
if req.Name == "houston" {
// houston은 버전없이 houston_update_dir폴더로 다운로드
root = houston_update_dir
// 이미 houston_update_dir가 있을 수도 있으므로 폴더채로 삭제
os.RemoveAll(houston_update_dir)
} else {
root, err = hc.prepareDeploy(req.Name, req.Version)
if err != nil {
return err
}
}
err = os.MkdirAll(root, 0775)
if err != nil { if err != nil {
return err return err
} }
@ -334,7 +305,8 @@ func (hc *houstonClient) deploy(req *shared.DeployRequest, cb func(*protos.Deplo
h := md5.New() h := md5.New()
h.Write([]byte(strings.Trim(req.Url, "/"))) h.Write([]byte(strings.Trim(req.Url, "/")))
at := hex.EncodeToString(h.Sum(nil)) at := hex.EncodeToString(h.Sum(nil))
fname, err := download(root, hc.makeDownloadUrl(req.Url), at, func(written int64, total int64) { var fname string
fname, err = download(root, hc.makeDownloadUrl(req.Url), at, func(written int64, total int64) {
prog := protos.DeployingProgress{ prog := protos.DeployingProgress{
State: "download", State: "download",
Progress: written, Progress: written,
@ -360,7 +332,7 @@ func (hc *houstonClient) deploy(req *shared.DeployRequest, cb func(*protos.Deplo
err = untar(fname) err = untar(fname)
} }
if err == nil && len(req.Config) > 0 { if err == nil && len(req.Config) > 0 && req.Name != "houston" {
// config.json도 다운로드 // config.json도 다운로드
h := md5.New() h := md5.New()
h.Write([]byte(strings.Trim(req.Config, "/"))) h.Write([]byte(strings.Trim(req.Config, "/")))

View File

@ -8,13 +8,12 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math"
"net/http"
"os" "os"
"os/exec" "os/exec"
"path" "path"
"path/filepath" "path/filepath"
"regexp" "regexp"
"slices"
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
@ -43,39 +42,12 @@ func lastExecutionArgs(verpath string) []string {
return out return out
} }
var errUploadZipLogFailed = errors.New("not ok") func (hc *houstonClient) uploadToAppendLog(logFile string, name string, version string) {
hc.uploadChan <- uploadRequest{
func (hc *houstonClient) uploadZipLogFile(zipFile string, name string, version string) error { logFile: logFile,
zf, err := os.Open(zipFile) name: name,
if err != nil { version: version,
return err
} }
if zf == nil {
return errUploadZipLogFailed
}
defer zf.Close()
req, err := http.NewRequest("POST", hc.config.HttpAddress+"/upload", zf)
if err != nil {
logger.Println(err)
}
req.Header.Set("Houston-Service-Name", name)
req.Header.Set("Houston-Service-Version", version)
req.Header.Set("Houston-Service-Filename", path.Base(filepath.ToSlash(zipFile)))
req.Header.Set("Content-Type", "application/zip")
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errUploadZipLogFailed
}
return nil
} }
func findMatchFiles(storageRoot, name, version, filter string) (string, []string) { func findMatchFiles(storageRoot, name, version, filter string) (string, []string) {
@ -99,6 +71,7 @@ func findMatchFiles(storageRoot, name, version, filter string) (string, []string
out = append(out, file) out = append(out, file)
} }
slices.Sort(out)
return root, out return root, out
} }
@ -295,64 +268,119 @@ func (hc *houstonClient) launch(meta *procmeta) error {
return err return err
} }
stdReader := func(jobName string, r io.ReadCloser, index int, logfilePath string) { logUploader := func(localctx context.Context, logfilePath string, logChan chan []byte) {
defer func() { var logFile *os.File
logger.Println("stdReader is terminated :", meta.name) var logFilePath string
if meta.isState(protos.ProcessState_Running) {
hc.operationChan <- &protos.OperationQueryResponse{ ext := path.Ext(logfilePath)
Operation: string(shared.Exception), head := logfilePath[:len(logfilePath)-len(ext)]
Args: map[string]string{ if len(head) > 0 && !strings.HasSuffix(head, "/") {
"id": fmt.Sprintf("%d", meta.id), head += "."
},
} }
writeLog := func(log []byte) {
if logFile == nil {
logFilePath = head + time.Now().UTC().Format("2006-01-02.150405") + ext
logFile, _ = os.OpenFile(logFilePath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
}
for written := 0; written < len(log); {
n, err := logFile.Write(log[written:])
if err != nil {
logger.Println("write log file failed :", logfilePath, err)
break
} else {
written += n
}
}
}
defer func() {
if logFile != nil {
logFile.Close()
logFile = nil
hc.uploadToAppendLog(logFilePath, meta.name, meta.version)
} }
}() }()
defer func() { defer func() {
overflow := index / 64 for {
offset := index % 64 select {
key := fmt.Sprintf("%s-%d", meta.args[0], overflow) case log := <-logChan:
runningFlags := hc.siblingProcIndex[key] writeLog(log)
mask := uint64(1 << offset)
runningFlags = runningFlags ^ mask default:
hc.siblingProcIndex[key] = runningFlags // logChan에 있는 모든 로그 소비
return
}
}
}() }()
for {
heartbeat := time.After(time.Minute)
select {
case <-localctx.Done():
return
case <-heartbeat:
heartbeat = time.After(time.Minute)
// 지금까지의 로그를 저장해서 업로드
if logFile != nil {
logFile.Close()
logFile = nil
hc.uploadToAppendLog(logFilePath, meta.name, meta.version)
}
case log := <-logChan:
writeLog(log)
}
}
}
stdReader := func(r io.ReadCloser, logfilePath string, verify func(buff []byte) bool) {
defer func() { defer func() {
reco := recover() reco := recover()
if reco != nil { if reco != nil {
logger.Println(reco) logger.Println(reco)
} }
r.Close() r.Close()
}() }()
metricExporter := metric.NewPrometheusExport(hc.config.MetricNamespace) localctx, cancel := context.WithCancel(context.Background())
defer metricExporter.Shutdown() defer cancel()
logChan := make(chan []byte, 1)
go logUploader(localctx, logfilePath, logChan)
total := 0
hn, _ := os.Hostname()
var targetFile *os.File
ext := path.Ext(logfilePath)
head := logfilePath[:len(logfilePath)-len(ext)]
reader := bufio.NewReader(r) reader := bufio.NewReader(r)
readingMetric := false
ext = "." + hn + ext
var metricBuffer []byte
for { for {
if targetFile == nil {
currentFile := head + time.Now().UTC().Format(".2006-01-02.150405") + ext
targetFile, _ = os.OpenFile(currentFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
}
buff, err := reader.ReadBytes('\n') buff, err := reader.ReadBytes('\n')
if err != nil { if err != nil {
logger.Println("ReadBytes at stdReader return err :", err, meta.name) logger.Println("ReadBytes at stdReader return err :", err, meta.name)
break break
} }
if verify(buff) {
if len(buff) > 0 {
logChan <- buff
}
}
}
}
var evalfile string
if len(meta.logfile) > 0 {
evalfile = path.Join(logfolder, meta.logfile)
} else {
evalfile = logfolder + "/"
}
go func() {
metricExporter := metric.NewPrometheusExport(hc.config.MetricNamespace)
defer metricExporter.Shutdown()
var metricBuffer []byte
readingMetric := false
stdReader(stdout, evalfile+".log", func(buff []byte) bool {
if readingMetric { if readingMetric {
metricBuffer = append(metricBuffer, buff...) metricBuffer = append(metricBuffer, buff...)
} else if buff[0] == metric.METRIC_HEAD_INLINE { } else if buff[0] == metric.METRIC_HEAD_INLINE {
@ -369,7 +397,7 @@ func (hc *houstonClient) launch(meta *procmeta) error {
var desc metric.MetricDescription var desc metric.MetricDescription
if err := json.Unmarshal(metricBuffer, &desc); err != nil { if err := json.Unmarshal(metricBuffer, &desc); err != nil {
logger.Println("unmarshal metric failed :", err, string(metricBuffer)) logger.Println("unmarshal metric failed :", err, string(metricBuffer))
continue return false
} }
if desc.ConstLabels == nil { if desc.ConstLabels == nil {
@ -380,8 +408,7 @@ func (hc *houstonClient) launch(meta *procmeta) error {
desc.ConstLabels[k] = v desc.ConstLabels[k] = v
} }
desc.ConstLabels["job"] = jobName desc.ConstLabels["job"] = meta.name
metricExporter.RegisterMetric(&desc) metricExporter.RegisterMetric(&desc)
} else { } else {
key, val := metric.ReadMetricValue(metricBuffer) key, val := metric.ReadMetricValue(metricBuffer)
@ -390,145 +417,30 @@ func (hc *houstonClient) launch(meta *procmeta) error {
metricBuffer = metricBuffer[:0] metricBuffer = metricBuffer[:0]
} }
} else if targetFile != nil && len(buff) > 0 {
for written := 0; written < len(buff); {
n, err := targetFile.Write(buff[written:])
if err != nil {
logger.Println("write log file failed :", logfilePath, err)
break
} else {
written += n
}
}
total += len(buff)
if total > 1024*1024 { return false
total = 0 }
targetFile.Close() return true
targetFile = nil })
logger.Println("stdReader is terminated :", meta.name)
hc.uploadProcFiles(meta, "logs/*"+ext, true) if meta.isState(protos.ProcessState_Running) {
// state는 running인데 종료됐으면 exception처리
hc.operationChan <- &protos.OperationQueryResponse{
Operation: string(shared.Exception),
Args: map[string]string{
"id": fmt.Sprintf("%d", meta.id),
},
} }
} }
}
}
errReader := func(r io.ReadCloser, logfilePath string) {
defer func() {
reco := recover()
if reco != nil {
logger.Println(reco)
}
}() }()
defer r.Close()
total := 0 go stdReader(stderr, evalfile+".err", func([]byte) bool { return true })
hn, _ := os.Hostname()
var targetFile *os.File
ext := path.Ext(logfilePath)
head := logfilePath[:len(logfilePath)-len(ext)]
reader := bufio.NewReader(r)
ext = "." + hn + ext
for {
if targetFile == nil {
currentFile := head + time.Now().UTC().Format(".2006-01-02.150405") + ext
targetFile, _ = os.OpenFile(currentFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
}
buff, errRead := reader.ReadBytes('\n')
if errRead != nil {
logger.Println("ReadBytes at stdReader return err :", err, meta.name)
break
}
if targetFile != nil && len(buff) > 0 {
for written := 0; written < len(buff); {
n, err := targetFile.Write(buff[written:])
if err != nil {
logger.Println("write log file failed :", logfilePath, err)
break
} else {
written += n
}
}
total += len(buff)
if total > 1024*1024 {
total = 0
targetFile.Close()
targetFile = nil
hc.uploadProcFiles(meta, "logs/*"+ext, true)
}
}
}
}
index := 0
for overflow := 0; ; overflow++ {
key := fmt.Sprintf("%s-%d", meta.args[0], overflow)
runningFlags := hc.siblingProcIndex[key]
if runningFlags == math.MaxUint64 {
index += 64
} else {
for si := 0; si < 64; si++ {
mask := uint64(1 << si)
if runningFlags&mask == 0 {
index += si
runningFlags |= mask
break
}
}
hc.siblingProcIndex[key] = runningFlags
break
}
}
// 자체 환경 변수
customEnv := map[string]string{
"HOUSTON_SIBLIING_INDEX": fmt.Sprintf("%d", index),
"HOUSTON_PROC_TIMESTAMP": time.Now().UTC().Format("2006-01-02T15-04-05"),
}
// 프로세스 환경 변수에 반영
meta.cmd.Env = os.Environ()
for k, v := range customEnv {
meta.cmd.Env = append(meta.cmd.Env, fmt.Sprintf("%s=%s", k, v))
}
// argument 표현식 계산
meta.cmd.Args, err = evaluateArgs(meta.cmd.Args, parseEnv(meta.cmd.Env))
if err != nil {
logger.Println("evaluateArgs failed :", err)
return err
}
// 로그파일에 환경변수 적용
evalfile := os.Expand(meta.logfile, func(n string) string {
v := os.Getenv(n)
if len(v) == 0 {
return customEnv[n]
}
return v
})
if len(evalfile) > 0 {
evalfile = path.Join(logfolder, evalfile)
} else {
evalfile = path.Join(logfolder, path.Base(meta.cmd.Args[0]))
}
go stdReader(meta.name, stdout, index, evalfile+".log")
go errReader(stderr, evalfile+".err")
logger.Println("startChildProcess :", meta.cmd.Args) logger.Println("startChildProcess :", meta.cmd.Args)
err = meta.cmd.Start() err = meta.cmd.Start()
if err == nil { if err == nil {
logger.Println("process index, pid =", index, meta.cmd.Process.Pid)
set_affinity(meta.cmd.Process.Pid, index)
meta.setState(protos.ProcessState_Running) meta.setState(protos.ProcessState_Running)
} }
@ -633,59 +545,3 @@ func (hc *houstonClient) restartChildProcess(req *shared.RestartProcessRequest,
return nil return nil
} }
func (hc *houstonClient) uploadProcFiles(child *procmeta, filter string, deleteAfterUpload bool) {
logger.Println("uploadFiles found :", child.version, child.name)
root, matches := findMatchFiles(hc.config.StorageRoot, child.name, child.version, filter)
go func(deleteAfterUpload bool, root string, matches []string) {
zipFile, err := zipCompressFiles(root, matches)
if err == nil && len(zipFile) > 0 {
if err = hc.uploadZipLogFile(zipFile, child.name, child.version); err == nil {
if deleteAfterUpload {
for _, fn := range matches {
if len(fn) > 0 {
os.Remove(fn)
}
}
}
} else {
logger.Println("uploadZipLogFile failed :", err)
}
os.Remove(zipFile)
} else if err != nil {
logger.Println("zipLogFiles failed :", err)
}
}(deleteAfterUpload, root, matches)
}
func (hc *houstonClient) uploadFiles(req *shared.UploadRequest) error {
logger.Println("uploadFiles req :", *req)
for _, child := range hc.childProcs {
if child.version == req.Version && child.name == req.Name {
hc.uploadProcFiles(child, req.Filter, false)
return nil
}
}
// 실행 중이 아닌 폴더에서도 대상을 찾는다
// 전체 파일을 대상으로
root, matches := findMatchFiles(hc.config.StorageRoot, req.Name, req.Version, req.Filter)
zipFile, err := zipCompressFiles(root, matches)
if err == nil && len(zipFile) > 0 && len(zipFile) > 0 {
if err = hc.uploadZipLogFile(zipFile, req.Name, req.Version); err == nil {
for _, fn := range matches {
if len(fn) > 0 {
os.Remove(fn)
}
}
os.Remove(zipFile)
} else {
logger.Println("uploadZipLogFile failed :", err)
}
} else if err != nil {
logger.Println("zipLogFiles failed :", err)
}
return nil
}

1
go.mod
View File

@ -4,6 +4,7 @@ go 1.19
require ( require (
github.com/Knetic/govaluate v3.0.0+incompatible github.com/Knetic/govaluate v3.0.0+incompatible
github.com/djherbis/times v1.6.0
github.com/go-kit/log v0.2.1 github.com/go-kit/log v0.2.1
github.com/prometheus/client_golang v1.17.0 github.com/prometheus/client_golang v1.17.0
github.com/prometheus/common v0.44.0 github.com/prometheus/common v0.44.0

3
go.sum
View File

@ -22,6 +22,8 @@ github.com/dennwc/ioctl v1.0.0 h1:DsWAAjIxRqNcLn9x6mwfuf2pet3iB7aK90K4tF16rLg=
github.com/dennwc/ioctl v1.0.0/go.mod h1:ellh2YB5ldny99SBU/VX7Nq0xiZbHphf1DrtHxxjMk0= github.com/dennwc/ioctl v1.0.0/go.mod h1:ellh2YB5ldny99SBU/VX7Nq0xiZbHphf1DrtHxxjMk0=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78=
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc=
github.com/djherbis/times v1.6.0 h1:w2ctJ92J8fBvWPxugmXIv7Nz7Q3iDMKNx9v5ocVH20c=
github.com/djherbis/times v1.6.0/go.mod h1:gOHeRAz2h+VJNZ5Gmc/o7iD9k4wW7NMVqieYCY99oc0=
github.com/ema/qdisc v0.0.0-20230120214811-5b708f463de3 h1:Jrl8sD8wO34+EE1dV2vhOXrqFAZa/FILDnZRaV28+cw= github.com/ema/qdisc v0.0.0-20230120214811-5b708f463de3 h1:Jrl8sD8wO34+EE1dV2vhOXrqFAZa/FILDnZRaV28+cw=
github.com/ema/qdisc v0.0.0-20230120214811-5b708f463de3/go.mod h1:FhIc0fLYi7f+lK5maMsesDqwYojIOh3VfRs8EVd5YJQ= github.com/ema/qdisc v0.0.0-20230120214811-5b708f463de3/go.mod h1:FhIc0fLYi7f+lK5maMsesDqwYojIOh3VfRs8EVd5YJQ=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
@ -146,6 +148,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211031064116-611d5d643895/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211031064116-611d5d643895/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=

View File

@ -1,4 +1,37 @@
#!/bin/sh #!/bin/bash
nohup /home/opdev/houston -client -logfile > /dev/null & HOUSTON_PATH="./houston"
UPDATE_DIR="houston.update"
run_houston() {
if [ -f "$HOUSTON_PATH" ]; then
"$HOUSTON_PATH" "$@"
return $?
else
echo "houston 실행 파일이 없습니다."
return 1
fi
}
while true; do
echo "현재 버전의 houston을 실행합니다."
run_houston "$@"
# houston.update 폴더가 존재하는지 확인
if [ -d "$UPDATE_DIR" ]; then
echo "새로운 업데이트 폴더 발견. 업데이트를 진행합니다."
# houston.update 폴더 내의 모든 파일을 현재 폴더로 복사
cp -R "$UPDATE_DIR"/* .
# 실행 권한 부여 (필요한 경우)
chmod +x "$HOUSTON_PATH"
# 업데이트 폴더 삭제
rm -rf "$UPDATE_DIR"
echo "업데이트 완료 및 업데이트 폴더 삭제. houston을 다시 시작합니다."
else
echo "업데이트 폴더가 없습니다. 스크립트 실행을 종료합니다."
break
fi
done
echo "houston 실행 및 업데이트 스크립트 종료"
exit $?

View File

@ -27,10 +27,7 @@ func main() {
go func() { go func() {
logger.Println("listen /metrics") logger.Println("listen /metrics")
err := server.ListenAndServe() server.ListenAndServe()
if err != nil {
logger.Error(err)
}
}() }()
hc.Start() hc.Start()

View File

@ -10,19 +10,7 @@ go mod tidy
go build -ldflags="-s -w" -tags=client . go build -ldflags="-s -w" -tags=client .
cp houston .\replacer\houston
cp config.json .\replacer\config.json
cd replacer
go build -ldflags="-s -w" .
Compress-Archive -Path replacer -DestinationPath houston.zip -Force
Compress-Archive -Path config.json -Update -DestinationPath houston.zip Compress-Archive -Path config.json -Update -DestinationPath houston.zip
Compress-Archive -Path houston -Update -DestinationPath houston.zip Compress-Archive -Path houston -Update -DestinationPath houston.zip
del houston
del config.json
del replacer
mv houston.zip ..\houston.zip
cd ..

View File

@ -1,113 +0,0 @@
package main
import (
"encoding/json"
"errors"
"io"
"log"
"os"
"os/exec"
"path"
"time"
)
func copy(src, dst string, stdlog *log.Logger) error {
fi, err := os.Stat(src)
if err != nil {
return err
}
if fi.IsDir() {
entries, _ := os.ReadDir(src)
for _, ent := range entries {
if err := copy(path.Join(src, ent.Name()), path.Join(dst, ent.Name()), stdlog); err != nil {
return err
}
}
return nil
}
inmode := fi.Mode()
in, err := os.Open(src)
if err != nil {
return err
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return err
}
defer out.Close()
copied, err := io.Copy(out, in)
if err != nil {
return err
}
if copied < fi.Size() {
return errors.New("copy not completed")
}
if err := out.Sync(); err != nil {
return err
}
if err := out.Chmod(inmode); err != nil {
return err
}
stdlog.Println("file copied :", src, dst)
return nil
}
func main() {
logfile, _ := os.OpenFile("replacer.log", os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0666)
defer logfile.Close()
stdlog := log.New(logfile, "", log.LstdFlags)
args := os.Args
// args[1] : 나를 시작한 pid. pid가 종료될 때 까지 기다림
// args[2] : target 폴더
// args[3:] : 다시 시작할 때 넘겨줄 arguments(프로세스 이름 포함)
stdlog.Println(args)
for {
stdlog.Println("wait for terminating of", args[3])
cmd := exec.Command("ps", "-p", args[1])
if err := cmd.Run(); err != nil {
break
}
time.Sleep(time.Second)
}
stdlog.Println("target is terminated")
// replacer 제거. 내가 돌고 있으므로 복사는 안된다.
// 내가 실행되기 전에 이미 복사가 되서 나는 최신 버전임
os.Remove(path.Join(args[2], os.Args[0]))
if err := copy(args[2], "", stdlog); err != nil {
stdlog.Fatal(err)
}
nextArgs := args[4:]
if bt, _ := os.ReadFile("@args"); len(bt) > 0 {
var tempArgs []string
if json.Unmarshal(bt, &tempArgs) == nil {
nextArgs = tempArgs
}
}
os.Remove("@args")
err := os.RemoveAll(args[2])
if err != nil {
stdlog.Println("os.RemoveAll failed :", args[2], err)
}
err = os.Chmod(args[3], 0775)
if err != nil {
stdlog.Println("os.Chmod failed :", err)
}
stdlog.Println("exec.Command :", args)
cmd := exec.Command(args[3], nextArgs...)
cmd.Start()
}

View File

@ -276,12 +276,16 @@ func (h *houstonHandler) Deploy(w http.ResponseWriter, r *http.Request) {
return return
} }
configPath, err := h.findLastestConfigFile(name) var configPath string
if name != "houston" {
// houston은 config를 포함하여 배포
configPath, err = h.findLastestConfigFile(name)
if err != nil { if err != nil {
logger.Println(err) logger.Println(err)
w.WriteHeader(http.StatusBadRequest) w.WriteHeader(http.StatusBadRequest)
return return
} }
}
h.Operation().Deploy(MakeDeployRequest( h.Operation().Deploy(MakeDeployRequest(
shared.DeployRequest{ shared.DeployRequest{

View File

@ -1,7 +1,6 @@
package server package server
import ( import (
"archive/zip"
"crypto/md5" "crypto/md5"
"encoding/hex" "encoding/hex"
"fmt" "fmt"
@ -15,7 +14,6 @@ import (
"strings" "strings"
"repositories.action2quare.com/ayo/gocommon" "repositories.action2quare.com/ayo/gocommon"
"repositories.action2quare.com/ayo/gocommon/flagx"
"repositories.action2quare.com/ayo/gocommon/logger" "repositories.action2quare.com/ayo/gocommon/logger"
) )
@ -141,25 +139,12 @@ func (h *houstonHandler) RegisterHandlers(serveMux gocommon.ServerMuxInterface,
dir := path.Join(h.downloadPath, name, version) dir := path.Join(h.downloadPath, name, version)
if err := os.MkdirAll(dir, 0775); err == nil { if err := os.MkdirAll(dir, 0775); err == nil {
filepath := path.Join(dir, filename) filepath := path.Join(dir, filename)
localfile, _ := os.Create(filepath) // filepath가 이미 있으면 append
logger.Println("file uploaded :", localfile) localfile, _ := os.OpenFile(filepath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0666)
if localfile != nil { if localfile != nil {
defer localfile.Close()
if _, err = io.Copy(localfile, r.Body); err != nil { if _, err = io.Copy(localfile, r.Body); err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
} else {
localfile.Close()
localfile, _ = os.Open(filepath)
if strings.HasSuffix(filename, ".zip") {
stat, _ := localfile.Stat()
zipreader, _ := zip.NewReader(localfile, stat.Size())
for _, f := range zipreader.File {
file, _ := os.Create(path.Join(dir, f.Name))
comp, _ := f.Open()
io.Copy(file, comp)
file.Close()
}
os.Remove(filepath)
}
} }
} else { } else {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
@ -172,9 +157,6 @@ func (h *houstonHandler) RegisterHandlers(serveMux gocommon.ServerMuxInterface,
return nil return nil
} }
var noauth = flagx.Bool("noauth", false, "")
var authtype = flagx.String("auth", "on", "on|off|both")
func (h *houstonHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { func (h *houstonHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer func() { defer func() {
s := recover() s := recover()
@ -189,39 +171,6 @@ func (h *houstonHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
r.Body.Close() r.Body.Close()
}() }()
// TODO : 구글 인증까지 붙인 후에 주석 제거
// var userinfo map[string]any
// if !*noauth && (*authtype == "on" || *authtype == "both") {
// authheader := r.Header.Get("Authorization")
// if len(authheader) == 0 {
// logger.Println("Authorization header is not valid :", authheader)
// w.WriteHeader(http.StatusBadRequest)
// return
// }
// req, _ := http.NewRequest("GET", "https://graph.microsoft.com/oidc/userinfo", nil)
// req.Header.Add("Authorization", authheader)
// client := &http.Client{}
// resp, err := client.Do(req)
// if err != nil {
// logger.Println("graph microsoft api call failed :", err)
// w.WriteHeader(http.StatusBadRequest)
// return
// }
// defer resp.Body.Close()
// raw, _ := io.ReadAll(resp.Body)
// if err = json.Unmarshal(raw, &userinfo); err != nil {
// return
// }
// if _, expired := userinfo["error"]; expired {
// w.WriteHeader(http.StatusUnauthorized)
// return
// }
// }
var operation string var operation string
if r.Method == "POST" { if r.Method == "POST" {
operation = r.FormValue("operation") operation = r.FormValue("operation")