| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405 | package redisimport (	"container/list"	"errors"	"log"	"net"	"sync"	"time"	"gopkg.in/bufio.v1")var (	errClosed      = errors.New("redis: client is closed")	errRateLimited = errors.New("redis: you open connections too fast"))var (	zeroTime = time.Time{})type pool interface {	Get() (*conn, bool, error)	Put(*conn) error	Remove(*conn) error	Len() int	Size() int	Close() error	Filter(func(*conn) bool)}//------------------------------------------------------------------------------type conn struct {	netcn net.Conn	rd    *bufio.Reader	buf   []byte	inUse  bool	usedAt time.Time	readTimeout  time.Duration	writeTimeout time.Duration	elem *list.Element}func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) {	return func() (*conn, error) {		netcn, err := dial()		if err != nil {			return nil, err		}		cn := &conn{			netcn: netcn,			buf:   make([]byte, 0, 64),		}		cn.rd = bufio.NewReader(cn)		return cn, nil	}}func (cn *conn) Read(b []byte) (int, error) {	if cn.readTimeout != 0 {		cn.netcn.SetReadDeadline(time.Now().Add(cn.readTimeout))	} else {		cn.netcn.SetReadDeadline(zeroTime)	}	return cn.netcn.Read(b)}func (cn *conn) Write(b []byte) (int, error) {	if cn.writeTimeout != 0 {		cn.netcn.SetWriteDeadline(time.Now().Add(cn.writeTimeout))	} else {		cn.netcn.SetWriteDeadline(zeroTime)	}	return cn.netcn.Write(b)}func (cn *conn) RemoteAddr() net.Addr {	return cn.netcn.RemoteAddr()}func (cn *conn) Close() error {	return cn.netcn.Close()}//------------------------------------------------------------------------------type connPool struct {	dial func() (*conn, error)	rl   *rateLimiter	opt *options	cond  *sync.Cond	conns *list.List	idleNum int	closed  bool}func newConnPool(dial func() (*conn, error), opt *options) *connPool {	return &connPool{		dial: dial,		rl:   newRateLimiter(time.Second, 2*opt.PoolSize),		opt: opt,		cond:  sync.NewCond(&sync.Mutex{}),		conns: list.New(),	}}func (p *connPool) new() (*conn, error) {	if !p.rl.Check() {		return nil, errRateLimited	}	return p.dial()}func (p *connPool) Get() (*conn, bool, error) {	p.cond.L.Lock()	if p.closed {		p.cond.L.Unlock()		return nil, false, errClosed	}	if p.opt.IdleTimeout > 0 {		for el := p.conns.Front(); el != nil; el = el.Next() {			cn := el.Value.(*conn)			if cn.inUse {				break			}			if time.Since(cn.usedAt) > p.opt.IdleTimeout {				if err := p.remove(cn); err != nil {					log.Printf("remove failed: %s", err)				}			}		}	}	for p.conns.Len() >= p.opt.PoolSize && p.idleNum == 0 {		p.cond.Wait()	}	if p.idleNum > 0 {		elem := p.conns.Front()		cn := elem.Value.(*conn)		if cn.inUse {			panic("pool: precondition failed")		}		cn.inUse = true		p.conns.MoveToBack(elem)		p.idleNum--		p.cond.L.Unlock()		return cn, false, nil	}	if p.conns.Len() < p.opt.PoolSize {		cn, err := p.new()		if err != nil {			p.cond.L.Unlock()			return nil, false, err		}		cn.inUse = true		cn.elem = p.conns.PushBack(cn)		p.cond.L.Unlock()		return cn, true, nil	}	panic("not reached")}func (p *connPool) Put(cn *conn) error {	if cn.rd.Buffered() != 0 {		b, _ := cn.rd.ReadN(cn.rd.Buffered())		log.Printf("redis: connection has unread data: %q", b)		return p.Remove(cn)	}	if p.opt.IdleTimeout > 0 {		cn.usedAt = time.Now()	}	p.cond.L.Lock()	if p.closed {		p.cond.L.Unlock()		return errClosed	}	cn.inUse = false	p.conns.MoveToFront(cn.elem)	p.idleNum++	p.cond.Signal()	p.cond.L.Unlock()	return nil}func (p *connPool) Remove(cn *conn) error {	p.cond.L.Lock()	if p.closed {		// Noop, connection is already closed.		p.cond.L.Unlock()		return nil	}	err := p.remove(cn)	p.cond.Signal()	p.cond.L.Unlock()	return err}func (p *connPool) remove(cn *conn) error {	p.conns.Remove(cn.elem)	cn.elem = nil	if !cn.inUse {		p.idleNum--	}	return cn.Close()}// Len returns number of idle connections.func (p *connPool) Len() int {	defer p.cond.L.Unlock()	p.cond.L.Lock()	return p.idleNum}// Size returns number of connections in the pool.func (p *connPool) Size() int {	defer p.cond.L.Unlock()	p.cond.L.Lock()	return p.conns.Len()}func (p *connPool) Filter(f func(*conn) bool) {	p.cond.L.Lock()	for el, next := p.conns.Front(), p.conns.Front(); el != nil; el = next {		next = el.Next()		cn := el.Value.(*conn)		if !f(cn) {			p.remove(cn)		}	}	p.cond.L.Unlock()}func (p *connPool) Close() error {	defer p.cond.L.Unlock()	p.cond.L.Lock()	if p.closed {		return nil	}	p.closed = true	p.rl.Close()	var retErr error	for {		e := p.conns.Front()		if e == nil {			break		}		if err := p.remove(e.Value.(*conn)); err != nil {			log.Printf("cn.Close failed: %s", err)			retErr = err		}	}	return retErr}//------------------------------------------------------------------------------type singleConnPool struct {	pool pool	cnMtx sync.Mutex	cn    *conn	reusable bool	closed bool}func newSingleConnPool(pool pool, reusable bool) *singleConnPool {	return &singleConnPool{		pool:     pool,		reusable: reusable,	}}func (p *singleConnPool) SetConn(cn *conn) {	p.cnMtx.Lock()	p.cn = cn	p.cnMtx.Unlock()}func (p *singleConnPool) Get() (*conn, bool, error) {	defer p.cnMtx.Unlock()	p.cnMtx.Lock()	if p.closed {		return nil, false, errClosed	}	if p.cn != nil {		return p.cn, false, nil	}	cn, isNew, err := p.pool.Get()	if err != nil {		return nil, false, err	}	p.cn = cn	return p.cn, isNew, nil}func (p *singleConnPool) Put(cn *conn) error {	defer p.cnMtx.Unlock()	p.cnMtx.Lock()	if p.cn != cn {		panic("p.cn != cn")	}	if p.closed {		return errClosed	}	return nil}func (p *singleConnPool) put() error {	err := p.pool.Put(p.cn)	p.cn = nil	return err}func (p *singleConnPool) Remove(cn *conn) error {	defer p.cnMtx.Unlock()	p.cnMtx.Lock()	if p.cn == nil {		panic("p.cn == nil")	}	if p.cn != cn {		panic("p.cn != cn")	}	if p.closed {		return errClosed	}	return p.remove()}func (p *singleConnPool) remove() error {	err := p.pool.Remove(p.cn)	p.cn = nil	return err}func (p *singleConnPool) Len() int {	defer p.cnMtx.Unlock()	p.cnMtx.Lock()	if p.cn == nil {		return 0	}	return 1}func (p *singleConnPool) Size() int {	defer p.cnMtx.Unlock()	p.cnMtx.Lock()	if p.cn == nil {		return 0	}	return 1}func (p *singleConnPool) Filter(f func(*conn) bool) {	p.cnMtx.Lock()	if p.cn != nil {		if !f(p.cn) {			p.remove()		}	}	p.cnMtx.Unlock()}func (p *singleConnPool) Close() error {	defer p.cnMtx.Unlock()	p.cnMtx.Lock()	if p.closed {		return nil	}	p.closed = true	var err error	if p.cn != nil {		if p.reusable {			err = p.put()		} else {			err = p.remove()		}	}	return err}
 |