瀏覽代碼

Integrated Redis, began commenting backend

Redis has been Integrated as a cache server, a pub-sub server, and a delayed / scheduled notification server. Also started adding JSDoc based documentation to everything.
Cameron Kline 8 年之前
父節點
當前提交
a4253476ad

+ 2 - 1
backend/Dockerfile

@@ -10,7 +10,8 @@ RUN apk add gcc
 RUN apk add g++
 RUN apk add make
 
-RUN npm install nodemon -g
+RUN npm install -g nodemon
+RUN npm install -g node-inspector
 
 RUN mkdir -p /opt
 WORKDIR /opt

+ 33 - 44
backend/index.js

@@ -10,60 +10,49 @@ const io = require('./logic/io');
 const cache = require('./logic/cache');
 const scheduler = require('./logic/scheduler');
 
-// setup our cache with the tables we need
-cache.addTable('sessions');
-cache.addTable('stations');
-
 async.waterfall([
 
-	// connect to our database
-	(next) => db.init('mongodb://mongo:27017/musare', next),
-
-	// load all the stations from the database into the cache (we won't actually do this in the future)
+	// setup our Redis cache
 	(next) => {
-
-		// this represents data directly from the DB (for now, lets just add some dummy stations)
-		let stations = [
-			{
-				id: '7dbf25fd-b10d-6863-2f48-637f6014b162',
-				name: 'edm',
-				genres: ['edm'],
-				displayName: 'EDM',
-				description: 'EDM Music',
-				playlist: [
-					'gCYcHz2k5x0'
-				]
-			},
-			{
-				id: '79cedff1-5341-7f0e-6542-50491c4797b4',
-				genres: ['chill'],
-				displayName: 'Chill',
-				description: 'Chill Music',
-				playlist: [
-					'gCYcHz2k5x0'
-				]
-			}
-		];
-
-		stations.forEach((station) => {
-			// add the station to the cache, adding the temporary data
-			cache.addRow('stations', Object.assign(station, {
-				skips: 0,
-				userCount: 0,
-				currentSongIndex: 0,
-				paused: false
-			}));
+		cache.init('redis://redis:6379/0', () => {
+			// load some test stations into the cache
+			async.waterfall([
+				(next) => cache.hset('stations', '7dbf25fd-b10d-6863-2f48-637f6014b162', cache.schemas.station({
+					name: 'edm',
+					genres: ['edm'],
+					displayName: 'EDM',
+					description: 'EDM Music',
+					playlist: [
+						'gCYcHz2k5x0'
+					]
+				}), next),
+				(next) => cache.hset('stations', '79cedff1-5341-7f0e-6542-50491c4797b4', cache.schemas.station({
+					name: 'chill',
+					genres: ['chill'],
+					displayName: 'Chill',
+					description: 'Chill Music',
+					playlist: [
+						'gCYcHz2k5x0'
+					]
+				}), next),
+			], next);
 		});
-
-		next();
 	},
 
+	// setup our MongoDB database
+	(next) => db.init('mongodb://mongo:27017/musare', next),
+
 	// setup the express server (not used right now, but may be used for OAuth stuff later, or for an API)
 	(next) => app.init(next),
 
 	// setup the socket.io server (all client / server communication is done over this)
 	(next) => io.init(next)
 ], (err) => {
-	if (err) return console.error(err);
-	console.log('Backend has been started');
+	if (err && err !== true) {
+		console.error('An error occurred while initializing the backend server');
+		console.error(err);
+	}
+	else {
+		console.log('Backend server has been successfully started');
+	}
 });

+ 8 - 0
backend/logic/actions/apis.js

@@ -4,6 +4,14 @@ const request = require('request');
 
 module.exports = {
 
+	/**
+	 * Fetches a list of songs from Youtubes API
+	 *
+	 * @param session
+	 * @param query - the query we'll pass to youtubes api
+	 * @param cb
+	 * @return {{ status: String, data: Object }}
+	 */
 	searchYoutube: (session, query, cb) => {
 
 		const params = [

+ 129 - 88
backend/logic/actions/stations.js

@@ -3,50 +3,120 @@
 const async = require('async');
 const request = require('request');
 
+const io = require('../io');
 const db = require('../db');
 const cache = require('../cache');
+const notifications = require('../notifications');
 const utils = require('../utils');
 
+/**
+ * Loads a station into the cache, and sets up all the related logic
+ *
+ * @param {String} stationId - the id of the station
+ * @param {Function} cb - gets called when this function completes
+ */
+function initializeAndReturnStation (stationId, cb) {
+
+	async.waterfall([
+
+		// first check the cache for the station
+		(next) => cache.hget('stations', stationId, next),
+
+		// if the cached version exist
+		(station, next) => {
+			if (station) return next(true, station);
+			db.models.station.find({ id: stationId }, next);
+		},
+
+		// if the station exists in the DB, add it to the cache
+		(station, next) => {
+			if (!station) return cb('Station by that id does not exist');
+			station = cache.schemas.station(station);
+			cache.hset('stations', station.id, station, (err) => next(err, station));
+		}
+
+	], (err, station) => {
+
+		if (err && err !== true) return cb(err);
+
+		// get notified when the next song for this station should play, so that we can notify our sockets
+		let notification = notifications.subscribe(`stations.nextSong?id=${station.id}`, () => {
+			// get the station from the cache
+			cache.hget('stations', station.id, (err, station) => {
+				if (station) {
+					// notify all the sockets on this station to go to the next song
+					async.waterfall(io.sockets.clients().map((socket) => (next) => {
+						// fetch the sockets session
+						cache.hget('sessions', socket.sessionId, (err, session) => {
+							if (session.stationId == station.id) {
+								socket.emit('notification:stations.nextSong');
+							}
+							next();
+						});
+					}), (err) => {
+						// schedule a notification to be dispatched when the next song ends
+						notifications.schedule(`stations.nextSong?id=${station.id}`, 5000);
+					});
+				}
+				// the station doesn't exist anymore, unsubscribe from it
+				else {
+					notifications.remove(notification);
+				}
+			});
+		}, true);
+
+		cb(null, station);
+	});
+}
+
 module.exports = {
 
+	/**
+	 * Get a list of all the stations
+	 *
+	 * @param session
+	 * @param cb
+	 * @return {{ status: String, stations: Array }}
+	 */
 	index: (session, cb) => {
-		cb(cache.getAllRows('stations'));
-	},
+		// TODO: the logic should be a bit more personalized to the users preferred genres
+		// and it should probably just a different cache table then 'stations'
+		cache.hgetall('stations', (err, stations) => {
 
-	join: (session, stationId, cb) => {
-
-		async.waterfall([
-
-			// first check the cache for the station
-			(next) => next(null, cache.findRow('stations', 'id', stationId)),
-
-			// if the cached version exist use it, otherwise check the DB for it
-			(station, next) => {
-				if (station) return next(true, station);
-				db.models.station.find({ id: stationId }, next);
-			},
-
-			// add the station from the DB to the cache, adding the temporary data
-			(station, next) => {
-				cache.addRow('stations', Object.assign(station, {
-					skips: 0,
-					userCount: 0,
-					currentSongIndex: 0,
-					paused: false
-				}));
-				next(null, station);
+			if (err && err !== true) {
+				return cb({
+					status: 'error',
+					message: 'An error occurred while obtaining the stations'
+				});
 			}
 
-		], (err, station) => {
+			cb({ status: 'success', stations });
+		});
+	},
+
+	/**
+	 * Joins the station by its id
+	 *
+	 * @param session
+	 * @param stationId - the station join
+	 * @param cb
+	 * @return {{ status: String, userCount: Integer }}
+	 */
+	join: (session, stationId, cb) => {
+		initializeAndReturnStation(stationId, (err, station) => {
 
 			if (err && err !== true) {
 				return cb({ status: 'error', message: 'An error occurred while joining the station' });
 			}
 
 			if (station) {
+
 				if (session) session.stationId = stationId;
-				station.userCount++;
-				cb({ status: 'success', userCount: station.userCount });
+
+				cache.client.hincrby('station.userCounts', stationId, 1, (err, userCount) => {
+					if (err) return cb({ status: 'error', message: 'An error occurred while joining the station' });
+					cb({ status: 'success', userCount });
+				});
 			}
 			else {
 				cb({ status: 'failure', message: `That station doesn't exist` });
@@ -54,44 +124,34 @@ module.exports = {
 		});
 	},
 
+	/**
+	 * Skips the users current station
+	 *
+	 * @param session
+	 * @param cb
+	 * @return {{ status: String, skipCount: Integer }}
+	 */
 	skip: (session, cb) => {
 
 		if (!session) return cb({ status: 'failure', message: 'You must be logged in to skip a song!' });
 
-		async.waterfall([
-
-			// first check the cache for the station
-			(next) => next(null, cache.findRow('stations', 'id', session.stationId)),
-
-			// if the cached version exist use it, otherwise check the DB for it
-			(station, next) => {
-				if (station) return next(true, station);
-				db.models.station.find({ id: session.stationId }, next);
-			},
-
-			// add the station from the DB to the cache, adding the temporary data
-			(station, next) => {
-				cache.addRow('stations', Object.assign(station, {
-					skips: 0,
-					userCount: 0,
-					currentSongIndex: 0,
-					paused: false
-				}));
-				next(null, station);
-			}
-
-		], (err, station) => {
+		initializeAndReturnStation(stationId, (err, station) => {
 
 			if (err && err !== true) {
 				return cb({ status: 'error', message: 'An error occurred while skipping the station' });
 			}
 
 			if (station) {
-				station.skips++;
-				if (station.skips > Math.floor(station.userCount / 2)) {
-					// TODO: skip to the next song if the skips is higher then half the total users
-				}
-				cb({ status: 'success', skips: station.skips });
+				cache.client.hincrby('station.skipCounts', session.stationId, 1, (err, skipCount) => {
+
+					session.skippedSong = true;
+
+					if (err) return cb({ status: 'error', message: 'An error occurred while skipping the station' });
+
+					cache.hget('station.userCounts', session.stationId, (err, userCount) => {
+						cb({ status: 'success', skipCount });
+					});
+				});
 			}
 			else {
 				cb({ status: 'failure', message: `That station doesn't exist` });
@@ -99,49 +159,30 @@ module.exports = {
 		});
 	},
 
-	// leaves the users current station
-	// returns the count of users that are still in that station
+	/**
+	 * Leaves the users current station
+	 *
+	 * @param session
+	 * @param cb
+	 * @return {{ status: String, userCount: Integer }}
+	 */
 	leave: (session, cb) => {
-
-		// if they're not logged in, we don't need to do anything below
-		if (!session) return cb({ status: 'success' });
-
-		async.waterfall([
-
-			// first check the cache for the station
-			(next) => next(null, cache.findRow('stations', 'id', session.stationId)),
-
-			// if the cached version exist use it, otherwise check the DB for it
-			(station, next) => {
-				if (station) return next(true, station);
-				db.models.station.find({ id: session.stationId }, next);
-			},
-
-			// add the station from the DB to the cache, adding the temporary data
-			(station, next) => {
-				cache.addRow('stations', Object.assign(station, {
-					skips: 0,
-					userCount: 0,
-					currentSongIndex: 0,
-					paused: false
-				}));
-				next(null, station);
-			}
-
-		], (err, station) => {
+		initializeAndReturnStation(stationId, (err, station) => {
 
 			if (err && err !== true) {
 				return cb({ status: 'error', message: 'An error occurred while leaving the station' });
 			}
 
-			session.stationId = null;
+			if (session) session.stationId = null;
 
 			if (station) {
-				station.userCount--;
-				cb({ status: 'success', userCount: station.userCount });
+				cache.client.hincrby('station.userCounts', stationId, -1, (err, userCount) => {
+					if (err) return cb({ status: 'error', message: 'An error occurred while leaving the station' });
+					cb({ status: 'success', userCount });
+				});
 			}
 			else {
-				cb({ status: 'failure', message: `That station doesn't exist` });
+				cb({ status: 'failure', message: `That station doesn't exist, it may have been deleted` });
 			}
 		});
 	},

+ 1 - 1
backend/logic/actions/users.js

@@ -32,7 +32,7 @@ module.exports = {
 					if (match) {
 
 						// store the session in the cache
-						cache.addRow('sessions', Object.assign(user, { sessionId: utils.guid() }));
+						cache.hset('sessions', utils.guid(), cache.schemas.session());
 
 						next(null, { status: 'failure', message: 'Login successful', user });
 					}

+ 0 - 19
backend/logic/cache.js

@@ -1,19 +0,0 @@
-'use strict';
-
-// ! ! ! TEMPORARY ! ! !
-// this is a temporary placeholder for holding cached data until we get Redis implemented
-
-const tables = {};
-
-const lib = {
-	addTable: (tableName) => tables[tableName] = [],
-	addRow: (tableName, data) => tables[tableName].push(data),
-	delTable: (tableName) => delete tables[tableName],
-	delRow: (tableName, index) => tables[tableName].splice(index),
-	getRow: (tableName, index) => tables[tableName][index],
-	getAllRows: (tableName) => tables[tableName],
-	findRow: (tableName, key, value) => tables[tableName].find((row) => row[key] == value),
-	findRowIndex: (tableName, key, value) => tables[tableName].indexOf(lib.findRow(tableName, key, value))
-};
-
-module.exports = lib;

+ 135 - 0
backend/logic/cache/index.js

@@ -0,0 +1,135 @@
+'use strict';
+
+// Lightweight / convenience wrapper around redis module for our needs
+
+const pubs = {}, subs = {};
+
+const lib = {
+
+	client: null,
+	url: '',
+	schemas: {
+		session: require('./schemas/session'),
+		station: require('./schemas/station')
+	},
+
+	/**
+	 * Initializes the cache module
+	 *
+	 * @param {String} url - the url of the redis server
+	 * @param {Function} cb - gets called once we're done initializing
+	 */
+	init: (url, cb) => {
+
+		lib.url = url;
+
+		lib.client = redis.createClient({ url: lib.url });
+		lib.client.on('error', (err) => console.error);
+
+		cb();
+	},
+
+	/**
+	 * Gracefully closes all the Redis client connections
+	 */
+	quit: () => {
+		lib.client.quit();
+		Object.keys(pubs).forEach((channel) => pubs[channel].quit());
+		Object.keys(subs).forEach((channel) => subs[channel].client.quit());
+	},
+
+	/**
+	 * Sets a single value in a table
+	 *
+	 * @param {String} table - name of the table we want to set a key of (table === redis hash)
+	 * @param {String} key -  name of the key to set
+	 * @param {*} value - the value we want to set
+	 * @param {Function} cb - gets called when the value has been set in Redis
+	 * @param {Boolean} [stringifyJson=true] - stringify 'value' if it's an Object or Array
+	 */
+	hset: (table, key, value, cb, stringifyJson = true) => {
+
+		// automatically stringify objects and arrays into JSON
+		if (stringifyJson && ['object', 'array'].includes(typeof value)) value = JSON.stringify(value);
+
+		lib.client.hset(table, key, value, (err) => {
+			if (err) return cb(err);
+			cb(null);
+		});
+	},
+
+	/**
+	 * Gets a single value from a table
+	 *
+	 * @param {String} table - name of the table to get the value from (table === redis hash)
+	 * @param {String} key - name of the key to fetch
+	 * @param {Function} cb - gets called when the value is returned from Redis
+	 * @param {Boolean} [parseJson=true] - attempt to parse returned data as JSON
+	 */
+	hget: (table, key, cb, parseJson = true) => {
+		lib.client.hget(table, key, (err, value) => {
+			if (err) return typeof cb === 'function' ? cb(err) : null;
+			if (parseJson) try { value = JSON.parse(value); } catch (e) {}
+			if (typeof cb === 'function') cb(null, value);
+		});
+	},
+
+	/**
+	 * Returns all the keys for a table
+	 *
+	 * @param {String} table - name of the table to get the values from (table === redis hash)
+	 * @param {Function} cb - gets called when the values are returned from Redis
+	 * @param {Boolean} [parseJson=true] - attempts to parse all values as JSON by default
+	 */
+	hgetall: (table, cb, parseJson = true) => {
+		lib.client.hgetall(table, (err, obj) => {
+			if (err) return typeof cb === 'function' ? cb(err) : null;
+			if (parseJson) Object.keys(obj).forEach((key) => { try { obj[key] = JSON.parse(obj[key]); } catch (e) {} });
+			cb(null, obj);
+		});
+	},
+
+	/**
+	 * Publish a message to a channel, caches the redis client connection
+	 *
+	 * @param {String} channel - the name of the channel we want to publish a message to
+	 * @param {*} value - the value we want to send
+	 * @param {Boolean} [stringifyJson=true] - stringify 'value' if it's an Object or Array
+	 */
+	pub: (channel, value, stringifyJson = true) => {
+
+		if (pubs[channel] === undefined) {
+			pubs[channel] = redis.createClient({ url: lib.url });
+			pubs[channel].on('error', (err) => console.error);
+		}
+
+		if (stringifyJson && ['object', 'array'].includes(typeof value)) value = JSON.stringify(value);
+
+		pubs[channel].publish(channel, value);
+	},
+
+	/**
+	 * Subscribe to a channel, caches the redis client connection
+	 *
+	 * @param {String} channel - name of the channel to subscribe to
+	 * @param {Function} cb - gets called when a message is received
+	 * @param {Boolean} [parseJson=true] - parse the message as JSON
+	 */
+	sub: (channel, cb, parseJson = true) => {
+
+		if (subs[channel] === undefined) {
+			subs[channel] = { client: redis.createClient({ url: lib.url }), cbs: [] };
+			subs[channel].client.on('error', (err) => console.error);
+			subs[channel].client.on('message', (channel, message) => {
+				if (parseJson) try { message = JSON.parse(message); } catch (e) {}
+				subs[channel].cbs.forEach((cb) => cb(message));
+			});
+			subs[channel].subscribe(channel);
+		}
+
+		subs[channel].cbs.push(cb);
+	}
+
+};
+
+module.exports = lib;

+ 14 - 0
backend/logic/cache/schemas/session.js

@@ -0,0 +1,14 @@
+'use strict';
+
+/**
+ * Schema for a session stored in redis,
+ * gets created when a user logs in
+ *
+ * @returns {{stationId: null, created: number}}
+ */
+module.exports = () => {
+	return {
+		stationId: null,
+		created: Date.now()
+	};
+};

+ 14 - 0
backend/logic/cache/schemas/station.js

@@ -0,0 +1,14 @@
+'use strict';
+
+/**
+ * Schema for a station stored / cached in redis,
+ * gets created when a station is in use
+ * and therefore is put into the redis cache
+ *
+ * @param station
+ * @returns {Object}
+ */
+module.exports = (station) => {
+	// this is just the data from the DB for now, which we're just caching in memory for fast access
+	return station;
+};

+ 23 - 13
backend/logic/io.js

@@ -45,20 +45,30 @@ module.exports = {
 
 						let args = Array.prototype.slice.call(arguments, 0, -1);
 						let cb = arguments[arguments.length - 1];
-						let session = cache.findRow('sessions', 'id', socket.sessionId);
-
-						// if the action set 'session' to null, that means they want to delete it
-						if (session === null) delete socket.sessionId;
-
-						// call the action, passing it the session, and the arguments socket.io passed us
-						actions[namespace][action].apply(null, [session].concat(args).concat([
-							(result) => {
-								// store the session id, which we use later when the user disconnects
-								if (name == 'users.login' && result.user) socket.sessionId = result.user.sessionId;
-								// respond to the socket with our message
-								cb(result);
+
+						// load the session from the cache
+						cache.hget('sessions', socket.sessionId, (err, session) => {
+
+							if (err && err !== true) {
+								return cb({
+									status: 'error',
+									message: 'An error occurred while obtaining your session'
+								});
 							}
-						]));
+
+							// make sure the sockets sessionId isn't set if there is no session
+							if (socket.sessionId && session === null) delete socket.sessionId;
+
+							// call the action, passing it the session, and the arguments socket.io passed us
+							actions[namespace][action].apply(null, [session].concat(args).concat([
+								(result) => {
+									// store the session id
+									if (name == 'users.login' && result.user) socket.sessionId = result.user.sessionId;
+									// respond to the socket with our message
+									cb(result);
+								}
+							]));
+						});
 					})
 				})
 			});

+ 70 - 0
backend/logic/notifications.js

@@ -0,0 +1,70 @@
+'use strict';
+
+const crypto = require('crypto');
+const redis = require('redis');
+
+let client = null;
+
+const subscriptions = [];
+
+const lib = {
+
+	/**
+	 * Initializes the notifications module
+	 *
+	 * @param {String} url - the url of the redis server
+	 * @param {Function} cb - gets called once we're done initializing
+	 */
+	init: (url, cb) => {
+		client = redis.createClient({ url: url });
+		client.on('error', (err) => console.error);
+		client.on('message', (pattern, channel, expiredKey) => {
+			subscriptions.forEach((sub) => {
+				if (sub.name !== expiredKey) return;
+				sub.cb();
+			});
+		});
+		client.psubscribe('__keyevent@0__:expired');
+	}
+
+	/**
+	 * Schedules a notification to be dispatched in a specific amount of milliseconds,
+	 * notifications are unique by name, and the first one is always kept, as in
+	 * attempting to schedule a notification that already exists won't do anything
+	 *
+	 * @param {String} name - the name of the notification we want to schedule
+	 * @param {Integer} time - how long in milliseconds until the notification should be fired
+	 * @param {Function} cb - gets called when the notification has been scheduled
+	 */
+	schedule: (name, time, cb) => {
+		client.set(crypto.createHash('md5').update(`_notification:${name}_`).digest('hex'), '', 'PX', 'NX', time, cb);
+	},
+
+	/**
+	 * Subscribes a callback function to be called when a notification gets called
+	 *
+	 * @param {String} name - the name of the notification we want to subscribe to
+	 * @param {Function} cb - gets called when the subscribed notification gets called
+	 * @param {Boolean} unique - only subscribe if another subscription with the same name doesn't already exist
+	 * @return {Object} - the subscription object
+	 */
+	subscribe: (name, cb, unique = false) => {
+		if (unique && subscriptions.find((subscription) => subscription.name == name)) return;
+		let subscription = { name: crypto.createHash('md5').update(`_notification:${name}_`).digest('hex'), cb };
+		subscriptions.push(subscription);
+		return subscription;
+	},
+
+	/**
+	 * Remove a notification subscription
+	 *
+	 * @param {Object} subscription - the subscription object returned by {@link subscribe}
+	 */
+	remove: (subscription) => {
+		let index = subscriptions.indexOf(subscription);
+		if (index) subscriptions.splice(index, 1);
+	}
+
+};
+
+module.exports = lib;

+ 0 - 33
backend/logic/scheduler.js

@@ -1,33 +0,0 @@
-'use strict';
-
-// Central place to register / store timeouts
-
-const timeouts = {}, intervals = {};
-
-module.exports = {
-
-	once: (name, time, cb) => {
-		timeouts[name] = setTimeout(() => {
-			delete timeouts[name];
-			cb();
-		}, time);
-	},
-
-	repeat: (name, time, cb) => {
-		intervals[name] = setInterval(() => {
-			delete intervals[name];
-			cb();
-		}, time);
-	},
-
-	cancel: (name) => {
-		if (timeouts[name]) {
-			clearTimeout(timeouts[name]);
-			delete timeouts[name];
-		}
-		if (!intervals[name]) {
-			clearInterval(intervals[name]);
-			delete intervals[name];
-		}
-	}
-};

+ 1 - 1
backend/package.json

@@ -6,7 +6,7 @@
   "author": "Musare Team",
   "repository": "https://github.com/Musare/MusareNode",
   "scripts": {
-    "development": "nodemon -L /opt/app",
+    "development": "node-inspector & nodemon --debug -L /opt/app",
     "production": "node /opt/app"
   },
   "dependencies": {

+ 2 - 0
docker-compose.yml

@@ -4,6 +4,7 @@ services:
     build: ./backend
     ports:
     - "8081:80"
+    - "8082:8080"
     volumes:
     - ./backend:/opt/app
     links:
@@ -25,3 +26,4 @@ services:
     - "3000:3000"
   redis:
     image: redis
+    command: "--notify-keyspace-events Ex"