| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 | package redisimport (	"errors"	"log"	"net"	"strings"	"sync"	"time")//------------------------------------------------------------------------------type FailoverOptions struct {	MasterName    string	SentinelAddrs []string	Password string	DB       int64	PoolSize int	DialTimeout  time.Duration	ReadTimeout  time.Duration	WriteTimeout time.Duration	IdleTimeout  time.Duration}func (opt *FailoverOptions) getPoolSize() int {	if opt.PoolSize == 0 {		return 10	}	return opt.PoolSize}func (opt *FailoverOptions) getDialTimeout() time.Duration {	if opt.DialTimeout == 0 {		return 5 * time.Second	}	return opt.DialTimeout}func (opt *FailoverOptions) options() *options {	return &options{		DB:       opt.DB,		Password: opt.Password,		DialTimeout:  opt.getDialTimeout(),		ReadTimeout:  opt.ReadTimeout,		WriteTimeout: opt.WriteTimeout,		PoolSize:    opt.getPoolSize(),		IdleTimeout: opt.IdleTimeout,	}}func NewFailoverClient(failoverOpt *FailoverOptions) *Client {	opt := failoverOpt.options()	failover := &sentinelFailover{		masterName:    failoverOpt.MasterName,		sentinelAddrs: failoverOpt.SentinelAddrs,		opt: opt,	}	return &Client{		baseClient: &baseClient{			opt:      opt,			connPool: failover.Pool(),		},	}}//------------------------------------------------------------------------------type sentinelClient struct {	*baseClient}func newSentinel(clOpt *Options) *sentinelClient {	opt := clOpt.options()	opt.Password = ""	opt.DB = 0	dialer := func() (net.Conn, error) {		return net.DialTimeout("tcp", clOpt.Addr, opt.DialTimeout)	}	return &sentinelClient{		baseClient: &baseClient{			opt:      opt,			connPool: newConnPool(newConnFunc(dialer), opt),		},	}}func (c *sentinelClient) PubSub() *PubSub {	return &PubSub{		baseClient: &baseClient{			opt:      c.opt,			connPool: newSingleConnPool(c.connPool, false),		},	}}func (c *sentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {	cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)	c.Process(cmd)	return cmd}func (c *sentinelClient) Sentinels(name string) *SliceCmd {	cmd := NewSliceCmd("SENTINEL", "sentinels", name)	c.Process(cmd)	return cmd}type sentinelFailover struct {	masterName    string	sentinelAddrs []string	opt *options	pool     pool	poolOnce sync.Once	lock      sync.RWMutex	_sentinel *sentinelClient}func (d *sentinelFailover) dial() (net.Conn, error) {	addr, err := d.MasterAddr()	if err != nil {		return nil, err	}	return net.DialTimeout("tcp", addr, d.opt.DialTimeout)}func (d *sentinelFailover) Pool() pool {	d.poolOnce.Do(func() {		d.pool = newConnPool(newConnFunc(d.dial), d.opt)	})	return d.pool}func (d *sentinelFailover) MasterAddr() (string, error) {	defer d.lock.Unlock()	d.lock.Lock()	// Try last working sentinel.	if d._sentinel != nil {		addr, err := d._sentinel.GetMasterAddrByName(d.masterName).Result()		if err != nil {			log.Printf("redis-sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)			d.resetSentinel()		} else {			addr := net.JoinHostPort(addr[0], addr[1])			log.Printf("redis-sentinel: %q addr is %s", d.masterName, addr)			return addr, nil		}	}	for i, sentinelAddr := range d.sentinelAddrs {		sentinel := newSentinel(&Options{			Addr: sentinelAddr,			DB:       d.opt.DB,			Password: d.opt.Password,			DialTimeout:  d.opt.DialTimeout,			ReadTimeout:  d.opt.ReadTimeout,			WriteTimeout: d.opt.WriteTimeout,			PoolSize:    d.opt.PoolSize,			IdleTimeout: d.opt.IdleTimeout,		})		masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()		if err != nil {			log.Printf("redis-sentinel: GetMasterAddrByName %q failed: %s", d.masterName, err)			sentinel.Close()			continue		}		// Push working sentinel to the top.		d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]		d.setSentinel(sentinel)		addr := net.JoinHostPort(masterAddr[0], masterAddr[1])		log.Printf("redis-sentinel: %q addr is %s", d.masterName, addr)		return addr, nil	}	return "", errors.New("redis: all sentinels are unreachable")}func (d *sentinelFailover) setSentinel(sentinel *sentinelClient) {	d.discoverSentinels(sentinel)	d._sentinel = sentinel	go d.listen()}func (d *sentinelFailover) discoverSentinels(sentinel *sentinelClient) {	sentinels, err := sentinel.Sentinels(d.masterName).Result()	if err != nil {		log.Printf("redis-sentinel: Sentinels %q failed: %s", d.masterName, err)		return	}	for _, sentinel := range sentinels {		vals := sentinel.([]interface{})		for i := 0; i < len(vals); i += 2 {			key := vals[i].(string)			if key == "name" {				sentinelAddr := vals[i+1].(string)				if !contains(d.sentinelAddrs, sentinelAddr) {					log.Printf(						"redis-sentinel: discovered new %q sentinel: %s",						d.masterName, sentinelAddr,					)					d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)				}			}		}	}}func (d *sentinelFailover) listen() {	var pubsub *PubSub	for {		if pubsub == nil {			pubsub = d._sentinel.PubSub()			if err := pubsub.Subscribe("+switch-master"); err != nil {				log.Printf("redis-sentinel: Subscribe failed: %s", err)				d.lock.Lock()				d.resetSentinel()				d.lock.Unlock()				return			}		}		msgIface, err := pubsub.Receive()		if err != nil {			log.Printf("redis-sentinel: Receive failed: %s", err)			pubsub.Close()			return		}		switch msg := msgIface.(type) {		case *Message:			switch msg.Channel {			case "+switch-master":				parts := strings.Split(msg.Payload, " ")				if parts[0] != d.masterName {					log.Printf("redis-sentinel: ignore new %s addr", parts[0])					continue				}				addr := net.JoinHostPort(parts[3], parts[4])				log.Printf(					"redis-sentinel: new %q addr is %s",					d.masterName, addr,				)				d.pool.Filter(func(cn *conn) bool {					if cn.RemoteAddr().String() != addr {						log.Printf(							"redis-sentinel: closing connection to old master %s",							cn.RemoteAddr(),						)						return false					}					return true				})			default:				log.Printf("redis-sentinel: unsupported message: %s", msg)			}		case *Subscription:			// Ignore.		default:			log.Printf("redis-sentinel: unsupported message: %s", msgIface)		}	}}func (d *sentinelFailover) resetSentinel() {	d._sentinel.Close()	d._sentinel = nil}func contains(slice []string, str string) bool {	for _, s := range slice {		if s == str {			return true		}	}	return false}
 |