Skip to content

Commit

Permalink
Fix some race conditions in tests that led to periodic failures.
Browse files Browse the repository at this point in the history
Also make some improvements to generally better handle test failures.
  • Loading branch information
GUI committed Feb 1, 2015
1 parent 264bc79 commit f3bc4c4
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 61 deletions.
14 changes: 11 additions & 3 deletions lib/distributed_rate_limits_sync/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -173,24 +173,32 @@ _.extend(Worker.prototype, {

var queue = async.queue(this.processSyncQueue.bind(this, rateLimit), 10);
queue.drain = function() {
console.info('queue drain');
asyncCallback(null);
};

var hasMongoResults = false;
stream.on('data', function(mongoResult) {
console.info('queue data');
hasMongoResults = true;
queue.push(mongoResult);
});

var mongoError = false;
stream.on('error', function(error) {
console.info('queue error');
mongoError = true;
logger.error({ err: error }, 'Distributed rate limits sync MongoDB result error');
asyncCallback(error);
});

stream.on('close', function() {
// Call the callback if no mongo results were present (so queue.drain
// will never be called).
if(!hasMongoResults) {
console.info('queue close');
// Call the callback if no mongo results were present (since queue.drain
// will never be called). Similarly, don't call the callback if a mongo
// error was encountered, since we've already called the callback in the
// error handler.
if(!hasMongoResults && !mongoError) {
asyncCallback(null);
}
});
Expand Down
128 changes: 75 additions & 53 deletions test/server/distributed_rate_limits_sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,41 +151,47 @@ describe('distributed rate limit sync', function() {

it('sets new rate limits to the distributed value', function(done) {
redisClient.get('apiKey:3000000:NEW:' + this.bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('143');
done();
});
});

it('increases existing rate limits to match the distributed value', function(done) {
redisClient.get('apiKey:3000000:EXISTING:' + this.bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('99');
done();
});
});

it('ignores rate limits when the distributed value is lower', function(done) {
redisClient.get('apiKey:3000000:LOWER:' + this.bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('80');
done();
});
});

it('ignores non-distributed rate limits', function(done) {
redisClient.get('apiKey:720000:LOCAL:' + this.bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('47');
done();
});
});

it('syncs api-specific rate limits', function(done) {
redisClient.get('apiKey:2700000:API:' + this.bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('133');
done();
});
});

it('syncs api-specific sub settings rate limits', function(done) {
redisClient.get('apiKey:2880000:API_SUB_SETTINGS:' + this.bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('38');
done();
});
Expand All @@ -194,13 +200,15 @@ describe('distributed rate limit sync', function() {
it('initially performs a sync for the entire duration on start', function(done) {
var bucketDate = new Date(this.bucketDate - 49 * 60 * 1000); // 49min ago
redisClient.get('apiKey:3000000:OLD:' + bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('97');
done();
});
});

it('does not sync distributed rate limits outside the duration on start', function(done) {
redisClient.keys('apiKey:3000000:TOO_OLD:*', function(error, keys) {
should.not.exist(error);
keys.length.should.eql(0);
done();
});
Expand All @@ -220,23 +228,27 @@ describe('distributed rate limit sync', function() {
count: 76,
});

distributed.save();
setTimeout(function() {
redisClient.get('apiKey:3000000:AFTER:' + bucketDate.toISOString(), function(error, limit) {
limit.should.eql('76');
distributed.save(function(error) {
should.not.exist(error);
setTimeout(function() {
redisClient.get('apiKey:3000000:AFTER:' + bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('76');

distributed.count = 99;
distributed.updated_at = new Date();
distributed.save();

setTimeout(function() {
redisClient.get('apiKey:3000000:AFTER:' + bucketDate.toISOString(), function(error, limit) {
limit.should.eql('99');
done();
});
}, this.sync.syncEvery + 50);
}.bind(this));
}.bind(this), this.sync.syncEvery + 50);
distributed.count = 99;
distributed.updated_at = new Date();
distributed.save(function() {
setTimeout(function() {
redisClient.get('apiKey:3000000:AFTER:' + bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('99');
done();
});
}, this.sync.syncEvery + 100);
}.bind(this));
}.bind(this));
}.bind(this), this.sync.syncEvery + 100);
}.bind(this));
});

it('polling continues when no data is present on a polling cycle', function(done) {
Expand All @@ -255,28 +267,33 @@ describe('distributed rate limit sync', function() {
count: 76,
});

distributed.save();
setTimeout(function() {
redisClient.get('apiKey:3000000:POLL:' + bucketDate.toISOString(), function(error, limit) {
limit.should.eql('76');

// Wait long enough to to hit a polling cycle outside the 2 second
// buffer.
var wait = this.sync.syncBuffer + this.sync.syncEvery + 50;
setTimeout(function() {
distributed.count = 99;
distributed.updated_at = new Date();
distributed.save();
distributed.save(function(error) {
should.not.exist(error);
setTimeout(function() {
redisClient.get('apiKey:3000000:POLL:' + bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('76');

// Wait long enough to to hit a polling cycle outside the 2 second
// buffer.
var wait = this.sync.syncBuffer + this.sync.syncEvery + 100;
setTimeout(function() {
redisClient.get('apiKey:3000000:POLL:' + bucketDate.toISOString(), function(error, limit) {
limit.should.eql('99');
done();
});
}, this.sync.syncEvery + 50);
}.bind(this), wait);
}.bind(this));
}.bind(this), this.sync.syncEvery + 50);
distributed.count = 99;
distributed.updated_at = new Date();
distributed.save(function(error) {
should.not.exist(error);
setTimeout(function() {
redisClient.get('apiKey:3000000:POLL:' + bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('99');
done();
});
}, this.sync.syncEvery + 100);
}.bind(this));
}.bind(this), wait);
}.bind(this));
}.bind(this), this.sync.syncEvery + 100);
}.bind(this));
});


Expand All @@ -296,22 +313,27 @@ describe('distributed rate limit sync', function() {
count: 13,
});

distributed.save();
setTimeout(function() {
redisClient.get('apiKey:3000000:BUFFER:' + bucketDate.toISOString(), function(error, limit) {
limit.should.eql('13');

distributed.count = 88;
distributed.updated_at = new Date() - 2600;
distributed.save();

setTimeout(function() {
redisClient.get('apiKey:3000000:BUFFER:' + bucketDate.toISOString(), function(error, limit) {
limit.should.eql('13');
done();
});
}, this.sync.syncEvery + 50);
}.bind(this));
}.bind(this), this.sync.syncEvery + 50);
distributed.save(function(error) {
setTimeout(function() {
should.not.exist(error);
redisClient.get('apiKey:3000000:BUFFER:' + bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('13');

distributed.count = 88;
distributed.updated_at = new Date() - 2600;
distributed.save(function(error) {
should.not.exist(error);
setTimeout(function() {
redisClient.get('apiKey:3000000:BUFFER:' + bucketDate.toISOString(), function(error, limit) {
should.not.exist(error);
limit.should.eql('13');
done();
});
}, this.sync.syncEvery + 100);
}.bind(this));
}.bind(this));
}.bind(this), this.sync.syncEvery + 100);
}.bind(this));
});
});
2 changes: 1 addition & 1 deletion test/support/blanket.js
Original file line number Diff line number Diff line change
@@ -1 +1 @@
require('blanket')();
//require('blanket')();
9 changes: 7 additions & 2 deletions test/support/distributed_rate_limits_sync_shared_examples.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var _ = require('lodash'),
_.merge(global.shared, {
runDistributedRateLimitsSync: function(configOverrides) {
beforeEach(function startConfigLoader(done) {
this.timeout(5000);
var overridesPath = path.resolve(__dirname, '../config/overrides.yml');
fs.writeFileSync(overridesPath, yaml.dump(configOverrides || {}));

Expand All @@ -34,11 +35,15 @@ _.merge(global.shared, {
});

afterEach(function stopConfigLoader(done) {
this.loader.close(done);
if(this.loader) {
this.loader.close(done);
}
});

afterEach(function stopSync(done) {
this.sync.close(done);
if(this.sync) {
this.sync.close(done);
}
});
},
});
9 changes: 7 additions & 2 deletions test/support/server_shared_examples.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ _.merge(global.shared, {

runServer: function(configOverrides) {
beforeEach(function startConfigLoader(done) {
this.timeout(5000);
var overridesPath = path.resolve(__dirname, '../config/overrides.yml');
fs.writeFileSync(overridesPath, yaml.dump(configOverrides || {}));

Expand Down Expand Up @@ -61,11 +62,15 @@ _.merge(global.shared, {
});

afterEach(function stopConfigLoader(done) {
this.loader.close(done);
if(this.loader) {
this.loader.close(done);
}
});

afterEach(function stopGatekeeper(done) {
this.gatekeeper.close(done);
if(this.gatekeeper) {
this.gatekeeper.close(done);
}
});
},

Expand Down

0 comments on commit f3bc4c4

Please sign in to comment.