WIP: begin to add locks to the store API

This commit is contained in:
Aaron Madsen 2018-11-25 01:29:16 -07:00
parent e43c169257
commit 077c259272
1 changed files with 269 additions and 256 deletions

View File

@ -95,22 +95,29 @@ function prepareZone(zone, options) {
} }
/* /*
init() should return an object with: { `init()` should return a `lock(forOps)` function, where `forOps` describes the portions
save: function -> undefined - changes to in memory representation should be persisted of the database that we need to obtain a lock for (so we can write to them). If `forOps`
is underfined, we only need to read the currently valid data.
`lock(forOps)` should return an object with: {
save: function -> undefined - changes to in memory representation should be persisted.
This could be considered the equivalent of committing a transaction to the database. This could be considered the equivalent of committing a transaction to the database.
primaryNameservers: { This will release any write lock obtained. `save()` will return an error if no write
list: function -> list nameservers lock was obtained OR writes are made to locations other than were locked.,
discard: function -> undefined - changes to in memory representation should be discarded.
This could be considered the equivalent of cancelling a transaction to the database.
This will release any write lock obtained.,
peers: {
list: function -> list FQDNs that we expec to be in sync with this server
}, },
zones: { zones: {
list: function -> list zones, list: function -> list zones,
create: write:
update:
delete: delete:
}, },
records: { records: {
list: function -> list records, list: function -> list records,
create: write:
update:
delete: delete:
} }
} }
@ -121,7 +128,7 @@ All lists will be a deep copy of the data actually stored.
module.exports = function init (opts) { module.exports = function init (opts) {
// opts = { filepath }; // opts = { filepath };
var db = require(opts.filepath); var fsDb = require(opts.filepath);
var mtime = require('fs').statSync(opts.filepath).mtime.valueOf(); var mtime = require('fs').statSync(opts.filepath).mtime.valueOf();
// //
@ -131,15 +138,15 @@ module.exports = function init (opts) {
// Convert the primary nameservers from an array of strings to objects with names and IDs. // Convert the primary nameservers from an array of strings to objects with names and IDs.
// also switch to the 'peers' name, since we are really interested in the other FQDNs that // also switch to the 'peers' name, since we are really interested in the other FQDNs that
// use the same data store and are kept in sync. // use the same data store and are kept in sync.
var peerList = (!db.peers || Array.isArray(db.peers))? db.peers : Object.keys(db.peers).map(function (p) { var peerList = (!fsDb.peers || Array.isArray(fsDb.peers))? fsDb.peers : Object.keys(fsDb.peers).map(function (p) {
return db.peers[p]; return fsDb.peers[p];
}); });
db.peers = [].concat(db.primaryNameservers, peerList).filter(function (p) { fsDb.peers = [].concat(fsDb.primaryNameservers, peerList).filter(function (p) {
// filer out empty strings, undefined, etc. // filter out empty strings, undefined, etc.
return !!p; return !!p;
}).map(function (ns) { }).map(function (ns) {
var peer = ('string' === typeof ns)? ns : { name: ns }; var peer = ('string' === typeof ns)? ns : { name: ns };
if (!ns.id) { if (!peer.id) {
peer.id = crypto.randomBytes(16).toString('hex'); peer.id = crypto.randomBytes(16).toString('hex');
} }
return peer; return peer;
@ -147,16 +154,16 @@ module.exports = function init (opts) {
peers[p.name] = p; peers[p.name] = p;
return peers; return peers;
}, {}); }, {});
delete db.primaryNameservers; delete fsDb.primaryNameservers;
// Convert domains to zones and ensure that they have proper IDs and timestamps // Convert domains to zones and ensure that they have proper IDs and timestamps
// Organize zones as a set of zone names // Organize zones as a set of zone names
var zoneList = (!db.zones || Array.isArray(db.zones))? db.zones : Object.keys(db.zones).map(function (z) { var zoneList = (!fsDb.zones || Array.isArray(fsDb.zones))? fsDb.zones : Object.keys(fsDb.zones).map(function (z) {
return db.zones[z]; return fsDb.zones[z];
}); });
db.zones = [].concat(db.domains, zoneList).filter(function (z) { fsDb.zones = [].concat(fsDb.domains, zoneList).filter(function (z) {
// filer out empty strings, undefined, etc. // filter out empty strings, undefined, etc.
return !!z; return !!z;
}).map(function (zone) { }).map(function (zone) {
return prepareZone(zone, { timestamp: mtime }); return prepareZone(zone, { timestamp: mtime });
@ -164,7 +171,7 @@ module.exports = function init (opts) {
zones[z.name] = z; zones[z.name] = z;
return zones; return zones;
}, {}); }, {});
delete db.domains; delete fsDb.domains;
// NOTE: Records belong to zones, but they previously referred to them only by a // NOTE: Records belong to zones, but they previously referred to them only by a
// zone property. This may pose problems where the whole list of records is not easily // zone property. This may pose problems where the whole list of records is not easily
@ -173,27 +180,57 @@ module.exports = function init (opts) {
// behave more traditionally, even though some stores (like a SQL database // behave more traditionally, even though some stores (like a SQL database
// table) might actually store the zone as a property of a record as we currently do. // table) might actually store the zone as a property of a record as we currently do.
// (This fits with the somewhat unexpected and confusing logic of wildcard records.) // (This fits with the somewhat unexpected and confusing logic of wildcard records.)
(db.records || []).forEach(function (record) { (fsDb.records || []).forEach(function (record) {
// make sure the record has an ID // make sure the record has an ID
if (!record.id) { if (!record.id) {
record.id = crypto.randomBytes(16).toString('hex'); record.id = crypto.randomBytes(16).toString('hex');
} }
// put it in it's zone - synthesize one if needed // Put it in it's zone - synthesize one if needed
db.zones[record.zone] = db.zones[record.zone] || prepareZone({ name: record.zone }); fsDb.zones[record.zone] = fsDb.zones[record.zone] || prepareZone({ name: record.zone });
var zone = db.zones[record.zone]; var zone = fsDb.zones[record.zone];
zone.records[record.name] = zone.records[record.name] || []; // Keep in mind that each name may have multiple records (whether or not they are
// of different types, classes, etc.), but each record must have a unique ID.
zone.records[record.name] = zone.records[record.name] || {};
var recordsForName = zone.records[record.name]; var recordsForName = zone.records[record.name];
recordsForName.push(record); recordsForName[record.id] = record;
}); });
delete db.records; delete fsDb.records;
// Write the migrated data // Write the migrated data
require('fs').writeFileSync(opts.filepath, JSON.stringify(db, null, 2)); require('fs').writeFileSync(opts.filepath, JSON.stringify(fsDb, null, 2));
// //
// End Migration // End Migration
// //
return function lock(forOps) {
/*
forOps : {
write: {
zone: string - required - a zone name,
names: [string] - optional - a list of record names that may be modified. May be 0 length,
records: [string] - optional - a list of record IDs that may be modified. May be 0 length (default)
}
}
1. You can't get a lock for a whole zone without first releasing any locks for names and records
within the zone. A whole zone lock will block
2. You can't get a lock for a name within a zone without first releasing any locks for records
within that name and zone.
3. Locks for a specific record do not block new locks with the same zone, name, but a different
record ID.
4. Creating a new zone, name, or record requires obtaining a lock for it's key (name or ID), even
though it does not exist yet. This prevents race conditions where 2 requests (or processes) attempt
to create the same resource at the same time.
Note: The UI probably needs to know if it is trying to write based on an outdated copy of data. Such
writes should be detected and fail loudly.
locks probably involve lockfiles on the filesystem (with watches) so that writes and locks can be
communicated easily across processes.
*/
var db = mergeObjects(fsDb);
var save = function save (cb) { var save = function save (cb) {
if (save._saving) { if (save._saving) {
console.log('make pending'); console.log('make pending');
@ -256,6 +293,124 @@ module.exports = function init (opts) {
return found; return found;
} }
// NOTE: `opts` exists so we can add options - like properties to read - easily in the future
// without modifying the function signature
function listZones(predicate, opts, cb) {
var found = jsonDeepClone(matchZone(predicate))
return setImmediate(cb, null, found);
}
function writeZone(zone, cb) {
matchZone({ name: zone.name }, function (err, matched) {
if (err) {
return setImmediate(cb, err);
}
var found = matched[0];
var isUpdate = !!found;
var combined = mergeObjects((found || {}), zone);
db.zones[zone.name] = prepareZone(combined, { isUpdate: isUpdate });
return setImmediate(function () {
cb(null, jsonDeepClone(db.zones[zone.name]));
});
});
}
function deleteZone(zone, cb) {
matchZone({ name: zone.name }, function (err, matched) {
if (err) {
return setImmediate(cb, err);
}
var found = matched[0];
if (!found) {
return setImmediate(cb, new Error('Zone not found'));
}
delete db.zones[zone.name];
return setImmediate(function () {
cb();
});
});
}
function listRecords(rPredicate, cb) {
var recordNames = [].concat(rPredicate.name);
var check = matchPredicate(rPredicate);
var found = matchZone({ name: rPredicate.zone }).reduce(function (records, zone) {
// get the records from the zone that match the record predicate
var zFound = recordNames.filter(function (name) {
return !!zone.records[name];
}).map(function (name) {
return Object.keys(zone.records[name]).map(function (id) {
return zone.records[name][id];
}).filter(check);
});
return records.concat(zFound);
}, []);
return setImmediate(cb, null, jsonDeepClone(found));
}
function modifyRecords (record, options, cb) {
var opts = options || {};
var isDelete = !!opts.isDelete;
if (!record.zone) {
return setImmediate(cb, new Error('No zone specified for record'));
}
if (!record.name) {
return setImmediate(cb, new Error('No name specified for record'));
}
if (isDelete && !record.id) {
return setImmediate(cb, new Error('No id specified to delete record'));
}
var zone = matchZone({ name: record.zone })[0];
if (!zone) {
return setImmediate(cb, new Error('Unble to find zone ' + record.zone + ' for record'));
}
var isUpdate = (record.id && !isDelete);
if (!isUpdate) {
record.id = crypto.randomBytes(16).toString('hex');
}
var recordsForName = zone.records[record.name] = zone.records[record.name] || {};
var found = recordsForName[record.id];
if ((isUpdate || isDelete) && !found) {
return setImmediate(cb, new Error('Unable to find record with ID: ' + record.id));
}
if (!isDelete) {
recordsForName[record.id] = (mergeObjects((found || {}), record));
}
var zoneUpdate = {
name: record.name,
records: {}
};
zoneUpdate.records[record.name] = keep;
return writeZone(zoneUpdate, function (err) {
if (err) {
return cb(err);
}
return cb(
null,
isDelete ? null : jsonDeepClone(recordsForName[record.id])
);
});
}
function writeRecord(record, cb) {
modifyRecords(record, null, cb);
}
function deleteRecord(record, cb) {
modifyRecords(record, { isDelete: true }, cb);
}
var dbApi = { var dbApi = {
save: function () { save: function () {
// hide _pending and _saving from callers // hide _pending and _saving from callers
@ -278,159 +433,17 @@ module.exports = function init (opts) {
return setImmediate(cb, null, jsonDeepClone(db.peers)); return setImmediate(cb, null, jsonDeepClone(db.peers));
}, },
zones: { zones: {
/* list: listZones,
I'm fairly certan that zone names must be unique and therefore are legitimately write: writeZone,
IDs within the zones namespace. This is similarly true of record names within a zone. delete: deleteZone
I'm not certain that having a distinct ID adds value and it may add confusion / complexity.
*/
// NOTE: `opts` exists so we can add options - like properties to read - easily in the future
// without modifying the function signature
list: function listZones(predicate, opts, cb) {
// TODO: consider whether we should just return the zone names
var found = jsonDeepClone(matchZone(predicate)).map(function (z) {
// This is fairly inefficient!! Consider alternative storage
// that does not require deleting the records like this.
delete z.records;
return z;
});
return setImmediate(cb, null, found);
},
// // NOTE: I'm not sure we need a distinct 'find()' operation in the API
// // unless we are going to limit the output of the
// // 'list()' operation in some incompatible way.
// // NOTE: `opts` exists so we can add options - like properties to read - easily in the future
// // without modifying the function signature
// find: function getZone(predicate, opts, cb) {
// if (!predicate.name || predicate.id) {
// return setImmediate(cb, new Error('Finding a zone requires a `name` or `id`'));
// }
// // TODO: implement a limit / short circuit and possibly offset
// // to allow for paging of zone data.
// var found = matchZone(predicate);
// if (!found[0]) {
// // TODO: make error message more specific?
// return setImmediate(cb, new Error('Zone not found'));
// }
// var z = jsonDeepClone(found[0]);
// delete z.records;
// return setImmediate(cb, null, z);
// },
create: function createZone(zone, cb) {
// We'll need a lock mechanism of some sort that works
// for simultaneous requests and multiple processes.
matchZone({ name: zone.name }, function (err, matched) {
if (err) {
return setImmediate(cb, err);
}
var found = matched[0];
if (found) {
return setImmediate(cb, new Error('Zone ' + zone.name + ' already exists'));
}
db.zones[zone.name] = prepareZone(zone);
return setImmediate(function () {
cb(null, jsonDeepClone(db.zones[zone.name]));
// release lock
});
});
},
update: function updateZone(zone, cb) {
// We'll need a lock mechanism of some sort that works
// for simultaneous requests and multiple processes.
matchZone({ name: zone.name }, function (err, matched) {
if (err) {
return setImmediate(cb, err);
}
var found = matched[0];
if (!found) {
return setImmediate(cb, new Error('Zone not found'));
}
// make sure we are not writing records through this interface
delete zone.records;
var combined = mergeObjects(found, zone);
db.zones[zone.name] = prepareZone(combined, { isUpdate: true });
return setImmediate(function () {
cb(null, jsonDeepClone(db.zones[zone.name]));
// release lock
});
});
},
delete: function(zone, cb) {
// We'll need a lock mechanism of some sort that works
// for simultaneous requests and multiple processes.
matchZone({ name: zone.name }, function (err, matched) {
if (err) {
return setImmediate(cb, err);
}
var found = matched[0];
if (!found) {
return setImmediate(cb, new Error('Zone not found'));
}
delete db.zones[zone.name];
return setImmediate(function () {
cb();
// release lock
});
});
}
}, },
records: { records: {
list: function listRecords(rPredicate, cb) { list: listRecords,
var recordNames = [].concat(rPredicate.name); write: writeRecord,
var check = matchPredicate(rPredicate); delete: deleteRecord
var found = matchZone({ name: rPredicate.zone }).reduce(function (records, zone) {
// get the records from the zone that match the record predicate
var zFound = recordNames.filter(function (name) {
return !!zone.records[name];
}).map(function (name) {
return zone.records[name].filter(check);
});
return records.concat(zFound);
}, []);
return setImmediate(cb, null, jsonDeepClone(found));
},
// find: function getRecord(rPredicate, cb) {
// var recordNames = [].concat(rPredicate.name);
// var check = matchPredicate(rPredicate);
// // TODO: swap the `filter()` and `map()` for a functional style "loop"
// // recursive function that lets us return early if we have a limit, etc.
// var found = matchZone({ name: rPredicate.zone }).reduce(function (records, zone) {
// // get the records from the zone that match the record predicate
// var zFound = recordNames.filter(function (name) {
// return !!zone.records[name];
// }).map(function (name) {
// return zone.records[name].filter(check);
// });
// return records.concat(zFound);
// }, []);
// return setImmediate(cb, null, jsonDeepClone(found[0]));
// },
create: function(record, cb) {
var zone = matchZone({ name: record.zone })[0];
if (!zone) {
return setImmediate(cb, new Error('Unble to find zone ' + record.zone + ' to create record'));
}
var records = zone.records[record.name] = zone.records[record.name] || [];
var check = matchPredicate(record);
if (records.filter(check)[0]) {
return setImmediate(cb, new Error('Exact record already exists in zone ' + record.zone ));
}
return setImmediate(cb, null, jsonDeepClone(found));
},
update: function(record, cb) {},
delete: function(record, cb) {}
} }
}; };
return dbApi; return dbApi;
};
}; };