var URL = require('url') , request = require('request') , UError = require('./error').UserError , mystreams = require('./streams') , Logger = require('./logger') , utils = require('./utils') // // Implements Storage interface // (same for storage.js, local-storage.js, up-storage.js) // function Storage(config, mainconfig) { if (!(this instanceof Storage)) return new Storage(config) this.config = config this.is_alive = false this.userAgent = mainconfig.user_agent this.ca = config.ca this.logger = Logger.logger.child({sub: 'out'}) this.url = URL.parse(this.config.url) if (this.url.hostname === 'registry.npmjs.org') { this.ca = this.ca || require('./npmsslkeys') // npm registry is too slow working with ssl :( /*if (this.config._autogenerated) { // encrypt all the things! this.url.protocol = 'https' this.config.url = URL.format(this.url) }*/ } this.config.url = this.config.url.replace(/\/$/, '') return this } Storage.prototype.request = function(options, cb) { var self = this , headers = options.headers || {} headers.accept = headers.accept || 'application/json' headers['user-agent'] = headers['user-agent'] || this.userAgent var method = options.method || 'GET' , uri = options.uri_full || (this.config.url + options.uri) self.logger.info({ method: method, headers: headers, uri: uri, }, "making request: '@{method} @{uri}'") if (utils.is_object(options.json)) { var json = JSON.stringify(options.json) headers['content-type'] = headers['content-type'] || 'application/json' } var req = request({ url: uri, method: method, headers: headers, body: json, ca: this.ca, }, function(err, res, body) { var error if (!err) { var res_length = body.length if (options.json && res.statusCode < 300) { try { body = JSON.parse(body) } catch(_err) { body = {} err = _err error = err.message } } if (!err && utils.is_object(body)) { if (body.error) { error = body.error } } } else { error = err.message } var msg = '@{!status}, req: \'@{request.method} @{request.url}\'' if (error) { msg += ', error: @{!error}' } else { msg += ', bytes: @{bytes.in}/@{bytes.out}' } self.logger.warn({ err: err, request: {method: method, url: uri}, level: 35, // http status: res != null ? res.statusCode : 'ERR', error: error, bytes: { in: json ? json.length : 0, out: res_length || 0, } }, msg) if (cb) cb.apply(self, arguments) }) req.on('response', function(res) { self.status_check(true) }) req.on('error', function() { self.status_check(false) }) return req } Storage.prototype.status_check = function(alive) { if (arguments.length === 0) { if (!this.is_alive && Math.abs(Date.now() - this.is_alive_time()) > 60*1000) { return false } else { return true } } else { this.is_alive = alive this.is_alive_time = Date.now() } } Storage.prototype.can_fetch_url = function(url) { url = URL.parse(url) return url.protocol === this.url.protocol && url.host === this.url.host && url.path.indexOf(this.url.path) === 0 } Storage.prototype.add_package = function(name, metadata, callback) { this.request({ uri: '/' + escape(name), method: 'PUT', json: metadata, }, function(err, res, body) { if (err) return callback(err) if (!(res.statusCode >= 200 && res.statusCode < 300)) { return callback(new Error('bad status code: ' + res.statusCode)) } callback(null, body) }) } Storage.prototype.add_version = function(name, version, metadata, tag, callback) { this.request({ uri: '/' + escape(name) + '/' + escape(version) + '/-tag/' + escape(tag), method: 'PUT', json: metadata, }, function(err, res, body) { if (err) return callback(err) if (!(res.statusCode >= 200 && res.statusCode < 300)) { return callback(new Error('bad status code: ' + res.statusCode)) } callback(null, body) }) } Storage.prototype.add_tarball = function(name, filename) { var stream = new mystreams.UploadTarballStream() , self = this var wstream = this.request({ uri: '/' + escape(name) + '/-/' + escape(filename) + '/whatever', method: 'PUT', headers: { 'content-type': 'application/octet-stream' }, }) wstream.on('response', function(res) { if (!(res.statusCode >= 200 && res.statusCode < 300)) { return stream.emit('error', new UError({ msg: 'bad uplink status code: ' + res.statusCode, status: 500, })) } stream.emit('success') }) wstream.on('error', function(err) { stream.emit('error', err) }) stream.abort = function() { process.nextTick(function() { if (wstream.req) { wstream.req.abort() } }) } stream.done = function() {} stream.pipe(wstream) return stream } Storage.prototype.get_package = function(name, etag, callback) { if (etag) { var headers = { 'if-none-match': etag } } this.request({ uri: '/' + escape(name), json: true, headers: headers, }, function(err, res, body) { if (err) return callback(err) if (res.statusCode === 404) { return callback(new UError({ msg: 'package doesn\'t exist on uplink', status: 404, })) } if (!(res.statusCode >= 200 && res.statusCode < 300)) { return callback(new Error('bad status code: ' + res.statusCode)) } callback(null, body, res.headers.etag) }) } Storage.prototype.get_tarball = function(name, filename) { return this.get_url(this.config.url + '/' + name + '/-/' + filename) } Storage.prototype.get_url = function(url) { var stream = new mystreams.ReadTarballStream() stream.abort = function() {} var rstream = this.request({ uri_full: url, encoding: null, }) rstream.on('response', function(res) { if (res.statusCode === 404) { return stream.emit('error', new UError({ msg: 'file doesn\'t exist on uplink', status: 404, })) } if (!(res.statusCode >= 200 && res.statusCode < 300)) { return stream.emit('error', new UError({ msg: 'bad uplink status code: ' + res.statusCode, status: 500, })) } rstream.pipe(stream) }) rstream.on('error', function(err) { stream.emit('error', err) }) return stream } module.exports = Storage