taldir

Directory service to resolve wallet mailboxes by messenger addresses
Log | Files | Refs | Submodules | README | LICENSE

notify.go (25722B)


      1 package pq
      2 
      3 import (
      4 	"context"
      5 	"database/sql/driver"
      6 	"errors"
      7 	"fmt"
      8 	"net"
      9 	"sync"
     10 	"sync/atomic"
     11 	"time"
     12 
     13 	"github.com/lib/pq/internal/proto"
     14 )
     15 
     16 // Notification represents a single notification from the database.
     17 type Notification struct {
     18 	BePid   int    // Process ID (PID) of the notifying postgres backend.
     19 	Channel string // Name of the channel the notification was sent on.
     20 	Extra   string // Payload, or the empty string if unspecified.
     21 }
     22 
     23 func recvNotification(r *readBuf) *Notification {
     24 	bePid := r.int32()
     25 	channel := r.string()
     26 	extra := r.string()
     27 	return &Notification{bePid, channel, extra}
     28 }
     29 
     30 // SetNotificationHandler sets the given notification handler on the given
     31 // connection. A runtime panic occurs if c is not a pq connection. A nil handler
     32 // may be used to unset it.
     33 //
     34 // Note: Notification handlers are executed synchronously by pq meaning commands
     35 // won't continue to be processed until the handler returns.
     36 func SetNotificationHandler(c driver.Conn, handler func(*Notification)) {
     37 	c.(*conn).notificationHandler = handler
     38 }
     39 
     40 // NotificationHandlerConnector wraps a regular connector and sets a
     41 // notification handler on it.
     42 type NotificationHandlerConnector struct {
     43 	driver.Connector
     44 	notificationHandler func(*Notification)
     45 }
     46 
     47 // Connect calls the underlying connector's connect method and then sets the
     48 // notification handler.
     49 func (n *NotificationHandlerConnector) Connect(ctx context.Context) (driver.Conn, error) {
     50 	c, err := n.Connector.Connect(ctx)
     51 	if err == nil {
     52 		SetNotificationHandler(c, n.notificationHandler)
     53 	}
     54 	return c, err
     55 }
     56 
     57 // ConnectorNotificationHandler returns the currently set notification handler,
     58 // if any. If the given connector is not a result of
     59 // [ConnectorWithNotificationHandler], nil is returned.
     60 func ConnectorNotificationHandler(c driver.Connector) func(*Notification) {
     61 	if c, ok := c.(*NotificationHandlerConnector); ok {
     62 		return c.notificationHandler
     63 	}
     64 	return nil
     65 }
     66 
     67 // ConnectorWithNotificationHandler creates or sets the given handler for the
     68 // given connector. If the given connector is a result of calling this function
     69 // previously, it is simply set on the given connector and returned. Otherwise,
     70 // this returns a new connector wrapping the given one and setting the
     71 // notification handler. A nil notification handler may be used to unset it.
     72 //
     73 // The returned connector is intended to be used with database/sql.OpenDB.
     74 //
     75 // Note: Notification handlers are executed synchronously by pq meaning commands
     76 // won't continue to be processed until the handler returns.
     77 func ConnectorWithNotificationHandler(c driver.Connector, handler func(*Notification)) *NotificationHandlerConnector {
     78 	if c, ok := c.(*NotificationHandlerConnector); ok {
     79 		c.notificationHandler = handler
     80 		return c
     81 	}
     82 	return &NotificationHandlerConnector{Connector: c, notificationHandler: handler}
     83 }
     84 
     85 const (
     86 	connStateIdle int32 = iota
     87 	connStateExpectResponse
     88 	connStateExpectReadyForQuery
     89 )
     90 
     91 type message struct {
     92 	typ proto.ResponseCode
     93 	err error
     94 }
     95 
     96 var errListenerConnClosed = errors.New("pq: ListenerConn has been closed")
     97 
     98 // ListenerConn is a low-level interface for waiting for notifications. You
     99 // should use [Listener] instead.
    100 type ListenerConn struct {
    101 	connectionLock   sync.Mutex // guards cn and err
    102 	senderLock       sync.Mutex // the sending goroutine will be holding this lock
    103 	cn               *conn
    104 	err              error
    105 	connState        int32
    106 	notificationChan chan<- *Notification
    107 	replyChan        chan message
    108 }
    109 
    110 // NewListenerConn creates a new ListenerConn. Use NewListener instead.
    111 func NewListenerConn(name string, notificationChan chan<- *Notification) (*ListenerConn, error) {
    112 	return newDialListenerConn(defaultDialer{}, name, notificationChan)
    113 }
    114 
    115 func newDialListenerConn(d Dialer, name string, c chan<- *Notification) (*ListenerConn, error) {
    116 	cn, err := DialOpen(d, name)
    117 	if err != nil {
    118 		return nil, err
    119 	}
    120 
    121 	l := &ListenerConn{
    122 		cn:               cn.(*conn),
    123 		notificationChan: c,
    124 		connState:        connStateIdle,
    125 		replyChan:        make(chan message, 2),
    126 	}
    127 
    128 	go l.listenerConnMain()
    129 	return l, nil
    130 }
    131 
    132 // We can only allow one goroutine at a time to be running a query on the
    133 // connection for various reasons, so the goroutine sending on the connection
    134 // must be holding senderLock.
    135 //
    136 // Returns an error if an unrecoverable error has occurred and the ListenerConn
    137 // should be abandoned.
    138 func (l *ListenerConn) acquireSenderLock() error {
    139 	// we must acquire senderLock first to avoid deadlocks; see ExecSimpleQuery
    140 	l.senderLock.Lock()
    141 
    142 	l.connectionLock.Lock()
    143 	err := l.err
    144 	l.connectionLock.Unlock()
    145 	if err != nil {
    146 		l.senderLock.Unlock()
    147 		return err
    148 	}
    149 	return nil
    150 }
    151 
    152 func (l *ListenerConn) releaseSenderLock() {
    153 	l.senderLock.Unlock()
    154 }
    155 
    156 // setState advances the protocol state to newState. Returns false if moving
    157 // to that state from the current state is not allowed.
    158 func (l *ListenerConn) setState(newState int32) bool {
    159 	var expectedState int32
    160 
    161 	switch newState {
    162 	case connStateIdle:
    163 		expectedState = connStateExpectReadyForQuery
    164 	case connStateExpectResponse:
    165 		expectedState = connStateIdle
    166 	case connStateExpectReadyForQuery:
    167 		expectedState = connStateExpectResponse
    168 	default:
    169 		panic(fmt.Sprintf("unexpected listenerConnState %d", newState))
    170 	}
    171 
    172 	return atomic.CompareAndSwapInt32(&l.connState, expectedState, newState)
    173 }
    174 
    175 // Main logic is here: receive messages from the postgres backend, forward
    176 // notifications and query replies and keep the internal state in sync with the
    177 // protocol state. Returns when the connection has been lost, is about to go
    178 // away or should be discarded because we couldn't agree on the state with the
    179 // server backend.
    180 func (l *ListenerConn) listenerConnLoop() (err error) {
    181 	r := &readBuf{}
    182 	for {
    183 		t, err := l.cn.recvMessage(r)
    184 		if err != nil {
    185 			return err
    186 		}
    187 
    188 		switch t {
    189 		case proto.NotificationResponse:
    190 			// recvNotification copies all the data so we don't need to worry
    191 			// about the scratch buffer being overwritten.
    192 			l.notificationChan <- recvNotification(r)
    193 
    194 		case proto.RowDescription, proto.DataRow:
    195 			// only used by tests; ignore
    196 
    197 		case proto.ErrorResponse:
    198 			// We might receive an ErrorResponse even when not in a query; it
    199 			// is expected that the server will close the connection after
    200 			// that, but we should make sure that the error we display is the
    201 			// one from the stray ErrorResponse, not io.ErrUnexpectedEOF.
    202 			if !l.setState(connStateExpectReadyForQuery) {
    203 				return parseError(r, "")
    204 			}
    205 			l.replyChan <- message{t, parseError(r, "")}
    206 
    207 		case proto.CommandComplete, proto.EmptyQueryResponse:
    208 			if !l.setState(connStateExpectReadyForQuery) {
    209 				// protocol out of sync
    210 				return fmt.Errorf("unexpected CommandComplete")
    211 			}
    212 			// ExecSimpleQuery doesn't need to know about this message
    213 
    214 		case proto.ReadyForQuery:
    215 			if !l.setState(connStateIdle) {
    216 				// protocol out of sync
    217 				return fmt.Errorf("unexpected ReadyForQuery")
    218 			}
    219 			l.replyChan <- message{t, nil}
    220 
    221 		case proto.ParameterStatus:
    222 			// ignore
    223 		case proto.NoticeResponse:
    224 			if n := l.cn.noticeHandler; n != nil {
    225 				n(parseError(r, ""))
    226 			}
    227 		default:
    228 			return fmt.Errorf("unexpected message %q from server in listenerConnLoop", t)
    229 		}
    230 	}
    231 }
    232 
    233 // This is the main routine for the goroutine receiving on the database
    234 // connection. Most of the main logic is in listenerConnLoop.
    235 func (l *ListenerConn) listenerConnMain() {
    236 	err := l.listenerConnLoop()
    237 
    238 	// listenerConnLoop terminated; we're done, but we still have to clean up.
    239 	// Make sure nobody tries to start any new queries by making sure the err
    240 	// pointer is set. It is important that we do not overwrite its value; a
    241 	// connection could be closed by either this goroutine or one sending on the
    242 	// connection – whoever closes the connection is assumed to have the more
    243 	// meaningful error message (as the other one will probably get
    244 	// net.errClosed), so that goroutine sets the error we expose while the
    245 	// other error is discarded. If the connection is lost while two goroutines
    246 	// are operating on the socket, it probably doesn't matter which error we
    247 	// expose so we don't try to do anything more complex.
    248 	l.connectionLock.Lock()
    249 	if l.err == nil {
    250 		l.err = err
    251 	}
    252 	_ = l.cn.Close()
    253 	l.connectionLock.Unlock()
    254 
    255 	// There might be a query in-flight; make sure nobody's waiting for a
    256 	// response to it, since there's not going to be one.
    257 	close(l.replyChan)
    258 
    259 	// let the listener know we're done
    260 	close(l.notificationChan)
    261 
    262 	// this ListenerConn is done
    263 }
    264 
    265 // Listen sends a LISTEN query to the server. See ExecSimpleQuery.
    266 func (l *ListenerConn) Listen(channel string) (bool, error) {
    267 	return l.ExecSimpleQuery("LISTEN " + QuoteIdentifier(channel))
    268 }
    269 
    270 // Unlisten sends an UNLISTEN query to the server. See ExecSimpleQuery.
    271 func (l *ListenerConn) Unlisten(channel string) (bool, error) {
    272 	return l.ExecSimpleQuery("UNLISTEN " + QuoteIdentifier(channel))
    273 }
    274 
    275 // UnlistenAll sends an `UNLISTEN *` query to the server. See ExecSimpleQuery.
    276 func (l *ListenerConn) UnlistenAll() (bool, error) {
    277 	return l.ExecSimpleQuery("UNLISTEN *")
    278 }
    279 
    280 // Ping the remote server to make sure it's alive. Non-nil error means the
    281 // connection has failed and should be abandoned.
    282 func (l *ListenerConn) Ping() error {
    283 	sent, err := l.ExecSimpleQuery("")
    284 	if !sent {
    285 		return err
    286 	}
    287 	if err != nil { // shouldn't happen
    288 		panic(err)
    289 	}
    290 	return nil
    291 }
    292 
    293 // Attempt to send a query on the connection. Returns an error if sending the
    294 // query failed, and the caller should initiate closure of this connection. The
    295 // caller must be holding senderLock (see acquireSenderLock and
    296 // releaseSenderLock).
    297 func (l *ListenerConn) sendSimpleQuery(q string) (err error) {
    298 	// Must set connection state before sending the query
    299 	if !l.setState(connStateExpectResponse) {
    300 		return errors.New("pq: two queries running at the same time")
    301 	}
    302 
    303 	// Can't use l.cn.writeBuf here because it uses the scratch buffer which
    304 	// might get overwritten by listenerConnLoop.
    305 	b := &writeBuf{
    306 		buf: []byte("Q\x00\x00\x00\x00"),
    307 		pos: 1,
    308 	}
    309 	b.string(q)
    310 	return l.cn.send(b)
    311 }
    312 
    313 // ExecSimpleQuery executes a "simple query" (i.e. one with no bindable
    314 // parameters) on the connection. The possible return values are:
    315 //  1. "executed" is true; the query was executed to completion on the database
    316 //     server. If the query failed, err will be set to the error returned by the
    317 //     database, otherwise err will be nil.
    318 //  2. If "executed" is false, the query could not be executed on the remote
    319 //     server. err will be non-nil.
    320 //
    321 // After a call to ExecSimpleQuery has returned an executed=false value, the
    322 // connection has either been closed or will be closed shortly thereafter, and
    323 // all subsequently executed queries will return an error.
    324 func (l *ListenerConn) ExecSimpleQuery(q string) (executed bool, err error) {
    325 	if err = l.acquireSenderLock(); err != nil {
    326 		return false, err
    327 	}
    328 	defer l.releaseSenderLock()
    329 
    330 	err = l.sendSimpleQuery(q)
    331 	if err != nil {
    332 		// We can't know what state the protocol is in, so we need to abandon
    333 		// this connection.
    334 		l.connectionLock.Lock()
    335 		// Set the error pointer if it hasn't been set already; see
    336 		// listenerConnMain.
    337 		if l.err == nil {
    338 			l.err = err
    339 		}
    340 		l.connectionLock.Unlock()
    341 		_ = l.cn.c.Close()
    342 		return false, err
    343 	}
    344 
    345 	// now we just wait for a reply..
    346 	for {
    347 		m, ok := <-l.replyChan
    348 		if !ok {
    349 			// We lost the connection to server, don't bother waiting for a
    350 			// a response. err should have been set already.
    351 			l.connectionLock.Lock()
    352 			err := l.err
    353 			l.connectionLock.Unlock()
    354 			return false, err
    355 		}
    356 		switch m.typ {
    357 		case proto.ReadyForQuery:
    358 			// sanity check
    359 			if m.err != nil {
    360 				panic("m.err != nil")
    361 			}
    362 			// done; err might or might not be set
    363 			return true, err
    364 
    365 		case proto.ErrorResponse:
    366 			// sanity check
    367 			if m.err == nil {
    368 				panic("m.err == nil")
    369 			}
    370 			// server responded with an error; ReadyForQuery to follow
    371 			err = m.err
    372 
    373 		default:
    374 			return false, fmt.Errorf("unknown response for simple query: %q", m.typ)
    375 		}
    376 	}
    377 }
    378 
    379 // Close closes the connection.
    380 func (l *ListenerConn) Close() error {
    381 	l.connectionLock.Lock()
    382 	if l.err != nil {
    383 		l.connectionLock.Unlock()
    384 		return errListenerConnClosed
    385 	}
    386 	l.err = errListenerConnClosed
    387 	l.connectionLock.Unlock()
    388 	// We can't send anything on the connection without holding senderLock.
    389 	// Simply close the net.Conn to wake up everyone operating on it.
    390 	return l.cn.c.Close()
    391 }
    392 
    393 // Err returns the reason the connection was closed. It is not safe to call
    394 // this function until l.Notify has been closed.
    395 func (l *ListenerConn) Err() error {
    396 	return l.err
    397 }
    398 
    399 // ErrChannelAlreadyOpen is returned from Listen when a channel is already
    400 // open.
    401 var ErrChannelAlreadyOpen = errors.New("pq: channel is already open")
    402 
    403 // ErrChannelNotOpen is returned from Unlisten when a channel is not open.
    404 var ErrChannelNotOpen = errors.New("pq: channel is not open")
    405 
    406 // ListenerEventType is an enumeration of listener event types.
    407 type ListenerEventType int
    408 
    409 const (
    410 	// ListenerEventConnected is emitted only when the database connection has
    411 	// been initially initialized. The err argument of the callback will always
    412 	// be nil.
    413 	ListenerEventConnected ListenerEventType = iota
    414 
    415 	// ListenerEventDisconnected is emitted after a database connection has been
    416 	// lost, either because of an error or because Close has been called. The
    417 	// err argument will be set to the reason the database connection was lost.
    418 	ListenerEventDisconnected
    419 
    420 	// ListenerEventReconnected is emitted after a database connection has been
    421 	// re-established after connection loss. The err argument of the callback
    422 	// will always be nil. After this event has been emitted, a nil
    423 	// pq.Notification is sent on the Listener.Notify channel.
    424 	ListenerEventReconnected
    425 
    426 	// ListenerEventConnectionAttemptFailed is emitted after a connection to the
    427 	// database was attempted, but failed. The err argument will be set to an
    428 	// error describing why the connection attempt did not succeed.
    429 	ListenerEventConnectionAttemptFailed
    430 )
    431 
    432 // EventCallbackType is the event callback type. See also ListenerEventType
    433 // constants' documentation.
    434 type EventCallbackType func(event ListenerEventType, err error)
    435 
    436 func (l ListenerEventType) String() string {
    437 	return map[ListenerEventType]string{
    438 		ListenerEventConnected:               "connected",
    439 		ListenerEventDisconnected:            "disconnected",
    440 		ListenerEventReconnected:             "reconnected",
    441 		ListenerEventConnectionAttemptFailed: "connectionAttemptFailed",
    442 	}[l]
    443 }
    444 
    445 // Listener provides an interface for listening to notifications from a
    446 // PostgreSQL database. For general usage information, see section
    447 // "Notifications".
    448 //
    449 // Listener can safely be used from concurrently running goroutines.
    450 type Listener struct {
    451 	// Channel for receiving notifications from the database. In some cases a
    452 	// nil value will be sent. See section "Notifications" above.
    453 	Notify chan *Notification
    454 
    455 	dsn                  string
    456 	minReconnectInterval time.Duration
    457 	maxReconnectInterval time.Duration
    458 	dialer               Dialer
    459 	eventCallback        EventCallbackType
    460 
    461 	lock                 sync.Mutex
    462 	isClosed             bool
    463 	reconnectCond        *sync.Cond
    464 	cn                   *ListenerConn
    465 	connNotificationChan <-chan *Notification
    466 	channels             map[string]struct{}
    467 }
    468 
    469 // NewListener creates a new database connection dedicated to LISTEN / NOTIFY.
    470 //
    471 // name should be set to a connection string to be used to establish the
    472 // database connection (see section "Connection String Parameters" above).
    473 //
    474 // minReconnect controls the duration to wait before trying to re-establish the
    475 // database connection after connection loss. After each consecutive failure
    476 // this interval is doubled, until maxReconnect is reached. Successfully
    477 // completing the connection establishment procedure resets the interval back to
    478 // minReconnect.
    479 //
    480 // The last parameter cb can be set to a function which will be called by the
    481 // Listener when the state of the underlying database connection changes. This
    482 // callback will be called by the goroutine which dispatches the notifications
    483 // over the Notify channel, so you should try to avoid doing potentially
    484 // time-consuming operations from the callback.
    485 func NewListener(dsn string, minReconnect, maxReconnect time.Duration, cb EventCallbackType) *Listener {
    486 	return NewDialListener(defaultDialer{}, dsn, minReconnect, maxReconnect, cb)
    487 }
    488 
    489 // NewDialListener is like NewListener but it takes a Dialer.
    490 func NewDialListener(d Dialer, dsn string, minReconnect, maxReconnect time.Duration, cb EventCallbackType) *Listener {
    491 	l := &Listener{
    492 		dsn:                  dsn,
    493 		minReconnectInterval: minReconnect,
    494 		maxReconnectInterval: maxReconnect,
    495 		dialer:               d,
    496 		eventCallback:        cb,
    497 		channels:             make(map[string]struct{}),
    498 		Notify:               make(chan *Notification, 32),
    499 	}
    500 	l.reconnectCond = sync.NewCond(&l.lock)
    501 	go l.listenerMain()
    502 	return l
    503 }
    504 
    505 // NotificationChannel returns the notification channel for this listener. This
    506 // is the same channel as Notify, and will not be recreated during the life time
    507 // of the Listener.
    508 func (l *Listener) NotificationChannel() <-chan *Notification {
    509 	return l.Notify
    510 }
    511 
    512 // Listen starts listening for notifications on a channel. Calls to this
    513 // function will block until an acknowledgement has been received from the
    514 // server. Note that Listener automatically re-establishes the connection after
    515 // connection loss, so this function may block indefinitely if the connection
    516 // can not be re-established.
    517 //
    518 // Listen will only fail in three conditions:
    519 //  1. The channel is already open. The returned error will be
    520 //     [ErrChannelAlreadyOpen].
    521 //  2. The query was executed on the remote server, but PostgreSQL returned an
    522 //     error message in response to the query. The returned error will be a
    523 //     [pq.Error] containing the information the server supplied.
    524 //  3. Close is called on the Listener before the request could be completed.
    525 //
    526 // The channel name is case-sensitive.
    527 func (l *Listener) Listen(channel string) error {
    528 	l.lock.Lock()
    529 	defer l.lock.Unlock()
    530 	if l.isClosed {
    531 		return net.ErrClosed
    532 	}
    533 
    534 	// The server allows you to issue a LISTEN on a channel which is already
    535 	// open, but it seems useful to be able to detect this case to spot for
    536 	// mistakes in application logic. If the application genuinely does't care,
    537 	// it can check the exported error and ignore it.
    538 	_, exists := l.channels[channel]
    539 	if exists {
    540 		return ErrChannelAlreadyOpen
    541 	}
    542 
    543 	if l.cn != nil {
    544 		// If resp is true but error is set then the query was executed on the
    545 		// remote server but resulted in an error. This should be relatively
    546 		// rare, so it's fine if we just pass the error to our caller.
    547 		// If resp is false then we could not complete the query on the remote
    548 		// server and our underlying connection is about to go away, so we only
    549 		// add relname to l.channels, and wait for resync() to take care of the
    550 		// rest.
    551 		resp, err := l.cn.Listen(channel)
    552 		if resp && err != nil {
    553 			return err
    554 		}
    555 	}
    556 
    557 	l.channels[channel] = struct{}{}
    558 	for l.cn == nil {
    559 		l.reconnectCond.Wait()
    560 		// we let go of the mutex for a while
    561 		if l.isClosed {
    562 			return net.ErrClosed
    563 		}
    564 	}
    565 
    566 	return nil
    567 }
    568 
    569 // Unlisten removes a channel from the Listener's channel list. Returns
    570 // ErrChannelNotOpen if the Listener is not listening on the specified channel.
    571 // Returns immediately with no error if there is no connection. Note that you
    572 // might still get notifications for this channel even after Unlisten has
    573 // returned.
    574 //
    575 // The channel name is case-sensitive.
    576 func (l *Listener) Unlisten(channel string) error {
    577 	l.lock.Lock()
    578 	defer l.lock.Unlock()
    579 
    580 	if l.isClosed {
    581 		return net.ErrClosed
    582 	}
    583 
    584 	// Similarly to LISTEN, this is not an error in Postgres, but it seems
    585 	// useful to distinguish from the normal conditions.
    586 	_, exists := l.channels[channel]
    587 	if !exists {
    588 		return ErrChannelNotOpen
    589 	}
    590 
    591 	if l.cn != nil {
    592 		// Similarly to Listen (see comment there), the caller should only be
    593 		// bothered with an error if it came from the backend as a response to
    594 		// our query.
    595 		resp, err := l.cn.Unlisten(channel)
    596 		if resp && err != nil {
    597 			return err
    598 		}
    599 	}
    600 
    601 	// Don't bother waiting for resync if there's no connection.
    602 	delete(l.channels, channel)
    603 	return nil
    604 }
    605 
    606 // UnlistenAll removes all channels from the Listener's channel list. Returns
    607 // immediately with no error if there is no connection. Note that you might
    608 // still get notifications for any of the deleted channels even after
    609 // UnlistenAll has returned.
    610 func (l *Listener) UnlistenAll() error {
    611 	l.lock.Lock()
    612 	defer l.lock.Unlock()
    613 
    614 	if l.isClosed {
    615 		return net.ErrClosed
    616 	}
    617 
    618 	if l.cn != nil {
    619 		// Similarly to Listen (see comment in that function), the caller
    620 		// should only be bothered with an error if it came from the backend as
    621 		// a response to our query.
    622 		gotResponse, err := l.cn.UnlistenAll()
    623 		if gotResponse && err != nil {
    624 			return err
    625 		}
    626 	}
    627 
    628 	// Don't bother waiting for resync if there's no connection.
    629 	l.channels = make(map[string]struct{})
    630 	return nil
    631 }
    632 
    633 // Ping the remote server to make sure it's alive. Non-nil return value means
    634 // that there is no active connection.
    635 func (l *Listener) Ping() error {
    636 	l.lock.Lock()
    637 	defer l.lock.Unlock()
    638 
    639 	if l.isClosed {
    640 		return net.ErrClosed
    641 	}
    642 	if l.cn == nil {
    643 		return errors.New("no connection")
    644 	}
    645 
    646 	return l.cn.Ping()
    647 }
    648 
    649 // Clean up after losing the server connection. Returns l.cn.Err(), which should
    650 // have the reason the connection was lost.
    651 func (l *Listener) disconnectCleanup() error {
    652 	l.lock.Lock()
    653 	defer l.lock.Unlock()
    654 
    655 	// sanity check; can't look at Err() until the channel has been closed
    656 	select {
    657 	case _, ok := <-l.connNotificationChan:
    658 		if ok {
    659 			panic("connNotificationChan not closed")
    660 		}
    661 	default:
    662 		panic("connNotificationChan not closed")
    663 	}
    664 
    665 	err := l.cn.Err()
    666 	_ = l.cn.Close()
    667 	l.cn = nil
    668 	return err
    669 }
    670 
    671 // Synchronize the list of channels we want to be listening on with the server
    672 // after the connection has been established.
    673 func (l *Listener) resync(cn *ListenerConn, notificationChan <-chan *Notification) error {
    674 	doneChan := make(chan error)
    675 	go func(notificationChan <-chan *Notification) {
    676 		for channel := range l.channels {
    677 			// If we got a response, return that error to our caller as it's
    678 			// going to be more descriptive than cn.Err().
    679 			gotResponse, err := cn.Listen(channel)
    680 			if gotResponse && err != nil {
    681 				doneChan <- err
    682 				return
    683 			}
    684 
    685 			// If we couldn't reach the server, wait for notificationChan to
    686 			// close and then return the error message from the connection, as
    687 			// per ListenerConn's interface.
    688 			if err != nil {
    689 				for range notificationChan {
    690 				}
    691 				doneChan <- cn.Err()
    692 				return
    693 			}
    694 		}
    695 		doneChan <- nil
    696 	}(notificationChan)
    697 
    698 	// Ignore notifications while synchronization is going on to avoid
    699 	// deadlocks. We have to send a nil notification over Notify anyway as we
    700 	// can't possibly know which notifications (if any) were lost while the
    701 	// connection was down, so there's no reason to try and process these
    702 	// messages at all.
    703 	for {
    704 		select {
    705 		case _, ok := <-notificationChan:
    706 			if !ok {
    707 				notificationChan = nil
    708 			}
    709 
    710 		case err := <-doneChan:
    711 			return err
    712 		}
    713 	}
    714 }
    715 
    716 // caller should NOT be holding l.lock
    717 func (l *Listener) closed() bool {
    718 	l.lock.Lock()
    719 	defer l.lock.Unlock()
    720 
    721 	return l.isClosed
    722 }
    723 
    724 func (l *Listener) connect() error {
    725 	l.lock.Lock()
    726 	defer l.lock.Unlock()
    727 	if l.isClosed {
    728 		return net.ErrClosed
    729 	}
    730 
    731 	notificationChan := make(chan *Notification, 32)
    732 
    733 	var err error
    734 	l.cn, err = newDialListenerConn(l.dialer, l.dsn, notificationChan)
    735 	if err != nil {
    736 		return err
    737 	}
    738 
    739 	err = l.resync(l.cn, notificationChan)
    740 	if err != nil {
    741 		_ = l.cn.Close()
    742 		return err
    743 	}
    744 
    745 	l.connNotificationChan = notificationChan
    746 	l.reconnectCond.Broadcast()
    747 	return nil
    748 }
    749 
    750 // Close disconnects the Listener from the database and shuts it down.
    751 // Subsequent calls to its methods will return an error. Close returns an error
    752 // if the connection has already been closed.
    753 func (l *Listener) Close() error {
    754 	l.lock.Lock()
    755 	defer l.lock.Unlock()
    756 
    757 	if l.isClosed {
    758 		return net.ErrClosed
    759 	}
    760 
    761 	if l.cn != nil {
    762 		_ = l.cn.Close()
    763 	}
    764 	l.isClosed = true
    765 
    766 	// Unblock calls to Listen()
    767 	l.reconnectCond.Broadcast()
    768 
    769 	return nil
    770 }
    771 
    772 func (l *Listener) emitEvent(event ListenerEventType, err error) {
    773 	if l.eventCallback != nil {
    774 		l.eventCallback(event, err)
    775 	}
    776 }
    777 
    778 // Main logic here: maintain a connection to the server when possible, wait
    779 // for notifications and emit events.
    780 func (l *Listener) listenerConnLoop() {
    781 	var (
    782 		nextReconnect     time.Time
    783 		reconnectInterval = l.minReconnectInterval
    784 	)
    785 	for {
    786 		for {
    787 			err := l.connect()
    788 			if err == nil {
    789 				break
    790 			}
    791 			if l.closed() {
    792 				return
    793 			}
    794 
    795 			l.emitEvent(ListenerEventConnectionAttemptFailed, err)
    796 			time.Sleep(reconnectInterval)
    797 			reconnectInterval *= 2
    798 			if reconnectInterval > l.maxReconnectInterval {
    799 				reconnectInterval = l.maxReconnectInterval
    800 			}
    801 		}
    802 
    803 		if nextReconnect.IsZero() {
    804 			l.emitEvent(ListenerEventConnected, nil)
    805 		} else {
    806 			l.emitEvent(ListenerEventReconnected, nil)
    807 			l.Notify <- nil
    808 		}
    809 
    810 		reconnectInterval = l.minReconnectInterval
    811 		nextReconnect = time.Now().Add(reconnectInterval)
    812 
    813 		for {
    814 			notification, ok := <-l.connNotificationChan
    815 			if !ok { // lost connection, loop again
    816 				break
    817 			}
    818 			l.Notify <- notification
    819 		}
    820 
    821 		err := l.disconnectCleanup()
    822 		if l.closed() {
    823 			return
    824 		}
    825 		l.emitEvent(ListenerEventDisconnected, err)
    826 
    827 		time.Sleep(time.Until(nextReconnect))
    828 	}
    829 }
    830 
    831 func (l *Listener) listenerMain() {
    832 	l.listenerConnLoop()
    833 	close(l.Notify)
    834 }