SyncPlayController.cs 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Globalization;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using MediaBrowser.Controller.Session;
  8. using MediaBrowser.Controller.SyncPlay;
  9. using MediaBrowser.Model.Session;
  10. using MediaBrowser.Model.SyncPlay;
  11. namespace Emby.Server.Implementations.SyncPlay
  12. {
  13. /// <summary>
  14. /// Class SyncPlayController.
  15. /// </summary>
  16. /// <remarks>
  17. /// Class is not thread-safe, external locking is required when accessing methods.
  18. /// </remarks>
  19. public class SyncPlayController : ISyncPlayController
  20. {
  21. /// <summary>
  22. /// Used to filter the sessions of a group.
  23. /// </summary>
  24. private enum BroadcastType
  25. {
  26. /// <summary>
  27. /// All sessions will receive the message.
  28. /// </summary>
  29. AllGroup = 0,
  30. /// <summary>
  31. /// Only the specified session will receive the message.
  32. /// </summary>
  33. CurrentSession = 1,
  34. /// <summary>
  35. /// All sessions, except the current one, will receive the message.
  36. /// </summary>
  37. AllExceptCurrentSession = 2,
  38. /// <summary>
  39. /// Only sessions that are not buffering will receive the message.
  40. /// </summary>
  41. AllReady = 3
  42. }
  43. /// <summary>
  44. /// The session manager.
  45. /// </summary>
  46. private readonly ISessionManager _sessionManager;
  47. /// <summary>
  48. /// The SyncPlay manager.
  49. /// </summary>
  50. private readonly ISyncPlayManager _syncPlayManager;
  51. /// <summary>
  52. /// The group to manage.
  53. /// </summary>
  54. private readonly GroupInfo _group = new GroupInfo();
  55. /// <summary>
  56. /// Initializes a new instance of the <see cref="SyncPlayController" /> class.
  57. /// </summary>
  58. /// <param name="sessionManager">The session manager.</param>
  59. /// <param name="syncPlayManager">The SyncPlay manager.</param>
  60. public SyncPlayController(
  61. ISessionManager sessionManager,
  62. ISyncPlayManager syncPlayManager)
  63. {
  64. _sessionManager = sessionManager;
  65. _syncPlayManager = syncPlayManager;
  66. }
  67. /// <inheritdoc />
  68. public Guid GetGroupId() => _group.GroupId;
  69. /// <inheritdoc />
  70. public Guid GetPlayingItemId() => _group.PlayingItem.Id;
  71. /// <inheritdoc />
  72. public bool IsGroupEmpty() => _group.IsEmpty();
  73. /// <summary>
  74. /// Converts DateTime to UTC string.
  75. /// </summary>
  76. /// <param name="date">The date to convert.</param>
  77. /// <value>The UTC string.</value>
  78. private string DateToUTCString(DateTime date)
  79. {
  80. return date.ToUniversalTime().ToString("o", CultureInfo.InvariantCulture);
  81. }
  82. /// <summary>
  83. /// Filters sessions of this group.
  84. /// </summary>
  85. /// <param name="from">The current session.</param>
  86. /// <param name="type">The filtering type.</param>
  87. /// <value>The array of sessions matching the filter.</value>
  88. private IEnumerable<SessionInfo> FilterSessions(SessionInfo from, BroadcastType type)
  89. {
  90. switch (type)
  91. {
  92. case BroadcastType.CurrentSession:
  93. return new SessionInfo[] { from };
  94. case BroadcastType.AllGroup:
  95. return _group.Participants.Values
  96. .Select(session => session.Session);
  97. case BroadcastType.AllExceptCurrentSession:
  98. return _group.Participants.Values
  99. .Select(session => session.Session)
  100. .Where(session => !session.Id.Equals(from.Id, StringComparison.Ordinal));
  101. case BroadcastType.AllReady:
  102. return _group.Participants.Values
  103. .Where(session => !session.IsBuffering)
  104. .Select(session => session.Session);
  105. default:
  106. return Array.Empty<SessionInfo>();
  107. }
  108. }
  109. /// <summary>
  110. /// Sends a GroupUpdate message to the interested sessions.
  111. /// </summary>
  112. /// <param name="from">The current session.</param>
  113. /// <param name="type">The filtering type.</param>
  114. /// <param name="message">The message to send.</param>
  115. /// <param name="cancellationToken">The cancellation token.</param>
  116. /// <value>The task.</value>
  117. private Task SendGroupUpdate<T>(SessionInfo from, BroadcastType type, GroupUpdate<T> message, CancellationToken cancellationToken)
  118. {
  119. IEnumerable<Task> GetTasks()
  120. {
  121. foreach (var session in FilterSessions(from, type))
  122. {
  123. yield return _sessionManager.SendSyncPlayGroupUpdate(session.Id, message, cancellationToken);
  124. }
  125. }
  126. return Task.WhenAll(GetTasks());
  127. }
  128. /// <summary>
  129. /// Sends a playback command to the interested sessions.
  130. /// </summary>
  131. /// <param name="from">The current session.</param>
  132. /// <param name="type">The filtering type.</param>
  133. /// <param name="message">The message to send.</param>
  134. /// <param name="cancellationToken">The cancellation token.</param>
  135. /// <value>The task.</value>
  136. private Task SendCommand(SessionInfo from, BroadcastType type, SendCommand message, CancellationToken cancellationToken)
  137. {
  138. IEnumerable<Task> GetTasks()
  139. {
  140. foreach (var session in FilterSessions(from, type))
  141. {
  142. yield return _sessionManager.SendSyncPlayCommand(session.Id, message, cancellationToken);
  143. }
  144. }
  145. return Task.WhenAll(GetTasks());
  146. }
  147. /// <summary>
  148. /// Builds a new playback command with some default values.
  149. /// </summary>
  150. /// <param name="type">The command type.</param>
  151. /// <value>The SendCommand.</value>
  152. private SendCommand NewSyncPlayCommand(SendCommandType type)
  153. {
  154. return new SendCommand()
  155. {
  156. GroupId = _group.GroupId.ToString(),
  157. Command = type,
  158. PositionTicks = _group.PositionTicks,
  159. When = DateToUTCString(_group.LastActivity),
  160. EmittedAt = DateToUTCString(DateTime.UtcNow)
  161. };
  162. }
  163. /// <summary>
  164. /// Builds a new group update message.
  165. /// </summary>
  166. /// <param name="type">The update type.</param>
  167. /// <param name="data">The data to send.</param>
  168. /// <value>The GroupUpdate.</value>
  169. private GroupUpdate<T> NewSyncPlayGroupUpdate<T>(GroupUpdateType type, T data)
  170. {
  171. return new GroupUpdate<T>()
  172. {
  173. GroupId = _group.GroupId.ToString(),
  174. Type = type,
  175. Data = data
  176. };
  177. }
  178. /// <inheritdoc />
  179. public void CreateGroup(SessionInfo session, CancellationToken cancellationToken)
  180. {
  181. _group.AddSession(session);
  182. _syncPlayManager.AddSessionToGroup(session, this);
  183. _group.PlayingItem = session.FullNowPlayingItem;
  184. _group.IsPaused = session.PlayState.IsPaused;
  185. _group.PositionTicks = session.PlayState.PositionTicks ?? 0;
  186. _group.LastActivity = DateTime.UtcNow;
  187. var updateSession = NewSyncPlayGroupUpdate(GroupUpdateType.GroupJoined, DateToUTCString(DateTime.UtcNow));
  188. SendGroupUpdate(session, BroadcastType.CurrentSession, updateSession, cancellationToken);
  189. }
  190. /// <inheritdoc />
  191. public void SessionJoin(SessionInfo session, JoinGroupRequest request, CancellationToken cancellationToken)
  192. {
  193. if (session.NowPlayingItem?.Id == _group.PlayingItem.Id)
  194. {
  195. _group.AddSession(session);
  196. _syncPlayManager.AddSessionToGroup(session, this);
  197. var updateSession = NewSyncPlayGroupUpdate(GroupUpdateType.GroupJoined, DateToUTCString(DateTime.UtcNow));
  198. SendGroupUpdate(session, BroadcastType.CurrentSession, updateSession, cancellationToken);
  199. var updateOthers = NewSyncPlayGroupUpdate(GroupUpdateType.UserJoined, session.UserName);
  200. SendGroupUpdate(session, BroadcastType.AllExceptCurrentSession, updateOthers, cancellationToken);
  201. // Syncing will happen client-side
  202. if (!_group.IsPaused)
  203. {
  204. var playCommand = NewSyncPlayCommand(SendCommandType.Play);
  205. SendCommand(session, BroadcastType.CurrentSession, playCommand, cancellationToken);
  206. }
  207. else
  208. {
  209. var pauseCommand = NewSyncPlayCommand(SendCommandType.Pause);
  210. SendCommand(session, BroadcastType.CurrentSession, pauseCommand, cancellationToken);
  211. }
  212. }
  213. else
  214. {
  215. var playRequest = new PlayRequest
  216. {
  217. ItemIds = new Guid[] { _group.PlayingItem.Id },
  218. StartPositionTicks = _group.PositionTicks
  219. };
  220. var update = NewSyncPlayGroupUpdate(GroupUpdateType.PrepareSession, playRequest);
  221. SendGroupUpdate(session, BroadcastType.CurrentSession, update, cancellationToken);
  222. }
  223. }
  224. /// <inheritdoc />
  225. public void SessionLeave(SessionInfo session, CancellationToken cancellationToken)
  226. {
  227. _group.RemoveSession(session);
  228. _syncPlayManager.RemoveSessionFromGroup(session, this);
  229. var updateSession = NewSyncPlayGroupUpdate(GroupUpdateType.GroupLeft, _group.PositionTicks);
  230. SendGroupUpdate(session, BroadcastType.CurrentSession, updateSession, cancellationToken);
  231. var updateOthers = NewSyncPlayGroupUpdate(GroupUpdateType.UserLeft, session.UserName);
  232. SendGroupUpdate(session, BroadcastType.AllExceptCurrentSession, updateOthers, cancellationToken);
  233. }
  234. /// <inheritdoc />
  235. public void HandleRequest(SessionInfo session, PlaybackRequest request, CancellationToken cancellationToken)
  236. {
  237. // The server's job is to maintain a consistent state for clients to reference
  238. // and notify clients of state changes. The actual syncing of media playback
  239. // happens client side. Clients are aware of the server's time and use it to sync.
  240. switch (request.Type)
  241. {
  242. case PlaybackRequestType.Play:
  243. HandlePlayRequest(session, request, cancellationToken);
  244. break;
  245. case PlaybackRequestType.Pause:
  246. HandlePauseRequest(session, request, cancellationToken);
  247. break;
  248. case PlaybackRequestType.Seek:
  249. HandleSeekRequest(session, request, cancellationToken);
  250. break;
  251. case PlaybackRequestType.Buffer:
  252. HandleBufferingRequest(session, request, cancellationToken);
  253. break;
  254. case PlaybackRequestType.Ready:
  255. HandleBufferingDoneRequest(session, request, cancellationToken);
  256. break;
  257. case PlaybackRequestType.Ping:
  258. HandlePingUpdateRequest(session, request);
  259. break;
  260. }
  261. }
  262. /// <summary>
  263. /// Handles a play action requested by a session.
  264. /// </summary>
  265. /// <param name="session">The session.</param>
  266. /// <param name="request">The play action.</param>
  267. /// <param name="cancellationToken">The cancellation token.</param>
  268. private void HandlePlayRequest(SessionInfo session, PlaybackRequest request, CancellationToken cancellationToken)
  269. {
  270. if (_group.IsPaused)
  271. {
  272. // Pick a suitable time that accounts for latency
  273. var delay = _group.GetHighestPing() * 2;
  274. delay = delay < _group.DefaultPing ? _group.DefaultPing : delay;
  275. // Unpause group and set starting point in future
  276. // Clients will start playback at LastActivity (datetime) from PositionTicks (playback position)
  277. // The added delay does not guarantee, of course, that the command will be received in time
  278. // Playback synchronization will mainly happen client side
  279. _group.IsPaused = false;
  280. _group.LastActivity = DateTime.UtcNow.AddMilliseconds(
  281. delay);
  282. var command = NewSyncPlayCommand(SendCommandType.Play);
  283. SendCommand(session, BroadcastType.AllGroup, command, cancellationToken);
  284. }
  285. else
  286. {
  287. // Client got lost, sending current state
  288. var command = NewSyncPlayCommand(SendCommandType.Play);
  289. SendCommand(session, BroadcastType.CurrentSession, command, cancellationToken);
  290. }
  291. }
  292. /// <summary>
  293. /// Handles a pause action requested by a session.
  294. /// </summary>
  295. /// <param name="session">The session.</param>
  296. /// <param name="request">The pause action.</param>
  297. /// <param name="cancellationToken">The cancellation token.</param>
  298. private void HandlePauseRequest(SessionInfo session, PlaybackRequest request, CancellationToken cancellationToken)
  299. {
  300. if (!_group.IsPaused)
  301. {
  302. // Pause group and compute the media playback position
  303. _group.IsPaused = true;
  304. var currentTime = DateTime.UtcNow;
  305. var elapsedTime = currentTime - _group.LastActivity;
  306. _group.LastActivity = currentTime;
  307. // Seek only if playback actually started
  308. // Pause request may be issued during the delay added to account for latency
  309. _group.PositionTicks += elapsedTime.Ticks > 0 ? elapsedTime.Ticks : 0;
  310. var command = NewSyncPlayCommand(SendCommandType.Pause);
  311. SendCommand(session, BroadcastType.AllGroup, command, cancellationToken);
  312. }
  313. else
  314. {
  315. // Client got lost, sending current state
  316. var command = NewSyncPlayCommand(SendCommandType.Pause);
  317. SendCommand(session, BroadcastType.CurrentSession, command, cancellationToken);
  318. }
  319. }
  320. /// <summary>
  321. /// Handles a seek action requested by a session.
  322. /// </summary>
  323. /// <param name="session">The session.</param>
  324. /// <param name="request">The seek action.</param>
  325. /// <param name="cancellationToken">The cancellation token.</param>
  326. private void HandleSeekRequest(SessionInfo session, PlaybackRequest request, CancellationToken cancellationToken)
  327. {
  328. // Sanitize PositionTicks
  329. var ticks = SanitizePositionTicks(request.PositionTicks);
  330. // Pause and seek
  331. _group.IsPaused = true;
  332. _group.PositionTicks = ticks;
  333. _group.LastActivity = DateTime.UtcNow;
  334. var command = NewSyncPlayCommand(SendCommandType.Seek);
  335. SendCommand(session, BroadcastType.AllGroup, command, cancellationToken);
  336. }
  337. /// <summary>
  338. /// Handles a buffering action requested by a session.
  339. /// </summary>
  340. /// <param name="session">The session.</param>
  341. /// <param name="request">The buffering action.</param>
  342. /// <param name="cancellationToken">The cancellation token.</param>
  343. private void HandleBufferingRequest(SessionInfo session, PlaybackRequest request, CancellationToken cancellationToken)
  344. {
  345. if (!_group.IsPaused)
  346. {
  347. // Pause group and compute the media playback position
  348. _group.IsPaused = true;
  349. var currentTime = DateTime.UtcNow;
  350. var elapsedTime = currentTime - _group.LastActivity;
  351. _group.LastActivity = currentTime;
  352. _group.PositionTicks += elapsedTime.Ticks > 0 ? elapsedTime.Ticks : 0;
  353. _group.SetBuffering(session, true);
  354. // Send pause command to all non-buffering sessions
  355. var command = NewSyncPlayCommand(SendCommandType.Pause);
  356. SendCommand(session, BroadcastType.AllReady, command, cancellationToken);
  357. var updateOthers = NewSyncPlayGroupUpdate(GroupUpdateType.GroupWait, session.UserName);
  358. SendGroupUpdate(session, BroadcastType.AllExceptCurrentSession, updateOthers, cancellationToken);
  359. }
  360. else
  361. {
  362. // Client got lost, sending current state
  363. var command = NewSyncPlayCommand(SendCommandType.Pause);
  364. SendCommand(session, BroadcastType.CurrentSession, command, cancellationToken);
  365. }
  366. }
  367. /// <summary>
  368. /// Handles a buffering-done action requested by a session.
  369. /// </summary>
  370. /// <param name="session">The session.</param>
  371. /// <param name="request">The buffering-done action.</param>
  372. /// <param name="cancellationToken">The cancellation token.</param>
  373. private void HandleBufferingDoneRequest(SessionInfo session, PlaybackRequest request, CancellationToken cancellationToken)
  374. {
  375. if (_group.IsPaused)
  376. {
  377. _group.SetBuffering(session, false);
  378. var requestTicks = SanitizePositionTicks(request.PositionTicks);
  379. var when = request.When ?? DateTime.UtcNow;
  380. var currentTime = DateTime.UtcNow;
  381. var elapsedTime = currentTime - when;
  382. var clientPosition = TimeSpan.FromTicks(requestTicks) + elapsedTime;
  383. var delay = _group.PositionTicks - clientPosition.Ticks;
  384. if (_group.IsBuffering())
  385. {
  386. // Others are still buffering, tell this client to pause when ready
  387. var command = NewSyncPlayCommand(SendCommandType.Pause);
  388. var pauseAtTime = currentTime.AddMilliseconds(delay);
  389. command.When = DateToUTCString(pauseAtTime);
  390. SendCommand(session, BroadcastType.CurrentSession, command, cancellationToken);
  391. }
  392. else
  393. {
  394. // Let other clients resume as soon as the buffering client catches up
  395. _group.IsPaused = false;
  396. if (delay > _group.GetHighestPing() * 2)
  397. {
  398. // Client that was buffering is recovering, notifying others to resume
  399. _group.LastActivity = currentTime.AddMilliseconds(
  400. delay);
  401. var command = NewSyncPlayCommand(SendCommandType.Play);
  402. SendCommand(session, BroadcastType.AllExceptCurrentSession, command, cancellationToken);
  403. }
  404. else
  405. {
  406. // Client, that was buffering, resumed playback but did not update others in time
  407. delay = _group.GetHighestPing() * 2;
  408. delay = delay < _group.DefaultPing ? _group.DefaultPing : delay;
  409. _group.LastActivity = currentTime.AddMilliseconds(
  410. delay);
  411. var command = NewSyncPlayCommand(SendCommandType.Play);
  412. SendCommand(session, BroadcastType.AllGroup, command, cancellationToken);
  413. }
  414. }
  415. }
  416. else
  417. {
  418. // Group was not waiting, make sure client has latest state
  419. var command = NewSyncPlayCommand(SendCommandType.Play);
  420. SendCommand(session, BroadcastType.CurrentSession, command, cancellationToken);
  421. }
  422. }
  423. /// <summary>
  424. /// Sanitizes the PositionTicks, considers the current playing item when available.
  425. /// </summary>
  426. /// <param name="positionTicks">The PositionTicks.</param>
  427. /// <value>The sanitized PositionTicks.</value>
  428. private long SanitizePositionTicks(long? positionTicks)
  429. {
  430. var ticks = positionTicks ?? 0;
  431. ticks = ticks >= 0 ? ticks : 0;
  432. if (_group.PlayingItem != null)
  433. {
  434. var runTimeTicks = _group.PlayingItem.RunTimeTicks ?? 0;
  435. ticks = ticks > runTimeTicks ? runTimeTicks : ticks;
  436. }
  437. return ticks;
  438. }
  439. /// <summary>
  440. /// Updates ping of a session.
  441. /// </summary>
  442. /// <param name="session">The session.</param>
  443. /// <param name="request">The update.</param>
  444. private void HandlePingUpdateRequest(SessionInfo session, PlaybackRequest request)
  445. {
  446. // Collected pings are used to account for network latency when unpausing playback
  447. _group.UpdatePing(session, request.Ping ?? _group.DefaultPing);
  448. }
  449. /// <inheritdoc />
  450. public GroupInfoView GetInfo()
  451. {
  452. return new GroupInfoView()
  453. {
  454. GroupId = GetGroupId().ToString(),
  455. PlayingItemName = _group.PlayingItem.Name,
  456. PlayingItemId = _group.PlayingItem.Id.ToString(),
  457. PositionTicks = _group.PositionTicks,
  458. Participants = _group.Participants.Values.Select(session => session.Session.UserName).Distinct().ToList()
  459. };
  460. }
  461. }
  462. }