rows.go (5606B)
1 package pq 2 3 import ( 4 "database/sql/driver" 5 "fmt" 6 "io" 7 "math" 8 "reflect" 9 "time" 10 11 "github.com/lib/pq/internal/proto" 12 "github.com/lib/pq/oid" 13 ) 14 15 type noRows struct{} 16 17 var emptyRows noRows 18 19 var _ driver.Result = noRows{} 20 21 func (noRows) LastInsertId() (int64, error) { return 0, errNoLastInsertID } 22 func (noRows) RowsAffected() (int64, error) { return 0, errNoRowsAffected } 23 24 type ( 25 rowsHeader struct { 26 colNames []string 27 colTyps []fieldDesc 28 colFmts []format 29 } 30 rows struct { 31 cn *conn 32 finish func() 33 rowsHeader 34 done bool 35 rb readBuf 36 result driver.Result 37 tag string 38 39 next *rowsHeader 40 } 41 ) 42 43 func (rs *rows) Close() error { 44 if finish := rs.finish; finish != nil { 45 defer finish() 46 } 47 // no need to look at cn.bad as Next() will 48 for { 49 err := rs.Next(nil) 50 switch err { 51 case nil: 52 case io.EOF: 53 // rs.Next can return io.EOF on both ReadyForQuery and 54 // RowDescription (used with HasNextResultSet). We need to fetch 55 // messages until we hit a ReadyForQuery, which is done by waiting 56 // for done to be set. 57 if rs.done { 58 return nil 59 } 60 default: 61 return err 62 } 63 } 64 } 65 66 func (rs *rows) Columns() []string { 67 return rs.colNames 68 } 69 70 func (rs *rows) Result() driver.Result { 71 if rs.result == nil { 72 return emptyRows 73 } 74 return rs.result 75 } 76 77 func (rs *rows) Tag() string { 78 return rs.tag 79 } 80 81 func (rs *rows) Next(dest []driver.Value) (resErr error) { 82 if rs.done { 83 return io.EOF 84 } 85 if err := rs.cn.err.getForNext(); err != nil { 86 return err 87 } 88 89 for { 90 t, err := rs.cn.recv1Buf(&rs.rb) 91 if err != nil { 92 return rs.cn.handleError(err) 93 } 94 switch t { 95 case proto.ErrorResponse: 96 resErr = parseError(&rs.rb, "") 97 case proto.CommandComplete, proto.EmptyQueryResponse: 98 if t == proto.CommandComplete { 99 rs.result, rs.tag, err = rs.cn.parseComplete(rs.rb.string()) 100 if err != nil { 101 return rs.cn.handleError(err) 102 } 103 } 104 continue 105 case proto.ReadyForQuery: 106 rs.cn.processReadyForQuery(&rs.rb) 107 rs.done = true 108 if resErr != nil { 109 return rs.cn.handleError(resErr) 110 } 111 return io.EOF 112 case proto.DataRow: 113 n := rs.rb.int16() 114 if resErr != nil { 115 rs.cn.err.set(driver.ErrBadConn) 116 return fmt.Errorf("pq: unexpected DataRow after error %s", resErr) 117 } 118 if n < len(dest) { 119 dest = dest[:n] 120 } 121 for i := range dest { 122 l := rs.rb.int32() 123 if l == -1 { 124 dest[i] = nil 125 continue 126 } 127 dest[i], err = decode(&rs.cn.parameterStatus, rs.rb.next(l), rs.colTyps[i].OID, rs.colFmts[i]) 128 if err != nil { 129 return rs.cn.handleError(err) 130 } 131 } 132 return rs.cn.handleError(resErr) 133 case proto.RowDescription: 134 next := parsePortalRowDescribe(&rs.rb) 135 rs.next = &next 136 return io.EOF 137 default: 138 return fmt.Errorf("pq: unexpected message after execute: %q", t) 139 } 140 } 141 } 142 143 func (rs *rows) HasNextResultSet() bool { 144 hasNext := rs.next != nil && !rs.done 145 return hasNext 146 } 147 148 func (rs *rows) NextResultSet() error { 149 if rs.next == nil { 150 return io.EOF 151 } 152 rs.rowsHeader = *rs.next 153 rs.next = nil 154 return nil 155 } 156 157 // ColumnTypeScanType returns the value type that can be used to scan types into. 158 func (rs *rows) ColumnTypeScanType(index int) reflect.Type { 159 return rs.colTyps[index].Type() 160 } 161 162 // ColumnTypeDatabaseTypeName return the database system type name. 163 func (rs *rows) ColumnTypeDatabaseTypeName(index int) string { 164 return rs.colTyps[index].Name() 165 } 166 167 // ColumnTypeLength returns the length of the column type if the column is a 168 // variable length type. If the column is not a variable length type ok 169 // should return false. 170 func (rs *rows) ColumnTypeLength(index int) (length int64, ok bool) { 171 return rs.colTyps[index].Length() 172 } 173 174 // ColumnTypePrecisionScale should return the precision and scale for decimal 175 // types. If not applicable, ok should be false. 176 func (rs *rows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool) { 177 return rs.colTyps[index].PrecisionScale() 178 } 179 180 const headerSize = 4 181 182 type fieldDesc struct { 183 // The object ID of the data type. 184 OID oid.Oid 185 // The data type size (see pg_type.typlen). 186 // Note that negative values denote variable-width types. 187 Len int 188 // The type modifier (see pg_attribute.atttypmod). 189 // The meaning of the modifier is type-specific. 190 Mod int 191 } 192 193 func (fd fieldDesc) Type() reflect.Type { 194 switch fd.OID { 195 case oid.T_int8: 196 return reflect.TypeOf(int64(0)) 197 case oid.T_int4: 198 return reflect.TypeOf(int32(0)) 199 case oid.T_int2: 200 return reflect.TypeOf(int16(0)) 201 case oid.T_float8: 202 return reflect.TypeOf(float64(0)) 203 case oid.T_float4: 204 return reflect.TypeOf(float32(0)) 205 case oid.T_varchar, oid.T_text, oid.T_varbit, oid.T_bit: 206 return reflect.TypeOf("") 207 case oid.T_bool: 208 return reflect.TypeOf(false) 209 case oid.T_date, oid.T_time, oid.T_timetz, oid.T_timestamp, oid.T_timestamptz: 210 return reflect.TypeOf(time.Time{}) 211 case oid.T_bytea: 212 return reflect.TypeOf([]byte(nil)) 213 default: 214 return reflect.TypeOf(new(any)).Elem() 215 } 216 } 217 218 func (fd fieldDesc) Name() string { 219 return oid.TypeName[fd.OID] 220 } 221 222 func (fd fieldDesc) Length() (length int64, ok bool) { 223 switch fd.OID { 224 case oid.T_text, oid.T_bytea: 225 return math.MaxInt64, true 226 case oid.T_varchar, oid.T_bpchar: 227 return int64(fd.Mod - headerSize), true 228 case oid.T_varbit, oid.T_bit: 229 return int64(fd.Mod), true 230 default: 231 return 0, false 232 } 233 } 234 235 func (fd fieldDesc) PrecisionScale() (precision, scale int64, ok bool) { 236 switch fd.OID { 237 case oid.T_numeric, oid.T__numeric: 238 mod := fd.Mod - headerSize 239 precision = int64((mod >> 16) & 0xffff) 240 scale = int64(mod & 0xffff) 241 return precision, scale, true 242 default: 243 return 0, 0, false 244 } 245 }