SyncplayController.cs 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using MediaBrowser.Controller.Session;
  7. using MediaBrowser.Controller.Syncplay;
  8. using MediaBrowser.Model.Session;
  9. using MediaBrowser.Model.Syncplay;
  10. using Microsoft.Extensions.Logging;
  11. namespace Emby.Server.Implementations.Syncplay
  12. {
  13. /// <summary>
  14. /// Class SyncplayController.
  15. /// </summary>
  16. public class SyncplayController : ISyncplayController, IDisposable
  17. {
  18. /// <summary>
  19. /// Used to filter the sessions of a group.
  20. /// </summary>
  21. private enum BroadcastType
  22. {
  23. /// <summary>
  24. /// All sessions will receive the message.
  25. /// </summary>
  26. AllGroup = 0,
  27. /// <summary>
  28. /// Only the specified session will receive the message.
  29. /// </summary>
  30. CurrentSession = 1,
  31. /// <summary>
  32. /// All sessions, except the current one, will receive the message.
  33. /// </summary>
  34. AllExceptCurrentSession = 2,
  35. /// <summary>
  36. /// Only sessions that are not buffering will receive the message.
  37. /// </summary>
  38. AllReady = 3
  39. }
  40. /// <summary>
  41. /// The logger.
  42. /// </summary>
  43. private readonly ILogger _logger;
  44. /// <summary>
  45. /// The session manager.
  46. /// </summary>
  47. private readonly ISessionManager _sessionManager;
  48. /// <summary>
  49. /// The syncplay manager.
  50. /// </summary>
  51. private readonly ISyncplayManager _syncplayManager;
  52. /// <summary>
  53. /// The group to manage.
  54. /// </summary>
  55. private readonly GroupInfo _group = new GroupInfo();
  56. /// <inheritdoc />
  57. public Guid GetGroupId() => _group.GroupId;
  58. /// <inheritdoc />
  59. public Guid GetPlayingItemId() => _group.PlayingItem.Id;
  60. /// <inheritdoc />
  61. public bool IsGroupEmpty() => _group.IsEmpty();
  62. private bool _disposed = false;
  63. public SyncplayController(
  64. ILogger logger,
  65. ISessionManager sessionManager,
  66. ISyncplayManager syncplayManager)
  67. {
  68. _logger = logger;
  69. _sessionManager = sessionManager;
  70. _syncplayManager = syncplayManager;
  71. }
  72. /// <inheritdoc />
  73. public void Dispose()
  74. {
  75. Dispose(true);
  76. GC.SuppressFinalize(this);
  77. }
  78. /// <summary>
  79. /// Releases unmanaged and optionally managed resources.
  80. /// </summary>
  81. /// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
  82. protected virtual void Dispose(bool disposing)
  83. {
  84. if (_disposed)
  85. {
  86. return;
  87. }
  88. _disposed = true;
  89. }
  90. // TODO: use this somewhere
  91. private void CheckDisposed()
  92. {
  93. if (_disposed)
  94. {
  95. throw new ObjectDisposedException(GetType().Name);
  96. }
  97. }
  98. /// <summary>
  99. /// Filters sessions of this group.
  100. /// </summary>
  101. /// <param name="from">The current session.</param>
  102. /// <param name="type">The filtering type.</param>
  103. /// <value>The array of sessions matching the filter.</value>
  104. private SessionInfo[] FilterSessions(SessionInfo from, BroadcastType type)
  105. {
  106. switch (type)
  107. {
  108. case BroadcastType.CurrentSession:
  109. return new SessionInfo[] { from };
  110. case BroadcastType.AllGroup:
  111. return _group.Participants.Values.Select(
  112. session => session.Session
  113. ).ToArray();
  114. case BroadcastType.AllExceptCurrentSession:
  115. return _group.Participants.Values.Select(
  116. session => session.Session
  117. ).Where(
  118. session => !session.Id.Equals(from.Id)
  119. ).ToArray();
  120. case BroadcastType.AllReady:
  121. return _group.Participants.Values.Where(
  122. session => !session.IsBuffering
  123. ).Select(
  124. session => session.Session
  125. ).ToArray();
  126. default:
  127. return new SessionInfo[] { };
  128. }
  129. }
  130. /// <summary>
  131. /// Sends a GroupUpdate message to the interested sessions.
  132. /// </summary>
  133. /// <param name="from">The current session.</param>
  134. /// <param name="type">The filtering type.</param>
  135. /// <param name="message">The message to send.</param>
  136. /// <value>The task.</value>
  137. private Task SendGroupUpdate<T>(SessionInfo from, BroadcastType type, GroupUpdate<T> message)
  138. {
  139. IEnumerable<Task> GetTasks()
  140. {
  141. SessionInfo[] sessions = FilterSessions(from, type);
  142. foreach (var session in sessions)
  143. {
  144. yield return _sessionManager.SendSyncplayGroupUpdate(session.Id.ToString(), message, CancellationToken.None);
  145. }
  146. }
  147. return Task.WhenAll(GetTasks());
  148. }
  149. /// <summary>
  150. /// Sends a playback command to the interested sessions.
  151. /// </summary>
  152. /// <param name="from">The current session.</param>
  153. /// <param name="type">The filtering type.</param>
  154. /// <param name="message">The message to send.</param>
  155. /// <value>The task.</value>
  156. private Task SendCommand(SessionInfo from, BroadcastType type, SendCommand message)
  157. {
  158. IEnumerable<Task> GetTasks()
  159. {
  160. SessionInfo[] sessions = FilterSessions(from, type);
  161. foreach (var session in sessions)
  162. {
  163. yield return _sessionManager.SendSyncplayCommand(session.Id.ToString(), message, CancellationToken.None);
  164. }
  165. }
  166. return Task.WhenAll(GetTasks());
  167. }
  168. /// <summary>
  169. /// Builds a new playback command with some default values.
  170. /// </summary>
  171. /// <param name="type">The command type.</param>
  172. /// <value>The SendCommand.</value>
  173. private SendCommand NewSyncplayCommand(SendCommandType type)
  174. {
  175. return new SendCommand()
  176. {
  177. GroupId = _group.GroupId.ToString(),
  178. Command = type,
  179. PositionTicks = _group.PositionTicks,
  180. When = _group.LastActivity.ToUniversalTime().ToString("o"),
  181. EmittedAt = DateTime.UtcNow.ToUniversalTime().ToString("o")
  182. };
  183. }
  184. /// <summary>
  185. /// Builds a new group update message.
  186. /// </summary>
  187. /// <param name="type">The update type.</param>
  188. /// <param name="data">The data to send.</param>
  189. /// <value>The GroupUpdate.</value>
  190. private GroupUpdate<T> NewSyncplayGroupUpdate<T>(GroupUpdateType type, T data)
  191. {
  192. return new GroupUpdate<T>()
  193. {
  194. GroupId = _group.GroupId.ToString(),
  195. Type = type,
  196. Data = data
  197. };
  198. }
  199. /// <inheritdoc />
  200. public void InitGroup(SessionInfo session)
  201. {
  202. _group.AddSession(session);
  203. _syncplayManager.AddSessionToGroup(session, this);
  204. _group.PlayingItem = session.FullNowPlayingItem;
  205. _group.IsPaused = true;
  206. _group.PositionTicks = session.PlayState.PositionTicks ??= 0;
  207. _group.LastActivity = DateTime.UtcNow;
  208. var updateSession = NewSyncplayGroupUpdate(GroupUpdateType.GroupJoined, DateTime.UtcNow.ToUniversalTime().ToString("o"));
  209. SendGroupUpdate(session, BroadcastType.CurrentSession, updateSession);
  210. var pauseCommand = NewSyncplayCommand(SendCommandType.Pause);
  211. SendCommand(session, BroadcastType.CurrentSession, pauseCommand);
  212. }
  213. /// <inheritdoc />
  214. public void SessionJoin(SessionInfo session, JoinGroupRequest request)
  215. {
  216. if (session.NowPlayingItem?.Id == _group.PlayingItem.Id && request.PlayingItemId == _group.PlayingItem.Id)
  217. {
  218. _group.AddSession(session);
  219. _syncplayManager.AddSessionToGroup(session, this);
  220. var updateSession = NewSyncplayGroupUpdate(GroupUpdateType.GroupJoined, DateTime.UtcNow.ToUniversalTime().ToString("o"));
  221. SendGroupUpdate(session, BroadcastType.CurrentSession, updateSession);
  222. var updateOthers = NewSyncplayGroupUpdate(GroupUpdateType.UserJoined, session.UserName);
  223. SendGroupUpdate(session, BroadcastType.AllExceptCurrentSession, updateOthers);
  224. // Client join and play, syncing will happen client side
  225. if (!_group.IsPaused)
  226. {
  227. var playCommand = NewSyncplayCommand(SendCommandType.Play);
  228. SendCommand(session, BroadcastType.CurrentSession, playCommand);
  229. }
  230. else
  231. {
  232. var pauseCommand = NewSyncplayCommand(SendCommandType.Pause);
  233. SendCommand(session, BroadcastType.CurrentSession, pauseCommand);
  234. }
  235. }
  236. else
  237. {
  238. var playRequest = new PlayRequest();
  239. playRequest.ItemIds = new Guid[] { _group.PlayingItem.Id };
  240. playRequest.StartPositionTicks = _group.PositionTicks;
  241. var update = NewSyncplayGroupUpdate(GroupUpdateType.PrepareSession, playRequest);
  242. SendGroupUpdate(session, BroadcastType.CurrentSession, update);
  243. }
  244. }
  245. /// <inheritdoc />
  246. public void SessionLeave(SessionInfo session)
  247. {
  248. _group.RemoveSession(session);
  249. _syncplayManager.RemoveSessionFromGroup(session, this);
  250. var updateSession = NewSyncplayGroupUpdate(GroupUpdateType.GroupLeft, _group.PositionTicks);
  251. SendGroupUpdate(session, BroadcastType.CurrentSession, updateSession);
  252. var updateOthers = NewSyncplayGroupUpdate(GroupUpdateType.UserLeft, session.UserName);
  253. SendGroupUpdate(session, BroadcastType.AllExceptCurrentSession, updateOthers);
  254. }
  255. /// <inheritdoc />
  256. public void HandleRequest(SessionInfo session, PlaybackRequest request)
  257. {
  258. // The server's job is to mantain a consistent state to which clients refer to,
  259. // as also to notify clients of state changes.
  260. // The actual syncing of media playback happens client side.
  261. // Clients are aware of the server's time and use it to sync.
  262. switch (request.Type)
  263. {
  264. case PlaybackRequestType.Play:
  265. HandlePlayRequest(session, request);
  266. break;
  267. case PlaybackRequestType.Pause:
  268. HandlePauseRequest(session, request);
  269. break;
  270. case PlaybackRequestType.Seek:
  271. HandleSeekRequest(session, request);
  272. break;
  273. case PlaybackRequestType.Buffering:
  274. HandleBufferingRequest(session, request);
  275. break;
  276. case PlaybackRequestType.BufferingDone:
  277. HandleBufferingDoneRequest(session, request);
  278. break;
  279. case PlaybackRequestType.UpdatePing:
  280. HandlePingUpdateRequest(session, request);
  281. break;
  282. }
  283. }
  284. /// <summary>
  285. /// Handles a play action requested by a session.
  286. /// </summary>
  287. /// <param name="session">The session.</param>
  288. /// <param name="request">The play action.</param>
  289. private void HandlePlayRequest(SessionInfo session, PlaybackRequest request)
  290. {
  291. if (_group.IsPaused)
  292. {
  293. // Pick a suitable time that accounts for latency
  294. var delay = _group.GetHighestPing() * 2;
  295. delay = delay < _group.DefaulPing ? _group.DefaulPing : delay;
  296. // Unpause group and set starting point in future
  297. // Clients will start playback at LastActivity (datetime) from PositionTicks (playback position)
  298. // The added delay does not guarantee, of course, that the command will be received in time
  299. // Playback synchronization will mainly happen client side
  300. _group.IsPaused = false;
  301. _group.LastActivity = DateTime.UtcNow.AddMilliseconds(
  302. delay
  303. );
  304. var command = NewSyncplayCommand(SendCommandType.Play);
  305. SendCommand(session, BroadcastType.AllGroup, command);
  306. }
  307. else
  308. {
  309. // Client got lost, sending current state
  310. var command = NewSyncplayCommand(SendCommandType.Play);
  311. SendCommand(session, BroadcastType.CurrentSession, command);
  312. }
  313. }
  314. /// <summary>
  315. /// Handles a pause action requested by a session.
  316. /// </summary>
  317. /// <param name="session">The session.</param>
  318. /// <param name="request">The pause action.</param>
  319. private void HandlePauseRequest(SessionInfo session, PlaybackRequest request)
  320. {
  321. if (!_group.IsPaused)
  322. {
  323. // Pause group and compute the media playback position
  324. _group.IsPaused = true;
  325. var currentTime = DateTime.UtcNow;
  326. var elapsedTime = currentTime - _group.LastActivity;
  327. _group.LastActivity = currentTime;
  328. // Seek only if playback actually started
  329. // (a pause request may be issued during the delay added to account for latency)
  330. _group.PositionTicks += elapsedTime.Ticks > 0 ? elapsedTime.Ticks : 0;
  331. var command = NewSyncplayCommand(SendCommandType.Pause);
  332. SendCommand(session, BroadcastType.AllGroup, command);
  333. }
  334. else
  335. {
  336. // Client got lost, sending current state
  337. var command = NewSyncplayCommand(SendCommandType.Pause);
  338. SendCommand(session, BroadcastType.CurrentSession, command);
  339. }
  340. }
  341. /// <summary>
  342. /// Handles a seek action requested by a session.
  343. /// </summary>
  344. /// <param name="session">The session.</param>
  345. /// <param name="request">The seek action.</param>
  346. private void HandleSeekRequest(SessionInfo session, PlaybackRequest request)
  347. {
  348. // Sanitize PositionTicks
  349. var ticks = request.PositionTicks ??= 0;
  350. ticks = ticks >= 0 ? ticks : 0;
  351. if (_group.PlayingItem.RunTimeTicks != null)
  352. {
  353. var runTimeTicks = _group.PlayingItem.RunTimeTicks ??= 0;
  354. ticks = ticks > runTimeTicks ? runTimeTicks : ticks;
  355. }
  356. // Pause and seek
  357. _group.IsPaused = true;
  358. _group.PositionTicks = ticks;
  359. _group.LastActivity = DateTime.UtcNow;
  360. var command = NewSyncplayCommand(SendCommandType.Seek);
  361. SendCommand(session, BroadcastType.AllGroup, command);
  362. }
  363. /// <summary>
  364. /// Handles a buffering action requested by a session.
  365. /// </summary>
  366. /// <param name="session">The session.</param>
  367. /// <param name="request">The buffering action.</param>
  368. private void HandleBufferingRequest(SessionInfo session, PlaybackRequest request)
  369. {
  370. if (!_group.IsPaused)
  371. {
  372. // Pause group and compute the media playback position
  373. _group.IsPaused = true;
  374. var currentTime = DateTime.UtcNow;
  375. var elapsedTime = currentTime - _group.LastActivity;
  376. _group.LastActivity = currentTime;
  377. _group.PositionTicks += elapsedTime.Ticks > 0 ? elapsedTime.Ticks : 0;
  378. _group.SetBuffering(session, true);
  379. // Send pause command to all non-buffering sessions
  380. var command = NewSyncplayCommand(SendCommandType.Pause);
  381. SendCommand(session, BroadcastType.AllReady, command);
  382. var updateOthers = NewSyncplayGroupUpdate(GroupUpdateType.GroupWait, session.UserName);
  383. SendGroupUpdate(session, BroadcastType.AllExceptCurrentSession, updateOthers);
  384. }
  385. else
  386. {
  387. // Client got lost, sending current state
  388. var command = NewSyncplayCommand(SendCommandType.Pause);
  389. SendCommand(session, BroadcastType.CurrentSession, command);
  390. }
  391. }
  392. /// <summary>
  393. /// Handles a buffering-done action requested by a session.
  394. /// </summary>
  395. /// <param name="session">The session.</param>
  396. /// <param name="request">The buffering-done action.</param>
  397. private void HandleBufferingDoneRequest(SessionInfo session, PlaybackRequest request)
  398. {
  399. if (_group.IsPaused)
  400. {
  401. _group.SetBuffering(session, false);
  402. var when = request.When ??= DateTime.UtcNow;
  403. var currentTime = DateTime.UtcNow;
  404. var elapsedTime = currentTime - when;
  405. var clientPosition = TimeSpan.FromTicks(request.PositionTicks ??= 0) + elapsedTime;
  406. var delay = _group.PositionTicks - clientPosition.Ticks;
  407. if (_group.IsBuffering())
  408. {
  409. // Others are buffering, tell this client to pause when ready
  410. var command = NewSyncplayCommand(SendCommandType.Pause);
  411. command.When = currentTime.AddMilliseconds(
  412. delay
  413. ).ToUniversalTime().ToString("o");
  414. SendCommand(session, BroadcastType.CurrentSession, command);
  415. }
  416. else
  417. {
  418. // Let other clients resume as soon as the buffering client catches up
  419. _group.IsPaused = false;
  420. if (delay > _group.GetHighestPing() * 2)
  421. {
  422. // Client that was buffering is recovering, notifying others to resume
  423. _group.LastActivity = currentTime.AddMilliseconds(
  424. delay
  425. );
  426. var command = NewSyncplayCommand(SendCommandType.Play);
  427. SendCommand(session, BroadcastType.AllExceptCurrentSession, command);
  428. }
  429. else
  430. {
  431. // Client, that was buffering, resumed playback but did not update others in time
  432. delay = _group.GetHighestPing() * 2;
  433. delay = delay < _group.DefaulPing ? _group.DefaulPing : delay;
  434. _group.LastActivity = currentTime.AddMilliseconds(
  435. delay
  436. );
  437. var command = NewSyncplayCommand(SendCommandType.Play);
  438. SendCommand(session, BroadcastType.AllGroup, command);
  439. }
  440. }
  441. }
  442. else
  443. {
  444. // Group was not waiting, make sure client has latest state
  445. var command = NewSyncplayCommand(SendCommandType.Play);
  446. SendCommand(session, BroadcastType.CurrentSession, command);
  447. }
  448. }
  449. /// <summary>
  450. /// Updates ping of a session.
  451. /// </summary>
  452. /// <param name="session">The session.</param>
  453. /// <param name="request">The update.</param>
  454. private void HandlePingUpdateRequest(SessionInfo session, PlaybackRequest request)
  455. {
  456. // Collected pings are used to account for network latency when unpausing playback
  457. _group.UpdatePing(session, request.Ping ??= _group.DefaulPing);
  458. }
  459. /// <inheritdoc />
  460. public GroupInfoView GetInfo()
  461. {
  462. return new GroupInfoView()
  463. {
  464. GroupId = GetGroupId().ToString(),
  465. PlayingItemName = _group.PlayingItem.Name,
  466. PlayingItemId = _group.PlayingItem.Id.ToString(),
  467. PositionTicks = _group.PositionTicks,
  468. Participants = _group.Participants.Values.Select(session => session.Session.UserName).ToArray()
  469. };
  470. }
  471. }
  472. }