| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463 | import async from "async";import CoreClass from "../core";let ActivitiesModule;let DBModule;let CacheModule;let UtilsModule;let WSModule;let PlaylistsModule;class _ActivitiesModule extends CoreClass {	// eslint-disable-next-line require-jsdoc	constructor() {		super("activities");		ActivitiesModule = this;	}	/**	 * Initialises the activities module	 * @returns {Promise} - returns promise (reject, resolve)	 */	initialize() {		return new Promise(resolve => {			DBModule = this.moduleManager.modules.db;			CacheModule = this.moduleManager.modules.cache;			UtilsModule = this.moduleManager.modules.utils;			WSModule = this.moduleManager.modules.ws;			PlaylistsModule = this.moduleManager.modules.playlists;			resolve();		});	}	/**	 * Adds a new activity to the database	 * @param {object} payload - object that contains the payload	 * @param {string} payload.userId - the id of the user who's activity is to be added	 * @param {string} payload.type - the type of activity (enum specified in schema)	 * @param {object} payload.payload - the details of the activity e.g. an array of songs that were added	 * @param {string} payload.payload.message - the main message describing the activity e.g. 50 songs added to playlist 'playlist name'	 * @param {string} payload.payload.thumbnail - url to a thumbnail e.g. song album art to be used when display an activity	 * @param {string} payload.payload.mediaSource - (optional) if relevant, the media source of the song related to the activity	 * @param {string} payload.payload.reportId - (optional) if relevant, the id of the report related to the activity	 * @param {string} payload.payload.playlistId - (optional) if relevant, the id of the playlist related to the activity	 * @param {string} payload.payload.stationId - (optional) if relevant, the id of the station related to the activity	 * @returns {Promise} - returns promise (reject, resolve)	 */	ADD_ACTIVITY(payload) {		return new Promise((resolve, reject) => {			async.waterfall(				[					next => {						DBModule.runJob("GET_MODEL", { modelName: "activity" }, this)							.then(res => next(null, res))							.catch(next);					},					(ActivityModel, next) => {						const { userId, type } = payload;						const activity = new ActivityModel({							userId,							type,							payload: payload.payload						});						activity.save(next);					},					(activity, next) => {						WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId }, this)							.then(sockets => {								sockets.forEach(socket =>									socket.dispatch("event:activity.created", { data: { activity } })								);								next(null, activity);							})							.catch(next);					},					(activity, next) => {						const mergeableActivities = ["playlist__remove_song", "playlist__add_song"];						const spammableActivities = [							"user__toggle_nightmode",							"user__toggle_autoskip_disliked_songs",							"user__toggle_activity_watch",							"song__like",							"song__unlike",							"song__dislike",							"song__undislike"						];						CacheModule.runJob("HGET", { table: "recentActivities", key: activity.userId })							.then(recentActivity => {								if (recentActivity) {									const timeDifference = mins =>										new Date() - new Date(recentActivity.createdAt) < mins * 60 * 1000;									// if both activities have the same type, if within last 15 mins and if activity is within the spammableActivities array									if (										recentActivity.type === activity.type &&										!!timeDifference(15) &&										spammableActivities.includes(activity.type)									)										return ActivitiesModule.runJob(											"CHECK_FOR_ACTIVITY_SPAM_TO_HIDE",											{ userId: activity.userId, type: activity.type },											this										)											.then(() => next(null, activity))											.catch(next);									// if activity is within the mergeableActivities array, if both activities are about removing/adding and if within last 5 mins									if (										mergeableActivities.includes(activity.type) &&										recentActivity.type === activity.type &&										!!timeDifference(5)									) {										return PlaylistsModule.runJob("GET_PLAYLIST", {											playlistId: activity.payload.playlistId										})											.then(playlist =>												ActivitiesModule.runJob(													"CHECK_FOR_ACTIVITY_SPAM_TO_MERGE",													{														userId: activity.userId,														type: activity.type,														playlist: {															playlistId: playlist._id,															displayName: playlist.displayName														}													},													this												)													.then(() => next(null, activity))													.catch(next)											)											.catch(next);									}									return next(null, activity);								}								return next(null, activity);							})							.catch(next);					},					// store most recent activity in cache to be quickly accessible					(activity, next) =>						CacheModule.runJob(							"HSET",							{								table: "recentActivities",								key: activity.userId,								value: { createdAt: activity.createdAt, type: activity.type }							},							this						)							.then(() => next(null))							.catch(next)				],				async (err, activity) => {					if (err) {						err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);						return reject(new Error(err));					}					return resolve(activity);				}			);		});	}	/**	 * Merges activities about adding/removing songs from a playlist within a 5-minute period to prevent spam	 * @param {object} payload - object that contains the payload	 * @param {string} payload.userId - the id of the user to check for duplicates	 * @param {object} payload.playlist - object that contains info about the relevant playlist	 * @param {string} payload.playlist.playlistId - the id of the playlist	 * @param {string} payload.playlist.displayName - the display name of the playlist	 * @param {string} payload.type - the type of activity to check for duplicates	 * @returns {Promise} - returns promise (reject, resolve)	 */	async CHECK_FOR_ACTIVITY_SPAM_TO_MERGE(payload) {		const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);		return new Promise((resolve, reject) => {			async.waterfall(				[					// find all activities of this type from the last 5 minutes					next => {						activityModel							.find(								{									userId: payload.userId,									type: { $in: [payload.type, `${payload.type}s`] },									hidden: false,									createdAt: {										$gte: new Date(new Date() - 5 * 60 * 1000)									},									"payload.playlistId": payload.playlist.playlistId								},								["_id", "type", "payload.message"]							)							.sort({ createdAt: -1 })							.exec(next);					},					// hide these activities, emit to socket listeners and count number of songs in each					(activities, next) => {						let howManySongs = 0; // how many songs added/removed						activities.forEach(activity => {							activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);							WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId }, this)								.then(sockets =>									sockets.forEach(socket =>										socket.dispatch("event:activity.hidden", { data: { activityId: activity._id } })									)								)								.catch(next);							if (activity.type === payload.type) howManySongs += 1;							else if (activity.type === `${payload.type}s`)								howManySongs += parseInt(									activity.payload.message.replace(										/(?:Removed|Added)\s(?<songs>\d+)\ssongs.+/g,										"$<songs>"									)								);						});						return next(null, howManySongs);					},					// // delete in cache the most recent activity to avoid issues when adding a new activity					(howManySongs, next) => {						CacheModule.runJob("HDEL", { table: "recentActivities", key: payload.userId }, this)							.then(() => next(null, howManySongs))							.catch(next);					},					// add a new activity that merges the activities together					(howManySongs, next) => {						const activity = {							userId: payload.userId,							type: "",							payload: {								message: "",								playlistId: payload.playlist.playlistId							}						};						if (payload.type === "playlist__remove_song" || payload.type === "playlist__remove_songs") {							activity.payload.message = `Removed ${howManySongs} songs from playlist <playlistId>${payload.playlist.displayName}</playlistId>`;							activity.type = "playlist__remove_songs";						} else if (payload.type === "playlist__add_song" || payload.type === "playlist__add_songs") {							activity.payload.message = `Added ${howManySongs} songs to playlist <playlistId>${payload.playlist.displayName}</playlistId>`;							activity.type = "playlist__add_songs";						}						ActivitiesModule.runJob("ADD_ACTIVITY", activity, this)							.then(() => next())							.catch(next);					}				],				async err => {					if (err) {						err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);						return reject(new Error(err));					}					return resolve();				}			);		});	}	/**	 * Removes any references to a station, playlist or song in activities	 * @param {object} payload - object that contains the payload	 * @param {string} payload.type - type of reference. enum: ["mediaSource", "stationId", "playlistId", "playlistId"]	 * @param {string} payload.stationId - (optional) the id of a station	 * @param {string} payload.reportId - (optional) the id of a report	 * @param {string} payload.playlistId - (optional) the id of a playlist	 * @param {string} payload.mediaSource - (optional) the id of a song	 * @returns {Promise} - returns promise (reject, resolve)	 */	async REMOVE_ACTIVITY_REFERENCES(payload) {		const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);		return new Promise((resolve, reject) => {			async.waterfall(				[					next => {						if (							(payload.type !== "mediaSource" &&								payload.type !== "stationId" &&								payload.type !== "reportId" &&								payload.type !== "playlistId") ||							!payload.type						)							return next("Please use a valid reference type.");						if (!payload[payload.type]) return next(`Please provide a ${payload.type} in the job payload.`);						return next();					},					// find all activities that include the reference					next => {						const query = {};						query[`payload.${payload.type}`] = payload[payload.type];						activityModel							.find(query, ["_id", "userId", "payload.message"])							.sort({ createdAt: -1 })							.exec(next);					},					(activities, next) => {						async.eachLimit(							activities,							1,							(activity, next) => {								// remove the reference tags								if (payload.mediaSource) {									activity.payload.message = activity.payload.message.replace(										/<mediaSource>(.*)<\/mediaSource>/g,										"$1"									);								}								if (payload.reportId) {									activity.payload.message = activity.payload.message.replace(										/<reportId>(.*)<\/reportId>/g,										"$1"									);								}								if (payload.playlistId) {									activity.payload.message = activity.payload.message.replace(										/<playlistId>(.*)<\/playlistId>/g,										`$1`									);								}								if (payload.stationId) {									activity.payload.message = activity.payload.message.replace(										/<stationId>(.*)<\/stationId>/g,										`$1`									);								}								activityModel									.updateOne(										{ _id: activity._id },										{ $set: { "payload.message": activity.payload.message } }									)									.then(() => {										WSModule.runJob("SOCKETS_FROM_USER", { userId: activity.userId })											.then(sockets =>												sockets.forEach(socket =>													socket.dispatch("event:activity.updated", {														data: {															activityId: activity._id,															message: activity.payload.message														}													})												)											)											.catch(next);										return next();									})									.catch(next);							},							err => next(err)						);					}				],				async err => {					if (err) {						err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);						return reject(new Error(err));					}					return resolve();				}			);		});	}	/**	 * Hides any activities of the same type within a 15-minute period to prevent spam	 * @param {object} payload - object that contains the payload	 * @param {string} payload.userId - the id of the user to check for duplicates	 * @param {string} payload.type - the type of activity to check for duplicates	 * @returns {Promise} - returns promise (reject, resolve)	 */	async CHECK_FOR_ACTIVITY_SPAM_TO_HIDE(payload) {		const activityModel = await DBModule.runJob("GET_MODEL", { modelName: "activity" }, this);		return new Promise((resolve, reject) => {			async.waterfall(				[					// find all activities of this type from the last 15 minutes					next => {						activityModel							.find(								{									userId: payload.userId,									type: payload.type,									hidden: false,									createdAt: {										$gte: new Date(new Date() - 15 * 60 * 1000)									}								},								"_id"							)							.sort({ createdAt: -1 })							.skip(1)							.exec(next);					},					// hide these activities and emit to socket listeners					(activities, next) => {						activities.forEach(activity => {							activityModel.updateOne({ _id: activity._id }, { $set: { hidden: true } }).catch(next);							WSModule.runJob("SOCKETS_FROM_USER", { userId: payload.userId })								.then(sockets => {									sockets.forEach(socket =>										socket.dispatch("event:activity.hidden", {											data: { activityId: activity._id }										})									);								})								.catch(next);						});						return next();					}				],				async err => {					if (err) {						err = await UtilsModule.runJob("GET_ERROR", { error: err }, this);						return reject(new Error(err));					}					return resolve();				}			);		});	}}export default new _ActivitiesModule();
 |