| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 | 
							- package redis
 
- // Not thread-safe.
 
- type Pipeline struct {
 
- 	*Client
 
- 	closed bool
 
- }
 
- func (c *Client) Pipeline() *Pipeline {
 
- 	return &Pipeline{
 
- 		Client: &Client{
 
- 			baseClient: &baseClient{
 
- 				opt:      c.opt,
 
- 				connPool: c.connPool,
 
- 				cmds: make([]Cmder, 0),
 
- 			},
 
- 		},
 
- 	}
 
- }
 
- func (c *Client) Pipelined(f func(*Pipeline) error) ([]Cmder, error) {
 
- 	pc := c.Pipeline()
 
- 	if err := f(pc); err != nil {
 
- 		return nil, err
 
- 	}
 
- 	cmds, err := pc.Exec()
 
- 	pc.Close()
 
- 	return cmds, err
 
- }
 
- func (c *Pipeline) Close() error {
 
- 	c.closed = true
 
- 	return nil
 
- }
 
- func (c *Pipeline) Discard() error {
 
- 	if c.closed {
 
- 		return errClosed
 
- 	}
 
- 	c.cmds = c.cmds[:0]
 
- 	return nil
 
- }
 
- // Exec always returns list of commands and error of the first failed
 
- // command if any.
 
- func (c *Pipeline) Exec() ([]Cmder, error) {
 
- 	if c.closed {
 
- 		return nil, errClosed
 
- 	}
 
- 	cmds := c.cmds
 
- 	c.cmds = make([]Cmder, 0)
 
- 	if len(cmds) == 0 {
 
- 		return []Cmder{}, nil
 
- 	}
 
- 	cn, err := c.conn()
 
- 	if err != nil {
 
- 		setCmdsErr(cmds, err)
 
- 		return cmds, err
 
- 	}
 
- 	if err := c.execCmds(cn, cmds); err != nil {
 
- 		c.freeConn(cn, err)
 
- 		return cmds, err
 
- 	}
 
- 	c.putConn(cn)
 
- 	return cmds, nil
 
- }
 
- func (c *Pipeline) execCmds(cn *conn, cmds []Cmder) error {
 
- 	if err := c.writeCmd(cn, cmds...); err != nil {
 
- 		setCmdsErr(cmds, err)
 
- 		return err
 
- 	}
 
- 	var firstCmdErr error
 
- 	for _, cmd := range cmds {
 
- 		if err := cmd.parseReply(cn.rd); err != nil {
 
- 			if firstCmdErr == nil {
 
- 				firstCmdErr = err
 
- 			}
 
- 		}
 
- 	}
 
- 	return firstCmdErr
 
- }
 
 
  |