Refactored addPackage on Storage

This commit is contained in:
Juan Picado @jotadeveloper 2017-06-03 23:41:03 +02:00
parent 391e98de9f
commit 09ca08baaf
No known key found for this signature in database
GPG Key ID: 18AC54485952D158
2 changed files with 90 additions and 86 deletions

View File

@ -412,7 +412,7 @@ module.exports = function(config, auth, storage) {
after_change(err, 'package changed');
});
} else {
storage.add_package(name, metadata, function(err) {
storage.addPackage(name, metadata, function(err) {
after_change(err, 'created new package');
});
}

View File

@ -61,85 +61,89 @@ class Storage {
* @param {*} metadata
* @param {*} callback
*/
add_package(name, metadata, callback) {
let self = this;
addPackage(name, metadata, callback) {
const self = this;
/**
* Check whether a package it is already a local package
* @param {*} cb the callback method
* @return {Promise}
*/
const checkPackageLocal = () => {
return new Promise((resolve, reject) => {
this.localStorage.getPackage(name, {}, (err, results) => {
if (!_.isNil(err) && err.status !== 404) {
return reject(err);
}
if (results) {
return reject(Error[409]('this package is already present'));
}
return resolve();
});
});
};
/**
* Check whether a package exist in any of the uplinks.
* @param {*} cb the callback method
* @return {Promise}
*/
const check_package_remote = () => {
return new Promise((resolve, reject) => {
self._sync_package_with_uplinks(name, null, {}, (err, results, err_results) => {
// something weird
if (err && err.status !== 404) {
return reject(err);
}
// checking package
if (results) {
return reject(Error[409]('this package is already present'));
}
for (let i = 0; i < err_results.length; i++) {
// checking error
// if uplink fails with a status other than 404, we report failure
if (err_results[i][0] != null) {
if (err_results[i][0].status !== 404) {
return reject(Error[503]('one of the uplinks is down, refuse to publish'));
}
}
}
return resolve();
});
});
};
/**
* Add a package to the local database
* @param {*} cb callback method
* @return {Promise}
*/
const publish_package = () => {
return new Promise((resolve, reject) => {
self.localStorage.addPackage(name, metadata, (err, latest) => {
if (!_.isNull(err)) {
return reject(err);
} else if (!_.isUndefined(latest)) {
Search.add(latest);
}
return resolve();
});
});
};
// NOTE:
// - when we checking package for existance, we ask ALL uplinks
// - when we publishing package, we only publish it to some of them
// so all requests are necessary
check_package_local(function(err) {
if (err) return callback(err);
check_package_remote(function(err) {
if (err) return callback(err);
publish_package(function(err) {
if (err) return callback(err);
callback();
});
});
});
/**
* Check whether a package it is already a local package
* @param {*} cb the callback method
*/
function check_package_local(cb) {
self.localStorage.getPackage(name, {}, function(err, results) {
if (err && err.status !== 404) {
return cb(err);
}
if (results) {
return cb( Error[409]('this package is already present') );
}
cb();
});
}
/**
* Check whether a package exist in any of the uplinks.
* @param {*} cb the callback method
*/
function check_package_remote(cb) {
self._sync_package_with_uplinks(name, null, {}, function(err, results, err_results) {
// something weird
if (err && err.status !== 404) {
return cb(err);
}
// checking package
if (results) {
return cb( Error[409]('this package is already present') );
}
for (let i=0; i<err_results.length; i++) {
// checking error
// if uplink fails with a status other than 404, we report failure
if (err_results[i][0] != null) {
if (err_results[i][0].status !== 404) {
return cb( Error[503]('one of the uplinks is down, refuse to publish') );
}
}
}
return cb();
});
}
/**
* Add a package to the local database
* @param {*} cb callback method
*/
function publish_package(cb) {
self.localStorage.addPackage(name, metadata, (err, latest) => {
if (!_.isNull(err)) {
return cb(err);
} else if (!_.isUndefined(latest)) {
Search.add(latest);
}
return cb();
});
}
checkPackageLocal()
.then(() => {
return check_package_remote().then(() => {
return publish_package().then(() => {
callback();
}, (err) => callback(err));
}, (err) => callback(err));
}, (err) => callback(err));
}
/**
@ -241,8 +245,8 @@ class Storage {
* @return {Stream}
*/
get_tarball(name, filename) {
let stream = new MyStreams.ReadTarball();
stream.abort = function() {};
let readStream = new MyStreams.ReadTarball();
readStream.abort = function() {};
let self = this;
@ -254,7 +258,7 @@ class Storage {
let is_open = false;
rstream.on('error', function(err) {
if (is_open || err.status !== 404) {
return stream.emit('error', err);
return readStream.emit('error', err);
}
// local reported 404
@ -269,10 +273,10 @@ class Storage {
// we know nothing about this file, trying to get information elsewhere
self._sync_package_with_uplinks(name, info, {}, function(err, info) {
if (err) {
return stream.emit('error', err);
return readStream.emit('error', err);
}
if (!info._distfiles || info._distfiles[filename] == null) {
return stream.emit('error', err404);
return readStream.emit('error', err404);
}
serve_file(info._distfiles[filename]);
});
@ -280,13 +284,13 @@ class Storage {
});
});
rstream.on('content-length', function(v) {
stream.emit('content-length', v);
readStream.emit('content-length', v);
});
rstream.on('open', function() {
is_open = true;
rstream.pipe(stream);
rstream.pipe(readStream);
});
return stream;
return readStream;
/**
* Fetch and cache local/remote packages.
@ -319,7 +323,7 @@ class Storage {
savestream.abort();
}
savestream = null;
stream.emit('error', err);
readStream.emit('error', err);
});
rstream2.on('end', function() {
if (savestream) {
@ -328,12 +332,12 @@ class Storage {
});
rstream2.on('content-length', function(v) {
stream.emit('content-length', v);
readStream.emit('content-length', v);
if (savestream) {
savestream.emit('content-length', v);
}
});
rstream2.pipe(stream);
rstream2.pipe(readStream);
if (savestream) {
rstream2.pipe(savestream);
}