'use strict'; const assert = require('assert'); const async = require('async'); const Crypto = require('crypto'); const fs = require('fs'); const Error = require('http-errors'); const Path = require('path'); const Stream = require('readable-stream'); const URL = require('url'); const fs_storage = require('./local-fs'); const Logger = require('./logger'); const Search = require('./search'); const MyStreams = require('./streams'); const Utils = require('./utils'); const info_file = 'package.json'; // returns the minimal package file function get_boilerplate(name) { return { // standard things 'name': name, 'versions': {}, 'dist-tags': {}, // our own object '_distfiles': {}, '_attachments': {}, '_uplinks': {}, }; } // // Implements Storage interface // (same for storage.js, local-storage.js, up-storage.js) // class Storage { constructor(config) { this.config = config; this.logger = Logger.logger.child({sub: 'fs'}); } /** * * @param {*} err * @param {*} file * @param {*} message */ _internal_error(err, file, message) { this.logger.error( {err: err, file: file} , message + ' @{file}: @{!err.message}' ); return Error[500](); } /** * * @param {*} name * @param {*} info * @param {*} callback */ add_package(name, info, callback) { let storage = this.storage(name); if (!storage) return callback( Error[404]('this package cannot be added') ); storage.create_json(info_file, get_boilerplate(name), function(err) { if (err && err.code === 'EEXISTS') { return callback( Error[409]('this package is already present') ); } let latest = info['dist-tags'].latest; if (latest && info.versions[latest]) { Search.add(info.versions[latest]); } callback(); }); } /** * * @param {*} name * @param {*} callback */ remove_package(name, callback) { this.logger.info( {name: name} , 'unpublishing @{name} (all)'); let storage = this.storage(name); if (!storage) return callback( Error[404]('no such package available') ); storage.read_json(info_file, (err, data) => { if (err) { if (err.code === 'ENOENT') { return callback( Error[404]('no such package available') ); } else { return callback(err); } } this._normalize_package(data); storage.unlink(info_file, function(err) { if (err) return callback(err); let files = Object.keys(data._attachments); function unlinkNext(cb) { if (files.length === 0) return cb(); let file = files.shift(); storage.unlink(file, function() { unlinkNext(cb); }); } unlinkNext(function() { // try to unlink the directory, but ignore errors because it can fail storage.rmdir('.', function(err) { callback(err); }); }); }); }); Search.remove(name); this.config.localList.remove(name); } /** * * @param {*} name * @param {*} callback */ _read_create_package(name, callback) { let storage = this.storage(name); if (!storage) { let data = get_boilerplate(name); this._normalize_package(data); return callback(null, data); } storage.read_json(info_file, (err, data) => { // TODO: race condition if (err) { if (err.code === 'ENOENT') { // if package doesn't exist, we create it here data = get_boilerplate(name); } else { return callback(this._internal_error(err, info_file, 'error reading')); } } this._normalize_package(data); callback(null, data); }); } /** * Synchronize remote package info with the local one * @param {*} name * @param {*} newdata * @param {*} callback */ update_versions(name, newdata, callback) { this._read_create_package(name, (err, data) => { if (err) return callback(err); let change = false; for (let ver in newdata.versions) { if (data.versions[ver] == null) { let verdata = newdata.versions[ver]; // we don't keep readmes for package versions, // only one readme per package delete verdata.readme; change = true; data.versions[ver] = verdata; if (verdata.dist && verdata.dist.tarball) { let filename = URL.parse(verdata.dist.tarball).pathname.replace(/^.*\//, ''); // we do NOT overwrite any existing records if (data._distfiles[filename] == null) { let hash = data._distfiles[filename] = { url: verdata.dist.tarball, sha: verdata.dist.shasum, }; // if (verdata[Symbol('_verdaccio_uplink')]) { if (verdata._verdaccio_uplink) { // if we got this information from a known registry, // use the same protocol for the tarball // // see https://github.com/rlidwka/sinopia/issues/166 let tarball_url = URL.parse(hash.url); let uplink_url = URL.parse(this.config.uplinks[verdata._verdaccio_uplink].url); if (uplink_url.host === tarball_url.host) { tarball_url.protocol = uplink_url.protocol; hash.registry = verdata._verdaccio_uplink; hash.url = URL.format(tarball_url); } } } } } } for (let tag in newdata['dist-tags']) { if (!data['dist-tags'][tag] || data['dist-tags'][tag] !== newdata['dist-tags'][tag]) { change = true; data['dist-tags'][tag] = newdata['dist-tags'][tag]; } } for (let up in newdata._uplinks) { let need_change = !Utils.is_object(data._uplinks[up]) || newdata._uplinks[up].etag !== data._uplinks[up].etag || newdata._uplinks[up].fetched !== data._uplinks[up].fetched; if (need_change) { change = true; data._uplinks[up] = newdata._uplinks[up]; } } if (newdata.readme !== data.readme) { data.readme = newdata.readme; change = true; } if (change) { this.logger.debug('updating package info'); this._write_package(name, data, function(err) { callback(err, data); }); } else { callback(null, data); } }); } /** * * @param {*} name * @param {*} version * @param {*} metadata * @param {*} tag * @param {*} callback */ add_version(name, version, metadata, tag, callback) { this.update_package(name, (data, cb) => { // keep only one readme per package data.readme = metadata.readme; delete metadata.readme; if (data.versions[version] != null) { return cb( Error[409]('this version already present') ); } // if uploaded tarball has a different shasum, it's very likely that we have some kind of error if (Utils.is_object(metadata.dist) && typeof(metadata.dist.tarball) === 'string') { let tarball = metadata.dist.tarball.replace(/.*\//, ''); if (Utils.is_object(data._attachments[tarball])) { if (data._attachments[tarball].shasum != null && metadata.dist.shasum != null) { if (data._attachments[tarball].shasum != metadata.dist.shasum) { return cb( Error[400]('shasum error, ' + data._attachments[tarball].shasum + ' != ' + metadata.dist.shasum) ); } } data._attachments[tarball].version = version; } } data.versions[version] = metadata; Utils.tag_version(data, version, tag); this.config.localList.add(name); cb(); }, callback); } /** * * @param {*} name * @param {*} tags * @param {*} callback */ merge_tags(name, tags, callback) { this.update_package(name, function updater(data, cb) { for (let t in tags) { if (tags[t] === null) { delete data['dist-tags'][t]; continue; } if (data.versions[tags[t]] == null) { return cb( Error[404]('this version doesn\'t exist') ); } Utils.tag_version(data, tags[t], t); } cb(); }, callback); } /** * * @param {*} name * @param {*} tags * @param {*} callback */ replace_tags(name, tags, callback) { this.update_package(name, function updater(data, cb) { data['dist-tags'] = {}; for (let t in tags) { if (tags[t] === null) { delete data['dist-tags'][t]; continue; } if (data.versions[tags[t]] == null) { return cb( Error[404]('this version doesn\'t exist') ); } Utils.tag_version(data, tags[t], t); } cb(); }, callback); } /** * Currently supports unpublishing only * @param {*} name * @param {*} metadata * @param {*} revision * @param {*} callback */ change_package(name, metadata, revision, callback) { if (!Utils.is_object(metadata.versions) || !Utils.is_object(metadata['dist-tags'])) { return callback( Error[422]('bad data') ); } this.update_package(name, (data, cb) => { for (let ver in data.versions) { if (metadata.versions[ver] == null) { this.logger.info( {name: name, version: ver} , 'unpublishing @{name}@@{version}'); delete data.versions[ver]; for (let file in data._attachments) { if (data._attachments[file].version === ver) { delete data._attachments[file].version; } } } } data['dist-tags'] = metadata['dist-tags']; cb(); }, function(err) { if (err) return callback(err); callback(); }); } /** * * @param {*} name * @param {*} filename * @param {*} revision * @param {*} callback */ remove_tarball(name, filename, revision, callback) { assert(Utils.validate_name(filename)); this.update_package(name, (data, cb) => { if (data._attachments[filename]) { delete data._attachments[filename]; cb(); } else { cb(Error[404]('no such file available')); } }, function(err) { if (err) return callback(err); let storage = this.storage(name); if (storage) storage.unlink(filename, callback); }); } /** * * @param {*} name * @param {*} filename */ add_tarball(name, filename) { assert(Utils.validate_name(filename)); let stream = MyStreams.UploadTarballStream(); let _transform = stream._transform; let length = 0; let shasum = Crypto.createHash('sha1'); stream.abort = stream.done = function() {}; stream._transform = function(data) { shasum.update(data); length += data.length; _transform.apply(stream, arguments); }; if (name === info_file || name === '__proto__') { process.nextTick(function() { stream.emit('error', Error[403]('can\'t use this filename')); }); return stream; } let storage = this.storage(name); if (!storage) { process.nextTick(function() { stream.emit('error', Error[404]('can\'t upload this package')); }); return stream; } let wstream = storage.write_stream(filename); wstream.on('error', (err) => { if (err.code === 'EEXISTS') { stream.emit('error', Error[409]('this tarball is already present')); } else if (err.code === 'ENOENT') { // check if package exists to throw an appropriate message this.get_package(name, function(_err, res) { if (_err) { stream.emit('error', _err); } else { stream.emit('error', err); } }); } else { stream.emit('error', err); } }); wstream.on('open', function() { // re-emitting open because it's handled in storage.js stream.emit('open'); }); wstream.on('success', () => { this.update_package(name, function updater(data, cb) { data._attachments[filename] = { shasum: shasum.digest('hex'), }; cb(); }, function(err) { if (err) { stream.emit('error', err); } else { stream.emit('success'); } }); }); stream.abort = function() { wstream.abort(); }; stream.done = function() { if (!length) { stream.emit('error', Error[422]('refusing to accept zero-length file')); wstream.abort(); } else { wstream.done(); } }; stream.pipe(wstream); return stream; } /** * * @param {*} name * @param {*} filename * @param {*} callback */ get_tarball(name, filename, callback) { assert(Utils.validate_name(filename)); let self = this; let stream = MyStreams.ReadTarballStream(); stream.abort = function() { if (rstream) rstream.abort(); }; let storage = self.storage(name); if (!storage) { process.nextTick(function() { stream.emit('error', Error[404]('no such file available')); }); return stream; } var rstream = storage.read_stream(filename); rstream.on('error', function(err) { if (err && err.code === 'ENOENT') { stream.emit('error', Error(404, 'no such file available')); } else { stream.emit('error', err); } }); rstream.on('content-length', function(v) { stream.emit('content-length', v); }); rstream.on('open', function() { // re-emitting open because it's handled in storage.js stream.emit('open'); rstream.pipe(stream); }); return stream; } /** * * @param {*} name * @param {*} options * @param {*} callback */ get_package(name, options, callback) { if (typeof(options) === 'function') { callback = options, options = {}; } let storage = this.storage(name); if (!storage) return callback( Error[404]('no such package available') ); storage.read_json(info_file, (err, result) => { if (err) { if (err.code === 'ENOENT') { return callback( Error[404]('no such package available') ); } else { return callback(this._internal_error(err, info_file, 'error reading')); } } this._normalize_package(result); callback(err, result); }); } /** * Walks through each package and calls `on_package` on them * @param {*} on_package * @param {*} on_end */ _each_package(on_package, on_end) { let storages = {}; storages[this.config.storage] = true; if (this.config.packages) { Object.keys(this.packages || {}).map( (pkg) => { if (this.config.packages[pkg].storage) { storages[this.config.packages[pkg].storage] = true; } }); } const base = Path.dirname(this.config.self_path); async.eachSeries(Object.keys(storages), function(storage, cb) { fs.readdir(Path.resolve(base, storage), function(err, files) { if (err) return cb(err); async.eachSeries(files, function(file, cb) { if (file.match(/^@/)) { // scoped fs.readdir(Path.resolve(base, storage, file), function(err, files) { if (err) return cb(err); async.eachSeries(files, function(file2, cb) { if (Utils.validate_name(file2)) { on_package({ name: `${file}/${file2}`, path: Path.resolve(base, storage, file, file2), }, cb); } else { cb(); } }, cb); }); } else if (Utils.validate_name(file)) { on_package({ name: file, path: Path.resolve(base, storage, file), }, cb); } else { cb(); } }, cb); }); }, on_end); } /** * This function allows to update the package thread-safely Algorithm: 1. lock package.json for writing 2. read package.json 3. updateFn(pkg, cb), and wait for cb 4. write package.json.tmp 5. move package.json.tmp package.json 6. callback(err?) * @param {*} name package name * @param {*} updateFn function(package, cb) - update function * @param {*} _callback callback that gets invoked after it's all updated */ update_package(name, updateFn, _callback) { let self = this; let storage = self.storage(name); if (!storage) return _callback( Error[404]('no such package available') ); storage.lock_and_read_json(info_file, function(err, json) { let locked = false; // callback that cleans up lock first function callback(err) { let _args = arguments; if (locked) { storage.unlock_file(info_file, function() { // ignore any error from the unlock _callback.apply(err, _args); }); } else { _callback.apply(null, _args); } } if (!err) { locked = true; } if (err) { if (err.code === 'EAGAIN') { return callback( Error[503]('resource temporarily unavailable') ); } else if (err.code === 'ENOENT') { return callback( Error[404]('no such package available') ); } else { return callback(err); } } self._normalize_package(json); updateFn(json, function(err) { if (err) return callback(err); self._write_package(name, json, callback); }); }); } /** * * @param {*} startkey * @param {*} options */ search(startkey, options) { const stream = new Stream.PassThrough({objectMode: true}); this._each_package((item, cb) => { fs.stat(item.path, (err, stats) => { if (err) { return cb(err); } if (stats.mtime > startkey) { this.get_package(item.name, options, function(err, data) { if (err) { return cb(err); } let versions = Utils.semver_sort(Object.keys(data.versions)); let latest = data['dist-tags'] && data['dist-tags'].latest ? data['dist-tags'].latest : versions.pop(); if (data.versions[latest]) { stream.push({ 'name': data.versions[latest].name, 'description': data.versions[latest].description, 'dist-tags': {latest: latest}, 'maintainers': data.versions[latest].maintainers || [data.versions[latest].author].filter(Boolean), 'author': data.versions[latest].author, 'repository': data.versions[latest].repository, 'readmeFilename': data.versions[latest].readmeFilename || '', 'homepage': data.versions[latest].homepage, 'keywords': data.versions[latest].keywords, 'bugs': data.versions[latest].bugs, 'license': data.versions[latest].license, 'time': { modified: item.time ? new Date(item.time).toISOString() : undefined, }, 'versions': {}, }); } cb(); }); } else { cb(); } }); }, function on_end(err) { if (err) return stream.emit('error', err); stream.end(); }); return stream; } /** * * @param {*} pkg */ _normalize_package(pkg) { ['versions', 'dist-tags', '_distfiles', '_attachments', '_uplinks'].forEach(function(key) { if (!Utils.is_object(pkg[key])) pkg[key] = {}; }); if (typeof(pkg._rev) !== 'string') { pkg._rev = '0-0000000000000000'; } // normalize dist-tags Utils.normalize_dist_tags(pkg); } /** * * @param {*} name * @param {*} json * @param {*} callback */ _write_package(name, json, callback) { // calculate revision a la couchdb if (typeof(json._rev) !== 'string') { json._rev = '0-0000000000000000'; } let rev = json._rev.split('-'); json._rev = ((+rev[0] || 0) + 1) + '-' + Crypto.pseudoRandomBytes(8).toString('hex'); let storage = this.storage(name); if (!storage) return callback(); storage.write_json(info_file, json, callback); } /** * * @param {*} pkg */ storage(pkg) { let path = this.config.get_package_spec(pkg).storage; if (path == null) { path = this.config.storage; } if (path == null || path === false) { this.logger.debug( {name: pkg} , 'this package has no storage defined: @{name}' ); return null; } return Path_Wrapper( Path.join( Path.resolve(Path.dirname(this.config.self_path || ''), path), pkg ) ); } } var Path_Wrapper = (function() { // a wrapper adding paths to fs_storage methods function Wrapper(path) { let self = Object.create(Wrapper.prototype); self.path = path; return self; } for (let i in fs_storage) { if (fs_storage.hasOwnProperty(i)) { Wrapper.prototype[i] = wrapper(i); } } function wrapper(method) { return function(/* ...*/) { let args = Array.prototype.slice.apply(arguments); args[0] = Path.join(this.path, args[0] || ''); return fs_storage[method].apply(null, args); }; } return Wrapper; })(); module.exports = Storage;