mirror of
https://github.com/m13253/dns-over-https.git
synced 2026-03-30 18:35:38 +00:00
Add LVS weight round robin selector (#36)
* Add upstream selector, there are two selector now:
- random selector
- weight random selector
random selector will choose upstream at random; weight random selector will choose upstream at random with weight
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Rewrite config and config file example, prepare for weight round robbin selector
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Replace bad implement of weight random selector with weight round robbin selector, the algorithm is nginx weight round robbin like
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Use new config module
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Disable deprecated DualStack set
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Fix typo
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Optimize upstreamSelector judge
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Fix typo
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Add config timeout unit tips
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Set wrr http client timeout to replace http request timeout
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Add weight value range
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Add a line ending for .gitignore
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Optimize config file style
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Modify Weight type to int32
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Add upstreamError
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Rewrite Selector interface and wrr implement
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Use http module predefined constant to judge req.response.StatusCode
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Use Selector.ReportUpstreamError to report upstream error for evaluation loop in real time
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Make client selector field private
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Replace config file url to URL
Add miss space for 'weight= 50'
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Rewrite Selector.ReportUpstreamError to Selector.ReportUpstreamStatus, report upstream ok in real time
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Fix checkIETFResponse: if upstream OK, won't increase weight
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Fix typo
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Rewrite wrr evaluation, concurrent check upstream and reduce interval to 15s
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Add lvs wrr selector config
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Add DebugReporter interface, when client verbose is true and the selector implements it, will report all upstream weights every 15s
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Rename WeightRoundRobinSelector to NginxWRRSelector
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Add LVSSelector
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
* Remove useless log
Signed-off-by: Sherlock Holo <sherlockya@gmail.com>
This commit is contained in:
committed by
Star Brilliant
parent
a400f03960
commit
871604f577
@@ -153,7 +153,51 @@ func NewClient(conf *config.Config) (c *Client, err error) {
|
||||
}
|
||||
|
||||
switch c.conf.Upstream.UpstreamSelector {
|
||||
case config.NginxWRR:
|
||||
if c.conf.Other.Verbose {
|
||||
log.Println(config.NginxWRR, " mode start")
|
||||
}
|
||||
|
||||
s := selector.NewNginxWRRSelector(time.Duration(c.conf.Other.Timeout) * time.Second)
|
||||
for _, u := range c.conf.Upstream.UpstreamGoogle {
|
||||
if err := s.Add(u.URL, selector.Google, u.Weight); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
for _, u := range c.conf.Upstream.UpstreamIETF {
|
||||
if err := s.Add(u.URL, selector.IETF, u.Weight); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
c.selector = s
|
||||
|
||||
case config.LVSWRR:
|
||||
if c.conf.Other.Verbose {
|
||||
log.Println(config.LVSWRR, " mode start")
|
||||
}
|
||||
|
||||
s := selector.NewLVSWRRSelector(time.Duration(c.conf.Other.Timeout) * time.Second)
|
||||
for _, u := range c.conf.Upstream.UpstreamGoogle {
|
||||
if err := s.Add(u.URL, selector.Google, u.Weight); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
for _, u := range c.conf.Upstream.UpstreamIETF {
|
||||
if err := s.Add(u.URL, selector.IETF, u.Weight); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
c.selector = s
|
||||
|
||||
default:
|
||||
if c.conf.Other.Verbose {
|
||||
log.Println(config.Random, " mode start")
|
||||
}
|
||||
|
||||
// if selector is invalid or random, use random selector, or should we stop program and let user knows he is wrong?
|
||||
s := selector.NewRandomSelector()
|
||||
for _, u := range c.conf.Upstream.UpstreamGoogle {
|
||||
@@ -169,22 +213,12 @@ func NewClient(conf *config.Config) (c *Client, err error) {
|
||||
}
|
||||
|
||||
c.selector = s
|
||||
}
|
||||
|
||||
case config.WeightedRoundRobin:
|
||||
s := selector.NewWeightRoundRobinSelector(time.Duration(c.conf.Other.Timeout) * time.Second)
|
||||
for _, u := range c.conf.Upstream.UpstreamGoogle {
|
||||
if err := s.Add(u.URL, selector.Google, u.Weight); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if c.conf.Other.Verbose {
|
||||
if reporter, ok := c.selector.(selector.DebugReporter); ok {
|
||||
reporter.ReportWeights()
|
||||
}
|
||||
|
||||
for _, u := range c.conf.Upstream.UpstreamIETF {
|
||||
if err := s.Add(u.URL, selector.IETF, u.Weight); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
c.selector = s
|
||||
}
|
||||
|
||||
return c, nil
|
||||
@@ -246,6 +280,9 @@ func (c *Client) Start() error {
|
||||
}(srv)
|
||||
}
|
||||
|
||||
// start evaluation loop
|
||||
c.selector.StartEvaluate()
|
||||
|
||||
for i := 0; i < cap(results); i++ {
|
||||
err := <-results
|
||||
if err != nil {
|
||||
@@ -254,9 +291,6 @@ func (c *Client) Start() error {
|
||||
}
|
||||
close(results)
|
||||
|
||||
// start evaluation poll
|
||||
c.selector.StartEvaluate()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -30,8 +30,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Random = "random"
|
||||
WeightedRoundRobin = "weighted_round_robin"
|
||||
Random = "random"
|
||||
NginxWRR = "weighted_round_robin"
|
||||
LVSWRR = "lvs_weighted_round_robin"
|
||||
)
|
||||
|
||||
type upstreamDetail struct {
|
||||
|
||||
@@ -16,7 +16,7 @@ listen = [
|
||||
|
||||
[upstream]
|
||||
|
||||
# available selector: random or weighted_round_robin
|
||||
# available selector: random or weighted_round_robin or lvs_weighted_round_robin
|
||||
upstream_selector = "random"
|
||||
|
||||
# weight should in (0, 100], if upstream_selector is random, weight will be ignored
|
||||
|
||||
262
doh-client/selector/lvsWRRSelector.go
Normal file
262
doh-client/selector/lvsWRRSelector.go
Normal file
@@ -0,0 +1,262 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type LVSWRRSelector struct {
|
||||
upstreams []*Upstream // upstreamsInfo
|
||||
client http.Client // http client to check the upstream
|
||||
lastChoose int32
|
||||
currentWeight int32
|
||||
}
|
||||
|
||||
func NewLVSWRRSelector(timeout time.Duration) *LVSWRRSelector {
|
||||
return &LVSWRRSelector{
|
||||
client: http.Client{Timeout: timeout},
|
||||
lastChoose: -1,
|
||||
}
|
||||
}
|
||||
|
||||
func (ls *LVSWRRSelector) Add(url string, upstreamType UpstreamType, weight int32) (err error) {
|
||||
if weight < 1 {
|
||||
return errors.New("weight is 1")
|
||||
}
|
||||
|
||||
switch upstreamType {
|
||||
case Google:
|
||||
ls.upstreams = append(ls.upstreams, &Upstream{
|
||||
Type: Google,
|
||||
URL: url,
|
||||
RequestType: "application/dns-json",
|
||||
weight: weight,
|
||||
effectiveWeight: weight,
|
||||
})
|
||||
|
||||
case IETF:
|
||||
ls.upstreams = append(ls.upstreams, &Upstream{
|
||||
Type: IETF,
|
||||
URL: url,
|
||||
RequestType: "application/dns-message",
|
||||
weight: weight,
|
||||
effectiveWeight: weight,
|
||||
})
|
||||
|
||||
default:
|
||||
return errors.New("unknown upstream type")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ls *LVSWRRSelector) StartEvaluate() {
|
||||
go func() {
|
||||
for {
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
for i := range ls.upstreams {
|
||||
wg.Add(1)
|
||||
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
||||
upstreamURL := ls.upstreams[i].URL
|
||||
var acceptType string
|
||||
|
||||
switch ls.upstreams[i].Type {
|
||||
case Google:
|
||||
upstreamURL += "?name=www.example.com&type=A"
|
||||
acceptType = "application/dns-json"
|
||||
|
||||
case IETF:
|
||||
// www.example.com
|
||||
upstreamURL += "?dns=q80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB"
|
||||
acceptType = "application/dns-message"
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, upstreamURL, nil)
|
||||
if err != nil {
|
||||
/*log.Println("upstream:", upstreamURL, "type:", typeMap[upstream.Type], "check failed:", err)
|
||||
continue*/
|
||||
|
||||
// should I only log it? But if there is an error, I think when query the server will return error too
|
||||
panic("upstream: " + upstreamURL + " type: " + typeMap[ls.upstreams[i].Type] + " check failed: " + err.Error())
|
||||
}
|
||||
|
||||
req.Header.Set("accept", acceptType)
|
||||
|
||||
resp, err := ls.client.Do(req)
|
||||
if err != nil {
|
||||
// should I check error in detail?
|
||||
if atomic.AddInt32(&ls.upstreams[i].effectiveWeight, -5) < 1 {
|
||||
atomic.StoreInt32(&ls.upstreams[i].effectiveWeight, 1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
switch ls.upstreams[i].Type {
|
||||
case Google:
|
||||
ls.checkGoogleResponse(resp, ls.upstreams[i])
|
||||
|
||||
case IETF:
|
||||
ls.checkIETFResponse(resp, ls.upstreams[i])
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
time.Sleep(15 * time.Second)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (ls *LVSWRRSelector) Get() *Upstream {
|
||||
if len(ls.upstreams) == 1 {
|
||||
return ls.upstreams[0]
|
||||
}
|
||||
|
||||
for {
|
||||
atomic.StoreInt32(&ls.lastChoose, (atomic.LoadInt32(&ls.lastChoose)+1)%int32(len(ls.upstreams)))
|
||||
|
||||
if atomic.LoadInt32(&ls.lastChoose) == 0 {
|
||||
atomic.AddInt32(&ls.currentWeight, -ls.gcdWeight())
|
||||
|
||||
if atomic.LoadInt32(&ls.currentWeight) <= 0 {
|
||||
atomic.AddInt32(&ls.currentWeight, ls.maxWeight())
|
||||
|
||||
if atomic.LoadInt32(&ls.currentWeight) == 0 {
|
||||
panic("current weight is 0")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if atomic.LoadInt32(&ls.upstreams[atomic.LoadInt32(&ls.lastChoose)].effectiveWeight) >= atomic.LoadInt32(&ls.currentWeight) {
|
||||
return ls.upstreams[atomic.LoadInt32(&ls.lastChoose)]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ls *LVSWRRSelector) gcdWeight() (res int32) {
|
||||
res = gcd(atomic.LoadInt32(&ls.upstreams[0].effectiveWeight), atomic.LoadInt32(&ls.upstreams[0].effectiveWeight))
|
||||
|
||||
for i := 1; i < len(ls.upstreams); i++ {
|
||||
res = gcd(res, atomic.LoadInt32(&ls.upstreams[i].effectiveWeight))
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (ls *LVSWRRSelector) maxWeight() (res int32) {
|
||||
for _, upstream := range ls.upstreams {
|
||||
w := atomic.LoadInt32(&upstream.effectiveWeight)
|
||||
if w > res {
|
||||
res = w
|
||||
}
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func gcd(x, y int32) int32 {
|
||||
for {
|
||||
if x < y {
|
||||
x, y = y, x
|
||||
}
|
||||
|
||||
tmp := x % y
|
||||
if tmp == 0 {
|
||||
return y
|
||||
}
|
||||
|
||||
x = tmp
|
||||
}
|
||||
}
|
||||
|
||||
func (ls *LVSWRRSelector) ReportUpstreamStatus(upstream *Upstream, upstreamStatus upstreamStatus) {
|
||||
switch upstreamStatus {
|
||||
case Timeout:
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -5) < 1 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 1)
|
||||
}
|
||||
|
||||
case Error:
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -2) < 1 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 1)
|
||||
}
|
||||
|
||||
case OK:
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, 1) > upstream.weight {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ls *LVSWRRSelector) checkGoogleResponse(resp *http.Response, upstream *Upstream) {
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
// server error
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -3) < 1 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
m := make(map[string]interface{})
|
||||
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
|
||||
// should I check error in detail?
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -2) < 1 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if status, ok := m["Status"]; ok {
|
||||
if statusNum, ok := status.(float64); ok && statusNum == 0 {
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, 5) > upstream.weight {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// should I check error in detail?
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -2) < 1 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 1)
|
||||
}
|
||||
}
|
||||
|
||||
func (ls *LVSWRRSelector) checkIETFResponse(resp *http.Response, upstream *Upstream) {
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
// server error
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -3) < 1 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, 5) > upstream.weight {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight)
|
||||
}
|
||||
}
|
||||
|
||||
func (ls *LVSWRRSelector) ReportWeights() {
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(15 * time.Second)
|
||||
|
||||
for _, u := range ls.upstreams {
|
||||
log.Printf("%s, effect weight: %d", u, atomic.LoadInt32(&u.effectiveWeight))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
215
doh-client/selector/nginxWRRSelector.go
Normal file
215
doh-client/selector/nginxWRRSelector.go
Normal file
@@ -0,0 +1,215 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type NginxWRRSelector struct {
|
||||
upstreams []*Upstream // upstreamsInfo
|
||||
client http.Client // http client to check the upstream
|
||||
}
|
||||
|
||||
func NewNginxWRRSelector(timeout time.Duration) *NginxWRRSelector {
|
||||
return &NginxWRRSelector{
|
||||
client: http.Client{Timeout: timeout},
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *NginxWRRSelector) Add(url string, upstreamType UpstreamType, weight int32) (err error) {
|
||||
switch upstreamType {
|
||||
case Google:
|
||||
ws.upstreams = append(ws.upstreams, &Upstream{
|
||||
Type: Google,
|
||||
URL: url,
|
||||
RequestType: "application/dns-json",
|
||||
weight: weight,
|
||||
effectiveWeight: weight,
|
||||
})
|
||||
|
||||
case IETF:
|
||||
ws.upstreams = append(ws.upstreams, &Upstream{
|
||||
Type: IETF,
|
||||
URL: url,
|
||||
RequestType: "application/dns-message",
|
||||
weight: weight,
|
||||
effectiveWeight: weight,
|
||||
})
|
||||
|
||||
default:
|
||||
return errors.New("unknown upstream type")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ws *NginxWRRSelector) StartEvaluate() {
|
||||
go func() {
|
||||
for {
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
for i := range ws.upstreams {
|
||||
wg.Add(1)
|
||||
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
||||
upstreamURL := ws.upstreams[i].URL
|
||||
var acceptType string
|
||||
|
||||
switch ws.upstreams[i].Type {
|
||||
case Google:
|
||||
upstreamURL += "?name=www.example.com&type=A"
|
||||
acceptType = "application/dns-json"
|
||||
|
||||
case IETF:
|
||||
// www.example.com
|
||||
upstreamURL += "?dns=q80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB"
|
||||
acceptType = "application/dns-message"
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, upstreamURL, nil)
|
||||
if err != nil {
|
||||
/*log.Println("upstream:", upstreamURL, "type:", typeMap[upstream.Type], "check failed:", err)
|
||||
continue*/
|
||||
|
||||
// should I only log it? But if there is an error, I think when query the server will return error too
|
||||
panic("upstream: " + upstreamURL + " type: " + typeMap[ws.upstreams[i].Type] + " check failed: " + err.Error())
|
||||
}
|
||||
|
||||
req.Header.Set("accept", acceptType)
|
||||
|
||||
resp, err := ws.client.Do(req)
|
||||
if err != nil {
|
||||
// should I check error in detail?
|
||||
if atomic.AddInt32(&ws.upstreams[i].effectiveWeight, -10) < 1 {
|
||||
atomic.StoreInt32(&ws.upstreams[i].effectiveWeight, 1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
switch ws.upstreams[i].Type {
|
||||
case Google:
|
||||
ws.checkGoogleResponse(resp, ws.upstreams[i])
|
||||
|
||||
case IETF:
|
||||
ws.checkIETFResponse(resp, ws.upstreams[i])
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
time.Sleep(15 * time.Second)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// nginx wrr like
|
||||
func (ws *NginxWRRSelector) Get() *Upstream {
|
||||
var (
|
||||
total int32
|
||||
bestUpstreamIndex = -1
|
||||
)
|
||||
|
||||
for i := range ws.upstreams {
|
||||
effectiveWeight := atomic.LoadInt32(&ws.upstreams[i].effectiveWeight)
|
||||
atomic.AddInt32(&ws.upstreams[i].currentWeight, effectiveWeight)
|
||||
total += effectiveWeight
|
||||
|
||||
if bestUpstreamIndex == -1 || atomic.LoadInt32(&ws.upstreams[i].currentWeight) > atomic.LoadInt32(&ws.upstreams[bestUpstreamIndex].currentWeight) {
|
||||
bestUpstreamIndex = i
|
||||
}
|
||||
}
|
||||
|
||||
atomic.AddInt32(&ws.upstreams[bestUpstreamIndex].currentWeight, -total)
|
||||
|
||||
return ws.upstreams[bestUpstreamIndex]
|
||||
}
|
||||
|
||||
func (ws *NginxWRRSelector) ReportUpstreamStatus(upstream *Upstream, upstreamStatus upstreamStatus) {
|
||||
switch upstreamStatus {
|
||||
case Timeout:
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -5) < 1 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 1)
|
||||
}
|
||||
|
||||
case Error:
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -3) < 1 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 1)
|
||||
}
|
||||
|
||||
case OK:
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, 1) > upstream.weight {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *NginxWRRSelector) checkGoogleResponse(resp *http.Response, upstream *Upstream) {
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
// server error
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -3) < 1 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
m := make(map[string]interface{})
|
||||
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
|
||||
// should I check error in detail?
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -2) < 1 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if status, ok := m["Status"]; ok {
|
||||
if statusNum, ok := status.(float64); ok && statusNum == 0 {
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, 5) > upstream.weight {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// should I check error in detail?
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -2) < 1 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 1)
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *NginxWRRSelector) checkIETFResponse(resp *http.Response, upstream *Upstream) {
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
// server error
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -5) < 1 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, 5) > upstream.weight {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight)
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *NginxWRRSelector) ReportWeights() {
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(15 * time.Second)
|
||||
|
||||
for _, u := range ws.upstreams {
|
||||
log.Printf("%s, effect weight: %d", u, atomic.LoadInt32(&u.effectiveWeight))
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -10,3 +10,8 @@ type Selector interface {
|
||||
// ReportUpstreamStatus report upstream status
|
||||
ReportUpstreamStatus(upstream *Upstream, upstreamStatus upstreamStatus)
|
||||
}
|
||||
|
||||
type DebugReporter interface {
|
||||
// ReportWeights starts a goroutine to report all upstream weights, recommend interval is 15s
|
||||
ReportWeights()
|
||||
}
|
||||
|
||||
@@ -1,192 +0,0 @@
|
||||
package selector
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
type WeightRoundRobinSelector struct {
|
||||
upstreams []*Upstream // upstreamsInfo
|
||||
client http.Client // http client to check the upstream
|
||||
}
|
||||
|
||||
func NewWeightRoundRobinSelector(timeout time.Duration) *WeightRoundRobinSelector {
|
||||
return &WeightRoundRobinSelector{
|
||||
client: http.Client{Timeout: timeout},
|
||||
}
|
||||
}
|
||||
|
||||
func (ws *WeightRoundRobinSelector) Add(url string, upstreamType UpstreamType, weight int32) (err error) {
|
||||
switch upstreamType {
|
||||
case Google:
|
||||
ws.upstreams = append(ws.upstreams, &Upstream{
|
||||
Type: Google,
|
||||
URL: url,
|
||||
RequestType: "application/dns-json",
|
||||
weight: weight,
|
||||
effectiveWeight: weight,
|
||||
})
|
||||
|
||||
case IETF:
|
||||
ws.upstreams = append(ws.upstreams, &Upstream{
|
||||
Type: IETF,
|
||||
URL: url,
|
||||
RequestType: "application/dns-message",
|
||||
weight: weight,
|
||||
effectiveWeight: weight,
|
||||
})
|
||||
|
||||
default:
|
||||
return errors.New("unknown upstream type")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// COW, avoid concurrent read write upstreams
|
||||
func (ws *WeightRoundRobinSelector) StartEvaluate() {
|
||||
go func() {
|
||||
for {
|
||||
for i := range ws.upstreams {
|
||||
upstreamURL := ws.upstreams[i].URL
|
||||
var acceptType string
|
||||
|
||||
switch ws.upstreams[i].Type {
|
||||
case Google:
|
||||
upstreamURL += "?name=www.example.com&type=A"
|
||||
acceptType = "application/dns-json"
|
||||
|
||||
case IETF:
|
||||
// www.example.com
|
||||
upstreamURL += "?dns=q80BAAABAAAAAAAAA3d3dwdleGFtcGxlA2NvbQAAAQAB"
|
||||
acceptType = "application/dns-message"
|
||||
}
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, upstreamURL, nil)
|
||||
if err != nil {
|
||||
/*log.Println("upstream:", upstreamURL, "type:", typeMap[upstream.Type], "check failed:", err)
|
||||
continue*/
|
||||
|
||||
// should I only log it? But if there is an error, I think when query the server will return error too
|
||||
panic("upstream: " + upstreamURL + " type: " + typeMap[ws.upstreams[i].Type] + " check failed: " + err.Error())
|
||||
}
|
||||
|
||||
req.Header.Set("accept", acceptType)
|
||||
|
||||
resp, err := ws.client.Do(req)
|
||||
if err != nil {
|
||||
// should I check error in detail?
|
||||
if atomic.AddInt32(&ws.upstreams[i].effectiveWeight, -10) < 0 {
|
||||
atomic.StoreInt32(&ws.upstreams[i].effectiveWeight, 0)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
switch ws.upstreams[i].Type {
|
||||
case Google:
|
||||
checkGoogleResponse(resp, ws.upstreams[i])
|
||||
|
||||
case IETF:
|
||||
checkIETFResponse(resp, ws.upstreams[i])
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(30 * time.Second)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// nginx wrr like
|
||||
func (ws *WeightRoundRobinSelector) Get() *Upstream {
|
||||
var (
|
||||
total int32
|
||||
bestUpstreamIndex = -1
|
||||
)
|
||||
|
||||
for i := range ws.upstreams {
|
||||
effectiveWeight := atomic.LoadInt32(&ws.upstreams[i].effectiveWeight)
|
||||
ws.upstreams[i].currentWeight += effectiveWeight
|
||||
total += effectiveWeight
|
||||
|
||||
if bestUpstreamIndex == -1 || ws.upstreams[i].currentWeight > ws.upstreams[bestUpstreamIndex].currentWeight {
|
||||
bestUpstreamIndex = i
|
||||
}
|
||||
}
|
||||
|
||||
ws.upstreams[bestUpstreamIndex].currentWeight -= total
|
||||
|
||||
return ws.upstreams[bestUpstreamIndex]
|
||||
}
|
||||
|
||||
func (ws *WeightRoundRobinSelector) ReportUpstreamStatus(upstream *Upstream, upstreamStatus upstreamStatus) {
|
||||
switch upstreamStatus {
|
||||
case Timeout:
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -10) < 0 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 0)
|
||||
}
|
||||
|
||||
case Error:
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -5) < 0 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 0)
|
||||
}
|
||||
|
||||
case OK:
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, 2) > upstream.weight {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkGoogleResponse(resp *http.Response, upstream *Upstream) {
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
// server error
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -5) < 0 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 0)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
m := make(map[string]interface{})
|
||||
if err := json.NewDecoder(resp.Body).Decode(&m); err != nil {
|
||||
// should I check error in detail?
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -1) < 0 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 0)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if status, ok := m["status"]; ok {
|
||||
if statusNum, ok := status.(int); ok && statusNum == 0 {
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, 5) > upstream.weight {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// should I check error in detail?
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -1) < 0 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 0)
|
||||
}
|
||||
}
|
||||
|
||||
func checkIETFResponse(resp *http.Response, upstream *Upstream) {
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
// server error
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, -5) < 0 {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, 0)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if atomic.AddInt32(&upstream.effectiveWeight, 5) > upstream.weight {
|
||||
atomic.StoreInt32(&upstream.effectiveWeight, upstream.weight)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user