diff --git a/opensearch/client.go b/opensearch/client.go index 129522b..49e0f6f 100644 --- a/opensearch/client.go +++ b/opensearch/client.go @@ -1,17 +1,17 @@ package opensearch import ( - "bytes" "context" "crypto/hmac" "crypto/sha256" "encoding/base64" "encoding/json" "fmt" + "io" + "net" "net/http" "slices" "strings" - "sync/atomic" "time" osg "github.com/opensearch-project/opensearch-go/v4" @@ -33,6 +33,7 @@ type Client struct { bulkHeader http.Header singleHeader http.Header sendingCount int32 + bulkChan chan *LogDocument } type LogDocument struct { @@ -62,67 +63,157 @@ func (c *Client) Send(ld *LogDocument) { return } - serialized, _ := json.Marshal(ld) - go func(serialized []byte) { - sending := atomic.AddInt32(&c.sendingCount, 1) - defer atomic.AddInt32(&c.sendingCount, -1) - - if sending > 100 { - logger.Println("sending log bottleneck :", sending) - logger.Println(string(serialized)) - return - } - - reader := bytes.NewBuffer(serialized) - req := osapi.IndexReq{ - Index: c.indexTemplatePattern + ld.Type, - Body: reader, - Header: c.singleHeader, - } - - resp, err := c.Do(context.Background(), req, nil) - if err != nil { - logger.Println("log send failed :", err) - return - } - resp.Body.Close() - }(serialized) + c.bulkChan <- ld } func (c *Client) SendBulk(ds map[string]*LogDocument) { if c == nil { return } - - var contents string - for _, d := range ds { - b, _ := json.Marshal(d) - contents += fmt.Sprintf(`{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, d.Type, string(b)) + c.bulkChan <- d + } +} + +type stringSliceReader struct { + src [][]byte + pointer int +} + +func (b *stringSliceReader) Read(p []byte) (n int, err error) { + for b.pointer < len(b.src) { + sbt := b.src[b.pointer] + if len(p) < len(sbt) { + return + } + copy(p, sbt) + p = p[len(sbt):] + b.pointer++ + err = nil + } + err = io.EOF + return +} + +func (c *Client) sendLoop(ctx context.Context) { + defer func() { + r := recover() + if r != nil { + logger.Error(r) + } + }() + + failChan := make(chan [][]byte) + contentsSize := 0 + var contents [][]byte + sendTick := time.After(time.Minute) + + sendfunc := func() { + // 2mb가 넘지 않게 조절. + // 실패한 로그가 다시 되돌아 오면 contents가 커질 수 있다. + sendingSize := 0 + cut := 0 + for ; cut < len(contents); cut++ { + thisSize := len(contents[cut]) + if thisSize+sendingSize > 2*1024*1024 { + break + } + sendingSize += thisSize + } + sending := contents[:cut] + contents = contents[cut:] + contentsSize -= sendingSize + sendTick = time.After(time.Minute) + + go func(sending [][]byte) { + defer func() { + r := recover() + if r != nil { + logger.Println(r) + } + }() + + reader := &stringSliceReader{src: sending, pointer: 0} + req := osapi.BulkReq{ + Body: reader, + Header: c.bulkHeader, + } + + resp, err := c.Do(context.Background(), req, nil) + if err != nil { + if netoperr, ok := err.(*net.OpError); ok && netoperr.Op == "dial" { + // 접속 안됨. 파일로 남기고 끝 + logger.Println("log send bulk failed :", err) + for _, e := range sending { + logger.Println(string(e)) + } + } else { + // 재시도 + failChan <- sending + } + return + } + if resp.Body == nil { + return + } + defer resp.Body.Close() + + var respbody struct { + Errors bool `json:"errors"` + Items []struct { + Create struct { + Status int `json:"status"` + } `json:"create"` + } `json:"items"` + } + json.NewDecoder(resp.Body).Decode(&respbody) + if !respbody.Errors { + return + } + + var retry [][]byte + for i, item := range respbody.Items { + if item.Create.Status < 400 { + // 재시도 + retry = append(retry, sending[i]) + } + } + if len(retry) > 0 { + failChan <- retry + } + }(sending) } - go func(contents string) { - sending := atomic.AddInt32(&c.sendingCount, 1) - defer atomic.AddInt32(&c.sendingCount, -1) - - if sending > 100 { - logger.Println("sending log bottleneck :", sending) - logger.Println(contents) + for { + select { + case <-ctx.Done(): return - } - reader := bytes.NewBuffer([]byte(contents)) - req := osapi.BulkReq{ - Body: reader, - Header: c.bulkHeader, + case ret := <-failChan: + // 순서는 중요하지 않음. + for _, r := range ret { + contentsSize += len(r) + } + contents = append(ret, contents...) + sendfunc() + + case <-sendTick: + if len(contents) > 0 { + sendfunc() + } else { + sendTick = time.After(time.Minute) + } + + case logDoc := <-c.bulkChan: + b, _ := json.Marshal(logDoc) + e := fmt.Appendf(nil, `{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, logDoc.Type, string(b)) + contentsSize += len(e) + contents = append(contents, e) + if contentsSize > 1024*1024 { + sendfunc() + } } - resp, err := c.Do(context.Background(), req, nil) - if err != nil { - logger.Println("log send bulk failed :", err) - return - } - resp.Body.Close() - }(contents) + } } var jwtHeader string @@ -199,7 +290,7 @@ func (c *Client) VerifyJWT(token string) (subject string, role string) { return src.Sub, src.Roles } -func NewClient(cfg Config) (Client, error) { +func NewClient(ctx context.Context, cfg Config) (Client, error) { if len(cfg.Addresses) == 0 { return Client{}, nil } @@ -232,12 +323,23 @@ func NewClient(cfg Config) (Client, error) { singleHeader := make(http.Header) singleHeader.Set("Authorization", authHeader) - return Client{ + out := Client{ Client: client, cfg: cfg, signingKey: signingKey, indexTemplatePattern: indexPrefix, bulkHeader: bulkHeader, singleHeader: singleHeader, - }, nil + bulkChan: make(chan *LogDocument, 1000), + } + + go func() { + for { + out.sendLoop(ctx) + if ctx.Err() != nil { + return + } + } + }() + return out, nil }