You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
479 lines
13 KiB
479 lines
13 KiB
'use strict' |
|
const EventEmitter = require('events').EventEmitter |
|
|
|
const NOOP = function () {} |
|
|
|
const removeWhere = (list, predicate) => { |
|
const i = list.findIndex(predicate) |
|
|
|
return i === -1 ? undefined : list.splice(i, 1)[0] |
|
} |
|
|
|
class IdleItem { |
|
constructor(client, idleListener, timeoutId) { |
|
this.client = client |
|
this.idleListener = idleListener |
|
this.timeoutId = timeoutId |
|
} |
|
} |
|
|
|
class PendingItem { |
|
constructor(callback) { |
|
this.callback = callback |
|
} |
|
} |
|
|
|
function throwOnDoubleRelease() { |
|
throw new Error('Release called on client which has already been released to the pool.') |
|
} |
|
|
|
function promisify(Promise, callback) { |
|
if (callback) { |
|
return { callback: callback, result: undefined } |
|
} |
|
let rej |
|
let res |
|
const cb = function (err, client) { |
|
err ? rej(err) : res(client) |
|
} |
|
const result = new Promise(function (resolve, reject) { |
|
res = resolve |
|
rej = reject |
|
}).catch((err) => { |
|
// replace the stack trace that leads to `TCP.onStreamRead` with one that leads back to the |
|
// application that created the query |
|
Error.captureStackTrace(err) |
|
throw err |
|
}) |
|
return { callback: cb, result: result } |
|
} |
|
|
|
function makeIdleListener(pool, client) { |
|
return function idleListener(err) { |
|
err.client = client |
|
|
|
client.removeListener('error', idleListener) |
|
client.on('error', () => { |
|
pool.log('additional client error after disconnection due to error', err) |
|
}) |
|
pool._remove(client) |
|
// TODO - document that once the pool emits an error |
|
// the client has already been closed & purged and is unusable |
|
pool.emit('error', err, client) |
|
} |
|
} |
|
|
|
class Pool extends EventEmitter { |
|
constructor(options, Client) { |
|
super() |
|
this.options = Object.assign({}, options) |
|
|
|
if (options != null && 'password' in options) { |
|
// "hiding" the password so it doesn't show up in stack traces |
|
// or if the client is console.logged |
|
Object.defineProperty(this.options, 'password', { |
|
configurable: true, |
|
enumerable: false, |
|
writable: true, |
|
value: options.password, |
|
}) |
|
} |
|
if (options != null && options.ssl && options.ssl.key) { |
|
// "hiding" the ssl->key so it doesn't show up in stack traces |
|
// or if the client is console.logged |
|
Object.defineProperty(this.options.ssl, 'key', { |
|
enumerable: false, |
|
}) |
|
} |
|
|
|
this.options.max = this.options.max || this.options.poolSize || 10 |
|
this.options.min = this.options.min || 0 |
|
this.options.maxUses = this.options.maxUses || Infinity |
|
this.options.allowExitOnIdle = this.options.allowExitOnIdle || false |
|
this.options.maxLifetimeSeconds = this.options.maxLifetimeSeconds || 0 |
|
this.log = this.options.log || function () {} |
|
this.Client = this.options.Client || Client || require('pg').Client |
|
this.Promise = this.options.Promise || global.Promise |
|
|
|
if (typeof this.options.idleTimeoutMillis === 'undefined') { |
|
this.options.idleTimeoutMillis = 10000 |
|
} |
|
|
|
this._clients = [] |
|
this._idle = [] |
|
this._expired = new WeakSet() |
|
this._pendingQueue = [] |
|
this._endCallback = undefined |
|
this.ending = false |
|
this.ended = false |
|
} |
|
|
|
_isFull() { |
|
return this._clients.length >= this.options.max |
|
} |
|
|
|
_isAboveMin() { |
|
return this._clients.length > this.options.min |
|
} |
|
|
|
_pulseQueue() { |
|
this.log('pulse queue') |
|
if (this.ended) { |
|
this.log('pulse queue ended') |
|
return |
|
} |
|
if (this.ending) { |
|
this.log('pulse queue on ending') |
|
if (this._idle.length) { |
|
this._idle.slice().map((item) => { |
|
this._remove(item.client) |
|
}) |
|
} |
|
if (!this._clients.length) { |
|
this.ended = true |
|
this._endCallback() |
|
} |
|
return |
|
} |
|
|
|
// if we don't have any waiting, do nothing |
|
if (!this._pendingQueue.length) { |
|
this.log('no queued requests') |
|
return |
|
} |
|
// if we don't have any idle clients and we have no more room do nothing |
|
if (!this._idle.length && this._isFull()) { |
|
return |
|
} |
|
const pendingItem = this._pendingQueue.shift() |
|
if (this._idle.length) { |
|
const idleItem = this._idle.pop() |
|
clearTimeout(idleItem.timeoutId) |
|
const client = idleItem.client |
|
client.ref && client.ref() |
|
const idleListener = idleItem.idleListener |
|
|
|
return this._acquireClient(client, pendingItem, idleListener, false) |
|
} |
|
if (!this._isFull()) { |
|
return this.newClient(pendingItem) |
|
} |
|
throw new Error('unexpected condition') |
|
} |
|
|
|
_remove(client, callback) { |
|
const removed = removeWhere(this._idle, (item) => item.client === client) |
|
|
|
if (removed !== undefined) { |
|
clearTimeout(removed.timeoutId) |
|
} |
|
|
|
this._clients = this._clients.filter((c) => c !== client) |
|
const context = this |
|
client.end(() => { |
|
context.emit('remove', client) |
|
|
|
if (typeof callback === 'function') { |
|
callback() |
|
} |
|
}) |
|
} |
|
|
|
connect(cb) { |
|
if (this.ending) { |
|
const err = new Error('Cannot use a pool after calling end on the pool') |
|
return cb ? cb(err) : this.Promise.reject(err) |
|
} |
|
|
|
const response = promisify(this.Promise, cb) |
|
const result = response.result |
|
|
|
// if we don't have to connect a new client, don't do so |
|
if (this._isFull() || this._idle.length) { |
|
// if we have idle clients schedule a pulse immediately |
|
if (this._idle.length) { |
|
process.nextTick(() => this._pulseQueue()) |
|
} |
|
|
|
if (!this.options.connectionTimeoutMillis) { |
|
this._pendingQueue.push(new PendingItem(response.callback)) |
|
return result |
|
} |
|
|
|
const queueCallback = (err, res, done) => { |
|
clearTimeout(tid) |
|
response.callback(err, res, done) |
|
} |
|
|
|
const pendingItem = new PendingItem(queueCallback) |
|
|
|
// set connection timeout on checking out an existing client |
|
const tid = setTimeout(() => { |
|
// remove the callback from pending waiters because |
|
// we're going to call it with a timeout error |
|
removeWhere(this._pendingQueue, (i) => i.callback === queueCallback) |
|
pendingItem.timedOut = true |
|
response.callback(new Error('timeout exceeded when trying to connect')) |
|
}, this.options.connectionTimeoutMillis) |
|
|
|
if (tid.unref) { |
|
tid.unref() |
|
} |
|
|
|
this._pendingQueue.push(pendingItem) |
|
return result |
|
} |
|
|
|
this.newClient(new PendingItem(response.callback)) |
|
|
|
return result |
|
} |
|
|
|
newClient(pendingItem) { |
|
const client = new this.Client(this.options) |
|
this._clients.push(client) |
|
const idleListener = makeIdleListener(this, client) |
|
|
|
this.log('checking client timeout') |
|
|
|
// connection timeout logic |
|
let tid |
|
let timeoutHit = false |
|
if (this.options.connectionTimeoutMillis) { |
|
tid = setTimeout(() => { |
|
this.log('ending client due to timeout') |
|
timeoutHit = true |
|
// force kill the node driver, and let libpq do its teardown |
|
client.connection ? client.connection.stream.destroy() : client.end() |
|
}, this.options.connectionTimeoutMillis) |
|
} |
|
|
|
this.log('connecting new client') |
|
client.connect((err) => { |
|
if (tid) { |
|
clearTimeout(tid) |
|
} |
|
client.on('error', idleListener) |
|
if (err) { |
|
this.log('client failed to connect', err) |
|
// remove the dead client from our list of clients |
|
this._clients = this._clients.filter((c) => c !== client) |
|
if (timeoutHit) { |
|
err = new Error('Connection terminated due to connection timeout', { cause: err }) |
|
} |
|
|
|
// this client won’t be released, so move on immediately |
|
this._pulseQueue() |
|
|
|
if (!pendingItem.timedOut) { |
|
pendingItem.callback(err, undefined, NOOP) |
|
} |
|
} else { |
|
this.log('new client connected') |
|
|
|
if (this.options.maxLifetimeSeconds !== 0) { |
|
const maxLifetimeTimeout = setTimeout(() => { |
|
this.log('ending client due to expired lifetime') |
|
this._expired.add(client) |
|
const idleIndex = this._idle.findIndex((idleItem) => idleItem.client === client) |
|
if (idleIndex !== -1) { |
|
this._acquireClient( |
|
client, |
|
new PendingItem((err, client, clientRelease) => clientRelease()), |
|
idleListener, |
|
false |
|
) |
|
} |
|
}, this.options.maxLifetimeSeconds * 1000) |
|
|
|
maxLifetimeTimeout.unref() |
|
client.once('end', () => clearTimeout(maxLifetimeTimeout)) |
|
} |
|
|
|
return this._acquireClient(client, pendingItem, idleListener, true) |
|
} |
|
}) |
|
} |
|
|
|
// acquire a client for a pending work item |
|
_acquireClient(client, pendingItem, idleListener, isNew) { |
|
if (isNew) { |
|
this.emit('connect', client) |
|
} |
|
|
|
this.emit('acquire', client) |
|
|
|
client.release = this._releaseOnce(client, idleListener) |
|
|
|
client.removeListener('error', idleListener) |
|
|
|
if (!pendingItem.timedOut) { |
|
if (isNew && this.options.verify) { |
|
this.options.verify(client, (err) => { |
|
if (err) { |
|
client.release(err) |
|
return pendingItem.callback(err, undefined, NOOP) |
|
} |
|
|
|
pendingItem.callback(undefined, client, client.release) |
|
}) |
|
} else { |
|
pendingItem.callback(undefined, client, client.release) |
|
} |
|
} else { |
|
if (isNew && this.options.verify) { |
|
this.options.verify(client, client.release) |
|
} else { |
|
client.release() |
|
} |
|
} |
|
} |
|
|
|
// returns a function that wraps _release and throws if called more than once |
|
_releaseOnce(client, idleListener) { |
|
let released = false |
|
|
|
return (err) => { |
|
if (released) { |
|
throwOnDoubleRelease() |
|
} |
|
|
|
released = true |
|
this._release(client, idleListener, err) |
|
} |
|
} |
|
|
|
// release a client back to the poll, include an error |
|
// to remove it from the pool |
|
_release(client, idleListener, err) { |
|
client.on('error', idleListener) |
|
|
|
client._poolUseCount = (client._poolUseCount || 0) + 1 |
|
|
|
this.emit('release', err, client) |
|
|
|
// TODO(bmc): expose a proper, public interface _queryable and _ending |
|
if (err || this.ending || !client._queryable || client._ending || client._poolUseCount >= this.options.maxUses) { |
|
if (client._poolUseCount >= this.options.maxUses) { |
|
this.log('remove expended client') |
|
} |
|
|
|
return this._remove(client, this._pulseQueue.bind(this)) |
|
} |
|
|
|
const isExpired = this._expired.has(client) |
|
if (isExpired) { |
|
this.log('remove expired client') |
|
this._expired.delete(client) |
|
return this._remove(client, this._pulseQueue.bind(this)) |
|
} |
|
|
|
// idle timeout |
|
let tid |
|
if (this.options.idleTimeoutMillis && this._isAboveMin()) { |
|
tid = setTimeout(() => { |
|
this.log('remove idle client') |
|
this._remove(client, this._pulseQueue.bind(this)) |
|
}, this.options.idleTimeoutMillis) |
|
|
|
if (this.options.allowExitOnIdle) { |
|
// allow Node to exit if this is all that's left |
|
tid.unref() |
|
} |
|
} |
|
|
|
if (this.options.allowExitOnIdle) { |
|
client.unref() |
|
} |
|
|
|
this._idle.push(new IdleItem(client, idleListener, tid)) |
|
this._pulseQueue() |
|
} |
|
|
|
query(text, values, cb) { |
|
// guard clause against passing a function as the first parameter |
|
if (typeof text === 'function') { |
|
const response = promisify(this.Promise, text) |
|
setImmediate(function () { |
|
return response.callback(new Error('Passing a function as the first parameter to pool.query is not supported')) |
|
}) |
|
return response.result |
|
} |
|
|
|
// allow plain text query without values |
|
if (typeof values === 'function') { |
|
cb = values |
|
values = undefined |
|
} |
|
const response = promisify(this.Promise, cb) |
|
cb = response.callback |
|
|
|
this.connect((err, client) => { |
|
if (err) { |
|
return cb(err) |
|
} |
|
|
|
let clientReleased = false |
|
const onError = (err) => { |
|
if (clientReleased) { |
|
return |
|
} |
|
clientReleased = true |
|
client.release(err) |
|
cb(err) |
|
} |
|
|
|
client.once('error', onError) |
|
this.log('dispatching query') |
|
try { |
|
client.query(text, values, (err, res) => { |
|
this.log('query dispatched') |
|
client.removeListener('error', onError) |
|
if (clientReleased) { |
|
return |
|
} |
|
clientReleased = true |
|
client.release(err) |
|
if (err) { |
|
return cb(err) |
|
} |
|
return cb(undefined, res) |
|
}) |
|
} catch (err) { |
|
client.release(err) |
|
return cb(err) |
|
} |
|
}) |
|
return response.result |
|
} |
|
|
|
end(cb) { |
|
this.log('ending') |
|
if (this.ending) { |
|
const err = new Error('Called end on pool more than once') |
|
return cb ? cb(err) : this.Promise.reject(err) |
|
} |
|
this.ending = true |
|
const promised = promisify(this.Promise, cb) |
|
this._endCallback = promised.callback |
|
this._pulseQueue() |
|
return promised.result |
|
} |
|
|
|
get waitingCount() { |
|
return this._pendingQueue.length |
|
} |
|
|
|
get idleCount() { |
|
return this._idle.length |
|
} |
|
|
|
get expiredCount() { |
|
return this._clients.reduce((acc, client) => acc + (this._expired.has(client) ? 1 : 0), 0) |
|
} |
|
|
|
get totalCount() { |
|
return this._clients.length |
|
} |
|
} |
|
module.exports = Pool
|
|
|