전송 실패시 fmt 출력
This commit is contained in:
@ -32,7 +32,6 @@ type Client struct {
|
||||
indexTemplatePattern string
|
||||
bulkHeader http.Header
|
||||
singleHeader http.Header
|
||||
sendingCount int32
|
||||
bulkChan chan *LogDocument
|
||||
}
|
||||
|
||||
@ -76,28 +75,40 @@ func (c *Client) SendBulk(ds map[string]*LogDocument) {
|
||||
}
|
||||
|
||||
type stringSliceReader struct {
|
||||
src [][]byte
|
||||
src [][]byte
|
||||
sent [][]byte
|
||||
}
|
||||
|
||||
func (b *stringSliceReader) Read(p []byte) (n int, err error) {
|
||||
n = 0
|
||||
err = nil
|
||||
|
||||
for len(b.src) > 0 {
|
||||
sbt := b.src[0]
|
||||
if len(p) < len(sbt) {
|
||||
b.src[0] = sbt[len(p):]
|
||||
sbt = sbt[:len(p)]
|
||||
copy(p, sbt)
|
||||
return n + len(sbt), nil
|
||||
copied := copy(p, sbt)
|
||||
n += copied
|
||||
b.sent = append(b.sent, sbt[:copied])
|
||||
|
||||
if copied < len(sbt) {
|
||||
b.src[0] = sbt[copied:]
|
||||
return
|
||||
}
|
||||
|
||||
copy(p, sbt)
|
||||
n += len(sbt)
|
||||
p = p[len(sbt):]
|
||||
p = p[copied:]
|
||||
b.src = b.src[1:]
|
||||
}
|
||||
|
||||
err = io.EOF
|
||||
return
|
||||
}
|
||||
|
||||
func (b *stringSliceReader) printSent() {
|
||||
for _, r := range b.sent {
|
||||
fmt.Print(string(r))
|
||||
}
|
||||
fmt.Print("\n")
|
||||
}
|
||||
|
||||
func (c *Client) sendLoop(ctx context.Context) {
|
||||
defer func() {
|
||||
r := recover()
|
||||
@ -145,11 +156,9 @@ func (c *Client) sendLoop(ctx context.Context) {
|
||||
resp, err := c.Do(context.Background(), req, nil)
|
||||
if err != nil {
|
||||
if netoperr, ok := err.(*net.OpError); ok && netoperr.Op == "dial" {
|
||||
// 접속 안됨. 파일로 남기고 끝
|
||||
// 접속 안됨. 재시도 안함
|
||||
logger.Println("log send bulk failed. no retry :", err)
|
||||
for _, e := range sending {
|
||||
logger.Println(string(e))
|
||||
}
|
||||
reader.printSent()
|
||||
} else {
|
||||
// 재시도
|
||||
logger.Println("log send bulk failed. retry :", err)
|
||||
|
||||
Reference in New Issue
Block a user