'use strict'; const assert = require('assert'); const async = require('async'); const Error = require('http-errors'); const Stream = require('stream'); const Local = require('./local-storage'); const Logger = require('./logger'); const MyStreams = require('./streams'); const Proxy = require('./up-storage'); const Utils = require('./utils'); // // Implements Storage interface // (same for storage.js, local-storage.js, up-storage.js) // class Storage { /** * * @param {*} config */ constructor(config) { this.config = config; // we support a number of uplinks, but only one local storage // Proxy and Local classes should have similar API interfaces this.uplinks = {}; for (let p in config.uplinks) { // instance for each up-link definition this.uplinks[p] = new Proxy(config.uplinks[p], config); this.uplinks[p].upname = p; } // an instance for local storage this.local = new Local(config); this.logger = Logger.logger.child(); } /** * Add a {name} package to a system Function checks if package with the same name is available from uplinks. If it isn't, we create package locally Used storages: local (write) && uplinks * @param {*} name * @param {*} metadata * @param {*} callback */ add_package(name, metadata, callback) { let self = this; // NOTE: // - when we checking package for existance, we ask ALL uplinks // - when we publishing package, we only publish it to some of them // so all requests are necessary check_package_local(function(err) { if (err) return callback(err); check_package_remote(function(err) { if (err) return callback(err); publish_package(function(err) { if (err) return callback(err); callback(); }); }); }); function check_package_local(cb) { self.local.get_package(name, {}, function(err, results) { if (err && err.status !== 404) return cb(err); if (results) return cb( Error[409]('this package is already present') ); cb(); }); } function check_package_remote(cb) { self._sync_package_with_uplinks(name, null, {}, function(err, results, err_results) { // something weird if (err && err.status !== 404) return cb(err); // checking package if (results) return cb( Error[409]('this package is already present') ); for (let i=0; i { if (err && (!err.status || err.status >= 500)) { // report internal errors right away return callback(err); } this._sync_package_with_uplinks(name, data, options, function(err, result, uplink_errors) { if (err) return callback(err); const whitelist = ['_rev', 'name', 'versions', 'dist-tags', 'readme']; for (let i in result) { if (whitelist.indexOf(i) === -1) delete result[i]; } Utils.normalize_dist_tags(result); // npm can throw if this field doesn't exist result._attachments = {}; callback(null, result, uplink_errors); }); }); } /** Retrieve remote and local packages more recent than {startkey} Function streams all packages from all uplinks first, and then local packages. Note that local packages could override registry ones just because they appear in JSON last. That's a trade-off we make to avoid memory issues. Used storages: local && uplink (proxy_access) * @param {*} startkey * @param {*} options */ search(startkey, options) { let self = this; let stream = new Stream.PassThrough({objectMode: true}); async.eachSeries(Object.keys(this.uplinks), function(up_name, cb) { // shortcut: if `local=1` is supplied, don't call uplinks if (options.req.query.local !== undefined) return cb(); let lstream = self.uplinks[up_name].search(startkey, options); lstream.pipe(stream, {end: false}); lstream.on('error', function(err) { self.logger.error({err: err}, 'uplink error: @{err.message}'); cb(), cb = function() {}; }); lstream.on('end', function() { cb(), cb = function() {}; }); stream.abort = function() { if (lstream.abort) lstream.abort(); cb(), cb = function() {}; }; }, function() { let lstream = self.local.search(startkey, options); stream.abort = function() { lstream.abort(); }; lstream.pipe(stream, {end: true}); lstream.on('error', function(err) { self.logger.error({err: err}, 'search error: @{err.message}'); stream.end(); }); }); return stream; } /** * * @param {*} callback */ get_local(callback) { let self = this; let locals = this.config.localList.get(); let packages = []; var getPackage = function(i) { self.local.get_package(locals[i], function(err, info) { if (!err) { let latest = info['dist-tags'].latest; if (latest && info.versions[latest]) { packages.push(info.versions[latest]); } else { self.logger.warn( {package: locals[i]} , 'package @{package} does not have a "latest" tag?' ); } } if (i >= locals.length - 1) { callback(null, packages); } else { getPackage(i + 1); } }); }; if (locals.length) { getPackage(0); } else { callback(null, []); } } /** * Function fetches package information from uplinks and synchronizes it with local data if package is available locally, it MUST be provided in pkginfo returns callback(err, result, uplink_errors) * @param {*} name * @param {*} pkginfo * @param {*} options * @param {*} callback */ _sync_package_with_uplinks(name, pkginfo, options, callback) { let self = this; let exists = false; if (!pkginfo) { exists = false; pkginfo = { 'name': name, 'versions': {}, 'dist-tags': {}, '_uplinks': {}, }; } else { exists = true; } let uplinks = []; for (let i in self.uplinks) { if (self.config.can_proxy_to(name, i)) { uplinks.push(self.uplinks[i]); } } async.map(uplinks, function(up, cb) { let _options = Object.assign({}, options); if (Utils.is_object(pkginfo._uplinks[up.upname])) { let fetched = pkginfo._uplinks[up.upname].fetched; if (fetched && fetched > (Date.now() - up.maxage)) { return cb(); } _options.etag = pkginfo._uplinks[up.upname].etag; } up.get_package(name, _options, function(err, up_res, etag) { if (err && err.status === 304) pkginfo._uplinks[up.upname].fetched = Date.now(); if (err || !up_res) return cb(null, [err || Error('no data')]); try { Utils.validate_metadata(up_res, name); } catch(err) { self.logger.error({ sub: 'out', err: err, }, 'package.json validating error @{!err.message}\n@{err.stack}'); return cb(null, [err]); } pkginfo._uplinks[up.upname] = { etag: etag, fetched: Date.now(), }; for (let i in up_res.versions) { // this won't be serialized to json, // kinda like an ES6 Symbol // FIXME: perhaps Symbol('_verdaccio_uplink') here? Object.defineProperty(up_res.versions[i], '_verdaccio_uplink', { value: up.upname, enumerable: false, configurable: false, writable: true, }); } try { Storage._merge_versions(pkginfo, up_res, self.config); } catch(err) { self.logger.error({ sub: 'out', err: err, }, 'package.json parsing error @{!err.message}\n@{err.stack}'); return cb(null, [err]); } // if we got to this point, assume that the correct package exists // on the uplink exists = true; cb(); }); }, function(err, uplink_errors) { assert(!err && Array.isArray(uplink_errors)); if (!exists) { return callback( Error[404]('no such package available') , null , uplink_errors ); } self.local.update_versions(name, pkginfo, function(err, pkginfo) { if (err) return callback(err); return callback(null, pkginfo, uplink_errors); }); }); } /** * Function gets a local info and an info from uplinks and tries to merge it exported for unit tests only. * @param {*} local * @param {*} up * @param {*} config */ static _merge_versions(local, up, config) { // copy new versions to a cache // NOTE: if a certain version was updated, we can't refresh it reliably for (var i in up.versions) { if (local.versions[i] == null) { local.versions[i] = up.versions[i]; } } // refresh dist-tags for (var i in up['dist-tags']) { if (local['dist-tags'][i] !== up['dist-tags'][i]) { local['dist-tags'][i] = up['dist-tags'][i]; if (i === 'latest') { // if remote has more fresh package, we should borrow its readme local.readme = up.readme; } } } } } module.exports = Storage;