diff --git a/lib/distributed_rate_limits_sync/worker.js b/lib/distributed_rate_limits_sync/worker.js index e6ffa20c..c7d783e3 100644 --- a/lib/distributed_rate_limits_sync/worker.js +++ b/lib/distributed_rate_limits_sync/worker.js @@ -173,24 +173,32 @@ _.extend(Worker.prototype, { var queue = async.queue(this.processSyncQueue.bind(this, rateLimit), 10); queue.drain = function() { + console.info('queue drain'); asyncCallback(null); }; var hasMongoResults = false; stream.on('data', function(mongoResult) { + console.info('queue data'); hasMongoResults = true; queue.push(mongoResult); }); + var mongoError = false; stream.on('error', function(error) { + console.info('queue error'); + mongoError = true; logger.error({ err: error }, 'Distributed rate limits sync MongoDB result error'); asyncCallback(error); }); stream.on('close', function() { - // Call the callback if no mongo results were present (so queue.drain - // will never be called). - if(!hasMongoResults) { + console.info('queue close'); + // Call the callback if no mongo results were present (since queue.drain + // will never be called). Similarly, don't call the callback if a mongo + // error was encountered, since we've already called the callback in the + // error handler. + if(!hasMongoResults && !mongoError) { asyncCallback(null); } }); diff --git a/test/server/distributed_rate_limits_sync.js b/test/server/distributed_rate_limits_sync.js index c38062bc..241f59e8 100644 --- a/test/server/distributed_rate_limits_sync.js +++ b/test/server/distributed_rate_limits_sync.js @@ -151,6 +151,7 @@ describe('distributed rate limit sync', function() { it('sets new rate limits to the distributed value', function(done) { redisClient.get('apiKey:3000000:NEW:' + this.bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); limit.should.eql('143'); done(); }); @@ -158,6 +159,7 @@ describe('distributed rate limit sync', function() { it('increases existing rate limits to match the distributed value', function(done) { redisClient.get('apiKey:3000000:EXISTING:' + this.bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); limit.should.eql('99'); done(); }); @@ -165,6 +167,7 @@ describe('distributed rate limit sync', function() { it('ignores rate limits when the distributed value is lower', function(done) { redisClient.get('apiKey:3000000:LOWER:' + this.bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); limit.should.eql('80'); done(); }); @@ -172,6 +175,7 @@ describe('distributed rate limit sync', function() { it('ignores non-distributed rate limits', function(done) { redisClient.get('apiKey:720000:LOCAL:' + this.bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); limit.should.eql('47'); done(); }); @@ -179,6 +183,7 @@ describe('distributed rate limit sync', function() { it('syncs api-specific rate limits', function(done) { redisClient.get('apiKey:2700000:API:' + this.bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); limit.should.eql('133'); done(); }); @@ -186,6 +191,7 @@ describe('distributed rate limit sync', function() { it('syncs api-specific sub settings rate limits', function(done) { redisClient.get('apiKey:2880000:API_SUB_SETTINGS:' + this.bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); limit.should.eql('38'); done(); }); @@ -194,6 +200,7 @@ describe('distributed rate limit sync', function() { it('initially performs a sync for the entire duration on start', function(done) { var bucketDate = new Date(this.bucketDate - 49 * 60 * 1000); // 49min ago redisClient.get('apiKey:3000000:OLD:' + bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); limit.should.eql('97'); done(); }); @@ -201,6 +208,7 @@ describe('distributed rate limit sync', function() { it('does not sync distributed rate limits outside the duration on start', function(done) { redisClient.keys('apiKey:3000000:TOO_OLD:*', function(error, keys) { + should.not.exist(error); keys.length.should.eql(0); done(); }); @@ -220,23 +228,27 @@ describe('distributed rate limit sync', function() { count: 76, }); - distributed.save(); - setTimeout(function() { - redisClient.get('apiKey:3000000:AFTER:' + bucketDate.toISOString(), function(error, limit) { - limit.should.eql('76'); + distributed.save(function(error) { + should.not.exist(error); + setTimeout(function() { + redisClient.get('apiKey:3000000:AFTER:' + bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); + limit.should.eql('76'); - distributed.count = 99; - distributed.updated_at = new Date(); - distributed.save(); - - setTimeout(function() { - redisClient.get('apiKey:3000000:AFTER:' + bucketDate.toISOString(), function(error, limit) { - limit.should.eql('99'); - done(); - }); - }, this.sync.syncEvery + 50); - }.bind(this)); - }.bind(this), this.sync.syncEvery + 50); + distributed.count = 99; + distributed.updated_at = new Date(); + distributed.save(function() { + setTimeout(function() { + redisClient.get('apiKey:3000000:AFTER:' + bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); + limit.should.eql('99'); + done(); + }); + }, this.sync.syncEvery + 100); + }.bind(this)); + }.bind(this)); + }.bind(this), this.sync.syncEvery + 100); + }.bind(this)); }); it('polling continues when no data is present on a polling cycle', function(done) { @@ -255,28 +267,33 @@ describe('distributed rate limit sync', function() { count: 76, }); - distributed.save(); - setTimeout(function() { - redisClient.get('apiKey:3000000:POLL:' + bucketDate.toISOString(), function(error, limit) { - limit.should.eql('76'); - - // Wait long enough to to hit a polling cycle outside the 2 second - // buffer. - var wait = this.sync.syncBuffer + this.sync.syncEvery + 50; - setTimeout(function() { - distributed.count = 99; - distributed.updated_at = new Date(); - distributed.save(); + distributed.save(function(error) { + should.not.exist(error); + setTimeout(function() { + redisClient.get('apiKey:3000000:POLL:' + bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); + limit.should.eql('76'); + // Wait long enough to to hit a polling cycle outside the 2 second + // buffer. + var wait = this.sync.syncBuffer + this.sync.syncEvery + 100; setTimeout(function() { - redisClient.get('apiKey:3000000:POLL:' + bucketDate.toISOString(), function(error, limit) { - limit.should.eql('99'); - done(); - }); - }, this.sync.syncEvery + 50); - }.bind(this), wait); - }.bind(this)); - }.bind(this), this.sync.syncEvery + 50); + distributed.count = 99; + distributed.updated_at = new Date(); + distributed.save(function(error) { + should.not.exist(error); + setTimeout(function() { + redisClient.get('apiKey:3000000:POLL:' + bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); + limit.should.eql('99'); + done(); + }); + }, this.sync.syncEvery + 100); + }.bind(this)); + }.bind(this), wait); + }.bind(this)); + }.bind(this), this.sync.syncEvery + 100); + }.bind(this)); }); @@ -296,22 +313,27 @@ describe('distributed rate limit sync', function() { count: 13, }); - distributed.save(); - setTimeout(function() { - redisClient.get('apiKey:3000000:BUFFER:' + bucketDate.toISOString(), function(error, limit) { - limit.should.eql('13'); - - distributed.count = 88; - distributed.updated_at = new Date() - 2600; - distributed.save(); - - setTimeout(function() { - redisClient.get('apiKey:3000000:BUFFER:' + bucketDate.toISOString(), function(error, limit) { - limit.should.eql('13'); - done(); - }); - }, this.sync.syncEvery + 50); - }.bind(this)); - }.bind(this), this.sync.syncEvery + 50); + distributed.save(function(error) { + setTimeout(function() { + should.not.exist(error); + redisClient.get('apiKey:3000000:BUFFER:' + bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); + limit.should.eql('13'); + + distributed.count = 88; + distributed.updated_at = new Date() - 2600; + distributed.save(function(error) { + should.not.exist(error); + setTimeout(function() { + redisClient.get('apiKey:3000000:BUFFER:' + bucketDate.toISOString(), function(error, limit) { + should.not.exist(error); + limit.should.eql('13'); + done(); + }); + }, this.sync.syncEvery + 100); + }.bind(this)); + }.bind(this)); + }.bind(this), this.sync.syncEvery + 100); + }.bind(this)); }); }); diff --git a/test/support/blanket.js b/test/support/blanket.js index 1f923224..5386efec 100644 --- a/test/support/blanket.js +++ b/test/support/blanket.js @@ -1 +1 @@ -require('blanket')(); +//require('blanket')(); diff --git a/test/support/distributed_rate_limits_sync_shared_examples.js b/test/support/distributed_rate_limits_sync_shared_examples.js index b28c309b..51ed9ec6 100644 --- a/test/support/distributed_rate_limits_sync_shared_examples.js +++ b/test/support/distributed_rate_limits_sync_shared_examples.js @@ -12,6 +12,7 @@ var _ = require('lodash'), _.merge(global.shared, { runDistributedRateLimitsSync: function(configOverrides) { beforeEach(function startConfigLoader(done) { + this.timeout(5000); var overridesPath = path.resolve(__dirname, '../config/overrides.yml'); fs.writeFileSync(overridesPath, yaml.dump(configOverrides || {})); @@ -34,11 +35,15 @@ _.merge(global.shared, { }); afterEach(function stopConfigLoader(done) { - this.loader.close(done); + if(this.loader) { + this.loader.close(done); + } }); afterEach(function stopSync(done) { - this.sync.close(done); + if(this.sync) { + this.sync.close(done); + } }); }, }); diff --git a/test/support/server_shared_examples.js b/test/support/server_shared_examples.js index 347193d3..97304ccd 100644 --- a/test/support/server_shared_examples.js +++ b/test/support/server_shared_examples.js @@ -26,6 +26,7 @@ _.merge(global.shared, { runServer: function(configOverrides) { beforeEach(function startConfigLoader(done) { + this.timeout(5000); var overridesPath = path.resolve(__dirname, '../config/overrides.yml'); fs.writeFileSync(overridesPath, yaml.dump(configOverrides || {})); @@ -61,11 +62,15 @@ _.merge(global.shared, { }); afterEach(function stopConfigLoader(done) { - this.loader.close(done); + if(this.loader) { + this.loader.close(done); + } }); afterEach(function stopGatekeeper(done) { - this.gatekeeper.close(done); + if(this.gatekeeper) { + this.gatekeeper.close(done); + } }); },