1
0
mirror of https://github.com/verdaccio/verdaccio.git synced 2024-11-08 23:25:51 +01:00

Merge pull request #253 from rlidwka/search

complete search rewrite
This commit is contained in:
Alex Kocharin 2015-05-30 18:17:49 +03:00
commit d370e5a6a9
9 changed files with 283 additions and 175 deletions

@ -100,6 +100,11 @@ function Config(config) {
return flatten(result)
}
// add a default rule for all packages to make writing plugins easier
if (self.packages['**'] == null) {
self.packages['**'] = {}
}
for (var i in self.packages) {
assert(
typeof(self.packages[i]) === 'object' &&

@ -1,4 +1,3 @@
var async = require('async')
var Cookies = require('cookies')
var express = require('express')
var expressJson5 = require('express-json5')
@ -84,33 +83,57 @@ module.exports = function(config, auth, storage) {
// searching packages
app.get('/-/all/:anything?', function(req, res, next) {
storage.search(req.param.startkey || 0, {req: req}, function(err, result) {
if (err) return next(err)
async.eachSeries(Object.keys(result), function(pkg, cb) {
auth.allow_access(pkg, req.remote_user, function(err, allowed) {
if (err) {
if (err.status && String(err.status).match(/^4\d\d$/)) {
// auth plugin returns 4xx user error,
// that's equivalent of !allowed basically
allowed = false
} else {
return cb(err)
}
}
var received_end = false
var response_finished = false
var processing_pkgs = 0
if (!allowed) delete result[pkg]
cb()
})
}, function(err) {
if (err) return next(err)
next(result)
res.status(200)
res.write('{"_updated":' + Date.now());
var stream = storage.search(req.param.startkey || 0, { req: req })
stream.on('data', function each(pkg) {
processing_pkgs++
auth.allow_access(pkg.name, req.remote_user, function(err, allowed) {
processing_pkgs--
if (err) {
if (err.status && String(err.status).match(/^4\d\d$/)) {
// auth plugin returns 4xx user error,
// that's equivalent of !allowed basically
allowed = false
} else {
stream.abort(err)
}
}
if (allowed) {
res.write(',\n' + JSON.stringify(pkg.name) + ':' + JSON.stringify(pkg))
}
check_finish()
})
})
})
//app.get('/*', function(req, res) {
// proxy.request(req, res)
//})
stream.on('error', function (_err) {
res.socket.destroy()
})
stream.on('end', function () {
received_end = true
check_finish()
})
function check_finish() {
if (!received_end) return
if (processing_pkgs) return
if (response_finished) return
response_finished = true
res.end('}\n')
}
})
// placeholder 'cause npm require to be authenticated to publish
// we do not do any real authentication yet

@ -58,18 +58,6 @@ module.exports = function(config_hash) {
next()
})
/* app.get('/-/all', function(req, res) {
var https = require('https')
var JSONStream = require('JSONStream')
var request = require('request')({
url: 'https://registry.npmjs.org/-/all',
})
.pipe(JSONStream.parse('*'))
.on('data', function(d) {
console.log(d)
})
})*/
// hook for tests only
if (config._debug) {
app.get('/-/_debug', function(req, res, next) {

@ -1,8 +1,10 @@
var assert = require('assert')
var async = require('async')
var Crypto = require('crypto')
var fs = require('fs')
var Error = require('http-errors')
var Path = require('path')
var Stream = require('readable-stream')
var URL = require('url')
var fs_storage = require('./local-fs')
var Logger = require('./logger')
@ -482,35 +484,55 @@ Storage.prototype.get_package = function(name, options, callback) {
})
}
Storage.prototype.get_recent_packages = function(startkey, callback) {
// walks through each package and calls `on_package` on them
Storage.prototype._each_package = function (on_package, on_end) {
var self = this
var i = 0
var list = []
var storages = {}
var storage = self.storage('')
if (!storage) return callback(null, [])
storages[self.config.storage] = true
fs.readdir(storage.path, function(err, files) {
if (err) return callback(null, [])
var filesL = files.length
files.forEach(function(file) {
fs.stat(storage.path, function(err, stats) {
if (err) return callback(err)
if (stats.mtime > startkey && Utils.validate_name(file)) {
list.push({
time: stats.mtime,
name: file
})
}
if (++i !== filesL) {
return false
}
return callback(null, list)
})
if (self.config.packages) {
Object.keys(self.packages || {}).map(function (pkg) {
if (self.config.packages[pkg].storage) {
storages[self.config.packages[pkg].storage] = true
}
})
})
}
var base = Path.dirname(self.config.self_path);
async.eachSeries(Object.keys(storages), function (storage, cb) {
fs.readdir(Path.resolve(base, storage), function (err, files) {
if (err) return cb(err)
async.eachSeries(files, function (file, cb) {
if (file.match(/^@/)) {
// scoped
fs.readdir(Path.resolve(base, storage, file), function (err, files) {
if (err) return cb(err)
async.eachSeries(files, function (file2, cb) {
if (Utils.validate_name(file2)) {
on_package({
name: file + '/' + file2,
path: Path.resolve(base, storage, file, file2),
}, cb)
} else {
cb()
}
}, cb)
})
} else if (Utils.validate_name(file)) {
on_package({
name: file,
path: Path.resolve(base, storage, file)
}, cb)
} else {
cb()
}
}, cb)
})
}, on_end)
}
//
@ -565,6 +587,55 @@ Storage.prototype.update_package = function(name, updateFn, _callback) {
})
}
Storage.prototype.search = function(startkey, options) {
var self = this
var stream = new Stream.PassThrough({ objectMode: true })
self._each_package(function on_package(item, cb) {
fs.stat(item.path, function(err, stats) {
if (err) return cb(err)
if (stats.mtime > startkey) {
self.get_package(item.name, options, function(err, data) {
if (err) return cb(err)
var versions = Utils.semver_sort(Object.keys(data.versions))
var latest = versions[versions.length - 1]
if (data.versions[latest]) {
stream.push({
name : data.versions[latest].name,
description : data.versions[latest].description,
'dist-tags' : { latest: latest },
maintainers : data.versions[latest].maintainers ||
[ data.versions[latest]._npmUser ].filter(Boolean),
author : data.versions[latest].author,
repository : data.versions[latest].repository,
readmeFilename : data.versions[latest].readmeFilename || '',
homepage : data.versions[latest].homepage,
keywords : data.versions[latest].keywords,
bugs : data.versions[latest].bugs,
license : data.versions[latest].license,
time : { modified: item.time ? new Date(item.time).toISOString() : undefined },
versions : {},
})
}
cb()
})
} else {
cb()
}
})
}, function on_end(err) {
if (err) return stream.emit('error', err)
stream.end()
})
return stream
}
Storage.prototype._normalize_package = function(pkg) {
;['versions', 'dist-tags', '_distfiles', '_attachments', '_uplinks'].forEach(function(key) {
if (!Utils.is_object(pkg[key])) pkg[key] = {}

@ -1,6 +1,7 @@
var assert = require('assert')
var async = require('async')
var Error = require('http-errors')
var Stream = require('stream')
var Local = require('./local-storage')
var Logger = require('./logger')
var MyStreams = require('./streams')
@ -328,86 +329,49 @@ Storage.prototype.get_package = function(name, options, callback) {
//
// Retrieve remote and local packages more recent than {startkey}
//
// Function invokes uplink.request for npm and local.get_recent_packages for
// local ones then sum up the result in a json object
// Function streams all packages from all uplinks first, and then
// local packages.
//
// Note that local packages could override registry ones just because
// they appear in JSON last. That's a trade-off we make to avoid
// memory issues.
//
// Used storages: local && uplink (proxy_access)
//
Storage.prototype.search = function(startkey, options, callback) {
Storage.prototype.search = function(startkey, options) {
var self = this
var uplinks = []
var i = 0
var uplinks
for (var p in self.uplinks) {
uplinks.push(p)
}
var stream = new Stream.PassThrough({ objectMode: true })
function merge_with_local_packages(err, res, body) {
if (err) return callback(err)
var j = 0
async.eachSeries(Object.keys(self.uplinks), function(up_name, cb) {
// shortcut: if `local=1` is supplied, don't call uplinks
if (options.req.query.local !== undefined) return cb()
self.local.get_recent_packages(startkey, function(err, list) {
if (err) return callback(err)
var listL = list.length
if (!listL) return callback(null, body)
list.forEach(function(item) {
self.local.get_package(item.name, options, function(err, data) {
if (err) return callback(err)
var versions = Utils.semver_sort(Object.keys(data.versions))
var latest = versions[versions.length - 1]
if (data.versions[latest]) {
body[item.name] = {
name : data.versions[latest].name,
description : data.versions[latest].description,
'dist-tags' : { latest: latest },
maintainers : data.versions[latest].maintainers || [data.versions[latest]._npmUser].filter(Boolean),
readmeFilename: data.versions[latest].readmeFilename || '',
time : {
modified: new Date(item.time).toISOString()
},
versions : {},
repository : data.versions[latest].repository,
keywords : data.versions[latest].keywords
}
body[item.name].versions[latest] = 'latest'
}
if (++j !== listL) {
return false
}
return callback(null, body)
})
})
var lstream = self.uplinks[up_name].search(startkey, options)
lstream.pipe(stream, { end: false })
lstream.on('error', function (err) {
self.logger.error({ err: err }, 'uplink error: @{err.message}')
cb(), cb = function () {}
})
lstream.on('end', function () {
cb(), cb = function () {}
})
}
function remote_search() {
var uplink = self.uplinks[uplinks[i]]
if (options.req.query.local !== undefined || !uplink) {
return merge_with_local_packages(null, null, {})
stream.abort = function () {
if (lstream.abort) lstream.abort()
cb(), cb = function () {}
}
self.uplinks[uplinks[i]].request({
uri : options.req.url,
timeout : self.uplinks[p].timeout,
json : true,
req : options.req,
}, function(err, res, body) {
if (err || Math.floor(res.statusCode / 100) > 3) {
i++
return remote_search()
}
return merge_with_local_packages(err, res, body)
}, function () {
var lstream = self.local.search(startkey, options)
stream.abort = function () { lstream.abort() }
lstream.pipe(stream, { end: true })
lstream.on('error', function (err) {
self.logger.error({ err: err }, 'search error: @{err.message}')
stream.end()
})
}
})
remote_search()
return stream
}
Storage.prototype.get_local = function(callback) {

@ -1,8 +1,8 @@
var JSONStream = require('JSONStream')
var Error = require('http-errors')
var request = require('request')
var Stream = require('stream')
var Stream = require('readable-stream')
var URL = require('url')
var zlib = require('zlib')
var parse_interval = require('./config').parse_interval
var Logger = require('./logger')
var MyStreams = require('./streams')
@ -127,34 +127,13 @@ Storage.prototype.request = function(options, cb) {
headers['Content-Type'] = headers['Content-Type'] || 'application/json'
}
var req = request({
url : uri,
method : method,
headers : headers,
body : json,
ca : this.ca,
proxy : this.proxy,
encoding : null,
timeout : this.timeout,
}, function(err, res, body) {
var request_callback = cb ? (function (err, res, body) {
var error
var res_length = err ? 0 : body.length
do_gunzip(function() {
do_decode()
do_log()
if (cb) cb(err, res, body)
})
function do_gunzip(cb) {
if (err) return cb()
if (res.headers['content-encoding'] !== 'gzip') return cb()
zlib.gunzip(body, function(er, buf) {
if (er) err = er
body = buf
return cb()
})
}
do_decode()
do_log()
cb(err, res, body)
function do_decode() {
if (err) {
@ -196,7 +175,19 @@ Storage.prototype.request = function(options, cb) {
}
}, message)
}
})
}) : undefined
var req = request({
url : uri,
method : method,
headers : headers,
body : json,
ca : this.ca,
proxy : this.proxy,
encoding : null,
gzip : true,
timeout : this.timeout,
}, request_callback)
var status_called = false
req.on('response', function(res) {
@ -204,6 +195,17 @@ Storage.prototype.request = function(options, cb) {
status_called = true
self.status_check(true)
}
if (!request_callback) {
;(function do_log() {
var message = '@{!status}, req: \'@{request.method} @{request.url}\' (streaming)'
self.logger.warn({
request : { method: method, url: uri },
level : 35, // http
status : res != null ? res.statusCode : 'ERR',
}, message)
})()
}
})
req.on('error', function(_err) {
if (!req._sinopia_aborted && !status_called) {
@ -319,6 +321,44 @@ Storage.prototype.get_url = function(url) {
return stream
}
Storage.prototype.search = function(startkey, options) {
var self = this
var stream = new Stream.PassThrough({ objectMode: true })
var req = self.request({
uri: options.req.url,
req: options.req,
})
req.on('response', function (res) {
if (!String(res.statusCode).match(/^2\d\d$/)) {
return stream.emit('error', Error('bad status code ' + res.statusCode + ' from uplink'))
}
res.pipe(JSONStream.parse('*')).on('data', function (pkg) {
if (Utils.is_object(pkg)) {
stream.emit('data', pkg)
}
})
res.on('end', function () {
stream.emit('end')
})
})
req.on('error', function (err) {
stream.emit('error', err)
})
stream.abort = function () {
req.abort()
stream.emit('end')
}
return stream
}
Storage.prototype._add_proxy_headers = function(req, headers) {
if (req) {
headers['X-Forwarded-For'] = (

45
npm-shrinkwrap.json generated

@ -2,6 +2,17 @@
"name": "sinopia",
"version": "1.3.1",
"dependencies": {
"JSONStream": {
"version": "1.0.3",
"dependencies": {
"jsonparse": {
"version": "1.0.0"
},
"through": {
"version": "2.3.7"
}
}
},
"async": {
"version": "0.9.2"
},
@ -410,6 +421,23 @@
}
}
},
"readable-stream": {
"version": "1.1.13",
"dependencies": {
"core-util-is": {
"version": "1.0.1"
},
"isarray": {
"version": "0.0.1"
},
"string_decoder": {
"version": "0.10.31"
},
"inherits": {
"version": "2.0.1"
}
}
},
"render-readme": {
"version": "1.3.0",
"dependencies": {
@ -469,23 +497,6 @@
"domelementtype": {
"version": "1.3.0"
},
"readable-stream": {
"version": "1.1.13",
"dependencies": {
"core-util-is": {
"version": "1.0.1"
},
"isarray": {
"version": "0.0.1"
},
"string_decoder": {
"version": "0.10.31"
},
"inherits": {
"version": "2.0.1"
}
}
},
"entities": {
"version": "1.0.0"
}

@ -39,11 +39,15 @@ dependencies:
lunr: '>=0.5.2 <1.0.0-0'
render-readme: '>=0.2.1'
jju: '1.x'
JSONStream: '1.x'
mkdirp: '>=0.3.5 <1.0.0-0'
sinopia-htpasswd: '>= 0.4.3'
http-errors: '>=1.2.0'
# node 0.10 compatibility, should go away soon
readable-stream: '~1.1.0'
optionalDependencies:
# those are native modules that could fail to compile
# and unavailable on windows

@ -89,11 +89,13 @@ module.exports = function() {
assert.deepEqual(obj['testpkg-newnpmreg'],
{ name: 'testpkg-newnpmreg',
description: '',
author: '',
license: 'ISC',
'dist-tags': { latest: '0.0.0' },
maintainers: [ { name: 'alex', email: 'alex@kocharin.ru' } ],
readmeFilename: '',
time: { modified: '2014-10-02T07:07:51.000Z' },
versions: { '0.0.0': 'latest' },
versions: {},
repository: { type: 'git', url: '' } })
}