ipfs storage for images and other nontext items. for use with etica - runs on etica network and currencys
https://collect.etica-stats.org
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
222 lines
5.1 KiB
222 lines
5.1 KiB
'use strict' |
|
|
|
const EventEmitter = require('events').EventEmitter |
|
|
|
const { parse, serialize } = require('pg-protocol') |
|
const { getStream, getSecureStream } = require('./stream') |
|
|
|
const flushBuffer = serialize.flush() |
|
const syncBuffer = serialize.sync() |
|
const endBuffer = serialize.end() |
|
|
|
// TODO(bmc) support binary mode at some point |
|
class Connection extends EventEmitter { |
|
constructor(config) { |
|
super() |
|
config = config || {} |
|
|
|
this.stream = config.stream || getStream(config.ssl) |
|
if (typeof this.stream === 'function') { |
|
this.stream = this.stream(config) |
|
} |
|
|
|
this._keepAlive = config.keepAlive |
|
this._keepAliveInitialDelayMillis = config.keepAliveInitialDelayMillis |
|
this.lastBuffer = false |
|
this.parsedStatements = {} |
|
this.ssl = config.ssl || false |
|
this._ending = false |
|
this._emitMessage = false |
|
const self = this |
|
this.on('newListener', function (eventName) { |
|
if (eventName === 'message') { |
|
self._emitMessage = true |
|
} |
|
}) |
|
} |
|
|
|
connect(port, host) { |
|
const self = this |
|
|
|
this._connecting = true |
|
this.stream.setNoDelay(true) |
|
this.stream.connect(port, host) |
|
|
|
this.stream.once('connect', function () { |
|
if (self._keepAlive) { |
|
self.stream.setKeepAlive(true, self._keepAliveInitialDelayMillis) |
|
} |
|
self.emit('connect') |
|
}) |
|
|
|
const reportStreamError = function (error) { |
|
// errors about disconnections should be ignored during disconnect |
|
if (self._ending && (error.code === 'ECONNRESET' || error.code === 'EPIPE')) { |
|
return |
|
} |
|
self.emit('error', error) |
|
} |
|
this.stream.on('error', reportStreamError) |
|
|
|
this.stream.on('close', function () { |
|
self.emit('end') |
|
}) |
|
|
|
if (!this.ssl) { |
|
return this.attachListeners(this.stream) |
|
} |
|
|
|
this.stream.once('data', function (buffer) { |
|
const responseCode = buffer.toString('utf8') |
|
switch (responseCode) { |
|
case 'S': // Server supports SSL connections, continue with a secure connection |
|
break |
|
case 'N': // Server does not support SSL connections |
|
self.stream.end() |
|
return self.emit('error', new Error('The server does not support SSL connections')) |
|
default: |
|
// Any other response byte, including 'E' (ErrorResponse) indicating a server error |
|
self.stream.end() |
|
return self.emit('error', new Error('There was an error establishing an SSL connection')) |
|
} |
|
const options = { |
|
socket: self.stream, |
|
} |
|
|
|
if (self.ssl !== true) { |
|
Object.assign(options, self.ssl) |
|
|
|
if ('key' in self.ssl) { |
|
options.key = self.ssl.key |
|
} |
|
} |
|
|
|
const net = require('net') |
|
if (net.isIP && net.isIP(host) === 0) { |
|
options.servername = host |
|
} |
|
try { |
|
self.stream = getSecureStream(options) |
|
} catch (err) { |
|
return self.emit('error', err) |
|
} |
|
self.attachListeners(self.stream) |
|
self.stream.on('error', reportStreamError) |
|
|
|
self.emit('sslconnect') |
|
}) |
|
} |
|
|
|
attachListeners(stream) { |
|
parse(stream, (msg) => { |
|
const eventName = msg.name === 'error' ? 'errorMessage' : msg.name |
|
if (this._emitMessage) { |
|
this.emit('message', msg) |
|
} |
|
this.emit(eventName, msg) |
|
}) |
|
} |
|
|
|
requestSsl() { |
|
this.stream.write(serialize.requestSsl()) |
|
} |
|
|
|
startup(config) { |
|
this.stream.write(serialize.startup(config)) |
|
} |
|
|
|
cancel(processID, secretKey) { |
|
this._send(serialize.cancel(processID, secretKey)) |
|
} |
|
|
|
password(password) { |
|
this._send(serialize.password(password)) |
|
} |
|
|
|
sendSASLInitialResponseMessage(mechanism, initialResponse) { |
|
this._send(serialize.sendSASLInitialResponseMessage(mechanism, initialResponse)) |
|
} |
|
|
|
sendSCRAMClientFinalMessage(additionalData) { |
|
this._send(serialize.sendSCRAMClientFinalMessage(additionalData)) |
|
} |
|
|
|
_send(buffer) { |
|
if (!this.stream.writable) { |
|
return false |
|
} |
|
return this.stream.write(buffer) |
|
} |
|
|
|
query(text) { |
|
this._send(serialize.query(text)) |
|
} |
|
|
|
// send parse message |
|
parse(query) { |
|
this._send(serialize.parse(query)) |
|
} |
|
|
|
// send bind message |
|
bind(config) { |
|
this._send(serialize.bind(config)) |
|
} |
|
|
|
// send execute message |
|
execute(config) { |
|
this._send(serialize.execute(config)) |
|
} |
|
|
|
flush() { |
|
if (this.stream.writable) { |
|
this.stream.write(flushBuffer) |
|
} |
|
} |
|
|
|
sync() { |
|
this._ending = true |
|
this._send(syncBuffer) |
|
} |
|
|
|
ref() { |
|
this.stream.ref() |
|
} |
|
|
|
unref() { |
|
this.stream.unref() |
|
} |
|
|
|
end() { |
|
// 0x58 = 'X' |
|
this._ending = true |
|
if (!this._connecting || !this.stream.writable) { |
|
this.stream.end() |
|
return |
|
} |
|
return this.stream.write(endBuffer, () => { |
|
this.stream.end() |
|
}) |
|
} |
|
|
|
close(msg) { |
|
this._send(serialize.close(msg)) |
|
} |
|
|
|
describe(msg) { |
|
this._send(serialize.describe(msg)) |
|
} |
|
|
|
sendCopyFromChunk(chunk) { |
|
this._send(serialize.copyData(chunk)) |
|
} |
|
|
|
endCopyFrom() { |
|
this._send(serialize.copyDone()) |
|
} |
|
|
|
sendCopyFail(msg) { |
|
this._send(serialize.copyFail(msg)) |
|
} |
|
} |
|
|
|
module.exports = Connection
|
|
|