From 871604f577d73bee31a209b9666736d4fe230317 Mon Sep 17 00:00:00 2001 From: Sherlock Holo Date: Wed, 13 Mar 2019 14:52:54 +0800 Subject: [PATCH] Add LVS weight round robin selector (#36) * Add upstream selector, there are two selector now: - random selector - weight random selector random selector will choose upstream at random; weight random selector will choose upstream at random with weight Signed-off-by: Sherlock Holo * Rewrite config and config file example, prepare for weight round robbin selector Signed-off-by: Sherlock Holo * Replace bad implement of weight random selector with weight round robbin selector, the algorithm is nginx weight round robbin like Signed-off-by: Sherlock Holo * Use new config module Signed-off-by: Sherlock Holo * Disable deprecated DualStack set Signed-off-by: Sherlock Holo * Fix typo Signed-off-by: Sherlock Holo * Optimize upstreamSelector judge Signed-off-by: Sherlock Holo * Fix typo Signed-off-by: Sherlock Holo * Add config timeout unit tips Signed-off-by: Sherlock Holo * Set wrr http client timeout to replace http request timeout Signed-off-by: Sherlock Holo * Add weight value range Signed-off-by: Sherlock Holo * Add a line ending for .gitignore Signed-off-by: Sherlock Holo * Optimize config file style Signed-off-by: Sherlock Holo * Modify Weight type to int32 Signed-off-by: Sherlock Holo * Add upstreamError Signed-off-by: Sherlock Holo * Rewrite Selector interface and wrr implement Signed-off-by: Sherlock Holo * Use http module predefined constant to judge req.response.StatusCode Signed-off-by: Sherlock Holo * Use Selector.ReportUpstreamError to report upstream error for evaluation loop in real time Signed-off-by: Sherlock Holo * Make client selector field private Signed-off-by: Sherlock Holo * Replace config file url to URL Add miss space for 'weight= 50' Signed-off-by: Sherlock Holo * Rewrite Selector.ReportUpstreamError to Selector.ReportUpstreamStatus, report upstream ok in real time Signed-off-by: Sherlock Holo * Fix checkIETFResponse: if upstream OK, won't increase weight Signed-off-by: Sherlock Holo * Fix typo Signed-off-by: Sherlock Holo * Rewrite wrr evaluation, concurrent check upstream and reduce interval to 15s Signed-off-by: Sherlock Holo * Add lvs wrr selector config Signed-off-by: Sherlock Holo * Add DebugReporter interface, when client verbose is true and the selector implements it, will report all upstream weights every 15s Signed-off-by: Sherlock Holo * Rename WeightRoundRobinSelector to NginxWRRSelector Signed-off-by: Sherlock Holo * Add LVSSelector Signed-off-by: Sherlock Holo * Remove useless log Signed-off-by: Sherlock Holo --- doh-client/client.go | 68 +++-- doh-client/config/config.go | 5 +- doh-client/doh-client.conf | 2 +- doh-client/selector/lvsWRRSelector.go | 262 ++++++++++++++++++ doh-client/selector/nginxWRRSelector.go | 215 ++++++++++++++ doh-client/selector/selector.go | 5 + .../selector/weightRoundRobinSelector.go | 192 ------------- 7 files changed, 537 insertions(+), 212 deletions(-) create mode 100644 doh-client/selector/lvsWRRSelector.go create mode 100644 doh-client/selector/nginxWRRSelector.go delete mode 100644 doh-client/selector/weightRoundRobinSelector.go diff --git a/doh-client/client.go b/doh-client/client.go index 03c9b68..9c1e4a7 100644 --- a/doh-client/client.go +++ b/doh-client/client.go @@ -153,7 +153,51 @@ func NewClient(conf *config.Config) (c *Client, err error) { } switch c.conf.Upstream.UpstreamSelector { + case config.NginxWRR: + if c.conf.Other.Verbose { + log.Println(config.NginxWRR, " mode start") + } + + s := selector.NewNginxWRRSelector(time.Duration(c.conf.Other.Timeout) * time.Second) + for _, u := range c.conf.Upstream.UpstreamGoogle { + if err := s.Add(u.URL, selector.Google, u.Weight); err != nil { + return nil, err + } + } + + for _, u := range c.conf.Upstream.UpstreamIETF { + if err := s.Add(u.URL, selector.IETF, u.Weight); err != nil { + return nil, err + } + } + + c.selector = s + + case config.LVSWRR: + if c.conf.Other.Verbose { + log.Println(config.LVSWRR, " mode start") + } + + s := selector.NewLVSWRRSelector(time.Duration(c.conf.Other.Timeout) * time.Second) + for _, u := range c.conf.Upstream.UpstreamGoogle { + if err := s.Add(u.URL, selector.Google, u.Weight); err != nil { + return nil, err + } + } + + for _, u := range c.conf.Upstream.UpstreamIETF { + if err := s.Add(u.URL, selector.IETF, u.Weight); err != nil { + return nil, err + } + } + + c.selector = s + default: + if c.conf.Other.Verbose { + log.Println(config.Random, " mode start") + } + // if selector is invalid or random, use random selector, or should we stop program and let user knows he is wrong? s := selector.NewRandomSelector() for _, u := range c.conf.Upstream.UpstreamGoogle { @@ -169,22 +213,12 @@ func NewClient(conf *config.Config) (c *Client, err error) { } c.selector = s + } - case config.WeightedRoundRobin: - s := selector.NewWeightRoundRobinSelector(time.Duration(c.conf.Other.Timeout) * time.Second) - for _, u := range c.conf.Upstream.UpstreamGoogle { - if err := s.Add(u.URL, selector.Google, u.Weight); err != nil { - return nil, err - } + if c.conf.Other.Verbose { + if reporter, ok := c.selector.(selector.DebugReporter); ok { + reporter.ReportWeights() } - - for _, u := range c.conf.Upstream.UpstreamIETF { - if err := s.Add(u.URL, selector.IETF, u.Weight); err != nil { - return nil, err - } - } - - c.selector = s } return c, nil @@ -246,6 +280,9 @@ func (c *Client) Start() error { }(srv) } + // start evaluation loop + c.selector.StartEvaluate() + for i := 0; i < cap(results); i++ { err := <-results if err != nil { @@ -254,9 +291,6 @@ func (c *Client) Start() error { } close(results) - // start evaluation poll - c.selector.StartEvaluate() - return nil } diff --git a/doh-client/config/config.go b/doh-client/config/config.go index 3a9a215..70d0118 100644 --- a/doh-client/config/config.go +++ b/doh-client/config/config.go @@ -30,8 +30,9 @@ import ( ) const ( - Random = "random" - WeightedRoundRobin = "weighted_round_robin" + Random = "random" + NginxWRR = "weighted_round_robin" + LVSWRR = "lvs_weighted_round_robin" ) type upstreamDetail struct { diff --git a/doh-client/doh-client.conf b/doh-client/doh-client.conf index c1f13af..93e7466 100644 --- a/doh-client/doh-client.conf +++ b/doh-client/doh-client.conf @@ -16,7 +16,7 @@ listen = [ [upstream] -# available selector: random or weighted_round_robin +# available selector: random or weighted_round_robin or lvs_weighted_round_robin upstream_selector = "random" # weight should in (0, 100], if upstream_selector is random, weight will be ignored diff --git a/doh-client/selector/lvsWRRSelector.go b/doh-client/selector/lvsWRRSelector.go new file mode 100644 index 0000000..0abe5a0 --- /dev/null +++ b/doh-client/selector/lvsWRRSelector.go @@ -0,0 +1,262 @@ +package selector + +import ( + "encoding/json" + "errors" + "log" + "net/http" + "sync" + "sync/atomic" + "time" +) + +type LVSWRRSelector struct { + upstreams []*Upstream // upstreamsInfo + client http.Client // http client to check the upstream + lastChoose int32 + currentWeight int32 +} + +func NewLVSWRRSelector(timeout time.Duration) *LVSWRRSelector { + return &LVSWRRSelector{ + client: http.Client{Timeout: timeout}, + lastChoose: -1, + } +} + +func (ls *LVSWRRSelector) Add(url string, upstreamType UpstreamType, weight int32) (err error) { + if weight < 1 { + return errors.New("weight is 1") + } + + switch upstreamType { + case Google: + ls.upstreams = append(ls.upstreams, &Upstream{ + Type: Google, + URL: url, + RequestType: "application/dns-json", + weight: weight, + effectiveWeight: weight, + }) + + case IETF: + ls.upstreams = append(ls.upstreams, &Upstream{ + Type: IETF, + URL: url, + RequestType: "application/dns-message", + weight: weight, + effectiveWeight: weight, + }) + + default: + return errors.New("unknown upstream type") + } + + return nil +} + +func (ls *LVSWRRSelector) StartEvaluate() { + go func() { + for { + wg := sync.WaitGroup{} + + for i := range ls.upstreams { + wg.Add(1) + + go func(i int) { + defer wg.Done() + + upstreamURL := ls.upstreams[i].URL + var acceptType string + + switch ls.upstreams[i].Type { + case Google: + upstreamURL += "?name=www.example.com&type=A" + acceptType = "application/dns-json" + + case IETF: + // www.example.com + upstreamURL += "?dns=q80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB" + acceptType = "application/dns-message" + } + + req, err := http.NewRequest(http.MethodGet, upstreamURL, nil) + if err != nil { + /*log.Println("upstream:", upstreamURL, "type:", typeMap[upstream.Type], "check failed:", err) + continue*/ + + // should I only log it? But if there is an error, I think when query the server will return error too + panic("upstream: " + upstreamURL + " type: " + typeMap[ls.upstreams[i].Type] + " check failed: " + err.Error()) + } + + req.Header.Set("accept", acceptType) + + resp, err := ls.client.Do(req) + if err != nil { + // should I check error in detail? + if atomic.AddInt32(&ls.upstreams[i].effectiveWeight, -5) < 1 { + atomic.StoreInt32(&ls.upstreams[i].effectiveWeight, 1) + } + return + } + + switch ls.upstreams[i].Type { + case Google: + ls.checkGoogleResponse(resp, ls.upstreams[i]) + + case IETF: + ls.checkIETFResponse(resp, ls.upstreams[i]) + } + }(i) + } + + wg.Wait() + + time.Sleep(15 * time.Second) + } + }() +} + +func (ls *LVSWRRSelector) Get() *Upstream { + if len(ls.upstreams) == 1 { + return ls.upstreams[0] + } + + for { + atomic.StoreInt32(&ls.lastChoose, (atomic.LoadInt32(&ls.lastChoose)+1)%int32(len(ls.upstreams))) + + if atomic.LoadInt32(&ls.lastChoose) == 0 { + atomic.AddInt32(&ls.currentWeight, -ls.gcdWeight()) + + if atomic.LoadInt32(&ls.currentWeight) <= 0 { + atomic.AddInt32(&ls.currentWeight, ls.maxWeight()) + + if atomic.LoadInt32(&ls.currentWeight) == 0 { + panic("current weight is 0") + } + } + } + + if atomic.LoadInt32(&ls.upstreams[atomic.LoadInt32(&ls.lastChoose)].effectiveWeight) >= atomic.LoadInt32(&ls.currentWeight) { + return ls.upstreams[atomic.LoadInt32(&ls.lastChoose)] + } + } +} + +func (ls *LVSWRRSelector) gcdWeight() (res int32) { + res = gcd(atomic.LoadInt32(&ls.upstreams[0].effectiveWeight), atomic.LoadInt32(&ls.upstreams[0].effectiveWeight)) + + for i := 1; i < len(ls.upstreams); i++ { + res = gcd(res, atomic.LoadInt32(&ls.upstreams[i].effectiveWeight)) + } + + return +} + +func (ls *LVSWRRSelector) maxWeight() (res int32) { + for _, upstream := range ls.upstreams { + w := atomic.LoadInt32(&upstream.effectiveWeight) + if w > res { + res = w + } + } + + return +} + +func gcd(x, y int32) int32 { + for { + if x < y { + x, y = y, x + } + + tmp := x % y + if tmp == 0 { + return y + } + + x = tmp + } +} + +func (ls *LVSWRRSelector) ReportUpstreamStatus(upstream *Upstream, upstreamStatus upstreamStatus) { + switch upstreamStatus { + case Timeout: + if atomic.AddInt32(&upstream.effectiveWeight, -5) < 1 { + atomic.StoreInt32(&upstream.effectiveWeight, 1) + } + + case Error: + if atomic.AddInt32(&upstream.effectiveWeight, -2) < 1 { + atomic.StoreInt32(&upstream.effectiveWeight, 1) + } + + case OK: + if atomic.AddInt32(&upstream.effectiveWeight, 1) > upstream.weight { + atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight) + } + } +} + +func (ls *LVSWRRSelector) checkGoogleResponse(resp *http.Response, upstream *Upstream) { + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + // server error + if atomic.AddInt32(&upstream.effectiveWeight, -3) < 1 { + atomic.StoreInt32(&upstream.effectiveWeight, 1) + } + return + } + + m := make(map[string]interface{}) + if err := json.NewDecoder(resp.Body).Decode(&m); err != nil { + // should I check error in detail? + if atomic.AddInt32(&upstream.effectiveWeight, -2) < 1 { + atomic.StoreInt32(&upstream.effectiveWeight, 1) + } + return + } + + if status, ok := m["Status"]; ok { + if statusNum, ok := status.(float64); ok && statusNum == 0 { + if atomic.AddInt32(&upstream.effectiveWeight, 5) > upstream.weight { + atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight) + } + return + } + } + + // should I check error in detail? + if atomic.AddInt32(&upstream.effectiveWeight, -2) < 1 { + atomic.StoreInt32(&upstream.effectiveWeight, 1) + } +} + +func (ls *LVSWRRSelector) checkIETFResponse(resp *http.Response, upstream *Upstream) { + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + // server error + if atomic.AddInt32(&upstream.effectiveWeight, -3) < 1 { + atomic.StoreInt32(&upstream.effectiveWeight, 1) + } + return + } + + if atomic.AddInt32(&upstream.effectiveWeight, 5) > upstream.weight { + atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight) + } +} + +func (ls *LVSWRRSelector) ReportWeights() { + go func() { + for { + time.Sleep(15 * time.Second) + + for _, u := range ls.upstreams { + log.Printf("%s, effect weight: %d", u, atomic.LoadInt32(&u.effectiveWeight)) + } + } + }() +} diff --git a/doh-client/selector/nginxWRRSelector.go b/doh-client/selector/nginxWRRSelector.go new file mode 100644 index 0000000..a535a28 --- /dev/null +++ b/doh-client/selector/nginxWRRSelector.go @@ -0,0 +1,215 @@ +package selector + +import ( + "encoding/json" + "errors" + "log" + "net/http" + "sync" + "sync/atomic" + "time" +) + +type NginxWRRSelector struct { + upstreams []*Upstream // upstreamsInfo + client http.Client // http client to check the upstream +} + +func NewNginxWRRSelector(timeout time.Duration) *NginxWRRSelector { + return &NginxWRRSelector{ + client: http.Client{Timeout: timeout}, + } +} + +func (ws *NginxWRRSelector) Add(url string, upstreamType UpstreamType, weight int32) (err error) { + switch upstreamType { + case Google: + ws.upstreams = append(ws.upstreams, &Upstream{ + Type: Google, + URL: url, + RequestType: "application/dns-json", + weight: weight, + effectiveWeight: weight, + }) + + case IETF: + ws.upstreams = append(ws.upstreams, &Upstream{ + Type: IETF, + URL: url, + RequestType: "application/dns-message", + weight: weight, + effectiveWeight: weight, + }) + + default: + return errors.New("unknown upstream type") + } + + return nil +} + +func (ws *NginxWRRSelector) StartEvaluate() { + go func() { + for { + wg := sync.WaitGroup{} + + for i := range ws.upstreams { + wg.Add(1) + + go func(i int) { + defer wg.Done() + + upstreamURL := ws.upstreams[i].URL + var acceptType string + + switch ws.upstreams[i].Type { + case Google: + upstreamURL += "?name=www.example.com&type=A" + acceptType = "application/dns-json" + + case IETF: + // www.example.com + upstreamURL += "?dns=q80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB" + acceptType = "application/dns-message" + } + + req, err := http.NewRequest(http.MethodGet, upstreamURL, nil) + if err != nil { + /*log.Println("upstream:", upstreamURL, "type:", typeMap[upstream.Type], "check failed:", err) + continue*/ + + // should I only log it? But if there is an error, I think when query the server will return error too + panic("upstream: " + upstreamURL + " type: " + typeMap[ws.upstreams[i].Type] + " check failed: " + err.Error()) + } + + req.Header.Set("accept", acceptType) + + resp, err := ws.client.Do(req) + if err != nil { + // should I check error in detail? + if atomic.AddInt32(&ws.upstreams[i].effectiveWeight, -10) < 1 { + atomic.StoreInt32(&ws.upstreams[i].effectiveWeight, 1) + } + return + } + + switch ws.upstreams[i].Type { + case Google: + ws.checkGoogleResponse(resp, ws.upstreams[i]) + + case IETF: + ws.checkIETFResponse(resp, ws.upstreams[i]) + } + }(i) + } + + wg.Wait() + + time.Sleep(15 * time.Second) + } + }() +} + +// nginx wrr like +func (ws *NginxWRRSelector) Get() *Upstream { + var ( + total int32 + bestUpstreamIndex = -1 + ) + + for i := range ws.upstreams { + effectiveWeight := atomic.LoadInt32(&ws.upstreams[i].effectiveWeight) + atomic.AddInt32(&ws.upstreams[i].currentWeight, effectiveWeight) + total += effectiveWeight + + if bestUpstreamIndex == -1 || atomic.LoadInt32(&ws.upstreams[i].currentWeight) > atomic.LoadInt32(&ws.upstreams[bestUpstreamIndex].currentWeight) { + bestUpstreamIndex = i + } + } + + atomic.AddInt32(&ws.upstreams[bestUpstreamIndex].currentWeight, -total) + + return ws.upstreams[bestUpstreamIndex] +} + +func (ws *NginxWRRSelector) ReportUpstreamStatus(upstream *Upstream, upstreamStatus upstreamStatus) { + switch upstreamStatus { + case Timeout: + if atomic.AddInt32(&upstream.effectiveWeight, -5) < 1 { + atomic.StoreInt32(&upstream.effectiveWeight, 1) + } + + case Error: + if atomic.AddInt32(&upstream.effectiveWeight, -3) < 1 { + atomic.StoreInt32(&upstream.effectiveWeight, 1) + } + + case OK: + if atomic.AddInt32(&upstream.effectiveWeight, 1) > upstream.weight { + atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight) + } + } +} + +func (ws *NginxWRRSelector) checkGoogleResponse(resp *http.Response, upstream *Upstream) { + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + // server error + if atomic.AddInt32(&upstream.effectiveWeight, -3) < 1 { + atomic.StoreInt32(&upstream.effectiveWeight, 1) + } + return + } + + m := make(map[string]interface{}) + if err := json.NewDecoder(resp.Body).Decode(&m); err != nil { + // should I check error in detail? + if atomic.AddInt32(&upstream.effectiveWeight, -2) < 1 { + atomic.StoreInt32(&upstream.effectiveWeight, 1) + } + return + } + + if status, ok := m["Status"]; ok { + if statusNum, ok := status.(float64); ok && statusNum == 0 { + if atomic.AddInt32(&upstream.effectiveWeight, 5) > upstream.weight { + atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight) + } + return + } + } + + // should I check error in detail? + if atomic.AddInt32(&upstream.effectiveWeight, -2) < 1 { + atomic.StoreInt32(&upstream.effectiveWeight, 1) + } +} + +func (ws *NginxWRRSelector) checkIETFResponse(resp *http.Response, upstream *Upstream) { + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + // server error + if atomic.AddInt32(&upstream.effectiveWeight, -5) < 1 { + atomic.StoreInt32(&upstream.effectiveWeight, 1) + } + return + } + + if atomic.AddInt32(&upstream.effectiveWeight, 5) > upstream.weight { + atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight) + } +} + +func (ws *NginxWRRSelector) ReportWeights() { + go func() { + for { + time.Sleep(15 * time.Second) + + for _, u := range ws.upstreams { + log.Printf("%s, effect weight: %d", u, atomic.LoadInt32(&u.effectiveWeight)) + } + } + }() +} diff --git a/doh-client/selector/selector.go b/doh-client/selector/selector.go index d32185e..1bf1e22 100644 --- a/doh-client/selector/selector.go +++ b/doh-client/selector/selector.go @@ -10,3 +10,8 @@ type Selector interface { // ReportUpstreamStatus report upstream status ReportUpstreamStatus(upstream *Upstream, upstreamStatus upstreamStatus) } + +type DebugReporter interface { + // ReportWeights starts a goroutine to report all upstream weights, recommend interval is 15s + ReportWeights() +} diff --git a/doh-client/selector/weightRoundRobinSelector.go b/doh-client/selector/weightRoundRobinSelector.go deleted file mode 100644 index 947bb99..0000000 --- a/doh-client/selector/weightRoundRobinSelector.go +++ /dev/null @@ -1,192 +0,0 @@ -package selector - -import ( - "encoding/json" - "errors" - "net/http" - "sync/atomic" - "time" -) - -type WeightRoundRobinSelector struct { - upstreams []*Upstream // upstreamsInfo - client http.Client // http client to check the upstream -} - -func NewWeightRoundRobinSelector(timeout time.Duration) *WeightRoundRobinSelector { - return &WeightRoundRobinSelector{ - client: http.Client{Timeout: timeout}, - } -} - -func (ws *WeightRoundRobinSelector) Add(url string, upstreamType UpstreamType, weight int32) (err error) { - switch upstreamType { - case Google: - ws.upstreams = append(ws.upstreams, &Upstream{ - Type: Google, - URL: url, - RequestType: "application/dns-json", - weight: weight, - effectiveWeight: weight, - }) - - case IETF: - ws.upstreams = append(ws.upstreams, &Upstream{ - Type: IETF, - URL: url, - RequestType: "application/dns-message", - weight: weight, - effectiveWeight: weight, - }) - - default: - return errors.New("unknown upstream type") - } - - return nil -} - -// COW, avoid concurrent read write upstreams -func (ws *WeightRoundRobinSelector) StartEvaluate() { - go func() { - for { - for i := range ws.upstreams { - upstreamURL := ws.upstreams[i].URL - var acceptType string - - switch ws.upstreams[i].Type { - case Google: - upstreamURL += "?name=www.example.com&type=A" - acceptType = "application/dns-json" - - case IETF: - // www.example.com - upstreamURL += "?dns=q80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB" - acceptType = "application/dns-message" - } - - req, err := http.NewRequest(http.MethodGet, upstreamURL, nil) - if err != nil { - /*log.Println("upstream:", upstreamURL, "type:", typeMap[upstream.Type], "check failed:", err) - continue*/ - - // should I only log it? But if there is an error, I think when query the server will return error too - panic("upstream: " + upstreamURL + " type: " + typeMap[ws.upstreams[i].Type] + " check failed: " + err.Error()) - } - - req.Header.Set("accept", acceptType) - - resp, err := ws.client.Do(req) - if err != nil { - // should I check error in detail? - if atomic.AddInt32(&ws.upstreams[i].effectiveWeight, -10) < 0 { - atomic.StoreInt32(&ws.upstreams[i].effectiveWeight, 0) - } - continue - } - - switch ws.upstreams[i].Type { - case Google: - checkGoogleResponse(resp, ws.upstreams[i]) - - case IETF: - checkIETFResponse(resp, ws.upstreams[i]) - } - } - - time.Sleep(30 * time.Second) - } - }() -} - -// nginx wrr like -func (ws *WeightRoundRobinSelector) Get() *Upstream { - var ( - total int32 - bestUpstreamIndex = -1 - ) - - for i := range ws.upstreams { - effectiveWeight := atomic.LoadInt32(&ws.upstreams[i].effectiveWeight) - ws.upstreams[i].currentWeight += effectiveWeight - total += effectiveWeight - - if bestUpstreamIndex == -1 || ws.upstreams[i].currentWeight > ws.upstreams[bestUpstreamIndex].currentWeight { - bestUpstreamIndex = i - } - } - - ws.upstreams[bestUpstreamIndex].currentWeight -= total - - return ws.upstreams[bestUpstreamIndex] -} - -func (ws *WeightRoundRobinSelector) ReportUpstreamStatus(upstream *Upstream, upstreamStatus upstreamStatus) { - switch upstreamStatus { - case Timeout: - if atomic.AddInt32(&upstream.effectiveWeight, -10) < 0 { - atomic.StoreInt32(&upstream.effectiveWeight, 0) - } - - case Error: - if atomic.AddInt32(&upstream.effectiveWeight, -5) < 0 { - atomic.StoreInt32(&upstream.effectiveWeight, 0) - } - - case OK: - if atomic.AddInt32(&upstream.effectiveWeight, 2) > upstream.weight { - atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight) - } - } -} - -func checkGoogleResponse(resp *http.Response, upstream *Upstream) { - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - // server error - if atomic.AddInt32(&upstream.effectiveWeight, -5) < 0 { - atomic.StoreInt32(&upstream.effectiveWeight, 0) - } - return - } - - m := make(map[string]interface{}) - if err := json.NewDecoder(resp.Body).Decode(&m); err != nil { - // should I check error in detail? - if atomic.AddInt32(&upstream.effectiveWeight, -1) < 0 { - atomic.StoreInt32(&upstream.effectiveWeight, 0) - } - return - } - - if status, ok := m["status"]; ok { - if statusNum, ok := status.(int); ok && statusNum == 0 { - if atomic.AddInt32(&upstream.effectiveWeight, 5) > upstream.weight { - atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight) - } - return - } - } - - // should I check error in detail? - if atomic.AddInt32(&upstream.effectiveWeight, -1) < 0 { - atomic.StoreInt32(&upstream.effectiveWeight, 0) - } -} - -func checkIETFResponse(resp *http.Response, upstream *Upstream) { - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - // server error - if atomic.AddInt32(&upstream.effectiveWeight, -5) < 0 { - atomic.StoreInt32(&upstream.effectiveWeight, 0) - } - return - } - - if atomic.AddInt32(&upstream.effectiveWeight, 5) > upstream.weight { - atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight) - } -}