client server 실행인자 추가

This commit is contained in:
2023-05-25 10:59:04 +09:00
parent 53a385d018
commit 383f846934
5 changed files with 173 additions and 34 deletions

View File

@ -5,11 +5,13 @@ import (
"io"
"os"
"os/exec"
"os/signal"
"path"
"reflect"
"sort"
"strconv"
"sync/atomic"
"syscall"
"unsafe"
"repositories.action2quare.com/ayo/gocommon/logger"
@ -28,6 +30,7 @@ import (
type HoustonClient interface {
SetReportMetrics(map[string]float32)
Shutdown()
Start()
}
type bufferStack struct {
@ -68,14 +71,16 @@ type procmeta struct {
}
type houstonClient struct {
client *grpc.ClientConn
childProcs []*procmeta
extraMetrics unsafe.Pointer // map[string]float32
deploys map[string][]*protos.VersionAndArgs
shutdownFunc context.CancelFunc
exitChan chan *exec.Cmd
httpAddr string
timestamp string
client *grpc.ClientConn
childProcs []*procmeta
extraMetrics unsafe.Pointer // map[string]float32
deploys map[string][]*protos.VersionAndArgs
shutdownFunc context.CancelFunc
ctx context.Context
operationChan chan *protos.OperationQueryResponse
exitChan chan *exec.Cmd
httpAddr string
timestamp string
}
func bToMb(b uint64) uint32 {
@ -208,7 +213,7 @@ func NewClient(grpcAddr string, httpAddr string) (HoustonClient, error) {
metrics.Free = bToMb(mem.ActualFree)
metrics.Metrics = *(*map[string]float32)(atomic.LoadPointer(&hc.extraMetrics))
sc.Report(context.Background(), metrics, grpc.WaitForReady(true))
sc.Report(ctx, metrics, grpc.WaitForReady(true))
mem.Get()
}
}
@ -295,33 +300,57 @@ func NewClient(grpcAddr string, httpAddr string) (HoustonClient, error) {
}
}()
go func() {
// receive from stream
for {
select {
case <-ctx.Done():
return
hc.shutdownFunc = cancel
hc.exitChan = exitChan
hc.ctx = ctx
hc.operationChan = operationChan
default:
err := hc.checkOperation(operationChan)
if err != nil {
logger.Println("hc.checkUpdate failed :", err)
}
return hc, nil
}
func (hc *houstonClient) Start() {
// receive from stream
defer func() {
for _, proc := range hc.childProcs {
if err := proc.cmd.Process.Signal(syscall.SIGTERM); err != nil {
proc.cmd.Process.Signal(os.Kill)
proc.state = protos.ProcessState_Stopping
}
}
for _, proc := range hc.childProcs {
proc.cmd.Wait()
}
}()
hc.shutdownFunc = cancel
hc.exitChan = exitChan
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
return hc, nil
go func() {
c := <-interrupt
logger.Println("interrupt!!!!!!!! :", c.String())
hc.shutdownFunc()
}()
for {
select {
case <-hc.ctx.Done():
return
default:
err := hc.checkOperation()
if err != nil {
logger.Println("hc.checkUpdate failed :", err)
}
}
}
}
func (hc *houstonClient) Shutdown() {
hc.shutdownFunc()
}
func (hc *houstonClient) checkOperation(opChan chan<- *protos.OperationQueryResponse) error {
func (hc *houstonClient) checkOperation() error {
defer func() {
r := recover()
if r != nil {
@ -330,13 +359,12 @@ func (hc *houstonClient) checkOperation(opChan chan<- *protos.OperationQueryResp
}()
op := protos.NewOperationClient(hc.client)
cl, err := op.Query(context.Background(), grpc.WaitForReady(true))
cl, err := op.Query(hc.ctx, grpc.WaitForReady(true))
if err != nil {
return err
}
err = cl.Send(hc.makeOperationQueryRequest())
if err != nil {
cl.CloseSend()
return err
@ -348,7 +376,8 @@ func (hc *houstonClient) checkOperation(opChan chan<- *protos.OperationQueryResp
cl.CloseSend()
return err
}
opChan <- update
logger.Println(update)
hc.operationChan <- update
}
}