package opensearch import ( "bytes" "context" "crypto/hmac" "crypto/sha256" "encoding/base64" "encoding/json" "fmt" "io" "net/http" "strings" "time" osg "github.com/opensearch-project/opensearch-go/v4" osapi "github.com/opensearch-project/opensearch-go/v4/opensearchapi" "repositories.action2quare.com/ayo/gocommon/logger" ) type Config struct { osg.Config `json:",inline"` IndexPrefix string `json:"IndexPrefix"` SigningKey string `json:"SigningKey"` } type Client struct { *osg.Client cfg Config signingKey []byte indexTemplatePattern string bulkHeader http.Header singleHeader http.Header } type LogDocument struct { Type string `json:"type"` Body any `json:"body"` Timestamp string `json:"@timestamp"` Country string `json:"country"` Ip string `json:"ip"` Uid string `json:"uid"` Auth struct { Type string `json:"type"` Id string `json:"id"` } `json:"auth"` } func NewLogDocument(logType string, body any) *LogDocument { return &LogDocument{ Type: strings.ToLower(logType), Timestamp: time.Now().UTC().Format("2006-01-02T15:04:05Z"), Body: body, } } func (c *Client) Send(ld *LogDocument) error { if c.Client == nil { return nil } serialized, _ := json.Marshal(ld) reader := bytes.NewBuffer(serialized) req := osapi.IndexReq{ Index: c.indexTemplatePattern + ld.Type, Body: reader, Header: c.singleHeader, } logger.Println("LogSend", req) resp, err := c.Do(context.Background(), req, nil) logger.Println(resp) if err != nil { return err } defer resp.Body.Close() r, err2 := io.ReadAll(resp.Body) if err2 != nil { logger.Println("LogSend resp read error :", err2) } else { logger.Println("LogSend resp :", string(r)) } return nil } func (c *Client) SendBulk(ds map[string]*LogDocument) error { var contents string for _, d := range ds { b, _ := json.Marshal(d) contents += fmt.Sprintf(`{"create":{"_index":"%s%s"}}`+"\n"+`%s`+"\n", c.indexTemplatePattern, d.Type, string(b)) } reader := bytes.NewBuffer([]byte(contents)) req := osapi.BulkReq{ Body: reader, Header: c.bulkHeader, } resp, err := c.Do(context.Background(), req, nil) if err != nil { return err } logger.Println(resp) defer resp.Body.Close() return nil } var jwtHeader string var encoding = base64.RawURLEncoding func init() { src := []byte(`{"alg": "HS256","typ": "JWT"}`) dst := make([]byte, len(src)*2) encoding.Encode(dst, src) enclen := encoding.EncodedLen(len(src)) jwtHeader = string(dst[:enclen]) } func (c *Client) MakeJWT(subject string, ttl time.Duration) string { if len(c.signingKey) == 0 { return "" } now := time.Now().UTC().Add(ttl).Add(time.Hour).Unix() src := []byte(fmt.Sprintf(`{"exp":%d,"sub":"%s","roles":"ds_client_full_access"}`, now, subject)) payload := make([]byte, encoding.EncodedLen(len(src))) encoding.Encode(payload, src) encoded := jwtHeader + "." + string(payload) mac := hmac.New(sha256.New, c.signingKey) mac.Write([]byte(encoded)) signature := mac.Sum(nil) sigenc := make([]byte, encoding.EncodedLen(len(signature))) encoding.Encode(sigenc, signature) return encoded + "." + string(sigenc) } func NewClient(cfg Config) (Client, error) { if len(cfg.Addresses) == 0 { return Client{}, nil } client, err := osg.NewClient(cfg.Config) if err != nil { return Client{}, err } var signingKey []byte if len(cfg.SigningKey) > 0 { dst := make([]byte, len(cfg.SigningKey)*2) dstlen, _ := base64.StdEncoding.Decode(dst, []byte(cfg.SigningKey)) signingKey = dst[:dstlen] } indexPrefix := cfg.IndexPrefix if !strings.HasSuffix(indexPrefix, "-") && len(indexPrefix) > 0 { indexPrefix += "-" } if !strings.HasSuffix(indexPrefix, "ds-logs-") { indexPrefix = "ds-logs-" + indexPrefix } authHeader := fmt.Sprintf("Basic %s", base64.RawURLEncoding.EncodeToString([]byte(fmt.Sprintf("%s:%s", cfg.Username, cfg.Password)))) bulkHeader := make(http.Header) bulkHeader.Set("Authorization", authHeader) singleHeader := make(http.Header) singleHeader.Set("Authorization", authHeader) return Client{ Client: client, cfg: cfg, signingKey: signingKey, indexTemplatePattern: indexPrefix, bulkHeader: bulkHeader, singleHeader: singleHeader, }, nil }