notify.go (25722B)
1 package pq 2 3 import ( 4 "context" 5 "database/sql/driver" 6 "errors" 7 "fmt" 8 "net" 9 "sync" 10 "sync/atomic" 11 "time" 12 13 "github.com/lib/pq/internal/proto" 14 ) 15 16 // Notification represents a single notification from the database. 17 type Notification struct { 18 BePid int // Process ID (PID) of the notifying postgres backend. 19 Channel string // Name of the channel the notification was sent on. 20 Extra string // Payload, or the empty string if unspecified. 21 } 22 23 func recvNotification(r *readBuf) *Notification { 24 bePid := r.int32() 25 channel := r.string() 26 extra := r.string() 27 return &Notification{bePid, channel, extra} 28 } 29 30 // SetNotificationHandler sets the given notification handler on the given 31 // connection. A runtime panic occurs if c is not a pq connection. A nil handler 32 // may be used to unset it. 33 // 34 // Note: Notification handlers are executed synchronously by pq meaning commands 35 // won't continue to be processed until the handler returns. 36 func SetNotificationHandler(c driver.Conn, handler func(*Notification)) { 37 c.(*conn).notificationHandler = handler 38 } 39 40 // NotificationHandlerConnector wraps a regular connector and sets a 41 // notification handler on it. 42 type NotificationHandlerConnector struct { 43 driver.Connector 44 notificationHandler func(*Notification) 45 } 46 47 // Connect calls the underlying connector's connect method and then sets the 48 // notification handler. 49 func (n *NotificationHandlerConnector) Connect(ctx context.Context) (driver.Conn, error) { 50 c, err := n.Connector.Connect(ctx) 51 if err == nil { 52 SetNotificationHandler(c, n.notificationHandler) 53 } 54 return c, err 55 } 56 57 // ConnectorNotificationHandler returns the currently set notification handler, 58 // if any. If the given connector is not a result of 59 // [ConnectorWithNotificationHandler], nil is returned. 60 func ConnectorNotificationHandler(c driver.Connector) func(*Notification) { 61 if c, ok := c.(*NotificationHandlerConnector); ok { 62 return c.notificationHandler 63 } 64 return nil 65 } 66 67 // ConnectorWithNotificationHandler creates or sets the given handler for the 68 // given connector. If the given connector is a result of calling this function 69 // previously, it is simply set on the given connector and returned. Otherwise, 70 // this returns a new connector wrapping the given one and setting the 71 // notification handler. A nil notification handler may be used to unset it. 72 // 73 // The returned connector is intended to be used with database/sql.OpenDB. 74 // 75 // Note: Notification handlers are executed synchronously by pq meaning commands 76 // won't continue to be processed until the handler returns. 77 func ConnectorWithNotificationHandler(c driver.Connector, handler func(*Notification)) *NotificationHandlerConnector { 78 if c, ok := c.(*NotificationHandlerConnector); ok { 79 c.notificationHandler = handler 80 return c 81 } 82 return &NotificationHandlerConnector{Connector: c, notificationHandler: handler} 83 } 84 85 const ( 86 connStateIdle int32 = iota 87 connStateExpectResponse 88 connStateExpectReadyForQuery 89 ) 90 91 type message struct { 92 typ proto.ResponseCode 93 err error 94 } 95 96 var errListenerConnClosed = errors.New("pq: ListenerConn has been closed") 97 98 // ListenerConn is a low-level interface for waiting for notifications. You 99 // should use [Listener] instead. 100 type ListenerConn struct { 101 connectionLock sync.Mutex // guards cn and err 102 senderLock sync.Mutex // the sending goroutine will be holding this lock 103 cn *conn 104 err error 105 connState int32 106 notificationChan chan<- *Notification 107 replyChan chan message 108 } 109 110 // NewListenerConn creates a new ListenerConn. Use NewListener instead. 111 func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) { 112 return newDialListenerConn(defaultDialer{}, name, notificationChan) 113 } 114 115 func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) { 116 cn, err := DialOpen(d, name) 117 if err != nil { 118 return nil, err 119 } 120 121 l := &ListenerConn{ 122 cn: cn.(*conn), 123 notificationChan: c, 124 connState: connStateIdle, 125 replyChan: make(chan message, 2), 126 } 127 128 go l.listenerConnMain() 129 return l, nil 130 } 131 132 // We can only allow one goroutine at a time to be running a query on the 133 // connection for various reasons, so the goroutine sending on the connection 134 // must be holding senderLock. 135 // 136 // Returns an error if an unrecoverable error has occurred and the ListenerConn 137 // should be abandoned. 138 func (l *ListenerConn) acquireSenderLock() error { 139 // we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery 140 l.senderLock.Lock() 141 142 l.connectionLock.Lock() 143 err := l.err 144 l.connectionLock.Unlock() 145 if err != nil { 146 l.senderLock.Unlock() 147 return err 148 } 149 return nil 150 } 151 152 func (l *ListenerConn) releaseSenderLock() { 153 l.senderLock.Unlock() 154 } 155 156 // setState advances the protocol state to newState. Returns false if moving 157 // to that state from the current state is not allowed. 158 func (l *ListenerConn) setState(newState int32) bool { 159 var expectedState int32 160 161 switch newState { 162 case connStateIdle: 163 expectedState = connStateExpectReadyForQuery 164 case connStateExpectResponse: 165 expectedState = connStateIdle 166 case connStateExpectReadyForQuery: 167 expectedState = connStateExpectResponse 168 default: 169 panic(fmt.Sprintf("unexpected listenerConnState %d", newState)) 170 } 171 172 return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState) 173 } 174 175 // Main logic is here: receive messages from the postgres backend, forward 176 // notifications and query replies and keep the internal state in sync with the 177 // protocol state. Returns when the connection has been lost, is about to go 178 // away or should be discarded because we couldn't agree on the state with the 179 // server backend. 180 func (l *ListenerConn) listenerConnLoop() (err error) { 181 r := &readBuf{} 182 for { 183 t, err := l.cn.recvMessage(r) 184 if err != nil { 185 return err 186 } 187 188 switch t { 189 case proto.NotificationResponse: 190 // recvNotification copies all the data so we don't need to worry 191 // about the scratch buffer being overwritten. 192 l.notificationChan <- recvNotification(r) 193 194 case proto.RowDescription, proto.DataRow: 195 // only used by tests; ignore 196 197 case proto.ErrorResponse: 198 // We might receive an ErrorResponse even when not in a query; it 199 // is expected that the server will close the connection after 200 // that, but we should make sure that the error we display is the 201 // one from the stray ErrorResponse, not io.ErrUnexpectedEOF. 202 if !l.setState(connStateExpectReadyForQuery) { 203 return parseError(r, "") 204 } 205 l.replyChan <- message{t, parseError(r, "")} 206 207 case proto.CommandComplete, proto.EmptyQueryResponse: 208 if !l.setState(connStateExpectReadyForQuery) { 209 // protocol out of sync 210 return fmt.Errorf("unexpected CommandComplete") 211 } 212 // ExecSimpleQuery doesn't need to know about this message 213 214 case proto.ReadyForQuery: 215 if !l.setState(connStateIdle) { 216 // protocol out of sync 217 return fmt.Errorf("unexpected ReadyForQuery") 218 } 219 l.replyChan <- message{t, nil} 220 221 case proto.ParameterStatus: 222 // ignore 223 case proto.NoticeResponse: 224 if n := l.cn.noticeHandler; n != nil { 225 n(parseError(r, "")) 226 } 227 default: 228 return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t) 229 } 230 } 231 } 232 233 // This is the main routine for the goroutine receiving on the database 234 // connection. Most of the main logic is in listenerConnLoop. 235 func (l *ListenerConn) listenerConnMain() { 236 err := l.listenerConnLoop() 237 238 // listenerConnLoop terminated; we're done, but we still have to clean up. 239 // Make sure nobody tries to start any new queries by making sure the err 240 // pointer is set. It is important that we do not overwrite its value; a 241 // connection could be closed by either this goroutine or one sending on the 242 // connection – whoever closes the connection is assumed to have the more 243 // meaningful error message (as the other one will probably get 244 // net.errClosed), so that goroutine sets the error we expose while the 245 // other error is discarded. If the connection is lost while two goroutines 246 // are operating on the socket, it probably doesn't matter which error we 247 // expose so we don't try to do anything more complex. 248 l.connectionLock.Lock() 249 if l.err == nil { 250 l.err = err 251 } 252 _ = l.cn.Close() 253 l.connectionLock.Unlock() 254 255 // There might be a query in-flight; make sure nobody's waiting for a 256 // response to it, since there's not going to be one. 257 close(l.replyChan) 258 259 // let the listener know we're done 260 close(l.notificationChan) 261 262 // this ListenerConn is done 263 } 264 265 // Listen sends a LISTEN query to the server. See ExecSimpleQuery. 266 func (l *ListenerConn) Listen(channel string) (bool, error) { 267 return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel)) 268 } 269 270 // Unlisten sends an UNLISTEN query to the server. See ExecSimpleQuery. 271 func (l *ListenerConn) Unlisten(channel string) (bool, error) { 272 return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel)) 273 } 274 275 // UnlistenAll sends an `UNLISTEN *` query to the server. See ExecSimpleQuery. 276 func (l *ListenerConn) UnlistenAll() (bool, error) { 277 return l.ExecSimpleQuery("UNLISTEN *") 278 } 279 280 // Ping the remote server to make sure it's alive. Non-nil error means the 281 // connection has failed and should be abandoned. 282 func (l *ListenerConn) Ping() error { 283 sent, err := l.ExecSimpleQuery("") 284 if !sent { 285 return err 286 } 287 if err != nil { // shouldn't happen 288 panic(err) 289 } 290 return nil 291 } 292 293 // Attempt to send a query on the connection. Returns an error if sending the 294 // query failed, and the caller should initiate closure of this connection. The 295 // caller must be holding senderLock (see acquireSenderLock and 296 // releaseSenderLock). 297 func (l *ListenerConn) sendSimpleQuery(q string) (err error) { 298 // Must set connection state before sending the query 299 if !l.setState(connStateExpectResponse) { 300 return errors.New("pq: two queries running at the same time") 301 } 302 303 // Can't use l.cn.writeBuf here because it uses the scratch buffer which 304 // might get overwritten by listenerConnLoop. 305 b := &writeBuf{ 306 buf: []byte("Q\x00\x00\x00\x00"), 307 pos: 1, 308 } 309 b.string(q) 310 return l.cn.send(b) 311 } 312 313 // ExecSimpleQuery executes a "simple query" (i.e. one with no bindable 314 // parameters) on the connection. The possible return values are: 315 // 1. "executed" is true; the query was executed to completion on the database 316 // server. If the query failed, err will be set to the error returned by the 317 // database, otherwise err will be nil. 318 // 2. If "executed" is false, the query could not be executed on the remote 319 // server. err will be non-nil. 320 // 321 // After a call to ExecSimpleQuery has returned an executed=false value, the 322 // connection has either been closed or will be closed shortly thereafter, and 323 // all subsequently executed queries will return an error. 324 func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) { 325 if err = l.acquireSenderLock(); err != nil { 326 return false, err 327 } 328 defer l.releaseSenderLock() 329 330 err = l.sendSimpleQuery(q) 331 if err != nil { 332 // We can't know what state the protocol is in, so we need to abandon 333 // this connection. 334 l.connectionLock.Lock() 335 // Set the error pointer if it hasn't been set already; see 336 // listenerConnMain. 337 if l.err == nil { 338 l.err = err 339 } 340 l.connectionLock.Unlock() 341 _ = l.cn.c.Close() 342 return false, err 343 } 344 345 // now we just wait for a reply.. 346 for { 347 m, ok := <-l.replyChan 348 if !ok { 349 // We lost the connection to server, don't bother waiting for a 350 // a response. err should have been set already. 351 l.connectionLock.Lock() 352 err := l.err 353 l.connectionLock.Unlock() 354 return false, err 355 } 356 switch m.typ { 357 case proto.ReadyForQuery: 358 // sanity check 359 if m.err != nil { 360 panic("m.err != nil") 361 } 362 // done; err might or might not be set 363 return true, err 364 365 case proto.ErrorResponse: 366 // sanity check 367 if m.err == nil { 368 panic("m.err == nil") 369 } 370 // server responded with an error; ReadyForQuery to follow 371 err = m.err 372 373 default: 374 return false, fmt.Errorf("unknown response for simple query: %q", m.typ) 375 } 376 } 377 } 378 379 // Close closes the connection. 380 func (l *ListenerConn) Close() error { 381 l.connectionLock.Lock() 382 if l.err != nil { 383 l.connectionLock.Unlock() 384 return errListenerConnClosed 385 } 386 l.err = errListenerConnClosed 387 l.connectionLock.Unlock() 388 // We can't send anything on the connection without holding senderLock. 389 // Simply close the net.Conn to wake up everyone operating on it. 390 return l.cn.c.Close() 391 } 392 393 // Err returns the reason the connection was closed. It is not safe to call 394 // this function until l.Notify has been closed. 395 func (l *ListenerConn) Err() error { 396 return l.err 397 } 398 399 // ErrChannelAlreadyOpen is returned from Listen when a channel is already 400 // open. 401 var ErrChannelAlreadyOpen = errors.New("pq: channel is already open") 402 403 // ErrChannelNotOpen is returned from Unlisten when a channel is not open. 404 var ErrChannelNotOpen = errors.New("pq: channel is not open") 405 406 // ListenerEventType is an enumeration of listener event types. 407 type ListenerEventType int 408 409 const ( 410 // ListenerEventConnected is emitted only when the database connection has 411 // been initially initialized. The err argument of the callback will always 412 // be nil. 413 ListenerEventConnected ListenerEventType = iota 414 415 // ListenerEventDisconnected is emitted after a database connection has been 416 // lost, either because of an error or because Close has been called. The 417 // err argument will be set to the reason the database connection was lost. 418 ListenerEventDisconnected 419 420 // ListenerEventReconnected is emitted after a database connection has been 421 // re-established after connection loss. The err argument of the callback 422 // will always be nil. After this event has been emitted, a nil 423 // pq.Notification is sent on the Listener.Notify channel. 424 ListenerEventReconnected 425 426 // ListenerEventConnectionAttemptFailed is emitted after a connection to the 427 // database was attempted, but failed. The err argument will be set to an 428 // error describing why the connection attempt did not succeed. 429 ListenerEventConnectionAttemptFailed 430 ) 431 432 // EventCallbackType is the event callback type. See also ListenerEventType 433 // constants' documentation. 434 type EventCallbackType func(event ListenerEventType, err error) 435 436 func (l ListenerEventType) String() string { 437 return map[ListenerEventType]string{ 438 ListenerEventConnected: "connected", 439 ListenerEventDisconnected: "disconnected", 440 ListenerEventReconnected: "reconnected", 441 ListenerEventConnectionAttemptFailed: "connectionAttemptFailed", 442 }[l] 443 } 444 445 // Listener provides an interface for listening to notifications from a 446 // PostgreSQL database. For general usage information, see section 447 // "Notifications". 448 // 449 // Listener can safely be used from concurrently running goroutines. 450 type Listener struct { 451 // Channel for receiving notifications from the database. In some cases a 452 // nil value will be sent. See section "Notifications" above. 453 Notify chan *Notification 454 455 dsn string 456 minReconnectInterval time.Duration 457 maxReconnectInterval time.Duration 458 dialer Dialer 459 eventCallback EventCallbackType 460 461 lock sync.Mutex 462 isClosed bool 463 reconnectCond *sync.Cond 464 cn *ListenerConn 465 connNotificationChan <-chan *Notification 466 channels map[string]struct{} 467 } 468 469 // NewListener creates a new database connection dedicated to LISTEN / NOTIFY. 470 // 471 // name should be set to a connection string to be used to establish the 472 // database connection (see section "Connection String Parameters" above). 473 // 474 // minReconnect controls the duration to wait before trying to re-establish the 475 // database connection after connection loss. After each consecutive failure 476 // this interval is doubled, until maxReconnect is reached. Successfully 477 // completing the connection establishment procedure resets the interval back to 478 // minReconnect. 479 // 480 // The last parameter cb can be set to a function which will be called by the 481 // Listener when the state of the underlying database connection changes. This 482 // callback will be called by the goroutine which dispatches the notifications 483 // over the Notify channel, so you should try to avoid doing potentially 484 // time-consuming operations from the callback. 485 func NewListener(dsn string, minReconnect, maxReconnect time.Duration, cb EventCallbackType) *Listener { 486 return NewDialListener(defaultDialer{}, dsn, minReconnect, maxReconnect, cb) 487 } 488 489 // NewDialListener is like NewListener but it takes a Dialer. 490 func NewDialListener(d Dialer, dsn string, minReconnect, maxReconnect time.Duration, cb EventCallbackType) *Listener { 491 l := &Listener{ 492 dsn: dsn, 493 minReconnectInterval: minReconnect, 494 maxReconnectInterval: maxReconnect, 495 dialer: d, 496 eventCallback: cb, 497 channels: make(map[string]struct{}), 498 Notify: make(chan *Notification, 32), 499 } 500 l.reconnectCond = sync.NewCond(&l.lock) 501 go l.listenerMain() 502 return l 503 } 504 505 // NotificationChannel returns the notification channel for this listener. This 506 // is the same channel as Notify, and will not be recreated during the life time 507 // of the Listener. 508 func (l *Listener) NotificationChannel() <-chan *Notification { 509 return l.Notify 510 } 511 512 // Listen starts listening for notifications on a channel. Calls to this 513 // function will block until an acknowledgement has been received from the 514 // server. Note that Listener automatically re-establishes the connection after 515 // connection loss, so this function may block indefinitely if the connection 516 // can not be re-established. 517 // 518 // Listen will only fail in three conditions: 519 // 1. The channel is already open. The returned error will be 520 // [ErrChannelAlreadyOpen]. 521 // 2. The query was executed on the remote server, but PostgreSQL returned an 522 // error message in response to the query. The returned error will be a 523 // [pq.Error] containing the information the server supplied. 524 // 3. Close is called on the Listener before the request could be completed. 525 // 526 // The channel name is case-sensitive. 527 func (l *Listener) Listen(channel string) error { 528 l.lock.Lock() 529 defer l.lock.Unlock() 530 if l.isClosed { 531 return net.ErrClosed 532 } 533 534 // The server allows you to issue a LISTEN on a channel which is already 535 // open, but it seems useful to be able to detect this case to spot for 536 // mistakes in application logic. If the application genuinely does't care, 537 // it can check the exported error and ignore it. 538 _, exists := l.channels[channel] 539 if exists { 540 return ErrChannelAlreadyOpen 541 } 542 543 if l.cn != nil { 544 // If resp is true but error is set then the query was executed on the 545 // remote server but resulted in an error. This should be relatively 546 // rare, so it's fine if we just pass the error to our caller. 547 // If resp is false then we could not complete the query on the remote 548 // server and our underlying connection is about to go away, so we only 549 // add relname to l.channels, and wait for resync() to take care of the 550 // rest. 551 resp, err := l.cn.Listen(channel) 552 if resp && err != nil { 553 return err 554 } 555 } 556 557 l.channels[channel] = struct{}{} 558 for l.cn == nil { 559 l.reconnectCond.Wait() 560 // we let go of the mutex for a while 561 if l.isClosed { 562 return net.ErrClosed 563 } 564 } 565 566 return nil 567 } 568 569 // Unlisten removes a channel from the Listener's channel list. Returns 570 // ErrChannelNotOpen if the Listener is not listening on the specified channel. 571 // Returns immediately with no error if there is no connection. Note that you 572 // might still get notifications for this channel even after Unlisten has 573 // returned. 574 // 575 // The channel name is case-sensitive. 576 func (l *Listener) Unlisten(channel string) error { 577 l.lock.Lock() 578 defer l.lock.Unlock() 579 580 if l.isClosed { 581 return net.ErrClosed 582 } 583 584 // Similarly to LISTEN, this is not an error in Postgres, but it seems 585 // useful to distinguish from the normal conditions. 586 _, exists := l.channels[channel] 587 if !exists { 588 return ErrChannelNotOpen 589 } 590 591 if l.cn != nil { 592 // Similarly to Listen (see comment there), the caller should only be 593 // bothered with an error if it came from the backend as a response to 594 // our query. 595 resp, err := l.cn.Unlisten(channel) 596 if resp && err != nil { 597 return err 598 } 599 } 600 601 // Don't bother waiting for resync if there's no connection. 602 delete(l.channels, channel) 603 return nil 604 } 605 606 // UnlistenAll removes all channels from the Listener's channel list. Returns 607 // immediately with no error if there is no connection. Note that you might 608 // still get notifications for any of the deleted channels even after 609 // UnlistenAll has returned. 610 func (l *Listener) UnlistenAll() error { 611 l.lock.Lock() 612 defer l.lock.Unlock() 613 614 if l.isClosed { 615 return net.ErrClosed 616 } 617 618 if l.cn != nil { 619 // Similarly to Listen (see comment in that function), the caller 620 // should only be bothered with an error if it came from the backend as 621 // a response to our query. 622 gotResponse, err := l.cn.UnlistenAll() 623 if gotResponse && err != nil { 624 return err 625 } 626 } 627 628 // Don't bother waiting for resync if there's no connection. 629 l.channels = make(map[string]struct{}) 630 return nil 631 } 632 633 // Ping the remote server to make sure it's alive. Non-nil return value means 634 // that there is no active connection. 635 func (l *Listener) Ping() error { 636 l.lock.Lock() 637 defer l.lock.Unlock() 638 639 if l.isClosed { 640 return net.ErrClosed 641 } 642 if l.cn == nil { 643 return errors.New("no connection") 644 } 645 646 return l.cn.Ping() 647 } 648 649 // Clean up after losing the server connection. Returns l.cn.Err(), which should 650 // have the reason the connection was lost. 651 func (l *Listener) disconnectCleanup() error { 652 l.lock.Lock() 653 defer l.lock.Unlock() 654 655 // sanity check; can't look at Err() until the channel has been closed 656 select { 657 case _, ok := <-l.connNotificationChan: 658 if ok { 659 panic("connNotificationChan not closed") 660 } 661 default: 662 panic("connNotificationChan not closed") 663 } 664 665 err := l.cn.Err() 666 _ = l.cn.Close() 667 l.cn = nil 668 return err 669 } 670 671 // Synchronize the list of channels we want to be listening on with the server 672 // after the connection has been established. 673 func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error { 674 doneChan := make(chan error) 675 go func(notificationChan <-chan *Notification) { 676 for channel := range l.channels { 677 // If we got a response, return that error to our caller as it's 678 // going to be more descriptive than cn.Err(). 679 gotResponse, err := cn.Listen(channel) 680 if gotResponse && err != nil { 681 doneChan <- err 682 return 683 } 684 685 // If we couldn't reach the server, wait for notificationChan to 686 // close and then return the error message from the connection, as 687 // per ListenerConn's interface. 688 if err != nil { 689 for range notificationChan { 690 } 691 doneChan <- cn.Err() 692 return 693 } 694 } 695 doneChan <- nil 696 }(notificationChan) 697 698 // Ignore notifications while synchronization is going on to avoid 699 // deadlocks. We have to send a nil notification over Notify anyway as we 700 // can't possibly know which notifications (if any) were lost while the 701 // connection was down, so there's no reason to try and process these 702 // messages at all. 703 for { 704 select { 705 case _, ok := <-notificationChan: 706 if !ok { 707 notificationChan = nil 708 } 709 710 case err := <-doneChan: 711 return err 712 } 713 } 714 } 715 716 // caller should NOT be holding l.lock 717 func (l *Listener) closed() bool { 718 l.lock.Lock() 719 defer l.lock.Unlock() 720 721 return l.isClosed 722 } 723 724 func (l *Listener) connect() error { 725 l.lock.Lock() 726 defer l.lock.Unlock() 727 if l.isClosed { 728 return net.ErrClosed 729 } 730 731 notificationChan := make(chan *Notification, 32) 732 733 var err error 734 l.cn, err = newDialListenerConn(l.dialer, l.dsn, notificationChan) 735 if err != nil { 736 return err 737 } 738 739 err = l.resync(l.cn, notificationChan) 740 if err != nil { 741 _ = l.cn.Close() 742 return err 743 } 744 745 l.connNotificationChan = notificationChan 746 l.reconnectCond.Broadcast() 747 return nil 748 } 749 750 // Close disconnects the Listener from the database and shuts it down. 751 // Subsequent calls to its methods will return an error. Close returns an error 752 // if the connection has already been closed. 753 func (l *Listener) Close() error { 754 l.lock.Lock() 755 defer l.lock.Unlock() 756 757 if l.isClosed { 758 return net.ErrClosed 759 } 760 761 if l.cn != nil { 762 _ = l.cn.Close() 763 } 764 l.isClosed = true 765 766 // Unblock calls to Listen() 767 l.reconnectCond.Broadcast() 768 769 return nil 770 } 771 772 func (l *Listener) emitEvent(event ListenerEventType, err error) { 773 if l.eventCallback != nil { 774 l.eventCallback(event, err) 775 } 776 } 777 778 // Main logic here: maintain a connection to the server when possible, wait 779 // for notifications and emit events. 780 func (l *Listener) listenerConnLoop() { 781 var ( 782 nextReconnect time.Time 783 reconnectInterval = l.minReconnectInterval 784 ) 785 for { 786 for { 787 err := l.connect() 788 if err == nil { 789 break 790 } 791 if l.closed() { 792 return 793 } 794 795 l.emitEvent(ListenerEventConnectionAttemptFailed, err) 796 time.Sleep(reconnectInterval) 797 reconnectInterval *= 2 798 if reconnectInterval > l.maxReconnectInterval { 799 reconnectInterval = l.maxReconnectInterval 800 } 801 } 802 803 if nextReconnect.IsZero() { 804 l.emitEvent(ListenerEventConnected, nil) 805 } else { 806 l.emitEvent(ListenerEventReconnected, nil) 807 l.Notify <- nil 808 } 809 810 reconnectInterval = l.minReconnectInterval 811 nextReconnect = time.Now().Add(reconnectInterval) 812 813 for { 814 notification, ok := <-l.connNotificationChan 815 if !ok { // lost connection, loop again 816 break 817 } 818 l.Notify <- notification 819 } 820 821 err := l.disconnectCleanup() 822 if l.closed() { 823 return 824 } 825 l.emitEvent(ListenerEventDisconnected, err) 826 827 time.Sleep(time.Until(nextReconnect)) 828 } 829 } 830 831 func (l *Listener) listenerMain() { 832 l.listenerConnLoop() 833 close(l.Notify) 834 }