| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 | package redis// Not thread-safe.type Pipeline struct {	*Client	closed bool}func (c *Client) Pipeline() *Pipeline {	return &Pipeline{		Client: &Client{			baseClient: &baseClient{				opt:      c.opt,				connPool: c.connPool,				cmds: make([]Cmder, 0),			},		},	}}func (c *Client) Pipelined(f func(*Pipeline) error) ([]Cmder, error) {	pc := c.Pipeline()	if err := f(pc); err != nil {		return nil, err	}	cmds, err := pc.Exec()	pc.Close()	return cmds, err}func (c *Pipeline) Close() error {	c.closed = true	return nil}func (c *Pipeline) Discard() error {	if c.closed {		return errClosed	}	c.cmds = c.cmds[:0]	return nil}// Exec always returns list of commands and error of the first failed// command if any.func (c *Pipeline) Exec() ([]Cmder, error) {	if c.closed {		return nil, errClosed	}	cmds := c.cmds	c.cmds = make([]Cmder, 0)	if len(cmds) == 0 {		return []Cmder{}, nil	}	cn, err := c.conn()	if err != nil {		setCmdsErr(cmds, err)		return cmds, err	}	if err := c.execCmds(cn, cmds); err != nil {		c.freeConn(cn, err)		return cmds, err	}	c.putConn(cn)	return cmds, nil}func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error {	if err := c.writeCmd(cn, cmds...); err != nil {		setCmdsErr(cmds, err)		return err	}	var firstCmdErr error	for _, cmd := range cmds {		if err := cmd.parseReply(cn.rd); err != nil {			if firstCmdErr == nil {				firstCmdErr = err			}		}	}	return firstCmdErr}
 |