| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 | package redisimport (	"fmt"	"time")// Not thread-safe.type PubSub struct {	*baseClient}func (c *Client) PubSub() *PubSub {	return &PubSub{		baseClient: &baseClient{			opt:      c.opt,			connPool: newSingleConnPool(c.connPool, false),		},	}}func (c *Client) Publish(channel, message string) *IntCmd {	req := NewIntCmd("PUBLISH", channel, message)	c.Process(req)	return req}type Message struct {	Channel string	Payload string}func (m *Message) String() string {	return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)}type PMessage struct {	Channel string	Pattern string	Payload string}func (m *PMessage) String() string {	return fmt.Sprintf("PMessage<%s: %s>", m.Channel, m.Payload)}type Subscription struct {	Kind    string	Channel string	Count   int}func (m *Subscription) String() string {	return fmt.Sprintf("%s: %s", m.Kind, m.Channel)}func (c *PubSub) Receive() (interface{}, error) {	return c.ReceiveTimeout(0)}func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {	cn, err := c.conn()	if err != nil {		return nil, err	}	cn.readTimeout = timeout	cmd := NewSliceCmd()	if err := cmd.parseReply(cn.rd); err != nil {		return nil, err	}	reply := cmd.Val()	msgName := reply[0].(string)	switch msgName {	case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":		return &Subscription{			Kind:    msgName,			Channel: reply[1].(string),			Count:   int(reply[2].(int64)),		}, nil	case "message":		return &Message{			Channel: reply[1].(string),			Payload: reply[2].(string),		}, nil	case "pmessage":		return &PMessage{			Pattern: reply[1].(string),			Channel: reply[2].(string),			Payload: reply[3].(string),		}, nil	}	return nil, fmt.Errorf("redis: unsupported message name: %q", msgName)}func (c *PubSub) subscribe(cmd string, channels ...string) error {	cn, err := c.conn()	if err != nil {		return err	}	args := append([]string{cmd}, channels...)	req := NewSliceCmd(args...)	return c.writeCmd(cn, req)}func (c *PubSub) Subscribe(channels ...string) error {	return c.subscribe("SUBSCRIBE", channels...)}func (c *PubSub) PSubscribe(patterns ...string) error {	return c.subscribe("PSUBSCRIBE", patterns...)}func (c *PubSub) unsubscribe(cmd string, channels ...string) error {	cn, err := c.conn()	if err != nil {		return err	}	args := append([]string{cmd}, channels...)	req := NewSliceCmd(args...)	return c.writeCmd(cn, req)}func (c *PubSub) Unsubscribe(channels ...string) error {	return c.unsubscribe("UNSUBSCRIBE", channels...)}func (c *PubSub) PUnsubscribe(patterns ...string) error {	return c.unsubscribe("PUNSUBSCRIBE", patterns...)}
 |