diff --git a/core/watch.go b/core/watch.go index bf225a6..bd80953 100644 --- a/core/watch.go +++ b/core/watch.go @@ -98,7 +98,7 @@ func (mg *Maingate) watchWhitelistCollection(parentctx context.Context) { changed := stream.TryNext(ctx) if ctx.Err() != nil { - logger.Error("watchServiceCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) + logger.Error("watchWhitelistCollection stream.TryNext failed. process should be restarted! :", ctx.Err().Error()) break } @@ -124,12 +124,20 @@ func (mg *Maingate) watchWhitelistCollection(parentctx context.Context) { } } } else { - logger.Error("watchServiceCollection stream.Decode failed :", err) + logger.Error("watchWhitelistCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { - logger.Error("watchServiceCollection stream error :", stream.Err()) - stream.Close(ctx) - stream = nil + select { + case <-ctx.Done(): + logger.Println("watchWhitelistCollection is done") + stream.Close(ctx) + return + + case <-time.After(time.Second): + logger.Error("watchWhitelistCollection stream error :", stream.Err()) + stream.Close(ctx) + stream = nil + } } else { time.Sleep(time.Second) } @@ -187,9 +195,17 @@ func (mg *Maingate) watchFileCollection(parentctx context.Context, serveMux *htt if !changed { if stream.Err() != nil || stream.ID() == 0 { - logger.Error("watchServiceCollection stream error :", stream.Err()) - stream.Close(ctx) - stream = nil + select { + case <-ctx.Done(): + logger.Println("watchServiceCollection is done") + stream.Close(ctx) + return + + case <-time.After(time.Second): + logger.Error("watchServiceCollection stream error :", stream.Err()) + stream.Close(ctx) + stream = nil + } } else { time.Sleep(time.Second) } @@ -346,9 +362,17 @@ func (mg *Maingate) watchServiceCollection(parentctx context.Context, serveMux * logger.Error("watchServiceCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { - logger.Error("watchServiceCollection stream error :", stream.Err()) - stream.Close(ctx) - stream = nil + select { + case <-ctx.Done(): + logger.Println("watchServiceCollection is done") + stream.Close(ctx) + return + + case <-time.After(time.Second): + logger.Error("watchServiceCollection stream error :", stream.Err()) + stream.Close(ctx) + stream = nil + } } else { time.Sleep(time.Second) } @@ -422,9 +446,17 @@ func watchAuthCollection(parentctx context.Context, ac *common.AuthCollection, m logger.Error("watchAuthCollection stream.Decode failed :", err) } } else if stream.Err() != nil || stream.ID() == 0 { - logger.Error("watchAuthCollection stream error :", stream.Err()) - stream.Close(ctx) - stream = nil + select { + case <-ctx.Done(): + logger.Println("watchAuthCollection is done") + stream.Close(ctx) + return + + case <-time.After(time.Second): + logger.Error("watchAuthCollection stream error :", stream.Err()) + stream.Close(ctx) + stream = nil + } } else { time.Sleep(time.Second) }