SyncRepository.cs 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849
  1. using MediaBrowser.Controller;
  2. using MediaBrowser.Controller.Sync;
  3. using MediaBrowser.Model.Dto;
  4. using MediaBrowser.Model.Logging;
  5. using MediaBrowser.Model.Querying;
  6. using MediaBrowser.Model.Serialization;
  7. using MediaBrowser.Model.Sync;
  8. using MediaBrowser.Server.Implementations.Persistence;
  9. using System;
  10. using System.Collections.Generic;
  11. using System.Data;
  12. using System.Globalization;
  13. using System.IO;
  14. using System.Linq;
  15. using System.Threading;
  16. using System.Threading.Tasks;
  17. namespace MediaBrowser.Server.Implementations.Sync
  18. {
  19. public class SyncRepository : ISyncRepository, IDisposable
  20. {
  21. private IDbConnection _connection;
  22. private readonly ILogger _logger;
  23. private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
  24. private readonly IServerApplicationPaths _appPaths;
  25. private readonly CultureInfo _usCulture = new CultureInfo("en-US");
  26. private IDbCommand _insertJobCommand;
  27. private IDbCommand _updateJobCommand;
  28. private IDbCommand _deleteJobCommand;
  29. private IDbCommand _deleteJobItemsCommand;
  30. private IDbCommand _insertJobItemCommand;
  31. private IDbCommand _updateJobItemCommand;
  32. private readonly IJsonSerializer _json;
  33. public SyncRepository(ILogger logger, IServerApplicationPaths appPaths, IJsonSerializer json)
  34. {
  35. _logger = logger;
  36. _appPaths = appPaths;
  37. _json = json;
  38. }
  39. public async Task Initialize()
  40. {
  41. var dbFile = Path.Combine(_appPaths.DataPath, "sync14.db");
  42. _connection = await SqliteExtensions.ConnectToDb(dbFile, _logger).ConfigureAwait(false);
  43. string[] queries = {
  44. "create table if not exists SyncJobs (Id GUID PRIMARY KEY, TargetId TEXT NOT NULL, Name TEXT NOT NULL, Profile TEXT, Quality TEXT, Bitrate INT, Status TEXT NOT NULL, Progress FLOAT, UserId TEXT NOT NULL, ItemIds TEXT NOT NULL, Category TEXT, ParentId TEXT, UnwatchedOnly BIT, ItemLimit INT, SyncNewContent BIT, DateCreated DateTime, DateLastModified DateTime, ItemCount int)",
  45. "create index if not exists idx_SyncJobs on SyncJobs(Id)",
  46. "create table if not exists SyncJobItems (Id GUID PRIMARY KEY, ItemId TEXT, ItemName TEXT, MediaSourceId TEXT, JobId TEXT, TemporaryPath TEXT, OutputPath TEXT, Status TEXT, TargetId TEXT, DateCreated DateTime, Progress FLOAT, AdditionalFiles TEXT, MediaSource TEXT, IsMarkedForRemoval BIT, JobItemIndex INT)",
  47. "create index if not exists idx_SyncJobItems on SyncJobs(Id)",
  48. //pragmas
  49. "pragma temp_store = memory",
  50. "pragma shrink_memory"
  51. };
  52. _connection.RunQueries(queries, _logger);
  53. _connection.AddColumn(_logger, "SyncJobs", "Profile", "TEXT");
  54. _connection.AddColumn(_logger, "SyncJobs", "Bitrate", "INT");
  55. PrepareStatements();
  56. }
  57. private void PrepareStatements()
  58. {
  59. // _deleteJobCommand
  60. _deleteJobCommand = _connection.CreateCommand();
  61. _deleteJobCommand.CommandText = "delete from SyncJobs where Id=@Id";
  62. _deleteJobCommand.Parameters.Add(_deleteJobCommand, "@Id");
  63. // _deleteJobItemsCommand
  64. _deleteJobItemsCommand = _connection.CreateCommand();
  65. _deleteJobItemsCommand.CommandText = "delete from SyncJobItems where JobId=@JobId";
  66. _deleteJobItemsCommand.Parameters.Add(_deleteJobItemsCommand, "@JobId");
  67. // _insertJobCommand
  68. _insertJobCommand = _connection.CreateCommand();
  69. _insertJobCommand.CommandText = "insert into SyncJobs (Id, TargetId, Name, Profile, Quality, Bitrate, Status, Progress, UserId, ItemIds, Category, ParentId, UnwatchedOnly, ItemLimit, SyncNewContent, DateCreated, DateLastModified, ItemCount) values (@Id, @TargetId, @Name, @Profile, @Quality, @Bitrate, @Status, @Progress, @UserId, @ItemIds, @Category, @ParentId, @UnwatchedOnly, @ItemLimit, @SyncNewContent, @DateCreated, @DateLastModified, @ItemCount)";
  70. _insertJobCommand.Parameters.Add(_insertJobCommand, "@Id");
  71. _insertJobCommand.Parameters.Add(_insertJobCommand, "@TargetId");
  72. _insertJobCommand.Parameters.Add(_insertJobCommand, "@Name");
  73. _insertJobCommand.Parameters.Add(_insertJobCommand, "@Profile");
  74. _insertJobCommand.Parameters.Add(_insertJobCommand, "@Quality");
  75. _insertJobCommand.Parameters.Add(_insertJobCommand, "@Bitrate");
  76. _insertJobCommand.Parameters.Add(_insertJobCommand, "@Status");
  77. _insertJobCommand.Parameters.Add(_insertJobCommand, "@Progress");
  78. _insertJobCommand.Parameters.Add(_insertJobCommand, "@UserId");
  79. _insertJobCommand.Parameters.Add(_insertJobCommand, "@ItemIds");
  80. _insertJobCommand.Parameters.Add(_insertJobCommand, "@Category");
  81. _insertJobCommand.Parameters.Add(_insertJobCommand, "@ParentId");
  82. _insertJobCommand.Parameters.Add(_insertJobCommand, "@UnwatchedOnly");
  83. _insertJobCommand.Parameters.Add(_insertJobCommand, "@ItemLimit");
  84. _insertJobCommand.Parameters.Add(_insertJobCommand, "@SyncNewContent");
  85. _insertJobCommand.Parameters.Add(_insertJobCommand, "@DateCreated");
  86. _insertJobCommand.Parameters.Add(_insertJobCommand, "@DateLastModified");
  87. _insertJobCommand.Parameters.Add(_insertJobCommand, "@ItemCount");
  88. // _updateJobCommand
  89. _updateJobCommand = _connection.CreateCommand();
  90. _updateJobCommand.CommandText = "update SyncJobs set TargetId=@TargetId,Name=@Name,Profile=@Profile,Quality=@Quality,Bitrate=@Bitrate,Status=@Status,Progress=@Progress,UserId=@UserId,ItemIds=@ItemIds,Category=@Category,ParentId=@ParentId,UnwatchedOnly=@UnwatchedOnly,ItemLimit=@ItemLimit,SyncNewContent=@SyncNewContent,DateCreated=@DateCreated,DateLastModified=@DateLastModified,ItemCount=@ItemCount where Id=@ID";
  91. _updateJobCommand.Parameters.Add(_updateJobCommand, "@Id");
  92. _updateJobCommand.Parameters.Add(_updateJobCommand, "@TargetId");
  93. _updateJobCommand.Parameters.Add(_updateJobCommand, "@Name");
  94. _updateJobCommand.Parameters.Add(_updateJobCommand, "@Profile");
  95. _updateJobCommand.Parameters.Add(_updateJobCommand, "@Quality");
  96. _updateJobCommand.Parameters.Add(_updateJobCommand, "@Bitrate");
  97. _updateJobCommand.Parameters.Add(_updateJobCommand, "@Status");
  98. _updateJobCommand.Parameters.Add(_updateJobCommand, "@Progress");
  99. _updateJobCommand.Parameters.Add(_updateJobCommand, "@UserId");
  100. _updateJobCommand.Parameters.Add(_updateJobCommand, "@ItemIds");
  101. _updateJobCommand.Parameters.Add(_updateJobCommand, "@Category");
  102. _updateJobCommand.Parameters.Add(_updateJobCommand, "@ParentId");
  103. _updateJobCommand.Parameters.Add(_updateJobCommand, "@UnwatchedOnly");
  104. _updateJobCommand.Parameters.Add(_updateJobCommand, "@ItemLimit");
  105. _updateJobCommand.Parameters.Add(_updateJobCommand, "@SyncNewContent");
  106. _updateJobCommand.Parameters.Add(_updateJobCommand, "@DateCreated");
  107. _updateJobCommand.Parameters.Add(_updateJobCommand, "@DateLastModified");
  108. _updateJobCommand.Parameters.Add(_updateJobCommand, "@ItemCount");
  109. // _insertJobItemCommand
  110. _insertJobItemCommand = _connection.CreateCommand();
  111. _insertJobItemCommand.CommandText = "insert into SyncJobItems (Id, ItemId, ItemName, MediaSourceId, JobId, TemporaryPath, OutputPath, Status, TargetId, DateCreated, Progress, AdditionalFiles, MediaSource, IsMarkedForRemoval, JobItemIndex) values (@Id, @ItemId, @ItemName, @MediaSourceId, @JobId, @TemporaryPath, @OutputPath, @Status, @TargetId, @DateCreated, @Progress, @AdditionalFiles, @MediaSource, @IsMarkedForRemoval, @JobItemIndex)";
  112. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@Id");
  113. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@ItemId");
  114. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@ItemName");
  115. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@MediaSourceId");
  116. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@JobId");
  117. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@TemporaryPath");
  118. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@OutputPath");
  119. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@Status");
  120. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@TargetId");
  121. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@DateCreated");
  122. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@Progress");
  123. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@AdditionalFiles");
  124. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@MediaSource");
  125. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@IsMarkedForRemoval");
  126. _insertJobItemCommand.Parameters.Add(_insertJobItemCommand, "@JobItemIndex");
  127. // _updateJobItemCommand
  128. _updateJobItemCommand = _connection.CreateCommand();
  129. _updateJobItemCommand.CommandText = "update SyncJobItems set ItemId=@ItemId,ItemName=@ItemName,MediaSourceId=@MediaSourceId,JobId=@JobId,TemporaryPath=@TemporaryPath,OutputPath=@OutputPath,Status=@Status,TargetId=@TargetId,DateCreated=@DateCreated,Progress=@Progress,AdditionalFiles=@AdditionalFiles,MediaSource=@MediaSource,IsMarkedForRemoval=@IsMarkedForRemoval,JobItemIndex=@JobItemIndex where Id=@Id";
  130. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@Id");
  131. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@ItemId");
  132. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@ItemName");
  133. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@MediaSourceId");
  134. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@JobId");
  135. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@TemporaryPath");
  136. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@OutputPath");
  137. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@Status");
  138. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@TargetId");
  139. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@DateCreated");
  140. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@Progress");
  141. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@AdditionalFiles");
  142. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@MediaSource");
  143. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@IsMarkedForRemoval");
  144. _updateJobItemCommand.Parameters.Add(_updateJobItemCommand, "@JobItemIndex");
  145. }
  146. private const string BaseJobSelectText = "select Id, TargetId, Name, Profile, Quality, Bitrate, Status, Progress, UserId, ItemIds, Category, ParentId, UnwatchedOnly, ItemLimit, SyncNewContent, DateCreated, DateLastModified, ItemCount from SyncJobs";
  147. private const string BaseJobItemSelectText = "select Id, ItemId, ItemName, MediaSourceId, JobId, TemporaryPath, OutputPath, Status, TargetId, DateCreated, Progress, AdditionalFiles, MediaSource, IsMarkedForRemoval, JobItemIndex from SyncJobItems";
  148. public SyncJob GetJob(string id)
  149. {
  150. if (string.IsNullOrEmpty(id))
  151. {
  152. throw new ArgumentNullException("id");
  153. }
  154. CheckDisposed();
  155. var guid = new Guid(id);
  156. if (guid == Guid.Empty)
  157. {
  158. throw new ArgumentNullException("id");
  159. }
  160. using (var cmd = _connection.CreateCommand())
  161. {
  162. cmd.CommandText = BaseJobSelectText + " where Id=@Id";
  163. cmd.Parameters.Add(cmd, "@Id", DbType.Guid).Value = guid;
  164. using (var reader = cmd.ExecuteReader(CommandBehavior.SequentialAccess | CommandBehavior.SingleResult | CommandBehavior.SingleRow))
  165. {
  166. if (reader.Read())
  167. {
  168. return GetJob(reader);
  169. }
  170. }
  171. }
  172. return null;
  173. }
  174. private SyncJob GetJob(IDataReader reader)
  175. {
  176. var info = new SyncJob
  177. {
  178. Id = reader.GetGuid(0).ToString("N"),
  179. TargetId = reader.GetString(1),
  180. Name = reader.GetString(2)
  181. };
  182. if (!reader.IsDBNull(3))
  183. {
  184. info.Profile = reader.GetString(3);
  185. }
  186. if (!reader.IsDBNull(4))
  187. {
  188. info.Quality = reader.GetString(4);
  189. }
  190. if (!reader.IsDBNull(5))
  191. {
  192. info.Bitrate = reader.GetInt32(5);
  193. }
  194. if (!reader.IsDBNull(6))
  195. {
  196. info.Status = (SyncJobStatus)Enum.Parse(typeof(SyncJobStatus), reader.GetString(6), true);
  197. }
  198. if (!reader.IsDBNull(7))
  199. {
  200. info.Progress = reader.GetDouble(7);
  201. }
  202. if (!reader.IsDBNull(8))
  203. {
  204. info.UserId = reader.GetString(8);
  205. }
  206. if (!reader.IsDBNull(9))
  207. {
  208. info.RequestedItemIds = reader.GetString(9).Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries).ToList();
  209. }
  210. if (!reader.IsDBNull(10))
  211. {
  212. info.Category = (SyncCategory)Enum.Parse(typeof(SyncCategory), reader.GetString(10), true);
  213. }
  214. if (!reader.IsDBNull(11))
  215. {
  216. info.ParentId = reader.GetString(11);
  217. }
  218. if (!reader.IsDBNull(12))
  219. {
  220. info.UnwatchedOnly = reader.GetBoolean(12);
  221. }
  222. if (!reader.IsDBNull(13))
  223. {
  224. info.ItemLimit = reader.GetInt32(13);
  225. }
  226. info.SyncNewContent = reader.GetBoolean(14);
  227. info.DateCreated = reader.GetDateTime(15).ToUniversalTime();
  228. info.DateLastModified = reader.GetDateTime(16).ToUniversalTime();
  229. info.ItemCount = reader.GetInt32(17);
  230. return info;
  231. }
  232. public Task Create(SyncJob job)
  233. {
  234. return InsertOrUpdate(job, _insertJobCommand);
  235. }
  236. public Task Update(SyncJob job)
  237. {
  238. return InsertOrUpdate(job, _updateJobCommand);
  239. }
  240. private async Task InsertOrUpdate(SyncJob job, IDbCommand cmd)
  241. {
  242. if (job == null)
  243. {
  244. throw new ArgumentNullException("job");
  245. }
  246. CheckDisposed();
  247. await _writeLock.WaitAsync().ConfigureAwait(false);
  248. IDbTransaction transaction = null;
  249. try
  250. {
  251. transaction = _connection.BeginTransaction();
  252. var index = 0;
  253. cmd.GetParameter(index++).Value = new Guid(job.Id);
  254. cmd.GetParameter(index++).Value = job.TargetId;
  255. cmd.GetParameter(index++).Value = job.Name;
  256. cmd.GetParameter(index++).Value = job.Profile;
  257. cmd.GetParameter(index++).Value = job.Quality;
  258. cmd.GetParameter(index++).Value = job.Bitrate;
  259. cmd.GetParameter(index++).Value = job.Status.ToString();
  260. cmd.GetParameter(index++).Value = job.Progress;
  261. cmd.GetParameter(index++).Value = job.UserId;
  262. cmd.GetParameter(index++).Value = string.Join(",", job.RequestedItemIds.ToArray());
  263. cmd.GetParameter(index++).Value = job.Category;
  264. cmd.GetParameter(index++).Value = job.ParentId;
  265. cmd.GetParameter(index++).Value = job.UnwatchedOnly;
  266. cmd.GetParameter(index++).Value = job.ItemLimit;
  267. cmd.GetParameter(index++).Value = job.SyncNewContent;
  268. cmd.GetParameter(index++).Value = job.DateCreated;
  269. cmd.GetParameter(index++).Value = job.DateLastModified;
  270. cmd.GetParameter(index++).Value = job.ItemCount;
  271. cmd.Transaction = transaction;
  272. cmd.ExecuteNonQuery();
  273. transaction.Commit();
  274. }
  275. catch (OperationCanceledException)
  276. {
  277. if (transaction != null)
  278. {
  279. transaction.Rollback();
  280. }
  281. throw;
  282. }
  283. catch (Exception e)
  284. {
  285. _logger.ErrorException("Failed to save record:", e);
  286. if (transaction != null)
  287. {
  288. transaction.Rollback();
  289. }
  290. throw;
  291. }
  292. finally
  293. {
  294. if (transaction != null)
  295. {
  296. transaction.Dispose();
  297. }
  298. _writeLock.Release();
  299. }
  300. }
  301. public async Task DeleteJob(string id)
  302. {
  303. if (string.IsNullOrWhiteSpace(id))
  304. {
  305. throw new ArgumentNullException("id");
  306. }
  307. CheckDisposed();
  308. await _writeLock.WaitAsync().ConfigureAwait(false);
  309. IDbTransaction transaction = null;
  310. try
  311. {
  312. transaction = _connection.BeginTransaction();
  313. var index = 0;
  314. _deleteJobCommand.GetParameter(index++).Value = new Guid(id);
  315. _deleteJobCommand.Transaction = transaction;
  316. _deleteJobCommand.ExecuteNonQuery();
  317. index = 0;
  318. _deleteJobItemsCommand.GetParameter(index++).Value = id;
  319. _deleteJobItemsCommand.Transaction = transaction;
  320. _deleteJobItemsCommand.ExecuteNonQuery();
  321. transaction.Commit();
  322. }
  323. catch (OperationCanceledException)
  324. {
  325. if (transaction != null)
  326. {
  327. transaction.Rollback();
  328. }
  329. throw;
  330. }
  331. catch (Exception e)
  332. {
  333. _logger.ErrorException("Failed to save record:", e);
  334. if (transaction != null)
  335. {
  336. transaction.Rollback();
  337. }
  338. throw;
  339. }
  340. finally
  341. {
  342. if (transaction != null)
  343. {
  344. transaction.Dispose();
  345. }
  346. _writeLock.Release();
  347. }
  348. }
  349. public QueryResult<SyncJob> GetJobs(SyncJobQuery query)
  350. {
  351. if (query == null)
  352. {
  353. throw new ArgumentNullException("query");
  354. }
  355. CheckDisposed();
  356. using (var cmd = _connection.CreateCommand())
  357. {
  358. cmd.CommandText = BaseJobSelectText;
  359. var whereClauses = new List<string>();
  360. if (query.Statuses.Count > 0)
  361. {
  362. var statuses = string.Join(",", query.Statuses.Select(i => "'" + i.ToString() + "'").ToArray());
  363. whereClauses.Add(string.Format("Status in ({0})", statuses));
  364. }
  365. if (!string.IsNullOrWhiteSpace(query.TargetId))
  366. {
  367. whereClauses.Add("TargetId=@TargetId");
  368. cmd.Parameters.Add(cmd, "@TargetId", DbType.String).Value = query.TargetId;
  369. }
  370. if (!string.IsNullOrWhiteSpace(query.UserId))
  371. {
  372. whereClauses.Add("UserId=@UserId");
  373. cmd.Parameters.Add(cmd, "@UserId", DbType.String).Value = query.UserId;
  374. }
  375. if (query.SyncNewContent.HasValue)
  376. {
  377. whereClauses.Add("SyncNewContent=@SyncNewContent");
  378. cmd.Parameters.Add(cmd, "@SyncNewContent", DbType.Boolean).Value = query.SyncNewContent.Value;
  379. }
  380. cmd.CommandText += " mainTable";
  381. var whereTextWithoutPaging = whereClauses.Count == 0 ?
  382. string.Empty :
  383. " where " + string.Join(" AND ", whereClauses.ToArray());
  384. var startIndex = query.StartIndex ?? 0;
  385. if (startIndex > 0)
  386. {
  387. whereClauses.Add(string.Format("Id NOT IN (SELECT Id FROM SyncJobs ORDER BY (Select Max(DateLastModified) from SyncJobs where TargetId=mainTable.TargetId) DESC, DateLastModified DESC LIMIT {0})",
  388. startIndex.ToString(_usCulture)));
  389. }
  390. if (whereClauses.Count > 0)
  391. {
  392. cmd.CommandText += " where " + string.Join(" AND ", whereClauses.ToArray());
  393. }
  394. cmd.CommandText += " ORDER BY (Select Max(DateLastModified) from SyncJobs where TargetId=mainTable.TargetId) DESC, DateLastModified DESC";
  395. if (query.Limit.HasValue)
  396. {
  397. cmd.CommandText += " LIMIT " + query.Limit.Value.ToString(_usCulture);
  398. }
  399. cmd.CommandText += "; select count (Id) from SyncJobs" + whereTextWithoutPaging;
  400. var list = new List<SyncJob>();
  401. var count = 0;
  402. using (var reader = cmd.ExecuteReader(CommandBehavior.SequentialAccess))
  403. {
  404. while (reader.Read())
  405. {
  406. list.Add(GetJob(reader));
  407. }
  408. if (reader.NextResult() && reader.Read())
  409. {
  410. count = reader.GetInt32(0);
  411. }
  412. }
  413. return new QueryResult<SyncJob>()
  414. {
  415. Items = list.ToArray(),
  416. TotalRecordCount = count
  417. };
  418. }
  419. }
  420. public SyncJobItem GetJobItem(string id)
  421. {
  422. if (string.IsNullOrEmpty(id))
  423. {
  424. throw new ArgumentNullException("id");
  425. }
  426. CheckDisposed();
  427. var guid = new Guid(id);
  428. using (var cmd = _connection.CreateCommand())
  429. {
  430. cmd.CommandText = BaseJobItemSelectText + " where Id=@Id";
  431. cmd.Parameters.Add(cmd, "@Id", DbType.Guid).Value = guid;
  432. using (var reader = cmd.ExecuteReader(CommandBehavior.SequentialAccess | CommandBehavior.SingleResult | CommandBehavior.SingleRow))
  433. {
  434. if (reader.Read())
  435. {
  436. return GetJobItem(reader);
  437. }
  438. }
  439. }
  440. return null;
  441. }
  442. private QueryResult<T> GetJobItemReader<T>(SyncJobItemQuery query, string baseSelectText, Func<IDataReader, T> itemFactory)
  443. {
  444. if (query == null)
  445. {
  446. throw new ArgumentNullException("query");
  447. }
  448. using (var cmd = _connection.CreateCommand())
  449. {
  450. cmd.CommandText = baseSelectText;
  451. var whereClauses = new List<string>();
  452. if (!string.IsNullOrWhiteSpace(query.JobId))
  453. {
  454. whereClauses.Add("JobId=@JobId");
  455. cmd.Parameters.Add(cmd, "@JobId", DbType.String).Value = query.JobId;
  456. }
  457. if (!string.IsNullOrWhiteSpace(query.ItemId))
  458. {
  459. whereClauses.Add("ItemId=@ItemId");
  460. cmd.Parameters.Add(cmd, "@ItemId", DbType.String).Value = query.ItemId;
  461. }
  462. if (!string.IsNullOrWhiteSpace(query.TargetId))
  463. {
  464. whereClauses.Add("TargetId=@TargetId");
  465. cmd.Parameters.Add(cmd, "@TargetId", DbType.String).Value = query.TargetId;
  466. }
  467. if (query.Statuses.Count > 0)
  468. {
  469. var statuses = string.Join(",", query.Statuses.Select(i => "'" + i.ToString() + "'").ToArray());
  470. whereClauses.Add(string.Format("Status in ({0})", statuses));
  471. }
  472. var whereTextWithoutPaging = whereClauses.Count == 0 ?
  473. string.Empty :
  474. " where " + string.Join(" AND ", whereClauses.ToArray());
  475. var startIndex = query.StartIndex ?? 0;
  476. if (startIndex > 0)
  477. {
  478. whereClauses.Add(string.Format("Id NOT IN (SELECT Id FROM SyncJobItems ORDER BY JobItemIndex, DateCreated LIMIT {0})",
  479. startIndex.ToString(_usCulture)));
  480. }
  481. if (whereClauses.Count > 0)
  482. {
  483. cmd.CommandText += " where " + string.Join(" AND ", whereClauses.ToArray());
  484. }
  485. cmd.CommandText += " ORDER BY JobItemIndex, DateCreated";
  486. if (query.Limit.HasValue)
  487. {
  488. cmd.CommandText += " LIMIT " + query.Limit.Value.ToString(_usCulture);
  489. }
  490. cmd.CommandText += "; select count (Id) from SyncJobItems" + whereTextWithoutPaging;
  491. var list = new List<T>();
  492. var count = 0;
  493. using (var reader = cmd.ExecuteReader(CommandBehavior.SequentialAccess))
  494. {
  495. while (reader.Read())
  496. {
  497. list.Add(itemFactory(reader));
  498. }
  499. if (reader.NextResult() && reader.Read())
  500. {
  501. count = reader.GetInt32(0);
  502. }
  503. }
  504. return new QueryResult<T>()
  505. {
  506. Items = list.ToArray(),
  507. TotalRecordCount = count
  508. };
  509. }
  510. }
  511. public QueryResult<string> GetLibraryItemIds(SyncJobItemQuery query)
  512. {
  513. return GetJobItemReader(query, "select ItemId from SyncJobItems", GetItemId);
  514. }
  515. public QueryResult<SyncJobItem> GetJobItems(SyncJobItemQuery query)
  516. {
  517. return GetJobItemReader(query, BaseJobItemSelectText, GetJobItem);
  518. }
  519. public Task Create(SyncJobItem jobItem)
  520. {
  521. return InsertOrUpdate(jobItem, _insertJobItemCommand);
  522. }
  523. public Task Update(SyncJobItem jobItem)
  524. {
  525. return InsertOrUpdate(jobItem, _updateJobItemCommand);
  526. }
  527. private async Task InsertOrUpdate(SyncJobItem jobItem, IDbCommand cmd)
  528. {
  529. if (jobItem == null)
  530. {
  531. throw new ArgumentNullException("jobItem");
  532. }
  533. CheckDisposed();
  534. await _writeLock.WaitAsync().ConfigureAwait(false);
  535. IDbTransaction transaction = null;
  536. try
  537. {
  538. transaction = _connection.BeginTransaction();
  539. var index = 0;
  540. cmd.GetParameter(index++).Value = new Guid(jobItem.Id);
  541. cmd.GetParameter(index++).Value = jobItem.ItemId;
  542. cmd.GetParameter(index++).Value = jobItem.ItemName;
  543. cmd.GetParameter(index++).Value = jobItem.MediaSourceId;
  544. cmd.GetParameter(index++).Value = jobItem.JobId;
  545. cmd.GetParameter(index++).Value = jobItem.TemporaryPath;
  546. cmd.GetParameter(index++).Value = jobItem.OutputPath;
  547. cmd.GetParameter(index++).Value = jobItem.Status.ToString();
  548. cmd.GetParameter(index++).Value = jobItem.TargetId;
  549. cmd.GetParameter(index++).Value = jobItem.DateCreated;
  550. cmd.GetParameter(index++).Value = jobItem.Progress;
  551. cmd.GetParameter(index++).Value = _json.SerializeToString(jobItem.AdditionalFiles);
  552. cmd.GetParameter(index++).Value = jobItem.MediaSource == null ? null : _json.SerializeToString(jobItem.MediaSource);
  553. cmd.GetParameter(index++).Value = jobItem.IsMarkedForRemoval;
  554. cmd.GetParameter(index++).Value = jobItem.JobItemIndex;
  555. cmd.Transaction = transaction;
  556. cmd.ExecuteNonQuery();
  557. transaction.Commit();
  558. }
  559. catch (OperationCanceledException)
  560. {
  561. if (transaction != null)
  562. {
  563. transaction.Rollback();
  564. }
  565. throw;
  566. }
  567. catch (Exception e)
  568. {
  569. _logger.ErrorException("Failed to save record:", e);
  570. if (transaction != null)
  571. {
  572. transaction.Rollback();
  573. }
  574. throw;
  575. }
  576. finally
  577. {
  578. if (transaction != null)
  579. {
  580. transaction.Dispose();
  581. }
  582. _writeLock.Release();
  583. }
  584. }
  585. private SyncJobItem GetJobItem(IDataReader reader)
  586. {
  587. var info = new SyncJobItem
  588. {
  589. Id = reader.GetGuid(0).ToString("N"),
  590. ItemId = reader.GetString(1)
  591. };
  592. if (!reader.IsDBNull(2))
  593. {
  594. info.ItemName = reader.GetString(2);
  595. }
  596. if (!reader.IsDBNull(3))
  597. {
  598. info.MediaSourceId = reader.GetString(3);
  599. }
  600. info.JobId = reader.GetString(4);
  601. if (!reader.IsDBNull(5))
  602. {
  603. info.TemporaryPath = reader.GetString(5);
  604. }
  605. if (!reader.IsDBNull(6))
  606. {
  607. info.OutputPath = reader.GetString(6);
  608. }
  609. if (!reader.IsDBNull(7))
  610. {
  611. info.Status = (SyncJobItemStatus)Enum.Parse(typeof(SyncJobItemStatus), reader.GetString(7), true);
  612. }
  613. info.TargetId = reader.GetString(8);
  614. info.DateCreated = reader.GetDateTime(9).ToUniversalTime();
  615. if (!reader.IsDBNull(10))
  616. {
  617. info.Progress = reader.GetDouble(10);
  618. }
  619. if (!reader.IsDBNull(11))
  620. {
  621. var json = reader.GetString(11);
  622. if (!string.IsNullOrWhiteSpace(json))
  623. {
  624. info.AdditionalFiles = _json.DeserializeFromString<List<ItemFileInfo>>(json);
  625. }
  626. }
  627. if (!reader.IsDBNull(12))
  628. {
  629. var json = reader.GetString(12);
  630. if (!string.IsNullOrWhiteSpace(json))
  631. {
  632. info.MediaSource = _json.DeserializeFromString<MediaSourceInfo>(json);
  633. }
  634. }
  635. info.IsMarkedForRemoval = reader.GetBoolean(13);
  636. info.JobItemIndex = reader.GetInt32(14);
  637. return info;
  638. }
  639. private string GetItemId(IDataReader reader)
  640. {
  641. return reader.GetString(0);
  642. }
  643. /// <summary>
  644. /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.
  645. /// </summary>
  646. public void Dispose()
  647. {
  648. Dispose(true);
  649. GC.SuppressFinalize(this);
  650. }
  651. private bool _disposed;
  652. private void CheckDisposed()
  653. {
  654. if (_disposed)
  655. {
  656. throw new ObjectDisposedException(GetType().Name + " has been disposed and cannot be accessed.");
  657. }
  658. }
  659. private readonly object _disposeLock = new object();
  660. /// <summary>
  661. /// Releases unmanaged and - optionally - managed resources.
  662. /// </summary>
  663. /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
  664. protected virtual void Dispose(bool dispose)
  665. {
  666. if (dispose)
  667. {
  668. _disposed = true;
  669. try
  670. {
  671. lock (_disposeLock)
  672. {
  673. if (_connection != null)
  674. {
  675. if (_connection.IsOpen())
  676. {
  677. _connection.Close();
  678. }
  679. _connection.Dispose();
  680. _connection = null;
  681. }
  682. }
  683. }
  684. catch (Exception ex)
  685. {
  686. _logger.ErrorException("Error disposing database", ex);
  687. }
  688. }
  689. }
  690. }
  691. }