var assert = require('assert') var async = require('async') var Error = require('http-errors') var Stream = require('stream') var Local = require('./local-storage') var Logger = require('./logger') var MyStreams = require('./streams') var Proxy = require('./up-storage') var Utils = require('./utils') module.exports = Storage // // Implements Storage interface // (same for storage.js, local-storage.js, up-storage.js) // function Storage(config) { var self = Object.create(Storage.prototype) self.config = config // we support a number of uplinks, but only one local storage // Proxy and Local classes should have similar API interfaces self.uplinks = {} for (var p in config.uplinks) { self.uplinks[p] = Proxy(config.uplinks[p], config) self.uplinks[p].upname = p } self.local = Local(config) self.logger = Logger.logger.child() return self } // // Add a {name} package to a system // // Function checks if package with the same name is available from uplinks. // If it isn't, we create package locally // // Used storages: local (write) && uplinks // Storage.prototype.add_package = function(name, metadata, callback) { var self = this // NOTE: // - when we checking package for existance, we ask ALL uplinks // - when we publishing package, we only publish it to some of them // so all requests are necessary check_package_local(function(err) { if (err) return callback(err) check_package_remote(function(err) { if (err) return callback(err) publish_package(function(err) { if (err) return callback(err) callback() }) }) }) function check_package_local(cb) { self.local.get_package(name, {}, function(err, results) { if (err && err.status !== 404) return cb(err) if (results) return cb( Error[409]('this package is already present') ) cb() }) } function check_package_remote(cb) { self._sync_package_with_uplinks(name, null, {}, function(err, results, err_results) { // something weird if (err && err.status !== 404) return cb(err) // checking package if (results) return cb( Error[409]('this package is already present') ) for (var i=0; i= 500)) { // report internal errors right away return callback(err) } self._sync_package_with_uplinks(name, data, options, function(err, result, uplink_errors) { if (err) return callback(err) var whitelist = [ '_rev', 'name', 'versions', 'dist-tags', 'readme' ] for (var i in result) { if (whitelist.indexOf(i) === -1) delete result[i] } Utils.normalize_dist_tags(result) // npm can throw if this field doesn't exist result._attachments = {} callback(null, result, uplink_errors) }) }) } // // Retrieve remote and local packages more recent than {startkey} // // Function streams all packages from all uplinks first, and then // local packages. // // Note that local packages could override registry ones just because // they appear in JSON last. That's a trade-off we make to avoid // memory issues. // // Used storages: local && uplink (proxy_access) // Storage.prototype.search = function(startkey, options) { var self = this var stream = new Stream.PassThrough({ objectMode: true }) async.eachSeries(Object.keys(self.uplinks), function(up_name, cb) { // shortcut: if `local=1` is supplied, don't call uplinks if (options.req.query.local !== undefined) return cb() var lstream = self.uplinks[up_name].search(startkey, options) lstream.pipe(stream, { end: false }) lstream.on('error', function (err) { self.logger.error({ err: err }, 'uplink error: @{err.message}') cb(), cb = function () {} }) lstream.on('end', function () { cb(), cb = function () {} }) stream.abort = function () { if (lstream.abort) lstream.abort() cb(), cb = function () {} } }, function () { var lstream = self.local.search(startkey, options) stream.abort = function () { lstream.abort() } lstream.pipe(stream, { end: true }) lstream.on('error', function (err) { self.logger.error({ err: err }, 'search error: @{err.message}') stream.end() }) }) return stream } Storage.prototype.get_local = function(callback) { var self = this var locals = this.config.localList.get() var packages = [] var getPackage = function(i) { self.local.get_package(locals[i], function(err, info) { if (!err) { var latest = info['dist-tags'].latest if (latest && info.versions[latest]) { packages.push(info.versions[latest]) } else { self.logger.warn( { package: locals[i] } , 'package @{package} does not have a "latest" tag?' ) } } if (i >= locals.length - 1) { callback(null, packages) } else { getPackage(i + 1) } }) } if (locals.length) { getPackage(0) } else { callback(null, []) } } // function fetches package information from uplinks and synchronizes it with local data // if package is available locally, it MUST be provided in pkginfo // returns callback(err, result, uplink_errors) Storage.prototype._sync_package_with_uplinks = function(name, pkginfo, options, callback) { var self = this if (!pkginfo) { var exists = false pkginfo = { name : name, versions : {}, 'dist-tags' : {}, _uplinks : {}, } } else { var exists = true } var uplinks = [] for (var i in self.uplinks) { if (self.config.can_proxy_to(name, i)) { uplinks.push(self.uplinks[i]) } } async.map(uplinks, function(up, cb) { var _options = Object.assign({}, options) if (Utils.is_object(pkginfo._uplinks[up.upname])) { var fetched = pkginfo._uplinks[up.upname].fetched if (fetched && fetched > (Date.now() - up.maxage)) { return cb() } _options.etag = pkginfo._uplinks[up.upname].etag } up.get_package(name, _options, function(err, up_res, etag) { if (err && err.status === 304) pkginfo._uplinks[up.upname].fetched = Date.now() if (err || !up_res) return cb(null, [err || Error('no data')]) try { Utils.validate_metadata(up_res, name) } catch(err) { self.logger.error({ sub: 'out', err: err, }, 'package.json validating error @{!err.message}\n@{err.stack}') return cb(null, [ err ]) } pkginfo._uplinks[up.upname] = { etag: etag, fetched: Date.now() } for (var i in up_res.versions) { // this won't be serialized to json, // kinda like an ES6 Symbol Object.defineProperty(up_res.versions[i], '_sinopia_uplink', { value : up.upname, enumerable : false, configurable : false, writable : true, }) } try { Storage._merge_versions(pkginfo, up_res, self.config) } catch(err) { self.logger.error({ sub: 'out', err: err, }, 'package.json parsing error @{!err.message}\n@{err.stack}') return cb(null, [ err ]) } // if we got to this point, assume that the correct package exists // on the uplink exists = true cb() }) }, function(err, uplink_errors) { assert(!err && Array.isArray(uplink_errors)) if (!exists) { return callback( Error[404]('no such package available') , null , uplink_errors ) } self.local.update_versions(name, pkginfo, function(err, pkginfo) { if (err) return callback(err) return callback(null, pkginfo, uplink_errors) }) }) } // function gets a local info and an info from uplinks and tries to merge it // exported for unit tests only Storage._merge_versions = function(local, up, config) { // copy new versions to a cache // NOTE: if a certain version was updated, we can't refresh it reliably for (var i in up.versions) { if (local.versions[i] == null) { local.versions[i] = up.versions[i] } } // refresh dist-tags for (var i in up['dist-tags']) { if (local['dist-tags'][i] !== up['dist-tags'][i]) { local['dist-tags'][i] = up['dist-tags'][i] if (i === 'latest') { // if remote has more fresh package, we should borrow its readme local.readme = up.readme } } } }