diff --git a/client/client.go b/client/client.go index 4bdd2cf..038ba70 100644 --- a/client/client.go +++ b/client/client.go @@ -94,20 +94,21 @@ func (pm *procmeta) setState(s protos.ProcessState) { } type houstonClient struct { - childProcs []*procmeta - extraMetrics unsafe.Pointer // map[string]float32 - deploys map[string][]*protos.VersionAndArgs - shutdownFunc context.CancelFunc - ctx context.Context - operationChan chan *protos.OperationQueryResponse - exitChan chan *exec.Cmd - clientChan chan *grpc.ClientConn - timestamp string - wg sync.WaitGroup - config clientConfig - version string - standalone bool - metricExporter metric.Exporter + childProcs []*procmeta + extraMetrics unsafe.Pointer // map[string]float32 + deploys map[string][]*protos.VersionAndArgs + shutdownFunc context.CancelFunc + ctx context.Context + operationChan chan *protos.OperationQueryResponse + exitChan chan *exec.Cmd + clientChan chan *grpc.ClientConn + timestamp string + wg sync.WaitGroup + config clientConfig + version string + standalone bool + metricExporter metric.Exporter + siblingProcIndex map[string]uint64 } func unmarshal[T any](val *T, src map[string]string) { @@ -260,14 +261,15 @@ func NewClient(standalone bool) (HoustonClient, error) { } hc := &houstonClient{ - config: clientConfig, - clientChan: make(chan *grpc.ClientConn), - extraMetrics: unsafe.Pointer(&map[string]float32{}), - deploys: deploys, - timestamp: exefi.ModTime().String(), - version: string(ver), - standalone: standalone, - metricExporter: metric.NewPrometheusExport(clientConfig.MetricNamespace), + config: clientConfig, + clientChan: make(chan *grpc.ClientConn), + extraMetrics: unsafe.Pointer(&map[string]float32{}), + deploys: deploys, + timestamp: exefi.ModTime().String(), + version: string(ver), + standalone: standalone, + metricExporter: metric.NewPrometheusExport(clientConfig.MetricNamespace), + siblingProcIndex: make(map[string]uint64), } ctx, cancel := context.WithCancel(context.Background()) diff --git a/client/client_linux.go b/client/client_linux.go new file mode 100644 index 0000000..4865db2 --- /dev/null +++ b/client/client_linux.go @@ -0,0 +1,26 @@ +//go:build linux + +package client + +import ( + "golang.org/x/sys/unix" + + "repositories.action2quare.com/ayo/gocommon/logger" +) + +func set_affinity(pid int, cpu int) { + var cpuset unix.CPUSet + err := unix.SchedGetaffinity(pid, &cpuset) + if err != nil { + logger.Println("SchedGetaffinity failed :", err) + } + + count := cpuset.Count() + cpuset.Zero() + cpuset.Set(cpu % count) + + err = unix.SchedSetaffinity(pid, &cpuset) + if err != nil { + logger.Println("SchedSetaffinity failed :", err) + } +} diff --git a/client/client_misc.go b/client/client_misc.go new file mode 100644 index 0000000..501156b --- /dev/null +++ b/client/client_misc.go @@ -0,0 +1,41 @@ +//go:build !linux + +package client + +func set_affinity(pid int, cpu int) { + +} + +// package main + +// import ( +// "fmt" +// "syscall" +// "time" +// "unsafe" +// ) + +// func main() { +// var mask uintptr + +// // Get the current CPU affinity of the process +// if _, _, err := syscall.RawSyscall(syscall.SYS_SCHED_GETAFFINITY, 0, uintptr(unsafe.Sizeof(mask)), uintptr(unsafe.Pointer(&mask))); err != 0 { +// fmt.Println("Failed to get CPU affinity:", err) +// return +// } +// fmt.Println("Current CPU affinity:", mask) + +// // Set the new CPU affinity +// mask = 3 +// if _, _, err := syscall.RawSyscall(syscall.SYS_SCHED_SETAFFINITY, 0, uintptr(unsafe.Sizeof(mask)), uintptr(unsafe.Pointer(&mask))); err != 0 { +// fmt.Println("Failed to set CPU affinity:", err) +// return +// } +// fmt.Println("New CPU affinity:", mask) + +// // some code +// for { +// println("Hello, World!") +// time.Sleep(1 * time.Second) +// } +// } diff --git a/client/operation.go b/client/operation.go index d85eef3..92ddf58 100644 --- a/client/operation.go +++ b/client/operation.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "io" + "math" "net/http" "os" "os/exec" @@ -179,7 +180,7 @@ func prepareProcessLaunch(storageRoot string, req *shared.StartProcessRequest) * return nil } -func makeLogFilePrefix(meta *procmeta) string { +func makeLogFilePrefix(meta *procmeta, index int) string { now := time.Now().UTC() ext := path.Ext(meta.args[0]) nameonly := path.Base(meta.args[0]) @@ -187,7 +188,11 @@ func makeLogFilePrefix(meta *procmeta) string { nameonly = nameonly[:len(nameonly)-len(ext)] } ts := now.Format("2006-01-02T15-04-05") - return path.Join(meta.cmd.Dir, "logs", fmt.Sprintf("%s_%s", nameonly, ts)) + if index == 0 { + return path.Join(meta.cmd.Dir, "logs", fmt.Sprintf("%s_%s", nameonly, ts)) + } + + return path.Join(meta.cmd.Dir, "logs", fmt.Sprintf("%s_%d_%s", nameonly, index, ts)) } func (hc *houstonClient) launch(meta *procmeta) error { @@ -201,7 +206,7 @@ func (hc *houstonClient) launch(meta *procmeta) error { return err } - stdReader := func(childProcName string, r io.ReadCloser) { + stdReader := func(childProcName string, r io.ReadCloser, index int) { defer func() { reco := recover() if reco != nil { @@ -209,13 +214,23 @@ func (hc *houstonClient) launch(meta *procmeta) error { } }() + defer func() { + overflow := index / 64 + offset := index % 64 + key := fmt.Sprintf("%s-%d", meta.args[0], overflow) + runningFlags := hc.siblingProcIndex[key] + mask := uint64(1 << offset) + runningFlags = runningFlags ^ mask + hc.siblingProcIndex[key] = runningFlags + }() + defer r.Close() reader := bufio.NewReader(r) thisFileSize := 0 logFileIndex := 0 - logFileNamePrefix := makeLogFilePrefix(meta) + logFileNamePrefix := makeLogFilePrefix(meta, index) logFileName := fmt.Sprintf("%s_%d.log", logFileNamePrefix, logFileIndex) targetFile, err := os.Create(logFileName) if err != nil { @@ -224,10 +239,15 @@ func (hc *houstonClient) launch(meta *procmeta) error { } exef, _ := os.Executable() - linkePath := path.Join(path.Dir(exef), path.Dir(logFileName), meta.name+".log") + var linkPath string + if index == 0 { + linkPath = path.Join(path.Dir(exef), path.Dir(logFileName), meta.name+".log") + } else { + linkPath = path.Join(path.Dir(exef), path.Dir(logFileName), fmt.Sprintf("%s_%d.log", meta.name, index)) + } - os.Remove(linkePath) - os.Symlink(path.Base(targetFile.Name()), linkePath) + os.Remove(linkPath) + os.Symlink(path.Base(targetFile.Name()), linkPath) defer func() { if targetFile != nil { @@ -307,19 +327,41 @@ func (hc *houstonClient) launch(meta *procmeta) error { } else { targetFile.Close() targetFile = nextTargetFile - os.Remove(linkePath) - os.Symlink(path.Base(targetFile.Name()), linkePath) + os.Remove(linkPath) + os.Symlink(path.Base(targetFile.Name()), linkPath) thisFileSize = 0 } } } } - go stdReader(meta.name, stdout) + index := 0 + for overflow := 0; ; overflow++ { + key := fmt.Sprintf("%s-%d", meta.args[0], overflow) + runningFlags := hc.siblingProcIndex[key] + if runningFlags == math.MaxUint64 { + index += 64 + } else { + for si := 0; si < 64; si++ { + mask := uint64(1 << si) + if runningFlags&mask == 0 { + index += si + runningFlags |= mask + break + } + } + hc.siblingProcIndex[key] = runningFlags + break + } + } + + go stdReader(meta.name, stdout, index) logger.Println("startChildProcess :", meta.cmd.Args) + meta.cmd.Env = append(meta.cmd.Env, fmt.Sprintf("HOUSTON_SIBLIING_INDEX=%d", index)) err = meta.cmd.Start() if err == nil { + set_affinity(meta.cmd.Process.Pid, index) meta.setState(protos.ProcessState_Running) } diff --git a/go.mod b/go.mod index 5b723a6..5ab5dc1 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect golang.org/x/net v0.11.0 // indirect golang.org/x/sync v0.3.0 // indirect - golang.org/x/sys v0.11.0 // indirect + golang.org/x/sys v0.15.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect howett.net/plist v1.0.0 // indirect ) diff --git a/go.sum b/go.sum index c4bbfdd..2adedbc 100644 --- a/go.sum +++ b/go.sum @@ -43,8 +43,11 @@ github.com/illumos/go-kstat v0.0.0-20210513183136-173c9b0a9973/go.mod h1:PoK3ejP github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/josharian/native v1.1.0 h1:uuaP0hAbW7Y4l0ZRQ6C9zfb7Mg1mbFKry/xzDAfmtLA= github.com/josharian/native v1.1.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jsimonetti/rtnetlink v1.3.2 h1:dcn0uWkfxycEEyNy0IGfx3GrhQ38LH7odjxAghimsVI= github.com/jsimonetti/rtnetlink v1.3.2/go.mod h1:BBu4jZCpTjP6Gk0/wfrO8qcqymnN3g0hoFqObRmUo6U= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= +github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/lufia/iostat v1.2.1 h1:tnCdZBIglgxD47RyD55kfWQcJMGzO+1QBziSQfesf2k= github.com/lufia/iostat v1.2.1/go.mod h1:rEPNA0xXgjHQjuI5Cy05sLlS2oRcSlWHRLrvh/AQ+Pg= github.com/mattn/go-xmlrpc v0.0.3 h1:Y6WEMLEsqs3RviBrAa1/7qmbGB7DVD3brZIbqMbQdGY= @@ -61,8 +64,12 @@ github.com/mdlayher/socket v0.4.1 h1:eM9y2/jlbs1M615oshPQOHZzj6R6wMT7bX5NPiQvn2U github.com/mdlayher/socket v0.4.1/go.mod h1:cAqeGjoufqdxWkD7DkpyS+wcefOtmu5OQ8KuoJGIReA= github.com/mdlayher/wifi v0.0.0-20220330172155-a44c70b6d3c8 h1:/HCRFfpoICSWHvNrJ356VO4opd9dg/LaU7m8Tzdf39c= github.com/mdlayher/wifi v0.0.0-20220330172155-a44c70b6d3c8/go.mod h1:IqdtNfemiXr50M8tnxLWSFdZKZ9vcI1Mgt0oTrCIS7A= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= +github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/opencontainers/selinux v1.11.0 h1:+5Zbo97w3Lbmb3PeqQtpmTkMwsW5nRI3YaLpt7tQ7oU= github.com/opencontainers/selinux v1.11.0/go.mod h1:E5dMC3VPuVvVHDYmi78qvhJp8+M586T4DlDRYpFkyec= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus-community/go-runit v0.1.0 h1:uTWEj/Fn2RoLdfg/etSqwzgYNOYPrARx1BHUN052tGA= @@ -103,6 +110,8 @@ golang.org/x/sys v0.0.0-20211031064116-611d5d643895/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.10.0 h1:UpjohKhiEgNc0CSauXmwYftY1+LlaC75SJwh0SgCX58= golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -117,6 +126,7 @@ google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v1 v1.0.0-20140924161607-9f9df34309c0/go.mod h1:WDnlLJ4WF5VGsH/HVa3CI79GS0ol3YnhVnKP89i0kNg= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= howett.net/plist v1.0.0 h1:7CrbWYbPPO/PyNy38b2EB/+gYbjCe2DXBxgtOOZbSQM= howett.net/plist v1.0.0/go.mod h1:lqaXoTrLY4hg8tnEzNru53gicrbv7rrk+2xJA/7hw9g=