| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754 | package mssqlimport (	"encoding/binary"	"errors"	"io"	"net"	"strconv"	"strings"	"golang.org/x/net/context")//go:generate stringer -type tokentype token byte// token idsconst (	tokenReturnStatus token = 121 // 0x79	tokenColMetadata  token = 129 // 0x81	tokenOrder        token = 169 // 0xA9	tokenError        token = 170 // 0xAA	tokenInfo         token = 171 // 0xAB	tokenLoginAck     token = 173 // 0xad	tokenRow          token = 209 // 0xd1	tokenNbcRow       token = 210 // 0xd2	tokenEnvChange    token = 227 // 0xE3	tokenSSPI         token = 237 // 0xED	tokenDone         token = 253 // 0xFD	tokenDoneProc     token = 254	tokenDoneInProc   token = 255)// done flags// https://msdn.microsoft.com/en-us/library/dd340421.aspxconst (	doneFinal    = 0	doneMore     = 1	doneError    = 2	doneInxact   = 4	doneCount    = 0x10	doneAttn     = 0x20	doneSrvError = 0x100)// ENVCHANGE types// http://msdn.microsoft.com/en-us/library/dd303449.aspxconst (	envTypDatabase           = 1	envTypLanguage           = 2	envTypCharset            = 3	envTypPacketSize         = 4	envSortId                = 5	envSortFlags             = 6	envSqlCollation          = 7	envTypBeginTran          = 8	envTypCommitTran         = 9	envTypRollbackTran       = 10	envEnlistDTC             = 11	envDefectTran            = 12	envDatabaseMirrorPartner = 13	envPromoteTran           = 15	envTranMgrAddr           = 16	envTranEnded             = 17	envResetConnAck          = 18	envStartedInstanceName   = 19	envRouting               = 20)// COLMETADATA flags// https://msdn.microsoft.com/en-us/library/dd357363.aspxconst (	colFlagNullable = 1	// TODO implement more flags)// interface for all tokenstype tokenStruct interface{}type orderStruct struct {	ColIds []uint16}type doneStruct struct {	Status   uint16	CurCmd   uint16	RowCount uint64	errors   []Error}func (d doneStruct) isError() bool {	return d.Status&doneError != 0 || len(d.errors) > 0}func (d doneStruct) getError() Error {	if len(d.errors) > 0 {		return d.errors[len(d.errors)-1]	} else {		return Error{Message: "Request failed but didn't provide reason"}	}}type doneInProcStruct doneStructvar doneFlags2str = map[uint16]string{	doneFinal:    "final",	doneMore:     "more",	doneError:    "error",	doneInxact:   "inxact",	doneCount:    "count",	doneAttn:     "attn",	doneSrvError: "srverror",}func doneFlags2Str(flags uint16) string {	strs := make([]string, 0, len(doneFlags2str))	for flag, tag := range doneFlags2str {		if flags&flag != 0 {			strs = append(strs, tag)		}	}	return strings.Join(strs, "|")}// ENVCHANGE stream// http://msdn.microsoft.com/en-us/library/dd303449.aspxfunc processEnvChg(sess *tdsSession) {	size := sess.buf.uint16()	r := &io.LimitedReader{R: sess.buf, N: int64(size)}	for {		var err error		var envtype uint8		err = binary.Read(r, binary.LittleEndian, &envtype)		if err == io.EOF {			return		}		if err != nil {			badStreamPanic(err)		}		switch envtype {		case envTypDatabase:			sess.database, err = readBVarChar(r)			if err != nil {				badStreamPanic(err)			}			_, err = readBVarChar(r)			if err != nil {				badStreamPanic(err)			}		case envTypLanguage:			//currently ignored			// old value			_, err = readBVarChar(r)			if err != nil {				badStreamPanic(err)			}			// new value			_, err = readBVarChar(r)			if err != nil {				badStreamPanic(err)			}		case envTypCharset:			//currently ignored			// old value			_, err = readBVarChar(r)			if err != nil {				badStreamPanic(err)			}			// new value			_, err = readBVarChar(r)			if err != nil {				badStreamPanic(err)			}		case envTypPacketSize:			packetsize, err := readBVarChar(r)			if err != nil {				badStreamPanic(err)			}			_, err = readBVarChar(r)			if err != nil {				badStreamPanic(err)			}			packetsizei, err := strconv.Atoi(packetsize)			if err != nil {				badStreamPanicf("Invalid Packet size value returned from server (%s): %s", packetsize, err.Error())			}			sess.buf.ResizeBuffer(packetsizei)		case envSortId:			// currently ignored			// old value, should be 0			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}			// new value			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}		case envSortFlags:			// currently ignored			// old value, should be 0			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}			// new value			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}		case envSqlCollation:			// currently ignored			// old value			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}			// new value			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}		case envTypBeginTran:			tranid, err := readBVarByte(r)			if len(tranid) != 8 {				badStreamPanicf("invalid size of transaction identifier: %d", len(tranid))			}			sess.tranid = binary.LittleEndian.Uint64(tranid)			if err != nil {				badStreamPanic(err)			}			if sess.logFlags&logTransaction != 0 {				sess.log.Printf("BEGIN TRANSACTION %x\n", sess.tranid)			}			_, err = readBVarByte(r)			if err != nil {				badStreamPanic(err)			}		case envTypCommitTran, envTypRollbackTran:			_, err = readBVarByte(r)			if err != nil {				badStreamPanic(err)			}			_, err = readBVarByte(r)			if err != nil {				badStreamPanic(err)			}			if sess.logFlags&logTransaction != 0 {				if envtype == envTypCommitTran {					sess.log.Printf("COMMIT TRANSACTION %x\n", sess.tranid)				} else {					sess.log.Printf("ROLLBACK TRANSACTION %x\n", sess.tranid)				}			}			sess.tranid = 0		case envEnlistDTC:			// currently ignored			// old value			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}			// new value, should be 0			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}		case envDefectTran:			// currently ignored			// old value, should be 0			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}			// new value			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}		case envDatabaseMirrorPartner:			sess.partner, err = readBVarChar(r)			if err != nil {				badStreamPanic(err)			}			_, err = readBVarChar(r)			if err != nil {				badStreamPanic(err)			}		case envPromoteTran:			// currently ignored			// old value, should be 0			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}			// dtc token			// spec says it should be L_VARBYTE, so this code might be wrong			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}		case envTranMgrAddr:			// currently ignored			// old value, should be 0			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}			// XACT_MANAGER_ADDRESS = B_VARBYTE			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}		case envTranEnded:			// currently ignored			// old value, B_VARBYTE			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}			// should be 0			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}		case envResetConnAck:			// currently ignored			// old value, should be 0			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}			// should be 0			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}		case envStartedInstanceName:			// currently ignored			// old value, should be 0			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}			// instance name			if _, err = readBVarChar(r); err != nil {				badStreamPanic(err)			}		case envRouting:			// RoutingData message is:			// ValueLength                 USHORT			// Protocol (TCP = 0)          BYTE			// ProtocolProperty (new port) USHORT			// AlternateServer             US_VARCHAR			_, err := readUshort(r)			if err != nil {				badStreamPanic(err)			}			protocol, err := readByte(r)			if err != nil || protocol != 0 {				badStreamPanic(err)			}			newPort, err := readUshort(r)			if err != nil {				badStreamPanic(err)			}			newServer, err := readUsVarChar(r)			if err != nil {				badStreamPanic(err)			}			// consume the OLDVALUE = %x00 %x00			_, err = readUshort(r)			if err != nil {				badStreamPanic(err)			}			sess.routedServer = newServer			sess.routedPort = newPort		default:			// ignore rest of records because we don't know how to skip those			sess.log.Printf("WARN: Unknown ENVCHANGE record detected with type id = %d\n", envtype)			break		}	}}type returnStatus int32// http://msdn.microsoft.com/en-us/library/dd358180.aspxfunc parseReturnStatus(r *tdsBuffer) returnStatus {	return returnStatus(r.int32())}func parseOrder(r *tdsBuffer) (res orderStruct) {	len := int(r.uint16())	res.ColIds = make([]uint16, len/2)	for i := 0; i < len/2; i++ {		res.ColIds[i] = r.uint16()	}	return res}// https://msdn.microsoft.com/en-us/library/dd340421.aspxfunc parseDone(r *tdsBuffer) (res doneStruct) {	res.Status = r.uint16()	res.CurCmd = r.uint16()	res.RowCount = r.uint64()	return res}// https://msdn.microsoft.com/en-us/library/dd340553.aspxfunc parseDoneInProc(r *tdsBuffer) (res doneInProcStruct) {	res.Status = r.uint16()	res.CurCmd = r.uint16()	res.RowCount = r.uint64()	return res}type sspiMsg []bytefunc parseSSPIMsg(r *tdsBuffer) sspiMsg {	size := r.uint16()	buf := make([]byte, size)	r.ReadFull(buf)	return sspiMsg(buf)}type loginAckStruct struct {	Interface  uint8	TDSVersion uint32	ProgName   string	ProgVer    uint32}func parseLoginAck(r *tdsBuffer) loginAckStruct {	size := r.uint16()	buf := make([]byte, size)	r.ReadFull(buf)	var res loginAckStruct	res.Interface = buf[0]	res.TDSVersion = binary.BigEndian.Uint32(buf[1:])	prognamelen := buf[1+4]	var err error	if res.ProgName, err = ucs22str(buf[1+4+1 : 1+4+1+prognamelen*2]); err != nil {		badStreamPanic(err)	}	res.ProgVer = binary.BigEndian.Uint32(buf[size-4:])	return res}// http://msdn.microsoft.com/en-us/library/dd357363.aspxfunc parseColMetadata72(r *tdsBuffer) (columns []columnStruct) {	count := r.uint16()	if count == 0xffff {		// no metadata is sent		return nil	}	columns = make([]columnStruct, count)	for i := range columns {		column := &columns[i]		column.UserType = r.uint32()		column.Flags = r.uint16()		// parsing TYPE_INFO structure		column.ti = readTypeInfo(r)		column.ColName = r.BVarChar()	}	return columns}// http://msdn.microsoft.com/en-us/library/dd357254.aspxfunc parseRow(r *tdsBuffer, columns []columnStruct, row []interface{}) {	for i, column := range columns {		row[i] = column.ti.Reader(&column.ti, r)	}}// http://msdn.microsoft.com/en-us/library/dd304783.aspxfunc parseNbcRow(r *tdsBuffer, columns []columnStruct, row []interface{}) {	bitlen := (len(columns) + 7) / 8	pres := make([]byte, bitlen)	r.ReadFull(pres)	for i, col := range columns {		if pres[i/8]&(1<<(uint(i)%8)) != 0 {			row[i] = nil			continue		}		row[i] = col.ti.Reader(&col.ti, r)	}}// http://msdn.microsoft.com/en-us/library/dd304156.aspxfunc parseError72(r *tdsBuffer) (res Error) {	length := r.uint16()	_ = length // ignore length	res.Number = r.int32()	res.State = r.byte()	res.Class = r.byte()	res.Message = r.UsVarChar()	res.ServerName = r.BVarChar()	res.ProcName = r.BVarChar()	res.LineNo = r.int32()	return}// http://msdn.microsoft.com/en-us/library/dd304156.aspxfunc parseInfo(r *tdsBuffer) (res Error) {	length := r.uint16()	_ = length // ignore length	res.Number = r.int32()	res.State = r.byte()	res.Class = r.byte()	res.Message = r.UsVarChar()	res.ServerName = r.BVarChar()	res.ProcName = r.BVarChar()	res.LineNo = r.int32()	return}func processSingleResponse(sess *tdsSession, ch chan tokenStruct) {	defer func() {		if err := recover(); err != nil {			if sess.logFlags&logErrors != 0 {				sess.log.Printf("ERROR: Intercepted panic %v", err)			}			ch <- err		}		close(ch)	}()	packet_type, err := sess.buf.BeginRead()	if err != nil {		if sess.logFlags&logErrors != 0 {			sess.log.Printf("ERROR: BeginRead failed %v", err)		}		ch <- err		return	}	if packet_type != packReply {		badStreamPanicf("invalid response packet type, expected REPLY, actual: %d", packet_type)	}	var columns []columnStruct	errs := make([]Error, 0, 5)	for {		token := token(sess.buf.byte())		if sess.logFlags&logDebug != 0 {			sess.log.Printf("got token %v", token)		}		switch token {		case tokenSSPI:			ch <- parseSSPIMsg(sess.buf)			return		case tokenReturnStatus:			returnStatus := parseReturnStatus(sess.buf)			ch <- returnStatus		case tokenLoginAck:			loginAck := parseLoginAck(sess.buf)			ch <- loginAck		case tokenOrder:			order := parseOrder(sess.buf)			ch <- order		case tokenDoneInProc:			done := parseDoneInProc(sess.buf)			if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 {				sess.log.Printf("(%d row(s) affected)\n", done.RowCount)			}			ch <- done		case tokenDone, tokenDoneProc:			done := parseDone(sess.buf)			done.errors = errs			if sess.logFlags&logDebug != 0 {				sess.log.Printf("got DONE or DONEPROC status=%d", done.Status)			}			if done.Status&doneSrvError != 0 {				ch <- errors.New("SQL Server had internal error")				return			}			if sess.logFlags&logRows != 0 && done.Status&doneCount != 0 {				sess.log.Printf("(%d row(s) affected)\n", done.RowCount)			}			ch <- done			if done.Status&doneMore == 0 {				return			}		case tokenColMetadata:			columns = parseColMetadata72(sess.buf)			ch <- columns		case tokenRow:			row := make([]interface{}, len(columns))			parseRow(sess.buf, columns, row)			ch <- row		case tokenNbcRow:			row := make([]interface{}, len(columns))			parseNbcRow(sess.buf, columns, row)			ch <- row		case tokenEnvChange:			processEnvChg(sess)		case tokenError:			err := parseError72(sess.buf)			if sess.logFlags&logDebug != 0 {				sess.log.Printf("got ERROR %d %s", err.Number, err.Message)			}			errs = append(errs, err)			if sess.logFlags&logErrors != 0 {				sess.log.Println(err.Message)			}		case tokenInfo:			info := parseInfo(sess.buf)			if sess.logFlags&logDebug != 0 {				sess.log.Printf("got INFO %d %s", info.Number, info.Message)			}			if sess.logFlags&logMessages != 0 {				sess.log.Println(info.Message)			}		default:			badStreamPanicf("Unknown token type: %d", token)		}	}}type parseRespIter byteconst (	parseRespIterContinue parseRespIter = iota // Continue parsing current token.	parseRespIterNext                          // Fetch the next token.	parseRespIterDone                          // Done with parsing the response.)type parseRespState byteconst (	parseRespStateNormal  parseRespState = iota // Normal response state.	parseRespStateCancel                        // Query is canceled, wait for server to confirm.	parseRespStateClosing                       // Waiting for tokens to come through.)type parseResp struct {	sess        *tdsSession	ctxDone     <-chan struct{}	state       parseRespState	cancelError error}func (ts *parseResp) sendAttention(ch chan tokenStruct) parseRespIter {	if err := sendAttention(ts.sess.buf); err != nil {		ts.dlogf("failed to send attention signal %v", err)		ch <- err		return parseRespIterDone	}	ts.state = parseRespStateCancel	return parseRespIterContinue}func (ts *parseResp) dlog(msg string) {	if ts.sess.logFlags&logDebug != 0 {		ts.sess.log.Println(msg)	}}func (ts *parseResp) dlogf(f string, v ...interface{}) {	if ts.sess.logFlags&logDebug != 0 {		ts.sess.log.Printf(f, v...)	}}func (ts *parseResp) iter(ctx context.Context, ch chan tokenStruct, tokChan chan tokenStruct) parseRespIter {	switch ts.state {	default:		panic("unknown state")	case parseRespStateNormal:		select {		case tok, ok := <-tokChan:			if !ok {				ts.dlog("response finished")				return parseRespIterDone			}			if err, ok := tok.(net.Error); ok && err.Timeout() {				ts.cancelError = err				ts.dlog("got timeout error, sending attention signal to server")				return ts.sendAttention(ch)			}			// Pass the token along.			ch <- tok			return parseRespIterContinue		case <-ts.ctxDone:			ts.ctxDone = nil			ts.dlog("got cancel message, sending attention signal to server")			return ts.sendAttention(ch)		}	case parseRespStateCancel: // Read all responses until a DONE or error is received.Auth		select {		case tok, ok := <-tokChan:			if !ok {				ts.dlog("response finished but waiting for attention ack")				return parseRespIterNext			}			switch tok := tok.(type) {			default:				// Ignore all other tokens while waiting.				// The TDS spec says other tokens may arrive after an attention				// signal is sent. Ignore these tokens and continue looking for				// a DONE with attention confirm mark.			case doneStruct:				if tok.Status&doneAttn != 0 {					ts.dlog("got cancellation confirmation from server")					if ts.cancelError != nil {						ch <- ts.cancelError						ts.cancelError = nil					} else {						ch <- ctx.Err()					}					return parseRespIterDone				}			// If an error happens during cancel, pass it along and just stop.			// We are uncertain to receive more tokens.			case error:				ch <- tok				ts.state = parseRespStateClosing			}			return parseRespIterContinue		case <-ts.ctxDone:			ts.ctxDone = nil			ts.state = parseRespStateClosing			return parseRespIterContinue		}	case parseRespStateClosing: // Wait for current token chan to close.		if _, ok := <-tokChan; !ok {			ts.dlog("response finished")			return parseRespIterDone		}		return parseRespIterContinue	}}func processResponse(ctx context.Context, sess *tdsSession, ch chan tokenStruct) {	ts := &parseResp{		sess:    sess,		ctxDone: ctx.Done(),	}	defer func() {		// Ensure any remaining error is piped through		// or the query may look like it executed when it actually failed.		if ts.cancelError != nil {			ch <- ts.cancelError			ts.cancelError = nil		}		close(ch)	}()	// Loop over multiple responses.	for {		ts.dlog("initiating resonse reading")		tokChan := make(chan tokenStruct)		go processSingleResponse(sess, tokChan)		// Loop over multiple tokens in response.	tokensLoop:		for {			switch ts.iter(ctx, ch, tokChan) {			case parseRespIterContinue:				// Nothing, continue to next token.			case parseRespIterNext:				break tokensLoop			case parseRespIterDone:				return			}		}	}}
 |