package opensearch import ( "context" "crypto/hmac" "crypto/sha256" "encoding/base64" "encoding/json" "fmt" "io" "net" "net/http" "slices" "strings" "time" osg "github.com/opensearch-project/opensearch-go/v4" osapi "github.com/opensearch-project/opensearch-go/v4/opensearchapi" "repositories.action2quare.com/ayo/gocommon/logger" ) type Config struct { osg.Config `json:",inline"` IndexPrefix string `json:"IndexPrefix"` SigningKey string `json:"SigningKey"` } type Client struct { *osg.Client cfg Config signingKey []byte indexTemplatePattern string bulkHeader http.Header singleHeader http.Header sendingCount int32 bulkChan chan *LogDocument } type LogDocument struct { Type string `json:"type"` Body any `json:"body"` Timestamp string `json:"@timestamp"` Country string `json:"country"` Ip string `json:"ip"` Uid string `json:"uid"` Auth struct { Type string `json:"type"` Id string `json:"id"` } `json:"auth"` } func NewLogDocument(logType string, body any) *LogDocument { return &LogDocument{ Type: strings.ToLower(logType), Timestamp: time.Now().UTC().Format("2006-01-02T15:04:05Z"), Body: body, } } func (c *Client) Send(ld *LogDocument) { if c.Client == nil { return } c.bulkChan <- ld } func (c *Client) SendBulk(ds map[string]*LogDocument) { if c == nil { return } for _, d := range ds { c.bulkChan <- d } } type stringSliceReader struct { src [][]byte pointer int } func (b *stringSliceReader) Read(p []byte) (n int, err error) { for b.pointer < len(b.src) { sbt := b.src[b.pointer] if len(p) < len(sbt) { return } copy(p, sbt) p = p[len(sbt):] b.pointer++ n += len(sbt) err = nil } err = io.EOF return } func (c *Client) sendLoop(ctx context.Context) { defer func() { r := recover() if r != nil { logger.Error(r) } }() failChan := make(chan [][]byte) contentsSize := 0 var contents [][]byte sendTick := time.After(time.Minute) sendfunc := func() { // 2mb가 넘지 않게 조절. // 실패한 로그가 다시 되돌아 오면 contents가 커질 수 있다. sendingSize := 0 cut := 0 for ; cut < len(contents); cut++ { thisSize := len(contents[cut]) if thisSize+sendingSize > 2*1024*1024 { break } sendingSize += thisSize } sending := contents[:cut] contents = contents[cut:] contentsSize -= sendingSize sendTick = time.After(time.Minute) go func(sending [][]byte) { defer func() { r := recover() if r != nil { logger.Println(r) } }() reader := &stringSliceReader{src: sending, pointer: 0} req := osapi.BulkReq{ Body: reader, Header: c.bulkHeader, } resp, err := c.Do(context.Background(), req, nil) if err != nil { if netoperr, ok := err.(*net.OpError); ok && netoperr.Op == "dial" { // 접속 안됨. 파일로 남기고 끝 logger.Println("log send bulk failed. no retry :", err) for _, e := range sending { logger.Println(string(e)) } } else { // 재시도 logger.Println("log send bulk failed. retry :", err) failChan <- sending } return } if resp.Body == nil { return } defer resp.Body.Close() var respbody struct { Errors bool `json:"errors"` Items []struct { Create struct { Status int `json:"status"` } `json:"create"` } `json:"items"` } json.NewDecoder(resp.Body).Decode(&respbody) if !respbody.Errors { return } var retry [][]byte for i, item := range respbody.Items { if item.Create.Status < 400 { // 재시도 retry = append(retry, sending[i]) } } if len(retry) > 0 { failChan <- retry } }(sending) } for { select { case <-ctx.Done(): return case ret := <-failChan: // 순서는 중요하지 않음. for _, r := range ret { contentsSize += len(r) } contents = append(ret, contents...) sendfunc() case <-sendTick: if len(contents) > 0 { sendfunc() } else { sendTick = time.After(time.Minute) } case logDoc := <-c.bulkChan: b, _ := json.Marshal(logDoc) e := fmt.Appendf(nil, `{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, logDoc.Type, string(b)) contentsSize += len(e) contents = append(contents, e) if contentsSize > 1024*1024 { sendfunc() } } } } var jwtHeader string var encoding = base64.RawURLEncoding func init() { src := []byte(`{"alg": "HS256","typ": "JWT"}`) dst := make([]byte, len(src)*2) encoding.Encode(dst, src) enclen := encoding.EncodedLen(len(src)) jwtHeader = string(dst[:enclen]) } func (c *Client) MakeJWT(subject string, role string, ttl time.Duration) string { if len(c.signingKey) == 0 { return "" } now := time.Now().Add(ttl).Unix() src := []byte(fmt.Sprintf(`{"exp":%d,"sub":"%s","roles":"%s"}`, now, subject, role)) payload := make([]byte, encoding.EncodedLen(len(src))) encoding.Encode(payload, src) encoded := jwtHeader + "." + string(payload) mac := hmac.New(sha256.New, c.signingKey) mac.Write([]byte(encoded)) signature := mac.Sum(nil) sigenc := make([]byte, encoding.EncodedLen(len(signature))) encoding.Encode(sigenc, signature) return encoded + "." + string(sigenc) } func (c *Client) VerifyJWT(token string) (subject string, role string) { dot := strings.LastIndex(token, ".") if dot < 0 { return } encoded := token[:dot] sigenc := token[dot+1:] signature := make([]byte, encoding.DecodedLen(len(sigenc))) encoding.Decode(signature, []byte(sigenc)) mac := hmac.New(sha256.New, c.signingKey) mac.Write([]byte(encoded)) calsig := mac.Sum(nil) if slices.Compare(calsig, signature) != 0 { return } _, payload, ok := strings.Cut(encoded, ".") if !ok { return } srcjson, err := encoding.DecodeString(payload) if err != nil { return } var src struct { Exp int64 `json:"exp"` Sub string `json:"sub"` Roles string `json:"roles"` } if json.Unmarshal([]byte(srcjson), &src) != nil { return } if src.Exp < time.Now().Unix() { return } return src.Sub, src.Roles } func NewClient(ctx context.Context, cfg Config) (Client, error) { if len(cfg.Addresses) == 0 { return Client{}, nil } client, err := osg.NewClient(cfg.Config) if err != nil { return Client{}, err } var signingKey []byte if len(cfg.SigningKey) > 0 { dst := make([]byte, len(cfg.SigningKey)*2) dstlen, _ := base64.StdEncoding.Decode(dst, []byte(cfg.SigningKey)) signingKey = dst[:dstlen] } indexPrefix := cfg.IndexPrefix if !strings.HasSuffix(indexPrefix, "-") && len(indexPrefix) > 0 { indexPrefix += "-" } if !strings.HasSuffix(indexPrefix, "ds-logs-") { indexPrefix = "ds-logs-" + indexPrefix } authHeader := fmt.Sprintf("Basic %s", base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", cfg.Username, cfg.Password)))) bulkHeader := make(http.Header) bulkHeader.Set("Authorization", authHeader) singleHeader := make(http.Header) singleHeader.Set("Authorization", authHeader) out := Client{ Client: client, cfg: cfg, signingKey: signingKey, indexTemplatePattern: indexPrefix, bulkHeader: bulkHeader, singleHeader: singleHeader, bulkChan: make(chan *LogDocument, 1000), } go func() { for { out.sendLoop(ctx) if ctx.Err() != nil { return } } }() return out, nil }