Forráskód Böngészése

Cleaned up stations.js.

KrisVos130 9 éve
szülő
commit
ecb635214f

+ 1 - 1
backend/logic/actions/songs.js

@@ -75,7 +75,7 @@ module.exports = {
 		//TODO Check if video is in queue and Add the song to the appropriate stations
 	}),
 
-	like: hooks.loginRequired((session, songId, cb) => {
+	like: hooks.loginRequired((session, songId, cb, userId) => {
 		db.models.user.findOne({ _id: userId }, (err, user) => {
 			if (user.liked.indexOf(songId) !== -1) return cb({ status: 'failure', message: 'You have already liked this song.' });
 			let dislikes = 0;

+ 22 - 30
backend/logic/actions/stations.js

@@ -26,13 +26,13 @@ cache.sub('station.pause', stationId => {
 });
 
 cache.sub('station.resume', stationId => {
-	stations.initializeAndReturnStation(stationId, (err, station) => {
+	stations.getStation(stationId, (err, station) => {
 		io.io.to(`station.${stationId}`).emit("event:stations.resume", {timePaused: station.timePaused});
 	});
 });
 
 cache.sub('station.create', stationId => {
-	stations.initializeAndReturnStation(stationId, (err, station) => {
+	stations.getStation(stationId, (err, station) => {
 		//TODO Emit to homepage and admin station page
 		if (!err) {
 			io.io.to('home').emit("event:stations.created", station);
@@ -80,7 +80,7 @@ module.exports = {
 	 */
 	join: (session, stationId, cb) => {
 
-		stations.initializeAndReturnStation(stationId, (err, station) => {
+		stations.getStation(stationId, (err, station) => {
 
 			if (err && err !== true) {
 				return cb({ status: 'error', message: 'An error occurred while joining the station' });
@@ -125,7 +125,7 @@ module.exports = {
 
 		if (!session) return cb({ status: 'failure', message: 'You must be logged in to skip a song!' });
 
-		stations.initializeAndReturnStation(stationId, (err, station) => {
+		stations.getStation(stationId, (err, station) => {
 			
 			if (err && err !== true) {
 				return cb({ status: 'error', message: 'An error occurred while skipping the station' });
@@ -150,7 +150,7 @@ module.exports = {
 	},*/
 
 	forceSkip: hooks.adminRequired((session, stationId, cb) => {
-		stations.initializeAndReturnStation(stationId, (err, station) => {
+		stations.getStation(stationId, (err, station) => {
 
 			if (err && err !== true) {
 				return cb({ status: 'error', message: 'An error occurred while skipping the station' });
@@ -175,7 +175,7 @@ module.exports = {
 	 * @return {{ status: String, userCount: Integer }}
 	 */
 	leave: (session, stationId, cb) => {
-		stations.initializeAndReturnStation(stationId, (err, station) => {
+		stations.getStation(stationId, (err, station) => {
 
 			if (err && err !== true) {
 				return cb({ status: 'error', message: 'An error occurred while leaving the station' });
@@ -195,7 +195,7 @@ module.exports = {
 	},
 
 	lock: hooks.adminRequired((session, stationId, cb) => {
-		stations.initializeAndReturnStation(stationId, (err, station) => {
+		stations.getStation(stationId, (err, station) => {
 			if (err && err !== true) {
 				return cb({ status: 'error', message: 'An error occurred while locking the station' });
 			} else if (station) {
@@ -208,7 +208,7 @@ module.exports = {
 	}),
 
 	unlock: hooks.adminRequired((session, stationId, cb) => {
-		stations.initializeAndReturnStation(stationId, (err, station) => {
+		stations.getStation(stationId, (err, station) => {
 			if (err && err !== true) {
 				return cb({ status: 'error', message: 'An error occurred while unlocking the station' });
 			} else if (station) {
@@ -221,23 +221,20 @@ module.exports = {
 	}),
 
 	pause: hooks.adminRequired((session, stationId, cb) => {
-		stations.initializeAndReturnStation(stationId, (err, station) => {
+		stations.getStation(stationId, (err, station) => {
 			if (err && err !== true) {
 				return cb({ status: 'error', message: 'An error occurred while pausing the station' });
 			} else if (station) {
 				if (!station.paused) {
 					station.paused = true;
 					station.pausedAt = Date.now();
-					cache.hset('stations', stationId, station, (err) => {
-						if (!err) {
-							db.models.station.update({_id: stationId}, {$set: {paused: true}}, () => {
-								cache.pub('station.pause', stationId);
-								notifications.unschedule(`stations.nextSong?id=${stationId}`);
-								cb({ status: 'success' });
-							});
-						} else {
-							cb({ status: 'failure', message: 'An error occurred while pausing the station.' });
-						}
+					db.models.station.update({_id: stationId}, {$set: {paused: true, pausedAt: Date.now()}}, () => {
+						if (err) return cb({ status: 'failure', message: 'An error occurred while pausing the station.' });
+						stations.updateStation(stationId, () => {
+							cache.pub('station.pause', stationId);
+							notifications.unschedule(`stations.nextSong?id=${stationId}`);
+							cb({ status: 'success' });
+						});
 					});
 				} else {
 					cb({ status: 'failure', message: 'That station was already paused.' });
@@ -250,27 +247,22 @@ module.exports = {
 	}),
 
 	resume: hooks.adminRequired((session, stationId, cb) => {
-		stations.initializeAndReturnStation(stationId, (err, station) => {
+		stations.getStation(stationId, (err, station) => {
 			if (err && err !== true) {
 				return cb({ status: 'error', message: 'An error occurred while resuming the station' });
 			} else if (station) {
 				if (station.paused) {
 					station.paused = false;
 					station.timePaused += (Date.now() - station.pausedAt);
-					cache.hset('stations', stationId, station, (err) => {
-						if (!err) {
-							db.models.station.update({_id: stationId}, {$set: {paused: false, timePaused: station.timePaused}}, () => {
-								cache.pub('station.resume', stationId);
-								cb({ status: 'success' });
-							});
-						} else {
-							cb({ status: 'failure', message: 'An error occurred while resuming the station.' });
-						}
+					db.models.station.update({_id: stationId}, {$set: {paused: false}, $inc: {timePaused: Date.now() - station.pausedAt}}, () => {
+						stations.updateStation(stationId, (err, station) => {
+							cache.pub('station.resume', stationId);
+							cb({ status: 'success' });
+						});
 					});
 				} else {
 					cb({ status: 'failure', message: 'That station is not paused.' });
 				}
-				cb({ status: 'success' });
 			} else {
 				cb({ status: 'failure', message: `That station doesn't exist, it may have been deleted` });
 			}

+ 5 - 1
backend/logic/db/schemas/station.js

@@ -10,10 +10,14 @@ module.exports = {
 		artists: [{ type: String }],
 		duration: { type: Number, required: true },
 		skipDuration: { type: Number, required: true },
-		thumbnail: { type: String, required: true }
+		thumbnail: { type: String, required: true },
+		likes: { type: Number, required: true },
+		dislikes: { type: Number, required: true },
 	},
 	currentSongIndex: { type: Number, default: 0, required: true },
 	timePaused: { type: Number, default: 0, required: true },
+	pausedAt: { type: Number, default: 0, required: true },
+	startedAt: { type: Number, default: 0, required: true },
 	playlist: { type: Array, required: true },
 	genres: [{ type: String }],
 	privacy: { type: String, enum: ["public", "unlisted", "private"], default: "private" },

+ 212 - 168
backend/logic/stations.js

@@ -15,27 +15,200 @@ cache.sub('station.pause', (stationId) => {
 });
 
 cache.sub('station.resume', (stationId) => {
-	module.exports.initializeAndReturnStation(stationId, (err, station) => {})
+	module.exports.getStation(stationId, (err, station) => {})
 });
 
 module.exports = {
 
 	init: function(cb) {
 		let _this = this;
+		//TODO Add async waterfall
 		db.models.station.find({}, (err, stations) => {
 			if (!err) {
 				stations.forEach((station) => {
 					console.info("Initializing Station: " + station._id);
-					_this.initializeAndReturnStation(station._id, (err, station) => {
-						//TODO Emit to homepage and admin station list
-					});
+					_this.initializeStation(station);
 				});
 				cb();
 			}
 		});
 	},
 
-	calculateSongForStation: (station, cb) => {
+	initializeStation: function(_station) {
+		let _this = this;
+		_this.getStation(_station._id, (err, station) => {
+			if (!err) {
+				if (station) {
+					let notification = notifications.subscribe(`stations.nextSong?id=${station._id}`, () => {
+						console.log("NOTIFICATION!!!");
+						_this.getStation(_station._id, (err, station) => {
+							if (station) {
+								// notify all the sockets on this station to go to the next song
+								async.waterfall([
+
+									(next) => {
+										if (station.playlist.length > 0) {
+											function func() {
+												if (station.currentSongIndex < station.playlist.length - 1) {
+													station.currentSongIndex++;
+													songs.getSong(station.playlist[station.currentSongIndex], (err, song) => {
+														if (!err) {
+															let $set = {};
+
+															$set.currentSong = {
+																_id: song._id,
+																title: song.title,
+																artists: song.artists,
+																duration: song.duration,
+																likes: song.likes,
+																dislikes: song.dislikes,
+																skipDuration: song.skipDuration,
+																thumbnail: song.thumbnail
+															};
+															$set.startedAt = Date.now();
+															$set.timePaused = 0;
+															next(null, $set);
+														} else {
+															db.models.station.update({_id: station._id}, {$inc: {currentSongIndex: 1}}, (err) => {
+																_this.updateStation(station._id, () => {
+																	func();
+																});
+															});
+														}
+													});
+												} else {
+													db.models.station.update({_id: station._id}, {$set: {currentSongIndex: 0}}, (err) => {
+														_this.updateStation(station._id, (err, station) => {
+															console.log(12345678, err, station);
+															_this.calculateSongForStation(station, (err, newPlaylist) => {
+																console.log('New playlist: ', newPlaylist);
+																if (!err) {
+																	songs.getSong(newPlaylist[0], (err, song) => {
+																		let $set = {};
+																		if (song) {
+																			$set.currentSong = {
+																				_id: song._id,
+																				title: song.title,
+																				artists: song.artists,
+																				duration: song.duration,
+																				likes: song.likes,
+																				dislikes: song.dislikes,
+																				skipDuration: song.skipDuration,
+																				thumbnail: song.thumbnail
+																			};
+																			station.playlist = newPlaylist;
+																		} else {
+																			$set.currentSong = _this.defaultSong;
+																		}
+																		$set.startedAt = Date.now();
+																		$set.timePaused = 0;
+																		next(null, $set);
+																	});
+																} else {
+																	let $set = {};
+																	$set.currentSong = _this.defaultSong;
+																	$set.startedAt = Date.now();
+																	$set.timePaused = 0;
+																	next(null, $set);
+																}
+															})
+														});
+													});
+												}
+											}
+											func();
+										} else {
+											_this.calculateSongForStation(station, (err, playlist) => {
+												if (!err && playlist.length === 0) {
+													let $set = {};
+													$set.currentSongIndex = 0;
+													$set.currentSong = _this.defaultSong;
+													$set.startedAt = Date.now();
+													$set.timePaused = 0;
+													next(null, $set);
+												} else {
+													songs.getSong(playlist[0], (err, song) => {
+														let $set = {};
+														if (!err) {
+															$set.currentSong = {
+																_id: song._id,
+																title: song.title,
+																artists: song.artists,
+																duration: song.duration,
+																likes: song.likes,
+																dislikes: song.dislikes,
+																skipDuration: song.skipDuration,
+																thumbnail: song.thumbnail
+															};
+														} else {
+															$set.currentSong = _this.defaultSong;
+														}
+														$set.currentSongIndex = 0;
+														$set.startedAt = Date.now();
+														$set.timePaused = 0;
+														next(null, $set);
+													});
+												}
+											});
+										}
+									},
+
+									($set, next) => {
+										db.models.station.update({_id: station._id}, {$set}, (err) => {
+											_this.updateStation(station._id, (err, station) => {
+												console.log(err, station);
+												next(null, station);
+											});
+										});
+									},
+
+
+								], (err, station) => {
+									console.log(err, station);
+									io.io.to(`station.${station._id}`).emit("event:songs.next", {
+										currentSong: station.currentSong,
+										startedAt: station.startedAt,
+										paused: station.paused,
+										timePaused: 0
+									});
+									utils.socketsJoinSongRoom(io.io.to(`station.${station._id}`).sockets, `song.${station.currentSong._id}`);
+									// schedule a notification to be dispatched when the next song ends
+									console.log("NEXT SONG!!!");
+									if (!station.paused) {
+										notifications.schedule(`stations.nextSong?id=${station._id}`, station.currentSong.duration * 1000);
+									}
+								});
+							}
+							// the station doesn't exist anymore, unsubscribe from it
+							else {
+								notifications.remove(notification);
+							}
+						});
+					}, true);
+					if (!station.paused) {
+						/*if (!station.startedAt) {
+							station.startedAt = Date.now();
+							station.timePaused = 0;
+							cache.hset('stations', stationId, station);
+						}*/
+						let timeLeft = ((station.currentSong.duration * 1000) - (Date.now() - station.startedAt - station.timePaused));
+						if (isNaN(timeLeft)) timeLeft = -1;
+						if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
+							console.log("Test");
+							notifications.schedule(`stations.nextSong?id=${station._id}`, 1);
+						} else {
+							notifications.schedule(`stations.nextSong?id=${station._id}`, timeLeft);
+						}
+					} else {
+						notifications.unschedule(`stations.nextSong?id${station._id}`);
+					}
+				}
+			}
+		});
+	},
+
+	calculateSongForStation: function(station, cb) {
+		let _this = this;
 		let songList = [];
 		async.waterfall([
 
@@ -64,8 +237,10 @@ module.exports = {
 				station.playlist.filter((songId) => {
 					if (songList.indexOf(songId) !== -1) playlist.push(songId);
 				});
-				db.models.station.update({_id: station._id}, {$set: {playlist: playlist}}, (err, result) => {
-					next(err, playlist);
+				db.models.station.update({_id: station._id}, {$set: {playlist: playlist}}, (err) => {
+					_this.updateStation(station._id, () => {
+						next(err, playlist);
+					});
 				});
 			}
 
@@ -74,186 +249,55 @@ module.exports = {
 		});
 	},
 
-	initializeAndReturnStation: function(stationId, cb) {
-		console.log("INITIALIZE", stationId);
-		let _this = this;
+	// Attempts to get the station from Redis. If it's not in Redis, get it from Mongo and add it to Redis.
+	getStation: function(stationId, cb) {
 		async.waterfall([
 
-			// first check the cache for the station
-			(next) => cache.hget('stations', stationId, next),
+			(next) => {
+				cache.hget('stations', stationId, next);
+			},
 
-			// if the cached version exist
 			(station, next) => {
 				if (station) return next(true, station);
+
 				db.models.station.findOne({ _id: stationId }, next);
 			},
 
-			// if the station exists in the DB, add it to the cache
 			(station, next) => {
-				if (!station) return cb('Station by that id does not exist');
-				station = cache.schemas.station(station);
-				cache.hset('stations', station._id, station, (err) => next(err, station));
-			}
+				if (station) {
+					station = cache.schemas.station(station);
+					cache.hset('stations', stationId, station);
+					next(true, station);
+				} else next('Station not found.');
+			},
 
 		], (err, station) => {
-			if (err && err !== true) return cb(err);
-
-			// get notified when the next song for this station should play, so that we can notify our sockets
-			let notification = notifications.subscribe(`stations.nextSong?id=${station._id}`, () => {
-				console.log("NOTIFICATION!!!");
-			//function skipSongTemp() {
-				// get the station from the cache
-				//TODO Recalculate songs if the last song of the station playlist is getting played
-				cache.hget('stations', station._id, (err, station) => {
-					if (station) {
-						// notify all the sockets on this station to go to the next song
-						async.waterfall([
+			if (err && err !== true) cb(err);
 
-							(next) => {
-								if (station.playlist.length > 0) {
-									function func() {
-										if (station.currentSongIndex < station.playlist.length - 1) {
-											station.currentSongIndex++;
-											songs.getSong(station.playlist[station.currentSongIndex], (err, song) => {
-												if (!err) {
-													station.currentSong = {
-														_id: song._id,
-														title: song.title,
-														artists: song.artists,
-														duration: song.duration,
-														likes: song.likes,
-														dislikes: song.dislikes,
-														skipDuration: song.skipDuration,
-														thumbnail: song.thumbnail
-													};
-													station.startedAt = Date.now();
-													station.timePaused = 0;
-													next(null, station);
-												} else {
-													station.currentSongIndex++;
-													func();
-												}
-											});
-										} else {
-											station.currentSongIndex = 0;
-											_this.calculateSongForStation(station, (err, newPlaylist) => {
-												console.log('New playlist: ', newPlaylist);
-												if (!err) {
-													songs.getSong(newPlaylist[0], (err, song) => {
-														if (song) {
-															station.currentSong = {
-																_id: song._id,
-																title: song.title,
-																artists: song.artists,
-																duration: song.duration,
-																likes: song.likes,
-																dislikes: song.dislikes,
-																skipDuration: song.skipDuration,
-																thumbnail: song.thumbnail
-															};
-															station.playlist = newPlaylist;
-														} else {
-															station.currentSong = _this.defaultSong;
-														}
-														station.startedAt = Date.now();
-														station.timePaused = 0;
-														next(null, station);
-													});
-												} else {
-													station.currentSong = _this.defaultSong;
-													station.startedAt = Date.now();
-													station.timePaused = 0;
-													next(null, station);
-												}
-											})
-										}
-									}
+			cb(null, station);
+		});
+	},
 
-									func();
-								} else {
-									_this.calculateSongForStation(station, (err, playlist) => {
-										if (!err && playlist.length === 0) {
-											station.currentSongIndex = 0;
-											station.currentSong = _this.defaultSong;
-											station.startedAt = Date.now();
-											station.timePaused = 0;
-											next(null, station);
-										} else {
-											songs.getSong(playlist[0], (err, song) => {
-												if (!err) {
-													station.currentSong = {
-														_id: song._id,
-														title: song.title,
-														artists: song.artists,
-														duration: song.duration,
-														likes: song.likes,
-														dislikes: song.dislikes,
-														skipDuration: song.skipDuration,
-														thumbnail: song.thumbnail
-													};
-												} else {
-													station.currentSong = _this.defaultSong;
-												}
-												station.currentSongIndex = 0;
-												station.startedAt = Date.now();
-												station.timePaused = 0;
-												station.playlist = playlist;
-												next(null, station);
-											});
-										}
-									});
-								}
-							},
+	updateStation: (stationId, cb) => {
+		async.waterfall([
 
-							(station, next) => {
-								cache.hset('stations', station._id, station, (err) => next(err, station));
-								//TODO Also save to DB
-							},
+			(next) => {
+				db.models.station.findOne({ _id: stationId }, next);
+			},
 
+			(station, next) => {
+				if (!station) return next('Station not found.');
 
-						], (err, station) => {
-							io.io.to(`station.${stationId}`).emit("event:songs.next", {
-								currentSong: station.currentSong,
-								startedAt: station.startedAt,
-								paused: station.paused,
-								timePaused: 0
-							});
-							utils.socketsJoinSongRoom(io.io.to(`station.${stationId}`).sockets, `song.${station.currentSong._id}`);
-							// schedule a notification to be dispatched when the next song ends
-							console.log("NEXT SONG!!!");
-							if (!station.paused) {
-								notifications.schedule(`stations.nextSong?id=${station._id}`, station.currentSong.duration * 1000);
-							}
-							//skipTimeout = setTimeout(skipSongTemp, station.currentSong.duration * 1000);
-						});
-					}
-					// the station doesn't exist anymore or is paused, unsubscribe from it
-					else {
-						notifications.remove(notification);
-					}
+				cache.hset('stations', stationId, station, (err) => {
+					if (err) return next(err);
+					next(null, station);
 				});
-			}, true);
-
-			console.log(station.paused);
-			if (!station.paused) {
-				if (!station.startedAt) {
-					station.startedAt = Date.now();
-					station.timePaused = 0;
-					cache.hset('stations', stationId, station);
-				}
-				let timeLeft = ((station.currentSong.duration * 1000) - (Date.now() - station.startedAt - station.timePaused));
-				console.log(timeLeft);
-				if (station.currentSong.duration * 1000 < timeLeft || timeLeft < 0) {
-					console.log("Test");
-					notifications.schedule(`stations.nextSong?id=${station._id}`, 1);
-				} else {
-					notifications.schedule(`stations.nextSong?id=${station._id}`, timeLeft);
-				}
-			} else {
-				notifications.unschedule(`stations.nextSong?id${station._id}`);
 			}
 
-			return cb(null, station);
+		], (err, station) => {
+			if (err && err !== true) cb(err);
+
+			cb(null, station);
 		});
 	},
 

+ 3 - 0
frontend/components/Station/Station.vue

@@ -225,6 +225,7 @@
 			resumeStation: function () {
 				let _this = this;
 				_this.socket.emit('stations.resume', _this.stationId, data => {
+					console.log(data);
 					if (data.status !== 'success') {
 						Toast.methods.addToast(`Error: ${data.message}`, 8000);
 					} else {
@@ -235,6 +236,7 @@
 			pauseStation: function () {
 				let _this = this;
 				_this.socket.emit('stations.pause', _this.stationId, data => {
+					console.log(data);
 					if (data.status !== 'success') {
 						Toast.methods.addToast(`Error: ${data.message}`, 8000);
 					} else {
@@ -325,6 +327,7 @@
 				_this.timePaused = data.timePaused;
 				_this.playVideo();
 				_this.socket.emit('songs.getOwnSongRatings', data.currentSong._id, (data) => {
+					console.log(data);
 					if (_this.currentSong._id === data.songId) {
 						_this.liked = data.liked;
 						_this.disliked = data.disliked;