| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754 | 
							- package mssql
 
- import (
 
- 	"encoding/binary"
 
- 	"errors"
 
- 	"io"
 
- 	"net"
 
- 	"strconv"
 
- 	"strings"
 
- 	"golang.org/x/net/context"
 
- )
 
- //go:generate stringer -type token
 
- type token byte
 
- // token ids
 
- const (
 
- 	tokenReturnStatus token = 121 // 0x79
 
- 	tokenColMetadata  token = 129 // 0x81
 
- 	tokenOrder        token = 169 // 0xA9
 
- 	tokenError        token = 170 // 0xAA
 
- 	tokenInfo         token = 171 // 0xAB
 
- 	tokenLoginAck     token = 173 // 0xad
 
- 	tokenRow          token = 209 // 0xd1
 
- 	tokenNbcRow       token = 210 // 0xd2
 
- 	tokenEnvChange    token = 227 // 0xE3
 
- 	tokenSSPI         token = 237 // 0xED
 
- 	tokenDone         token = 253 // 0xFD
 
- 	tokenDoneProc     token = 254
 
- 	tokenDoneInProc   token = 255
 
- )
 
- // done flags
 
- // https://msdn.microsoft.com/en-us/library/dd340421.aspx
 
- const (
 
- 	doneFinal    = 0
 
- 	doneMore     = 1
 
- 	doneError    = 2
 
- 	doneInxact   = 4
 
- 	doneCount    = 0x10
 
- 	doneAttn     = 0x20
 
- 	doneSrvError = 0x100
 
- )
 
- // ENVCHANGE types
 
- // http://msdn.microsoft.com/en-us/library/dd303449.aspx
 
- const (
 
- 	envTypDatabase           = 1
 
- 	envTypLanguage           = 2
 
- 	envTypCharset            = 3
 
- 	envTypPacketSize         = 4
 
- 	envSortId                = 5
 
- 	envSortFlags             = 6
 
- 	envSqlCollation          = 7
 
- 	envTypBeginTran          = 8
 
- 	envTypCommitTran         = 9
 
- 	envTypRollbackTran       = 10
 
- 	envEnlistDTC             = 11
 
- 	envDefectTran            = 12
 
- 	envDatabaseMirrorPartner = 13
 
- 	envPromoteTran           = 15
 
- 	envTranMgrAddr           = 16
 
- 	envTranEnded             = 17
 
- 	envResetConnAck          = 18
 
- 	envStartedInstanceName   = 19
 
- 	envRouting               = 20
 
- )
 
- // COLMETADATA flags
 
- // https://msdn.microsoft.com/en-us/library/dd357363.aspx
 
- const (
 
- 	colFlagNullable = 1
 
- 	// TODO implement more flags
 
- )
 
- // interface for all tokens
 
- type tokenStruct interface{}
 
- type orderStruct struct {
 
- 	ColIds []uint16
 
- }
 
- type doneStruct struct {
 
- 	Status   uint16
 
- 	CurCmd   uint16
 
- 	RowCount uint64
 
- 	errors   []Error
 
- }
 
- func (d doneStruct) isError() bool {
 
- 	return d.Status&doneError != 0 || len(d.errors) > 0
 
- }
 
- func (d doneStruct) getError() Error {
 
- 	if len(d.errors) > 0 {
 
- 		return d.errors[len(d.errors)-1]
 
- 	} else {
 
- 		return Error{Message: "Request failed but didn't provide reason"}
 
- 	}
 
- }
 
- type doneInProcStruct doneStruct
 
- var doneFlags2str = map[uint16]string{
 
- 	doneFinal:    "final",
 
- 	doneMore:     "more",
 
- 	doneError:    "error",
 
- 	doneInxact:   "inxact",
 
- 	doneCount:    "count",
 
- 	doneAttn:     "attn",
 
- 	doneSrvError: "srverror",
 
- }
 
- func doneFlags2Str(flags uint16) string {
 
- 	strs := make([]string, 0, len(doneFlags2str))
 
- 	for flag, tag := range doneFlags2str {
 
- 		if flags&flag != 0 {
 
- 			strs = append(strs, tag)
 
- 		}
 
- 	}
 
- 	return strings.Join(strs, "|")
 
- }
 
- // ENVCHANGE stream
 
- // http://msdn.microsoft.com/en-us/library/dd303449.aspx
 
- func processEnvChg(sess *tdsSession) {
 
- 	size := sess.buf.uint16()
 
- 	r := &io.LimitedReader{R: sess.buf, N: int64(size)}
 
- 	for {
 
- 		var err error
 
- 		var envtype uint8
 
- 		err = binary.Read(r, binary.LittleEndian, &envtype)
 
- 		if err == io.EOF {
 
- 			return
 
- 		}
 
- 		if err != nil {
 
- 			badStreamPanic(err)
 
- 		}
 
- 		switch envtype {
 
- 		case envTypDatabase:
 
- 			sess.database, err = readBVarChar(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			_, err = readBVarChar(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envTypLanguage:
 
- 			//currently ignored
 
- 			// old value
 
- 			_, err = readBVarChar(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// new value
 
- 			_, err = readBVarChar(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envTypCharset:
 
- 			//currently ignored
 
- 			// old value
 
- 			_, err = readBVarChar(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// new value
 
- 			_, err = readBVarChar(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envTypPacketSize:
 
- 			packetsize, err := readBVarChar(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			_, err = readBVarChar(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			packetsizei, err := strconv.Atoi(packetsize)
 
- 			if err != nil {
 
- 				badStreamPanicf("Invalid Packet size value returned from server (%s): %s", packetsize, err.Error())
 
- 			}
 
- 			sess.buf.ResizeBuffer(packetsizei)
 
- 		case envSortId:
 
- 			// currently ignored
 
- 			// old value, should be 0
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// new value
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envSortFlags:
 
- 			// currently ignored
 
- 			// old value, should be 0
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// new value
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envSqlCollation:
 
- 			// currently ignored
 
- 			// old value
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// new value
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envTypBeginTran:
 
- 			tranid, err := readBVarByte(r)
 
- 			if len(tranid) != 8 {
 
- 				badStreamPanicf("invalid size of transaction identifier: %d", len(tranid))
 
- 			}
 
- 			sess.tranid = binary.LittleEndian.Uint64(tranid)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			if sess.logFlags&logTransaction != 0 {
 
- 				sess.log.Printf("BEGIN TRANSACTION %x\n", sess.tranid)
 
- 			}
 
- 			_, err = readBVarByte(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envTypCommitTran, envTypRollbackTran:
 
- 			_, err = readBVarByte(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			_, err = readBVarByte(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			if sess.logFlags&logTransaction != 0 {
 
- 				if envtype == envTypCommitTran {
 
- 					sess.log.Printf("COMMIT TRANSACTION %x\n", sess.tranid)
 
- 				} else {
 
- 					sess.log.Printf("ROLLBACK TRANSACTION %x\n", sess.tranid)
 
- 				}
 
- 			}
 
- 			sess.tranid = 0
 
- 		case envEnlistDTC:
 
- 			// currently ignored
 
- 			// old value
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// new value, should be 0
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envDefectTran:
 
- 			// currently ignored
 
- 			// old value, should be 0
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// new value
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envDatabaseMirrorPartner:
 
- 			sess.partner, err = readBVarChar(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			_, err = readBVarChar(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envPromoteTran:
 
- 			// currently ignored
 
- 			// old value, should be 0
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// dtc token
 
- 			// spec says it should be L_VARBYTE, so this code might be wrong
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envTranMgrAddr:
 
- 			// currently ignored
 
- 			// old value, should be 0
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// XACT_MANAGER_ADDRESS = B_VARBYTE
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envTranEnded:
 
- 			// currently ignored
 
- 			// old value, B_VARBYTE
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// should be 0
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envResetConnAck:
 
- 			// currently ignored
 
- 			// old value, should be 0
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// should be 0
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envStartedInstanceName:
 
- 			// currently ignored
 
- 			// old value, should be 0
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// instance name
 
- 			if _, err = readBVarChar(r); err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 		case envRouting:
 
- 			// RoutingData message is:
 
- 			// ValueLength                 USHORT
 
- 			// Protocol (TCP = 0)          BYTE
 
- 			// ProtocolProperty (new port) USHORT
 
- 			// AlternateServer             US_VARCHAR
 
- 			_, err := readUshort(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			protocol, err := readByte(r)
 
- 			if err != nil || protocol != 0 {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			newPort, err := readUshort(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			newServer, err := readUsVarChar(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			// consume the OLDVALUE = %x00 %x00
 
- 			_, err = readUshort(r)
 
- 			if err != nil {
 
- 				badStreamPanic(err)
 
- 			}
 
- 			sess.routedServer = newServer
 
- 			sess.routedPort = newPort
 
- 		default:
 
- 			// ignore rest of records because we don't know how to skip those
 
- 			sess.log.Printf("WARN: Unknown ENVCHANGE record detected with type id = %d\n", envtype)
 
- 			break
 
- 		}
 
- 	}
 
- }
 
- type returnStatus int32
 
- // http://msdn.microsoft.com/en-us/library/dd358180.aspx
 
- func parseReturnStatus(r *tdsBuffer) returnStatus {
 
- 	return returnStatus(r.int32())
 
- }
 
- func parseOrder(r *tdsBuffer) (res orderStruct) {
 
- 	len := int(r.uint16())
 
- 	res.ColIds = make([]uint16, len/2)
 
- 	for i := 0; i < len/2; i++ {
 
- 		res.ColIds[i] = r.uint16()
 
- 	}
 
- 	return res
 
- }
 
- // https://msdn.microsoft.com/en-us/library/dd340421.aspx
 
- func parseDone(r *tdsBuffer) (res doneStruct) {
 
- 	res.Status = r.uint16()
 
- 	res.CurCmd = r.uint16()
 
- 	res.RowCount = r.uint64()
 
- 	return res
 
- }
 
- // https://msdn.microsoft.com/en-us/library/dd340553.aspx
 
- func parseDoneInProc(r *tdsBuffer) (res doneInProcStruct) {
 
- 	res.Status = r.uint16()
 
- 	res.CurCmd = r.uint16()
 
- 	res.RowCount = r.uint64()
 
- 	return res
 
- }
 
- type sspiMsg []byte
 
- func parseSSPIMsg(r *tdsBuffer) sspiMsg {
 
- 	size := r.uint16()
 
- 	buf := make([]byte, size)
 
- 	r.ReadFull(buf)
 
- 	return sspiMsg(buf)
 
- }
 
- type loginAckStruct struct {
 
- 	Interface  uint8
 
- 	TDSVersion uint32
 
- 	ProgName   string
 
- 	ProgVer    uint32
 
- }
 
- func parseLoginAck(r *tdsBuffer) loginAckStruct {
 
- 	size := r.uint16()
 
- 	buf := make([]byte, size)
 
- 	r.ReadFull(buf)
 
- 	var res loginAckStruct
 
- 	res.Interface = buf[0]
 
- 	res.TDSVersion = binary.BigEndian.Uint32(buf[1:])
 
- 	prognamelen := buf[1+4]
 
- 	var err error
 
- 	if res.ProgName, err = ucs22str(buf[1+4+1 : 1+4+1+prognamelen*2]); err != nil {
 
- 		badStreamPanic(err)
 
- 	}
 
- 	res.ProgVer = binary.BigEndian.Uint32(buf[size-4:])
 
- 	return res
 
- }
 
- // http://msdn.microsoft.com/en-us/library/dd357363.aspx
 
- func parseColMetadata72(r *tdsBuffer) (columns []columnStruct) {
 
- 	count := r.uint16()
 
- 	if count == 0xffff {
 
- 		// no metadata is sent
 
- 		return nil
 
- 	}
 
- 	columns = make([]columnStruct, count)
 
- 	for i := range columns {
 
- 		column := &columns[i]
 
- 		column.UserType = r.uint32()
 
- 		column.Flags = r.uint16()
 
- 		// parsing TYPE_INFO structure
 
- 		column.ti = readTypeInfo(r)
 
- 		column.ColName = r.BVarChar()
 
- 	}
 
- 	return columns
 
- }
 
- // http://msdn.microsoft.com/en-us/library/dd357254.aspx
 
- func parseRow(r *tdsBuffer, columns []columnStruct, row []interface{}) {
 
- 	for i, column := range columns {
 
- 		row[i] = column.ti.Reader(&column.ti, r)
 
- 	}
 
- }
 
- // http://msdn.microsoft.com/en-us/library/dd304783.aspx
 
- func parseNbcRow(r *tdsBuffer, columns []columnStruct, row []interface{}) {
 
- 	bitlen := (len(columns) + 7) / 8
 
- 	pres := make([]byte, bitlen)
 
- 	r.ReadFull(pres)
 
- 	for i, col := range columns {
 
- 		if pres[i/8]&(1<<(uint(i)%8)) != 0 {
 
- 			row[i] = nil
 
- 			continue
 
- 		}
 
- 		row[i] = col.ti.Reader(&col.ti, r)
 
- 	}
 
- }
 
- // http://msdn.microsoft.com/en-us/library/dd304156.aspx
 
- func parseError72(r *tdsBuffer) (res Error) {
 
- 	length := r.uint16()
 
- 	_ = length // ignore length
 
- 	res.Number = r.int32()
 
- 	res.State = r.byte()
 
- 	res.Class = r.byte()
 
- 	res.Message = r.UsVarChar()
 
- 	res.ServerName = r.BVarChar()
 
- 	res.ProcName = r.BVarChar()
 
- 	res.LineNo = r.int32()
 
- 	return
 
- }
 
- // http://msdn.microsoft.com/en-us/library/dd304156.aspx
 
- func parseInfo(r *tdsBuffer) (res Error) {
 
- 	length := r.uint16()
 
- 	_ = length // ignore length
 
- 	res.Number = r.int32()
 
- 	res.State = r.byte()
 
- 	res.Class = r.byte()
 
- 	res.Message = r.UsVarChar()
 
- 	res.ServerName = r.BVarChar()
 
- 	res.ProcName = r.BVarChar()
 
- 	res.LineNo = r.int32()
 
- 	return
 
- }
 
- func processSingleResponse(sess *tdsSession, ch chan tokenStruct) {
 
- 	defer func() {
 
- 		if err := recover(); err != nil {
 
- 			if sess.logFlags&logErrors != 0 {
 
- 				sess.log.Printf("ERROR: Intercepted panic %v", err)
 
- 			}
 
- 			ch <- err
 
- 		}
 
- 		close(ch)
 
- 	}()
 
- 	packet_type, err := sess.buf.BeginRead()
 
- 	if err != nil {
 
- 		if sess.logFlags&logErrors != 0 {
 
- 			sess.log.Printf("ERROR: BeginRead failed %v", err)
 
- 		}
 
- 		ch <- err
 
- 		return
 
- 	}
 
- 	if packet_type != packReply {
 
- 		badStreamPanicf("invalid response packet type, expected REPLY, actual: %d", packet_type)
 
- 	}
 
- 	var columns []columnStruct
 
- 	errs := make([]Error, 0, 5)
 
- 	for {
 
- 		token := token(sess.buf.byte())
 
- 		if sess.logFlags&logDebug != 0 {
 
- 			sess.log.Printf("got token %v", token)
 
- 		}
 
- 		switch token {
 
- 		case tokenSSPI:
 
- 			ch <- parseSSPIMsg(sess.buf)
 
- 			return
 
- 		case tokenReturnStatus:
 
- 			returnStatus := parseReturnStatus(sess.buf)
 
- 			ch <- returnStatus
 
- 		case tokenLoginAck:
 
- 			loginAck := parseLoginAck(sess.buf)
 
- 			ch <- loginAck
 
- 		case tokenOrder:
 
- 			order := parseOrder(sess.buf)
 
- 			ch <- order
 
- 		case tokenDoneInProc:
 
- 			done := parseDoneInProc(sess.buf)
 
- 			if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 {
 
- 				sess.log.Printf("(%d row(s) affected)\n", done.RowCount)
 
- 			}
 
- 			ch <- done
 
- 		case tokenDone, tokenDoneProc:
 
- 			done := parseDone(sess.buf)
 
- 			done.errors = errs
 
- 			if sess.logFlags&logDebug != 0 {
 
- 				sess.log.Printf("got DONE or DONEPROC status=%d", done.Status)
 
- 			}
 
- 			if done.Status&doneSrvError != 0 {
 
- 				ch <- errors.New("SQL Server had internal error")
 
- 				return
 
- 			}
 
- 			if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 {
 
- 				sess.log.Printf("(%d row(s) affected)\n", done.RowCount)
 
- 			}
 
- 			ch <- done
 
- 			if done.Status&doneMore == 0 {
 
- 				return
 
- 			}
 
- 		case tokenColMetadata:
 
- 			columns = parseColMetadata72(sess.buf)
 
- 			ch <- columns
 
- 		case tokenRow:
 
- 			row := make([]interface{}, len(columns))
 
- 			parseRow(sess.buf, columns, row)
 
- 			ch <- row
 
- 		case tokenNbcRow:
 
- 			row := make([]interface{}, len(columns))
 
- 			parseNbcRow(sess.buf, columns, row)
 
- 			ch <- row
 
- 		case tokenEnvChange:
 
- 			processEnvChg(sess)
 
- 		case tokenError:
 
- 			err := parseError72(sess.buf)
 
- 			if sess.logFlags&logDebug != 0 {
 
- 				sess.log.Printf("got ERROR %d %s", err.Number, err.Message)
 
- 			}
 
- 			errs = append(errs, err)
 
- 			if sess.logFlags&logErrors != 0 {
 
- 				sess.log.Println(err.Message)
 
- 			}
 
- 		case tokenInfo:
 
- 			info := parseInfo(sess.buf)
 
- 			if sess.logFlags&logDebug != 0 {
 
- 				sess.log.Printf("got INFO %d %s", info.Number, info.Message)
 
- 			}
 
- 			if sess.logFlags&logMessages != 0 {
 
- 				sess.log.Println(info.Message)
 
- 			}
 
- 		default:
 
- 			badStreamPanicf("Unknown token type: %d", token)
 
- 		}
 
- 	}
 
- }
 
- type parseRespIter byte
 
- const (
 
- 	parseRespIterContinue parseRespIter = iota // Continue parsing current token.
 
- 	parseRespIterNext                          // Fetch the next token.
 
- 	parseRespIterDone                          // Done with parsing the response.
 
- )
 
- type parseRespState byte
 
- const (
 
- 	parseRespStateNormal  parseRespState = iota // Normal response state.
 
- 	parseRespStateCancel                        // Query is canceled, wait for server to confirm.
 
- 	parseRespStateClosing                       // Waiting for tokens to come through.
 
- )
 
- type parseResp struct {
 
- 	sess        *tdsSession
 
- 	ctxDone     <-chan struct{}
 
- 	state       parseRespState
 
- 	cancelError error
 
- }
 
- func (ts *parseResp) sendAttention(ch chan tokenStruct) parseRespIter {
 
- 	if err := sendAttention(ts.sess.buf); err != nil {
 
- 		ts.dlogf("failed to send attention signal %v", err)
 
- 		ch <- err
 
- 		return parseRespIterDone
 
- 	}
 
- 	ts.state = parseRespStateCancel
 
- 	return parseRespIterContinue
 
- }
 
- func (ts *parseResp) dlog(msg string) {
 
- 	if ts.sess.logFlags&logDebug != 0 {
 
- 		ts.sess.log.Println(msg)
 
- 	}
 
- }
 
- func (ts *parseResp) dlogf(f string, v ...interface{}) {
 
- 	if ts.sess.logFlags&logDebug != 0 {
 
- 		ts.sess.log.Printf(f, v...)
 
- 	}
 
- }
 
- func (ts *parseResp) iter(ctx context.Context, ch chan tokenStruct, tokChan chan tokenStruct) parseRespIter {
 
- 	switch ts.state {
 
- 	default:
 
- 		panic("unknown state")
 
- 	case parseRespStateNormal:
 
- 		select {
 
- 		case tok, ok := <-tokChan:
 
- 			if !ok {
 
- 				ts.dlog("response finished")
 
- 				return parseRespIterDone
 
- 			}
 
- 			if err, ok := tok.(net.Error); ok && err.Timeout() {
 
- 				ts.cancelError = err
 
- 				ts.dlog("got timeout error, sending attention signal to server")
 
- 				return ts.sendAttention(ch)
 
- 			}
 
- 			// Pass the token along.
 
- 			ch <- tok
 
- 			return parseRespIterContinue
 
- 		case <-ts.ctxDone:
 
- 			ts.ctxDone = nil
 
- 			ts.dlog("got cancel message, sending attention signal to server")
 
- 			return ts.sendAttention(ch)
 
- 		}
 
- 	case parseRespStateCancel: // Read all responses until a DONE or error is received.Auth
 
- 		select {
 
- 		case tok, ok := <-tokChan:
 
- 			if !ok {
 
- 				ts.dlog("response finished but waiting for attention ack")
 
- 				return parseRespIterNext
 
- 			}
 
- 			switch tok := tok.(type) {
 
- 			default:
 
- 				// Ignore all other tokens while waiting.
 
- 				// The TDS spec says other tokens may arrive after an attention
 
- 				// signal is sent. Ignore these tokens and continue looking for
 
- 				// a DONE with attention confirm mark.
 
- 			case doneStruct:
 
- 				if tok.Status&doneAttn != 0 {
 
- 					ts.dlog("got cancellation confirmation from server")
 
- 					if ts.cancelError != nil {
 
- 						ch <- ts.cancelError
 
- 						ts.cancelError = nil
 
- 					} else {
 
- 						ch <- ctx.Err()
 
- 					}
 
- 					return parseRespIterDone
 
- 				}
 
- 			// If an error happens during cancel, pass it along and just stop.
 
- 			// We are uncertain to receive more tokens.
 
- 			case error:
 
- 				ch <- tok
 
- 				ts.state = parseRespStateClosing
 
- 			}
 
- 			return parseRespIterContinue
 
- 		case <-ts.ctxDone:
 
- 			ts.ctxDone = nil
 
- 			ts.state = parseRespStateClosing
 
- 			return parseRespIterContinue
 
- 		}
 
- 	case parseRespStateClosing: // Wait for current token chan to close.
 
- 		if _, ok := <-tokChan; !ok {
 
- 			ts.dlog("response finished")
 
- 			return parseRespIterDone
 
- 		}
 
- 		return parseRespIterContinue
 
- 	}
 
- }
 
- func processResponse(ctx context.Context, sess *tdsSession, ch chan tokenStruct) {
 
- 	ts := &parseResp{
 
- 		sess:    sess,
 
- 		ctxDone: ctx.Done(),
 
- 	}
 
- 	defer func() {
 
- 		// Ensure any remaining error is piped through
 
- 		// or the query may look like it executed when it actually failed.
 
- 		if ts.cancelError != nil {
 
- 			ch <- ts.cancelError
 
- 			ts.cancelError = nil
 
- 		}
 
- 		close(ch)
 
- 	}()
 
- 	// Loop over multiple responses.
 
- 	for {
 
- 		ts.dlog("initiating resonse reading")
 
- 		tokChan := make(chan tokenStruct)
 
- 		go processSingleResponse(sess, tokChan)
 
- 		// Loop over multiple tokens in response.
 
- 	tokensLoop:
 
- 		for {
 
- 			switch ts.iter(ctx, ch, tokChan) {
 
- 			case parseRespIterContinue:
 
- 				// Nothing, continue to next token.
 
- 			case parseRespIterNext:
 
- 				break tokensLoop
 
- 			case parseRespIterDone:
 
- 				return
 
- 			}
 
- 		}
 
- 	}
 
- }
 
 
  |