From 4087326f45a55bd95fec83c65877cbfd3bca8c80 Mon Sep 17 00:00:00 2001 From: caffix Date: Mon, 20 Nov 2017 16:45:59 -0500 Subject: [PATCH 1/3] fixed race condition reported in issue #881 --- p2p/trust/trustmetric.go | 86 +++++++++++++++++------------------ p2p/trust/trustmetric_test.go | 6 +++ 2 files changed, 49 insertions(+), 43 deletions(-) diff --git a/p2p/trust/trustmetric.go b/p2p/trust/trustmetric.go index eaed78e3a..4216a0b5b 100644 --- a/p2p/trust/trustmetric.go +++ b/p2p/trust/trustmetric.go @@ -144,6 +144,7 @@ func (tms *TrustMetricStore) loadFromDB() bool { for key, p := range peers { tm := NewMetricWithConfig(tms.config) + tm.mtx.Lock() // Restore the number of time intervals we have previously tracked if p.NumIntervals > tm.maxIntervals { p.NumIntervals = tm.maxIntervals @@ -168,6 +169,8 @@ func (tms *TrustMetricStore) loadFromDB() bool { } // Calculate the history value based on the loaded history data tm.historyValue = tm.calcHistoryValue() + tm.mtx.Unlock() + // Load the peer trust metric into the store tms.peerMetrics[key] = tm } @@ -181,11 +184,13 @@ func (tms *TrustMetricStore) saveToDB() { peers := make(map[string]peerHistoryJSON, 0) for key, tm := range tms.peerMetrics { + tm.mtx.Lock() // Add an entry for the peer identified by key peers[key] = peerHistoryJSON{ NumIntervals: tm.numIntervals, History: tm.history, } + tm.mtx.Unlock() } // Write all the data back to the DB @@ -236,6 +241,9 @@ const ( // TrustMetric - keeps track of peer reliability // See tendermint/docs/architecture/adr-006-trust-metric.md for details type TrustMetric struct { + // Mutex that protects the metric from concurrent access + mtx sync.Mutex + // Determines the percentage given to current behavior proportionalWeight float64 @@ -277,24 +285,6 @@ type TrustMetric struct { // Sending true on this channel stops tracking, while false pauses tracking stop chan bool - - // For sending information about new good/bad events to be recorded - update chan *updateBadGood - - // The channel to request a newly calculated trust value - trustValue chan *reqTrustValue -} - -// For the TrustMetric update channel -type updateBadGood struct { - IsBad bool - Add int -} - -// For the TrustMetric trustValue channel -type reqTrustValue struct { - // The requested trust value is sent back on this channel - Resp chan float64 } // Pause tells the metric to pause recording data over time intervals. @@ -310,20 +300,44 @@ func (tm *TrustMetric) Stop() { // BadEvents indicates that an undesirable event(s) took place func (tm *TrustMetric) BadEvents(num int) { - tm.update <- &updateBadGood{IsBad: true, Add: num} + tm.mtx.Lock() + defer tm.mtx.Unlock() + + // Check if this is the first experience with + // what we are tracking since being paused + if tm.paused { + tm.good = 0 + tm.bad = 0 + // New events cause us to unpause the metric + tm.paused = false + } + + tm.bad += float64(num) } // GoodEvents indicates that a desirable event(s) took place func (tm *TrustMetric) GoodEvents(num int) { - tm.update <- &updateBadGood{IsBad: false, Add: num} + tm.mtx.Lock() + defer tm.mtx.Unlock() + + // Check if this is the first experience with + // what we are tracking since being paused + if tm.paused { + tm.good = 0 + tm.bad = 0 + // New events cause us to unpause the metric + tm.paused = false + } + + tm.good += float64(num) } // TrustValue gets the dependable trust value; always between 0 and 1 func (tm *TrustMetric) TrustValue() float64 { - resp := make(chan float64, 1) + tm.mtx.Lock() + defer tm.mtx.Unlock() - tm.trustValue <- &reqTrustValue{Resp: resp} - return <-resp + return tm.calcTrustValue() } // TrustScore gets a score based on the trust value always between 0 and 100 @@ -381,9 +395,7 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1 // This metric has a perfect history so far tm.historyValue = 1.0 - // Setup the channels - tm.update = make(chan *updateBadGood, defaultUpdateChanCapacity) - tm.trustValue = make(chan *reqTrustValue, defaultRequestChanCapacity) + // Setup the stop channel tm.stop = make(chan bool, 1) go tm.processRequests() @@ -516,24 +528,8 @@ func (tm *TrustMetric) processRequests() { loop: for { select { - case bg := <-tm.update: - // Check if this is the first experience with - // what we are tracking since being paused - if tm.paused { - tm.good = 0 - tm.bad = 0 - // New events cause us to unpause the metric - tm.paused = false - } - - if bg.IsBad { - tm.bad += float64(bg.Add) - } else { - tm.good += float64(bg.Add) - } - case rtv := <-tm.trustValue: - rtv.Resp <- tm.calcTrustValue() case <-t.C: + tm.mtx.Lock() if !tm.paused { // Add the current trust value to the history data newHist := tm.calcTrustValue() @@ -563,13 +559,17 @@ loop: tm.good = 0 tm.bad = 0 } + tm.mtx.Unlock() case stop := <-tm.stop: + tm.mtx.Lock() if stop { // Stop all further tracking for this metric + tm.mtx.Unlock() break loop } // Pause the metric for now tm.paused = true + tm.mtx.Unlock() } } } diff --git a/p2p/trust/trustmetric_test.go b/p2p/trust/trustmetric_test.go index 56441c721..af4a945db 100644 --- a/p2p/trust/trustmetric_test.go +++ b/p2p/trust/trustmetric_test.go @@ -210,7 +210,10 @@ func TestTrustMetricStopPause(t *testing.T) { // Give the pause some time to take place time.Sleep(10 * time.Millisecond) + tm.mtx.Lock() first := tm.numIntervals + tm.mtx.Unlock() + // Allow more time to pass and check the intervals are unchanged time.Sleep(50 * time.Millisecond) assert.Equal(t, first, tm.numIntervals) @@ -223,7 +226,10 @@ func TestTrustMetricStopPause(t *testing.T) { // Give the stop some time to take place time.Sleep(10 * time.Millisecond) + tm.mtx.Lock() second := tm.numIntervals + tm.mtx.Unlock() + // Allow more time to pass and check the intervals are unchanged time.Sleep(50 * time.Millisecond) assert.Equal(t, second, tm.numIntervals) From 9c8100043edda4f5f582e370e85e627a6cfc9995 Mon Sep 17 00:00:00 2001 From: caffix Date: Mon, 20 Nov 2017 19:15:11 -0500 Subject: [PATCH 2/3] made changes to address suggestions from the PR comments --- p2p/trust/trustmetric.go | 92 ++++++++++++++++++++-------------------- 1 file changed, 47 insertions(+), 45 deletions(-) diff --git a/p2p/trust/trustmetric.go b/p2p/trust/trustmetric.go index 4216a0b5b..e68903ff5 100644 --- a/p2p/trust/trustmetric.go +++ b/p2p/trust/trustmetric.go @@ -124,6 +124,37 @@ type peerHistoryJSON struct { History []float64 `json:"history"` } +// Loads the history data for a single peer and takes care of trust metric locking +func reinstantiateMetric(tm *TrustMetric, ph peerHistoryJSON) { + tm.mtx.Lock() + defer tm.mtx.Unlock() + + // Restore the number of time intervals we have previously tracked + if ph.NumIntervals > tm.maxIntervals { + ph.NumIntervals = tm.maxIntervals + } + tm.numIntervals = ph.NumIntervals + // Restore the history and its current size + if len(ph.History) > tm.historyMaxSize { + // Keep the history no larger than historyMaxSize + last := len(ph.History) - tm.historyMaxSize + ph.History = ph.History[last:] + } + tm.history = ph.History + tm.historySize = len(tm.history) + // Create the history weight values and weight sum + for i := 1; i <= tm.numIntervals; i++ { + x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight + tm.historyWeights = append(tm.historyWeights, x) + } + + for _, v := range tm.historyWeights { + tm.historyWeightSum += v + } + // Calculate the history value based on the loaded history data + tm.historyValue = tm.calcHistoryValue() +} + // Loads the history data for all peers from the store DB // cmn.Panics if file is corrupt func (tms *TrustMetricStore) loadFromDB() bool { @@ -144,33 +175,7 @@ func (tms *TrustMetricStore) loadFromDB() bool { for key, p := range peers { tm := NewMetricWithConfig(tms.config) - tm.mtx.Lock() - // Restore the number of time intervals we have previously tracked - if p.NumIntervals > tm.maxIntervals { - p.NumIntervals = tm.maxIntervals - } - tm.numIntervals = p.NumIntervals - // Restore the history and its current size - if len(p.History) > tm.historyMaxSize { - // Keep the history no larger than historyMaxSize - last := len(p.History) - tm.historyMaxSize - p.History = p.History[last:] - } - tm.history = p.History - tm.historySize = len(tm.history) - // Create the history weight values and weight sum - for i := 1; i <= tm.numIntervals; i++ { - x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight - tm.historyWeights = append(tm.historyWeights, x) - } - - for _, v := range tm.historyWeights { - tm.historyWeightSum += v - } - // Calculate the history value based on the loaded history data - tm.historyValue = tm.calcHistoryValue() - tm.mtx.Unlock() - + reinstantiateMetric(tm, p) // Load the peer trust metric into the store tms.peerMetrics[key] = tm } @@ -303,15 +308,7 @@ func (tm *TrustMetric) BadEvents(num int) { tm.mtx.Lock() defer tm.mtx.Unlock() - // Check if this is the first experience with - // what we are tracking since being paused - if tm.paused { - tm.good = 0 - tm.bad = 0 - // New events cause us to unpause the metric - tm.paused = false - } - + tm.unpause() tm.bad += float64(num) } @@ -320,15 +317,7 @@ func (tm *TrustMetric) GoodEvents(num int) { tm.mtx.Lock() defer tm.mtx.Unlock() - // Check if this is the first experience with - // what we are tracking since being paused - if tm.paused { - tm.good = 0 - tm.bad = 0 - // New events cause us to unpause the metric - tm.paused = false - } - + tm.unpause() tm.good += float64(num) } @@ -429,6 +418,19 @@ func customConfig(tmc TrustMetricConfig) TrustMetricConfig { return config } +// Wakes the trust metric up if it is currently paused +// This method needs to be called with the mutex locked +func (tm *TrustMetric) unpause() { + // Check if this is the first experience with + // what we are tracking since being paused + if tm.paused { + tm.good = 0 + tm.bad = 0 + // New events cause us to unpause the metric + tm.paused = false + } +} + // Calculates the derivative component func (tm *TrustMetric) derivativeValue() float64 { return tm.proportionalValue() - tm.historyValue From 887cb6d0cd6e39175fddb0f616d524585bbd85ad Mon Sep 17 00:00:00 2001 From: caffix Date: Wed, 22 Nov 2017 23:42:38 -0500 Subject: [PATCH 3/3] added public methods to handle locking within the trust metric --- p2p/trust/trustmetric.go | 242 ++++++++++-------- ...ustmetric_test.go_ => trustmetric_test.go} | 10 +- 2 files changed, 142 insertions(+), 110 deletions(-) rename p2p/trust/{trustmetric_test.go_ => trustmetric_test.go} (98%) diff --git a/p2p/trust/trustmetric.go b/p2p/trust/trustmetric.go index e68903ff5..cbc2db7d5 100644 --- a/p2p/trust/trustmetric.go +++ b/p2p/trust/trustmetric.go @@ -107,6 +107,15 @@ func (tms *TrustMetricStore) PeerDisconnected(key string) { } } +// Saves the history data for all peers to the store DB. +// This public method acquires the trust metric store lock +func (tms *TrustMetricStore) SaveToDB() { + tms.mtx.Lock() + defer tms.mtx.Unlock() + + tms.saveToDB() +} + /* Private methods */ // size returns the number of entries in the store without acquiring the mutex @@ -115,46 +124,10 @@ func (tms *TrustMetricStore) size() int { } /* Loading & Saving */ -/* Both of these methods assume the mutex has been acquired, since they write to the map */ +/* Both loadFromDB and savetoDB assume the mutex has been acquired */ var trustMetricKey = []byte("trustMetricStore") -type peerHistoryJSON struct { - NumIntervals int `json:"intervals"` - History []float64 `json:"history"` -} - -// Loads the history data for a single peer and takes care of trust metric locking -func reinstantiateMetric(tm *TrustMetric, ph peerHistoryJSON) { - tm.mtx.Lock() - defer tm.mtx.Unlock() - - // Restore the number of time intervals we have previously tracked - if ph.NumIntervals > tm.maxIntervals { - ph.NumIntervals = tm.maxIntervals - } - tm.numIntervals = ph.NumIntervals - // Restore the history and its current size - if len(ph.History) > tm.historyMaxSize { - // Keep the history no larger than historyMaxSize - last := len(ph.History) - tm.historyMaxSize - ph.History = ph.History[last:] - } - tm.history = ph.History - tm.historySize = len(tm.history) - // Create the history weight values and weight sum - for i := 1; i <= tm.numIntervals; i++ { - x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight - tm.historyWeights = append(tm.historyWeights, x) - } - - for _, v := range tm.historyWeights { - tm.historyWeightSum += v - } - // Calculate the history value based on the loaded history data - tm.historyValue = tm.calcHistoryValue() -} - // Loads the history data for all peers from the store DB // cmn.Panics if file is corrupt func (tms *TrustMetricStore) loadFromDB() bool { @@ -164,18 +137,18 @@ func (tms *TrustMetricStore) loadFromDB() bool { return false } - peers := make(map[string]peerHistoryJSON, 0) + peers := make(map[string]MetricHistoryJSON, 0) err := json.Unmarshal(bytes, &peers) if err != nil { cmn.PanicCrisis(cmn.Fmt("Could not unmarshal Trust Metric Store DB data: %v", err)) } // If history data exists in the file, - // load it into trust metrics and recalc + // load it into trust metric for key, p := range peers { tm := NewMetricWithConfig(tms.config) - reinstantiateMetric(tm, p) + tm.Init(p) // Load the peer trust metric into the store tms.peerMetrics[key] = tm } @@ -186,16 +159,11 @@ func (tms *TrustMetricStore) loadFromDB() bool { func (tms *TrustMetricStore) saveToDB() { tms.Logger.Debug("Saving TrustHistory to DB", "size", tms.size()) - peers := make(map[string]peerHistoryJSON, 0) + peers := make(map[string]MetricHistoryJSON, 0) for key, tm := range tms.peerMetrics { - tm.mtx.Lock() // Add an entry for the peer identified by key - peers[key] = peerHistoryJSON{ - NumIntervals: tm.numIntervals, - History: tm.history, - } - tm.mtx.Unlock() + peers[key] = tm.HistoryJSON() } // Write all the data back to the DB @@ -215,9 +183,7 @@ loop: for { select { case <-t.C: - tms.mtx.Lock() - tms.saveToDB() - tms.mtx.Unlock() + tms.SaveToDB() case <-tms.Quit: break loop } @@ -227,12 +193,6 @@ loop: //--------------------------------------------------------------------------------------- const ( - // The number of event updates that can be sent on a single metric before blocking - defaultUpdateChanCapacity = 10 - - // The number of trust value requests that can be made simultaneously before blocking - defaultRequestChanCapacity = 10 - // The weight applied to the derivative when current behavior is >= previous behavior defaultDerivativeGamma1 = 0 @@ -288,19 +248,70 @@ type TrustMetric struct { // While true, history data is not modified paused bool - // Sending true on this channel stops tracking, while false pauses tracking - stop chan bool + // Signal channel for stopping the trust metric go-routine + stop chan struct{} +} + +// MetricHistoryJSON - history data necessary to save the trust metric +type MetricHistoryJSON struct { + NumIntervals int `json:"intervals"` + History []float64 `json:"history"` +} + +// Returns a snapshot of the trust metric history data +func (tm *TrustMetric) HistoryJSON() MetricHistoryJSON { + tm.mtx.Lock() + defer tm.mtx.Unlock() + + return MetricHistoryJSON{ + NumIntervals: tm.numIntervals, + History: tm.history, + } +} + +// Instantiates a trust metric by loading the history data for a single peer. +// This is called only once and only right after creation, which is why the +// lock is not held while accessing the trust metric struct members +func (tm *TrustMetric) Init(hist MetricHistoryJSON) { + // Restore the number of time intervals we have previously tracked + if hist.NumIntervals > tm.maxIntervals { + hist.NumIntervals = tm.maxIntervals + } + tm.numIntervals = hist.NumIntervals + // Restore the history and its current size + if len(hist.History) > tm.historyMaxSize { + // Keep the history no larger than historyMaxSize + last := len(hist.History) - tm.historyMaxSize + hist.History = hist.History[last:] + } + tm.history = hist.History + tm.historySize = len(tm.history) + // Create the history weight values and weight sum + for i := 1; i <= tm.numIntervals; i++ { + x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight + tm.historyWeights = append(tm.historyWeights, x) + } + + for _, v := range tm.historyWeights { + tm.historyWeightSum += v + } + // Calculate the history value based on the loaded history data + tm.historyValue = tm.calcHistoryValue() } // Pause tells the metric to pause recording data over time intervals. // All method calls that indicate events will unpause the metric func (tm *TrustMetric) Pause() { - tm.stop <- false + tm.mtx.Lock() + defer tm.mtx.Unlock() + + // Pause the metric for now + tm.paused = true } // Stop tells the metric to stop recording data over time intervals func (tm *TrustMetric) Stop() { - tm.stop <- true + tm.stop <- struct{}{} } // BadEvents indicates that an undesirable event(s) took place @@ -336,6 +347,70 @@ func (tm *TrustMetric) TrustScore() int { return int(math.Floor(score)) } +// NextTimeInterval saves current time interval data and prepares for the following interval +func (tm *TrustMetric) NextTimeInterval() { + tm.mtx.Lock() + defer tm.mtx.Unlock() + + if tm.paused { + // Do not prepare for the next time interval while paused + return + } + + // Add the current trust value to the history data + newHist := tm.calcTrustValue() + tm.history = append(tm.history, newHist) + + // Update history and interval counters + if tm.historySize < tm.historyMaxSize { + tm.historySize++ + } else { + // Keep the history no larger than historyMaxSize + last := len(tm.history) - tm.historyMaxSize + tm.history = tm.history[last:] + } + + if tm.numIntervals < tm.maxIntervals { + tm.numIntervals++ + // Add the optimistic weight for the new time interval + wk := math.Pow(defaultHistoryDataWeight, float64(tm.numIntervals)) + tm.historyWeights = append(tm.historyWeights, wk) + tm.historyWeightSum += wk + } + + // Update the history data using Faded Memories + tm.updateFadedMemory() + // Calculate the history value for the upcoming time interval + tm.historyValue = tm.calcHistoryValue() + tm.good = 0 + tm.bad = 0 +} + +// Copy returns a new trust metric with members containing the same values +func (tm *TrustMetric) Copy() *TrustMetric { + if tm == nil { + return nil + } + + return &TrustMetric{ + proportionalWeight: tm.proportionalWeight, + integralWeight: tm.integralWeight, + numIntervals: tm.numIntervals, + maxIntervals: tm.maxIntervals, + intervalLen: tm.intervalLen, + history: tm.history, + historyWeights: tm.historyWeights, + historyWeightSum: tm.historyWeightSum, + historySize: tm.historySize, + historyMaxSize: tm.historyMaxSize, + historyValue: tm.historyValue, + good: tm.good, + bad: tm.bad, + paused: tm.paused, + stop: make(chan struct{}), + } +} + // TrustMetricConfig - Configures the weight functions and time intervals for the metric type TrustMetricConfig struct { // Determines the percentage given to current behavior @@ -385,7 +460,7 @@ func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric { // This metric has a perfect history so far tm.historyValue = 1.0 // Setup the stop channel - tm.stop = make(chan bool, 1) + tm.stop = make(chan struct{}) go tm.processRequests() return tm @@ -531,47 +606,10 @@ loop: for { select { case <-t.C: - tm.mtx.Lock() - if !tm.paused { - // Add the current trust value to the history data - newHist := tm.calcTrustValue() - tm.history = append(tm.history, newHist) - - // Update history and interval counters - if tm.historySize < tm.historyMaxSize { - tm.historySize++ - } else { - // Keep the history no larger than historyMaxSize - last := len(tm.history) - tm.historyMaxSize - tm.history = tm.history[last:] - } - - if tm.numIntervals < tm.maxIntervals { - tm.numIntervals++ - // Add the optimistic weight for the new time interval - wk := math.Pow(defaultHistoryDataWeight, float64(tm.numIntervals)) - tm.historyWeights = append(tm.historyWeights, wk) - tm.historyWeightSum += wk - } - - // Update the history data using Faded Memories - tm.updateFadedMemory() - // Calculate the history value for the upcoming time interval - tm.historyValue = tm.calcHistoryValue() - tm.good = 0 - tm.bad = 0 - } - tm.mtx.Unlock() - case stop := <-tm.stop: - tm.mtx.Lock() - if stop { - // Stop all further tracking for this metric - tm.mtx.Unlock() - break loop - } - // Pause the metric for now - tm.paused = true - tm.mtx.Unlock() + tm.NextTimeInterval() + case <-tm.stop: + // Stop all further tracking for this metric + break loop } } } diff --git a/p2p/trust/trustmetric_test.go_ b/p2p/trust/trustmetric_test.go similarity index 98% rename from p2p/trust/trustmetric_test.go_ rename to p2p/trust/trustmetric_test.go index af4a945db..6c6137538 100644 --- a/p2p/trust/trustmetric_test.go_ +++ b/p2p/trust/trustmetric_test.go @@ -210,10 +210,7 @@ func TestTrustMetricStopPause(t *testing.T) { // Give the pause some time to take place time.Sleep(10 * time.Millisecond) - tm.mtx.Lock() - first := tm.numIntervals - tm.mtx.Unlock() - + first := tm.Copy().numIntervals // Allow more time to pass and check the intervals are unchanged time.Sleep(50 * time.Millisecond) assert.Equal(t, first, tm.numIntervals) @@ -226,10 +223,7 @@ func TestTrustMetricStopPause(t *testing.T) { // Give the stop some time to take place time.Sleep(10 * time.Millisecond) - tm.mtx.Lock() - second := tm.numIntervals - tm.mtx.Unlock() - + second := tm.Copy().numIntervals // Allow more time to pass and check the intervals are unchanged time.Sleep(50 * time.Millisecond) assert.Equal(t, second, tm.numIntervals)