'use strict'; // See https://github.com/facebook/jest/issues/2549 // eslint-disable-next-line node/prefer-global/url const {URL} = require('url'); const EventEmitter = require('events'); const tls = require('tls'); const http2 = require('http2'); const QuickLRU = require('quick-lru'); const delayAsyncDestroy = require('./utils/delay-async-destroy.js'); const kCurrentStreamCount = Symbol('currentStreamCount'); const kRequest = Symbol('request'); const kOriginSet = Symbol('cachedOriginSet'); const kGracefullyClosing = Symbol('gracefullyClosing'); const kLength = Symbol('length'); const nameKeys = [ // Not an Agent option actually 'createConnection', // `http2.connect()` options 'maxDeflateDynamicTableSize', 'maxSettings', 'maxSessionMemory', 'maxHeaderListPairs', 'maxOutstandingPings', 'maxReservedRemoteStreams', 'maxSendHeaderBlockLength', 'paddingStrategy', 'peerMaxConcurrentStreams', 'settings', // `tls.connect()` source options 'family', 'localAddress', 'rejectUnauthorized', // `tls.connect()` secure context options 'pskCallback', 'minDHSize', // `tls.connect()` destination options // - `servername` is automatically validated, skip it // - `host` and `port` just describe the destination server, 'path', 'socket', // `tls.createSecureContext()` options 'ca', 'cert', 'sigalgs', 'ciphers', 'clientCertEngine', 'crl', 'dhparam', 'ecdhCurve', 'honorCipherOrder', 'key', 'privateKeyEngine', 'privateKeyIdentifier', 'maxVersion', 'minVersion', 'pfx', 'secureOptions', 'secureProtocol', 'sessionIdContext', 'ticketKeys' ]; const getSortedIndex = (array, value, compare) => { let low = 0; let high = array.length; while (low < high) { const mid = (low + high) >>> 1; if (compare(array[mid], value)) { low = mid + 1; } else { high = mid; } } return low; }; const compareSessions = (a, b) => a.remoteSettings.maxConcurrentStreams > b.remoteSettings.maxConcurrentStreams; // See https://tools.ietf.org/html/rfc8336 const closeCoveredSessions = (where, session) => { // Clients SHOULD NOT emit new requests on any connection whose Origin // Set is a proper subset of another connection's Origin Set, and they // SHOULD close it once all outstanding requests are satisfied. for (let index = 0; index < where.length; index++) { const coveredSession = where[index]; if ( // Unfortunately `.every()` returns true for an empty array coveredSession[kOriginSet].length > 0 // The set is a proper subset when its length is less than the other set. && coveredSession[kOriginSet].length < session[kOriginSet].length // And the other set includes all elements of the subset. && coveredSession[kOriginSet].every(origin => session[kOriginSet].includes(origin)) // Makes sure that the session can handle all requests from the covered session. && (coveredSession[kCurrentStreamCount] + session[kCurrentStreamCount]) <= session.remoteSettings.maxConcurrentStreams ) { // This allows pending requests to finish and prevents making new requests. gracefullyClose(coveredSession); } } }; // This is basically inverted `closeCoveredSessions(...)`. const closeSessionIfCovered = (where, coveredSession) => { for (let index = 0; index < where.length; index++) { const session = where[index]; if ( coveredSession[kOriginSet].length > 0 && coveredSession[kOriginSet].length < session[kOriginSet].length && coveredSession[kOriginSet].every(origin => session[kOriginSet].includes(origin)) && (coveredSession[kCurrentStreamCount] + session[kCurrentStreamCount]) <= session.remoteSettings.maxConcurrentStreams ) { gracefullyClose(coveredSession); return true; } } return false; }; const gracefullyClose = session => { session[kGracefullyClosing] = true; if (session[kCurrentStreamCount] === 0) { session.close(); } }; class Agent extends EventEmitter { constructor({timeout = 0, maxSessions = Number.POSITIVE_INFINITY, maxEmptySessions = 10, maxCachedTlsSessions = 100} = {}) { super(); // SESSIONS[NORMALIZED_OPTIONS] = []; this.sessions = {}; // The queue for creating new sessions. It looks like this: // QUEUE[NORMALIZED_OPTIONS][NORMALIZED_ORIGIN] = ENTRY_FUNCTION // // It's faster when there are many origins. If there's only one, then QUEUE[`${options}:${origin}`] is faster. // I guess object creation / deletion is causing the slowdown. // // The entry function has `listeners`, `completed` and `destroyed` properties. // `listeners` is an array of objects containing `resolve` and `reject` functions. // `completed` is a boolean. It's set to true after ENTRY_FUNCTION is executed. // `destroyed` is a boolean. If it's set to true, the session will be destroyed if hasn't connected yet. this.queue = {}; // Each session will use this timeout value. this.timeout = timeout; // Max sessions in total this.maxSessions = maxSessions; // Max empty sessions in total this.maxEmptySessions = maxEmptySessions; this._emptySessionCount = 0; this._sessionCount = 0; // We don't support push streams by default. this.settings = { enablePush: false, initialWindowSize: 1024 * 1024 * 32 // 32MB, see https://github.com/nodejs/node/issues/38426 }; // Reusing TLS sessions increases performance. this.tlsSessionCache = new QuickLRU({maxSize: maxCachedTlsSessions}); } get protocol() { return 'https:'; } normalizeOptions(options) { let normalized = ''; for (let index = 0; index < nameKeys.length; index++) { const key = nameKeys[index]; normalized += ':'; if (options && options[key] !== undefined) { normalized += options[key]; } } return normalized; } _processQueue() { if (this._sessionCount >= this.maxSessions) { this.closeEmptySessions(this.maxSessions - this._sessionCount + 1); return; } // eslint-disable-next-line guard-for-in for (const normalizedOptions in this.queue) { // eslint-disable-next-line guard-for-in for (const normalizedOrigin in this.queue[normalizedOptions]) { const item = this.queue[normalizedOptions][normalizedOrigin]; // The entry function can be run only once. if (!item.completed) { item.completed = true; item(); } } } } _isBetterSession(thisStreamCount, thatStreamCount) { return thisStreamCount > thatStreamCount; } _accept(session, listeners, normalizedOrigin, options) { let index = 0; while (index < listeners.length && session[kCurrentStreamCount] < session.remoteSettings.maxConcurrentStreams) { // We assume `resolve(...)` calls `request(...)` *directly*, // otherwise the session will get overloaded. listeners[index].resolve(session); index++; } listeners.splice(0, index); if (listeners.length > 0) { this.getSession(normalizedOrigin, options, listeners); listeners.length = 0; } } getSession(origin, options, listeners) { return new Promise((resolve, reject) => { if (Array.isArray(listeners) && listeners.length > 0) { listeners = [...listeners]; // Resolve the current promise ASAP, we're just moving the listeners. // They will be executed at a different time. resolve(); } else { listeners = [{resolve, reject}]; } try { // Parse origin if (typeof origin === 'string') { origin = new URL(origin); } else if (!(origin instanceof URL)) { throw new TypeError('The `origin` argument needs to be a string or an URL object'); } if (options) { // Validate servername const {servername} = options; const {hostname} = origin; if (servername && hostname !== servername) { throw new Error(`Origin ${hostname} differs from servername ${servername}`); } } } catch (error) { for (let index = 0; index < listeners.length; index++) { listeners[index].reject(error); } return; } const normalizedOptions = this.normalizeOptions(options); const normalizedOrigin = origin.origin; if (normalizedOptions in this.sessions) { const sessions = this.sessions[normalizedOptions]; let maxConcurrentStreams = -1; let currentStreamsCount = -1; let optimalSession; // We could just do this.sessions[normalizedOptions].find(...) but that isn't optimal. // Additionally, we are looking for session which has biggest current pending streams count. // // |------------| |------------| |------------| |------------| // | Session: A | | Session: B | | Session: C | | Session: D | // | Pending: 5 |-| Pending: 8 |-| Pending: 9 |-| Pending: 4 | // | Max: 10 | | Max: 10 | | Max: 9 | | Max: 5 | // |------------| |------------| |------------| |------------| // ^ // | // pick this one -- // for (let index = 0; index < sessions.length; index++) { const session = sessions[index]; const sessionMaxConcurrentStreams = session.remoteSettings.maxConcurrentStreams; if (sessionMaxConcurrentStreams < maxConcurrentStreams) { break; } if (!session[kOriginSet].includes(normalizedOrigin)) { continue; } const sessionCurrentStreamsCount = session[kCurrentStreamCount]; if ( sessionCurrentStreamsCount >= sessionMaxConcurrentStreams || session[kGracefullyClosing] // Unfortunately the `close` event isn't called immediately, // so `session.destroyed` is `true`, but `session.closed` is `false`. || session.destroyed ) { continue; } // We only need set this once. if (!optimalSession) { maxConcurrentStreams = sessionMaxConcurrentStreams; } // Either get the session which has biggest current stream count or the lowest. if (this._isBetterSession(sessionCurrentStreamsCount, currentStreamsCount)) { optimalSession = session; currentStreamsCount = sessionCurrentStreamsCount; } } if (optimalSession) { this._accept(optimalSession, listeners, normalizedOrigin, options); return; } } if (normalizedOptions in this.queue) { if (normalizedOrigin in this.queue[normalizedOptions]) { // There's already an item in the queue, just attach ourselves to it. this.queue[normalizedOptions][normalizedOrigin].listeners.push(...listeners); return; } } else { this.queue[normalizedOptions] = { [kLength]: 0 }; } // The entry must be removed from the queue IMMEDIATELY when: // 1. the session connects successfully, // 2. an error occurs. const removeFromQueue = () => { // Our entry can be replaced. We cannot remove the new one. if (normalizedOptions in this.queue && this.queue[normalizedOptions][normalizedOrigin] === entry) { delete this.queue[normalizedOptions][normalizedOrigin]; if (--this.queue[normalizedOptions][kLength] === 0) { delete this.queue[normalizedOptions]; } } }; // The main logic is here const entry = async () => { this._sessionCount++; const name = `${normalizedOrigin}:${normalizedOptions}`; let receivedSettings = false; let socket; try { const computedOptions = {...options}; if (computedOptions.settings === undefined) { computedOptions.settings = this.settings; } if (computedOptions.session === undefined) { computedOptions.session = this.tlsSessionCache.get(name); } const createConnection = computedOptions.createConnection || this.createConnection; // A hacky workaround to enable async `createConnection` socket = await createConnection.call(this, origin, computedOptions); computedOptions.createConnection = () => socket; const session = http2.connect(origin, computedOptions); session[kCurrentStreamCount] = 0; session[kGracefullyClosing] = false; // Node.js return https://false:443 instead of https://1.1.1.1:443 const getOriginSet = () => { const {socket} = session; let originSet; if (socket.servername === false) { socket.servername = socket.remoteAddress; originSet = session.originSet; socket.servername = false; } else { originSet = session.originSet; } return originSet; }; const isFree = () => session[kCurrentStreamCount] < session.remoteSettings.maxConcurrentStreams; session.socket.once('session', tlsSession => { this.tlsSessionCache.set(name, tlsSession); }); session.once('error', error => { // Listeners are empty when the session successfully connected. for (let index = 0; index < listeners.length; index++) { listeners[index].reject(error); } // The connection got broken, purge the cache. this.tlsSessionCache.delete(name); }); session.setTimeout(this.timeout, () => { // Terminates all streams owned by this session. session.destroy(); }); session.once('close', () => { this._sessionCount--; if (receivedSettings) { // Assumes session `close` is emitted after request `close` this._emptySessionCount--; // This cannot be moved to the stream logic, // because there may be a session that hadn't made a single request. const where = this.sessions[normalizedOptions]; if (where.length === 1) { delete this.sessions[normalizedOptions]; } else { where.splice(where.indexOf(session), 1); } } else { // Broken connection removeFromQueue(); const error = new Error('Session closed without receiving a SETTINGS frame'); error.code = 'HTTP2WRAPPER_NOSETTINGS'; for (let index = 0; index < listeners.length; index++) { listeners[index].reject(error); } } // There may be another session awaiting. this._processQueue(); }); // Iterates over the queue and processes listeners. const processListeners = () => { const queue = this.queue[normalizedOptions]; if (!queue) { return; } const originSet = session[kOriginSet]; for (let index = 0; index < originSet.length; index++) { const origin = originSet[index]; if (origin in queue) { const {listeners, completed} = queue[origin]; let index = 0; // Prevents session overloading. while (index < listeners.length && isFree()) { // We assume `resolve(...)` calls `request(...)` *directly*, // otherwise the session will get overloaded. listeners[index].resolve(session); index++; } queue[origin].listeners.splice(0, index); if (queue[origin].listeners.length === 0 && !completed) { delete queue[origin]; if (--queue[kLength] === 0) { delete this.queue[normalizedOptions]; break; } } // We're no longer free, no point in continuing. if (!isFree()) { break; } } } }; // The Origin Set cannot shrink. No need to check if it suddenly became covered by another one. session.on('origin', () => { session[kOriginSet] = getOriginSet() || []; session[kGracefullyClosing] = false; closeSessionIfCovered(this.sessions[normalizedOptions], session); if (session[kGracefullyClosing] || !isFree()) { return; } processListeners(); if (!isFree()) { return; } // Close covered sessions (if possible). closeCoveredSessions(this.sessions[normalizedOptions], session); }); session.once('remoteSettings', () => { // The Agent could have been destroyed already. if (entry.destroyed) { const error = new Error('Agent has been destroyed'); for (let index = 0; index < listeners.length; index++) { listeners[index].reject(error); } session.destroy(); return; } // See https://github.com/nodejs/node/issues/38426 if (session.setLocalWindowSize) { session.setLocalWindowSize(1024 * 1024 * 4); // 4 MB } session[kOriginSet] = getOriginSet() || []; if (session.socket.encrypted) { const mainOrigin = session[kOriginSet][0]; if (mainOrigin !== normalizedOrigin) { const error = new Error(`Requested origin ${normalizedOrigin} does not match server ${mainOrigin}`); for (let index = 0; index < listeners.length; index++) { listeners[index].reject(error); } session.destroy(); return; } } removeFromQueue(); { const where = this.sessions; if (normalizedOptions in where) { const sessions = where[normalizedOptions]; sessions.splice(getSortedIndex(sessions, session, compareSessions), 0, session); } else { where[normalizedOptions] = [session]; } } receivedSettings = true; this._emptySessionCount++; this.emit('session', session); this._accept(session, listeners, normalizedOrigin, options); if (session[kCurrentStreamCount] === 0 && this._emptySessionCount > this.maxEmptySessions) { this.closeEmptySessions(this._emptySessionCount - this.maxEmptySessions); } // `session.remoteSettings.maxConcurrentStreams` might get increased session.on('remoteSettings', () => { if (!isFree()) { return; } processListeners(); if (!isFree()) { return; } // In case the Origin Set changes closeCoveredSessions(this.sessions[normalizedOptions], session); }); }); // Shim `session.request()` in order to catch all streams session[kRequest] = session.request; session.request = (headers, streamOptions) => { if (session[kGracefullyClosing]) { throw new Error('The session is gracefully closing. No new streams are allowed.'); } const stream = session[kRequest](headers, streamOptions); // The process won't exit until the session is closed or all requests are gone. session.ref(); if (session[kCurrentStreamCount]++ === 0) { this._emptySessionCount--; } stream.once('close', () => { if (--session[kCurrentStreamCount] === 0) { this._emptySessionCount++; session.unref(); if (this._emptySessionCount > this.maxEmptySessions || session[kGracefullyClosing]) { session.close(); return; } } if (session.destroyed || session.closed) { return; } if (isFree() && !closeSessionIfCovered(this.sessions[normalizedOptions], session)) { closeCoveredSessions(this.sessions[normalizedOptions], session); processListeners(); if (session[kCurrentStreamCount] === 0) { this._processQueue(); } } }); return stream; }; } catch (error) { removeFromQueue(); this._sessionCount--; for (let index = 0; index < listeners.length; index++) { listeners[index].reject(error); } } }; entry.listeners = listeners; entry.completed = false; entry.destroyed = false; this.queue[normalizedOptions][normalizedOrigin] = entry; this.queue[normalizedOptions][kLength]++; this._processQueue(); }); } request(origin, options, headers, streamOptions) { return new Promise((resolve, reject) => { this.getSession(origin, options, [{ reject, resolve: session => { try { const stream = session.request(headers, streamOptions); // Do not throw before `request(...)` has been awaited delayAsyncDestroy(stream); resolve(stream); } catch (error) { reject(error); } } }]); }); } async createConnection(origin, options) { return Agent.connect(origin, options); } static connect(origin, options) { options.ALPNProtocols = ['h2']; const port = origin.port || 443; const host = origin.hostname; if (typeof options.servername === 'undefined') { options.servername = host; } const socket = tls.connect(port, host, options); if (options.socket) { socket._peername = { family: undefined, address: undefined, port }; } return socket; } closeEmptySessions(maxCount = Number.POSITIVE_INFINITY) { let closedCount = 0; const {sessions} = this; // eslint-disable-next-line guard-for-in for (const key in sessions) { const thisSessions = sessions[key]; for (let index = 0; index < thisSessions.length; index++) { const session = thisSessions[index]; if (session[kCurrentStreamCount] === 0) { closedCount++; session.close(); if (closedCount >= maxCount) { return closedCount; } } } } return closedCount; } destroy(reason) { const {sessions, queue} = this; // eslint-disable-next-line guard-for-in for (const key in sessions) { const thisSessions = sessions[key]; for (let index = 0; index < thisSessions.length; index++) { thisSessions[index].destroy(reason); } } // eslint-disable-next-line guard-for-in for (const normalizedOptions in queue) { const entries = queue[normalizedOptions]; // eslint-disable-next-line guard-for-in for (const normalizedOrigin in entries) { entries[normalizedOrigin].destroyed = true; } } // New requests should NOT attach to destroyed sessions this.queue = {}; this.tlsSessionCache.clear(); } get emptySessionCount() { return this._emptySessionCount; } get pendingSessionCount() { return this._sessionCount - this._emptySessionCount; } get sessionCount() { return this._sessionCount; } } Agent.kCurrentStreamCount = kCurrentStreamCount; Agent.kGracefullyClosing = kGracefullyClosing; module.exports = { Agent, globalAgent: new Agent() };