prom: Fix leak when a call fails with the prometheus endpoint (#2456)
This commit is contained in:
@@ -25,6 +25,7 @@ import (
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/runtime/middleware"
|
||||
@@ -977,20 +978,28 @@ func getUsageWidgetsForDeployment(ctx context.Context, prometheusURL string, mAd
|
||||
|
||||
func unmarshalPrometheus(ctx context.Context, endpoint string, data interface{}) bool {
|
||||
httpClnt := GetConsoleHTTPClient(endpoint)
|
||||
resp, err := httpClnt.Get(endpoint)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
|
||||
if err != nil {
|
||||
ErrorWithContext(ctx, fmt.Errorf("Unable to fetch labels from prometheus (%s)", resp.Status))
|
||||
ErrorWithContext(ctx, fmt.Errorf("Unable to create the request to fetch labels from prometheus: %w", err))
|
||||
return true
|
||||
}
|
||||
|
||||
resp, err := httpClnt.Do(req)
|
||||
if err != nil {
|
||||
ErrorWithContext(ctx, fmt.Errorf("Unable to fetch labels from prometheus: %w", err))
|
||||
return true
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != 200 {
|
||||
ErrorWithContext(ctx, fmt.Errorf("Unexpected errors from prometheus (%s)", resp.Status))
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
ErrorWithContext(ctx, fmt.Errorf("Unexpected status code from prometheus (%s)", resp.Status))
|
||||
return true
|
||||
}
|
||||
|
||||
if err = json.NewDecoder(resp.Body).Decode(data); err != nil {
|
||||
ErrorWithContext(ctx, fmt.Errorf("Unexpected errors from prometheus (%s)", resp.Status))
|
||||
ErrorWithContext(ctx, fmt.Errorf("Unexpected error from prometheus: %w", err))
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -1081,10 +1090,17 @@ LabelsWaitLoop:
|
||||
continue
|
||||
}
|
||||
|
||||
targetResults := make(chan *models.ResultTarget)
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
targetResults = make([]*models.ResultTarget, len(m.Targets))
|
||||
)
|
||||
|
||||
// for each target we will launch another goroutine to fetch the values
|
||||
for _, target := range m.Targets {
|
||||
go func(target Target, inStep *int32, inStart *int64, inEnd *int64) {
|
||||
for idx, target := range m.Targets {
|
||||
wg.Add(1)
|
||||
go func(idx int, target Target, inStep *int32, inStart *int64, inEnd *int64) {
|
||||
defer wg.Done()
|
||||
|
||||
apiType := "query_range"
|
||||
now := time.Now()
|
||||
|
||||
@@ -1145,10 +1161,12 @@ LabelsWaitLoop:
|
||||
})
|
||||
}
|
||||
|
||||
targetResults <- &targetResult
|
||||
}(target, step, start, end)
|
||||
targetResults[idx] = &targetResult
|
||||
}(idx, target, step, start, end)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
wdgtResult := models.WidgetDetails{
|
||||
ID: m.ID,
|
||||
Title: m.Title,
|
||||
@@ -1161,15 +1179,10 @@ LabelsWaitLoop:
|
||||
},
|
||||
}
|
||||
}
|
||||
// count how many targets we have received
|
||||
targetsReceived := 0
|
||||
|
||||
for res := range targetResults {
|
||||
wdgtResult.Targets = append(wdgtResult.Targets, res)
|
||||
targetsReceived++
|
||||
// upon receiving the total number of targets needed, we can close the channel to not lock the goroutine
|
||||
if targetsReceived >= len(m.Targets) {
|
||||
close(targetResults)
|
||||
for _, res := range targetResults {
|
||||
if res != nil {
|
||||
wdgtResult.Targets = append(wdgtResult.Targets, res)
|
||||
}
|
||||
}
|
||||
return &wdgtResult, nil
|
||||
|
||||
Reference in New Issue
Block a user