SyncRepository.cs 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Globalization;
  4. using System.IO;
  5. using System.Linq;
  6. using System.Threading.Tasks;
  7. using Emby.Server.Implementations.Data;
  8. using MediaBrowser.Controller;
  9. using MediaBrowser.Controller.Sync;
  10. using MediaBrowser.Model.Dto;
  11. using MediaBrowser.Model.Logging;
  12. using MediaBrowser.Model.Querying;
  13. using MediaBrowser.Model.Serialization;
  14. using MediaBrowser.Model.Sync;
  15. using SQLitePCL.pretty;
  16. namespace Emby.Server.Implementations.Sync
  17. {
  18. public class SyncRepository : BaseSqliteRepository, ISyncRepository
  19. {
  20. private readonly CultureInfo _usCulture = new CultureInfo("en-US");
  21. private readonly IJsonSerializer _json;
  22. public SyncRepository(ILogger logger, IJsonSerializer json, IServerApplicationPaths appPaths)
  23. : base(logger)
  24. {
  25. _json = json;
  26. DbFilePath = Path.Combine(appPaths.DataPath, "sync14.db");
  27. }
  28. private class SyncSummary
  29. {
  30. public Dictionary<string, int> Items { get; set; }
  31. public SyncSummary()
  32. {
  33. Items = new Dictionary<string, int>();
  34. }
  35. }
  36. public void Initialize()
  37. {
  38. using (var connection = CreateConnection())
  39. {
  40. RunDefaultInitialization(connection);
  41. string[] queries = {
  42. "create table if not exists SyncJobs (Id GUID PRIMARY KEY, TargetId TEXT NOT NULL, Name TEXT NOT NULL, Profile TEXT, Quality TEXT, Bitrate INT, Status TEXT NOT NULL, Progress FLOAT, UserId TEXT NOT NULL, ItemIds TEXT NOT NULL, Category TEXT, ParentId TEXT, UnwatchedOnly BIT, ItemLimit INT, SyncNewContent BIT, DateCreated DateTime, DateLastModified DateTime, ItemCount int)",
  43. "create table if not exists SyncJobItems (Id GUID PRIMARY KEY, ItemId TEXT, ItemName TEXT, MediaSourceId TEXT, JobId TEXT, TemporaryPath TEXT, OutputPath TEXT, Status TEXT, TargetId TEXT, DateCreated DateTime, Progress FLOAT, AdditionalFiles TEXT, MediaSource TEXT, IsMarkedForRemoval BIT, JobItemIndex INT, ItemDateModifiedTicks BIGINT)",
  44. "drop index if exists idx_SyncJobItems2",
  45. "drop index if exists idx_SyncJobItems3",
  46. "drop index if exists idx_SyncJobs1",
  47. "drop index if exists idx_SyncJobs",
  48. "drop index if exists idx_SyncJobItems1",
  49. "create index if not exists idx_SyncJobItems4 on SyncJobItems(TargetId,ItemId,Status,Progress,DateCreated)",
  50. "create index if not exists idx_SyncJobItems5 on SyncJobItems(TargetId,Status,ItemId,Progress)",
  51. "create index if not exists idx_SyncJobs2 on SyncJobs(TargetId,Status,ItemIds,Progress)",
  52. "pragma shrink_memory"
  53. };
  54. connection.RunQueries(queries);
  55. connection.RunInTransaction(db =>
  56. {
  57. var existingColumnNames = GetColumnNames(db, "SyncJobs");
  58. AddColumn(db, "SyncJobs", "Profile", "TEXT", existingColumnNames);
  59. AddColumn(db, "SyncJobs", "Bitrate", "INT", existingColumnNames);
  60. existingColumnNames = GetColumnNames(db, "SyncJobItems");
  61. AddColumn(db, "SyncJobItems", "ItemDateModifiedTicks", "BIGINT", existingColumnNames);
  62. }, TransactionMode);
  63. }
  64. }
  65. protected override bool EnableTempStoreMemory
  66. {
  67. get
  68. {
  69. return true;
  70. }
  71. }
  72. private const string BaseJobSelectText = "select Id, TargetId, Name, Profile, Quality, Bitrate, Status, Progress, UserId, ItemIds, Category, ParentId, UnwatchedOnly, ItemLimit, SyncNewContent, DateCreated, DateLastModified, ItemCount from SyncJobs";
  73. private const string BaseJobItemSelectText = "select Id, ItemId, ItemName, MediaSourceId, JobId, TemporaryPath, OutputPath, Status, TargetId, DateCreated, Progress, AdditionalFiles, MediaSource, IsMarkedForRemoval, JobItemIndex, ItemDateModifiedTicks from SyncJobItems";
  74. public SyncJob GetJob(string id)
  75. {
  76. if (string.IsNullOrEmpty(id))
  77. {
  78. throw new ArgumentNullException("id");
  79. }
  80. CheckDisposed();
  81. var guid = new Guid(id);
  82. if (guid == Guid.Empty)
  83. {
  84. throw new ArgumentNullException("id");
  85. }
  86. using (WriteLock.Read())
  87. {
  88. using (var connection = CreateConnection(true))
  89. {
  90. var commandText = BaseJobSelectText + " where Id=?";
  91. var paramList = new List<object>();
  92. paramList.Add(guid.ToGuidParamValue());
  93. foreach (var row in connection.Query(commandText, paramList.ToArray()))
  94. {
  95. return GetJob(row);
  96. }
  97. return null;
  98. }
  99. }
  100. }
  101. private SyncJob GetJob(IReadOnlyList<IResultSetValue> reader)
  102. {
  103. var info = new SyncJob
  104. {
  105. Id = reader[0].ReadGuid().ToString("N"),
  106. TargetId = reader[1].ToString(),
  107. Name = reader[2].ToString()
  108. };
  109. if (reader[3].SQLiteType != SQLiteType.Null)
  110. {
  111. info.Profile = reader[3].ToString();
  112. }
  113. if (reader[4].SQLiteType != SQLiteType.Null)
  114. {
  115. info.Quality = reader[4].ToString();
  116. }
  117. if (reader[5].SQLiteType != SQLiteType.Null)
  118. {
  119. info.Bitrate = reader[5].ToInt();
  120. }
  121. if (reader[6].SQLiteType != SQLiteType.Null)
  122. {
  123. info.Status = (SyncJobStatus)Enum.Parse(typeof(SyncJobStatus), reader[6].ToString(), true);
  124. }
  125. if (reader[7].SQLiteType != SQLiteType.Null)
  126. {
  127. info.Progress = reader[7].ToDouble();
  128. }
  129. if (reader[8].SQLiteType != SQLiteType.Null)
  130. {
  131. info.UserId = reader[8].ToString();
  132. }
  133. if (reader[9].SQLiteType != SQLiteType.Null)
  134. {
  135. info.RequestedItemIds = reader[9].ToString().Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries).ToList();
  136. }
  137. if (reader[10].SQLiteType != SQLiteType.Null)
  138. {
  139. info.Category = (SyncCategory)Enum.Parse(typeof(SyncCategory), reader[10].ToString(), true);
  140. }
  141. if (reader[11].SQLiteType != SQLiteType.Null)
  142. {
  143. info.ParentId = reader[11].ToString();
  144. }
  145. if (reader[12].SQLiteType != SQLiteType.Null)
  146. {
  147. info.UnwatchedOnly = reader[12].ToBool();
  148. }
  149. if (reader[13].SQLiteType != SQLiteType.Null)
  150. {
  151. info.ItemLimit = reader[13].ToInt();
  152. }
  153. info.SyncNewContent = reader[14].ToBool();
  154. info.DateCreated = reader[15].ReadDateTime();
  155. info.DateLastModified = reader[16].ReadDateTime();
  156. info.ItemCount = reader[17].ToInt();
  157. return info;
  158. }
  159. public Task Create(SyncJob job)
  160. {
  161. return InsertOrUpdate(job, true);
  162. }
  163. public Task Update(SyncJob job)
  164. {
  165. return InsertOrUpdate(job, false);
  166. }
  167. private async Task InsertOrUpdate(SyncJob job, bool insert)
  168. {
  169. if (job == null)
  170. {
  171. throw new ArgumentNullException("job");
  172. }
  173. CheckDisposed();
  174. using (WriteLock.Write())
  175. {
  176. using (var connection = CreateConnection())
  177. {
  178. string commandText;
  179. if (insert)
  180. {
  181. commandText = "insert into SyncJobs (Id, TargetId, Name, Profile, Quality, Bitrate, Status, Progress, UserId, ItemIds, Category, ParentId, UnwatchedOnly, ItemLimit, SyncNewContent, DateCreated, DateLastModified, ItemCount) values (@Id, @TargetId, @Name, @Profile, @Quality, @Bitrate, @Status, @Progress, @UserId, @ItemIds, @Category, @ParentId, @UnwatchedOnly, @ItemLimit, @SyncNewContent, @DateCreated, @DateLastModified, @ItemCount)";
  182. }
  183. else
  184. {
  185. commandText = "update SyncJobs set TargetId=@TargetId,Name=@Name,Profile=@Profile,Quality=@Quality,Bitrate=@Bitrate,Status=@Status,Progress=@Progress,UserId=@UserId,ItemIds=@ItemIds,Category=@Category,ParentId=@ParentId,UnwatchedOnly=@UnwatchedOnly,ItemLimit=@ItemLimit,SyncNewContent=@SyncNewContent,DateCreated=@DateCreated,DateLastModified=@DateLastModified,ItemCount=@ItemCount where Id=@Id";
  186. }
  187. connection.RunInTransaction(conn =>
  188. {
  189. using (var statement = PrepareStatementSafe(connection, commandText))
  190. {
  191. statement.TryBind("@TargetId", job.TargetId);
  192. statement.TryBind("@Name", job.Name);
  193. statement.TryBind("@Profile", job.Profile);
  194. statement.TryBind("@Quality", job.Quality);
  195. statement.TryBind("@Bitrate", job.Bitrate);
  196. statement.TryBind("@Status", job.Status.ToString());
  197. statement.TryBind("@Progress", job.Progress);
  198. statement.TryBind("@UserId", job.UserId);
  199. statement.TryBind("@ItemIds", string.Join(",", job.RequestedItemIds.ToArray()));
  200. if (job.Category.HasValue)
  201. {
  202. statement.TryBind("@Category", job.Category.Value.ToString());
  203. }
  204. else
  205. {
  206. statement.TryBindNull("@Category");
  207. }
  208. if (!string.IsNullOrWhiteSpace(job.ParentId))
  209. {
  210. statement.TryBind("@ParentId", job.ParentId);
  211. }
  212. else
  213. {
  214. statement.TryBindNull("@ParentId");
  215. }
  216. statement.TryBind("@UnwatchedOnly", job.UnwatchedOnly);
  217. if (job.ItemLimit.HasValue)
  218. {
  219. statement.TryBind("@ItemLimit", job.ItemLimit);
  220. }
  221. else
  222. {
  223. statement.TryBindNull("@ItemLimit");
  224. }
  225. statement.TryBind("@SyncNewContent", job.SyncNewContent);
  226. statement.TryBind("@DateCreated", job.DateCreated.ToDateTimeParamValue());
  227. statement.TryBind("@DateLastModified", job.DateLastModified.ToDateTimeParamValue());
  228. statement.TryBind("@ItemCount", job.ItemCount);
  229. statement.TryBind("@Id", job.Id.ToGuidParamValue());
  230. statement.MoveNext();
  231. }
  232. }, TransactionMode);
  233. }
  234. }
  235. }
  236. public async Task DeleteJob(string id)
  237. {
  238. if (string.IsNullOrWhiteSpace(id))
  239. {
  240. throw new ArgumentNullException("id");
  241. }
  242. CheckDisposed();
  243. using (WriteLock.Write())
  244. {
  245. using (var connection = CreateConnection())
  246. {
  247. connection.RunInTransaction(conn =>
  248. {
  249. conn.Execute("delete from SyncJobs where Id=?", id.ToGuidParamValue());
  250. conn.Execute("delete from SyncJobItems where JobId=?", id);
  251. }, TransactionMode);
  252. }
  253. }
  254. }
  255. public QueryResult<SyncJob> GetJobs(SyncJobQuery query)
  256. {
  257. if (query == null)
  258. {
  259. throw new ArgumentNullException("query");
  260. }
  261. CheckDisposed();
  262. using (WriteLock.Read())
  263. {
  264. using (var connection = CreateConnection(true))
  265. {
  266. var commandText = BaseJobSelectText;
  267. var paramList = new List<object>();
  268. var whereClauses = new List<string>();
  269. if (query.Statuses.Length > 0)
  270. {
  271. var statuses = string.Join(",", query.Statuses.Select(i => "'" + i.ToString() + "'").ToArray());
  272. whereClauses.Add(string.Format("Status in ({0})", statuses));
  273. }
  274. if (!string.IsNullOrWhiteSpace(query.TargetId))
  275. {
  276. whereClauses.Add("TargetId=?");
  277. paramList.Add(query.TargetId);
  278. }
  279. if (!string.IsNullOrWhiteSpace(query.ExcludeTargetIds))
  280. {
  281. var excludeIds = (query.ExcludeTargetIds ?? string.Empty).Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries);
  282. if (excludeIds.Length == 1)
  283. {
  284. whereClauses.Add("TargetId<>?");
  285. paramList.Add(excludeIds[0]);
  286. }
  287. else if (excludeIds.Length > 1)
  288. {
  289. whereClauses.Add("TargetId<>?");
  290. paramList.Add(excludeIds[0]);
  291. }
  292. }
  293. if (!string.IsNullOrWhiteSpace(query.UserId))
  294. {
  295. whereClauses.Add("UserId=?");
  296. paramList.Add(query.UserId);
  297. }
  298. if (!string.IsNullOrWhiteSpace(query.ItemId))
  299. {
  300. whereClauses.Add("ItemIds like ?");
  301. paramList.Add("%" + query.ItemId + "%");
  302. }
  303. if (query.SyncNewContent.HasValue)
  304. {
  305. whereClauses.Add("SyncNewContent=?");
  306. paramList.Add(query.SyncNewContent.Value);
  307. }
  308. commandText += " mainTable";
  309. var whereTextWithoutPaging = whereClauses.Count == 0 ?
  310. string.Empty :
  311. " where " + string.Join(" AND ", whereClauses.ToArray());
  312. var startIndex = query.StartIndex ?? 0;
  313. if (startIndex > 0)
  314. {
  315. whereClauses.Add(string.Format("Id NOT IN (SELECT Id FROM SyncJobs ORDER BY (Select Max(DateLastModified) from SyncJobs where TargetId=mainTable.TargetId) DESC, DateLastModified DESC LIMIT {0})",
  316. startIndex.ToString(_usCulture)));
  317. }
  318. if (whereClauses.Count > 0)
  319. {
  320. commandText += " where " + string.Join(" AND ", whereClauses.ToArray());
  321. }
  322. commandText += " ORDER BY (Select Max(DateLastModified) from SyncJobs where TargetId=mainTable.TargetId) DESC, DateLastModified DESC";
  323. if (query.Limit.HasValue)
  324. {
  325. commandText += " LIMIT " + query.Limit.Value.ToString(_usCulture);
  326. }
  327. var list = new List<SyncJob>();
  328. var count = connection.Query("select count (Id) from SyncJobs" + whereTextWithoutPaging, paramList.ToArray())
  329. .SelectScalarInt()
  330. .First();
  331. foreach (var row in connection.Query(commandText, paramList.ToArray()))
  332. {
  333. list.Add(GetJob(row));
  334. }
  335. return new QueryResult<SyncJob>()
  336. {
  337. Items = list.ToArray(),
  338. TotalRecordCount = count
  339. };
  340. }
  341. }
  342. }
  343. public SyncJobItem GetJobItem(string id)
  344. {
  345. if (string.IsNullOrEmpty(id))
  346. {
  347. throw new ArgumentNullException("id");
  348. }
  349. CheckDisposed();
  350. var guid = new Guid(id);
  351. using (WriteLock.Read())
  352. {
  353. using (var connection = CreateConnection(true))
  354. {
  355. var commandText = BaseJobItemSelectText + " where Id=?";
  356. var paramList = new List<object>();
  357. paramList.Add(guid.ToGuidParamValue());
  358. foreach (var row in connection.Query(commandText, paramList.ToArray()))
  359. {
  360. return GetJobItem(row);
  361. }
  362. return null;
  363. }
  364. }
  365. }
  366. private QueryResult<T> GetJobItemReader<T>(SyncJobItemQuery query, string baseSelectText, Func<IReadOnlyList<IResultSetValue>, T> itemFactory)
  367. {
  368. if (query == null)
  369. {
  370. throw new ArgumentNullException("query");
  371. }
  372. using (WriteLock.Read())
  373. {
  374. using (var connection = CreateConnection(true))
  375. {
  376. var commandText = baseSelectText;
  377. var paramList = new List<object>();
  378. var whereClauses = new List<string>();
  379. if (!string.IsNullOrWhiteSpace(query.JobId))
  380. {
  381. whereClauses.Add("JobId=?");
  382. paramList.Add(query.JobId);
  383. }
  384. if (!string.IsNullOrWhiteSpace(query.ItemId))
  385. {
  386. whereClauses.Add("ItemId=?");
  387. paramList.Add(query.ItemId);
  388. }
  389. if (!string.IsNullOrWhiteSpace(query.TargetId))
  390. {
  391. whereClauses.Add("TargetId=?");
  392. paramList.Add(query.TargetId);
  393. }
  394. if (query.Statuses.Length > 0)
  395. {
  396. var statuses = string.Join(",", query.Statuses.Select(i => "'" + i.ToString() + "'").ToArray());
  397. whereClauses.Add(string.Format("Status in ({0})", statuses));
  398. }
  399. var whereTextWithoutPaging = whereClauses.Count == 0 ?
  400. string.Empty :
  401. " where " + string.Join(" AND ", whereClauses.ToArray());
  402. var startIndex = query.StartIndex ?? 0;
  403. if (startIndex > 0)
  404. {
  405. whereClauses.Add(string.Format("Id NOT IN (SELECT Id FROM SyncJobItems ORDER BY JobItemIndex, DateCreated LIMIT {0})",
  406. startIndex.ToString(_usCulture)));
  407. }
  408. if (whereClauses.Count > 0)
  409. {
  410. commandText += " where " + string.Join(" AND ", whereClauses.ToArray());
  411. }
  412. commandText += " ORDER BY JobItemIndex, DateCreated";
  413. if (query.Limit.HasValue)
  414. {
  415. commandText += " LIMIT " + query.Limit.Value.ToString(_usCulture);
  416. }
  417. var list = new List<T>();
  418. var count = connection.Query("select count (Id) from SyncJobItems" + whereTextWithoutPaging, paramList.ToArray())
  419. .SelectScalarInt()
  420. .First();
  421. foreach (var row in connection.Query(commandText, paramList.ToArray()))
  422. {
  423. list.Add(itemFactory(row));
  424. }
  425. return new QueryResult<T>()
  426. {
  427. Items = list.ToArray(),
  428. TotalRecordCount = count
  429. };
  430. }
  431. }
  432. }
  433. public Dictionary<string, SyncedItemProgress> GetSyncedItemProgresses(SyncJobItemQuery query)
  434. {
  435. var result = new Dictionary<string, SyncedItemProgress>();
  436. var now = DateTime.UtcNow;
  437. using (WriteLock.Read())
  438. {
  439. using (var connection = CreateConnection(true))
  440. {
  441. var commandText = "select ItemId,Status,Progress from SyncJobItems";
  442. var whereClauses = new List<string>();
  443. if (!string.IsNullOrWhiteSpace(query.TargetId))
  444. {
  445. whereClauses.Add("TargetId=@TargetId");
  446. }
  447. if (query.Statuses.Length > 0)
  448. {
  449. var statuses = string.Join(",", query.Statuses.Select(i => "'" + i.ToString() + "'").ToArray());
  450. whereClauses.Add(string.Format("Status in ({0})", statuses));
  451. }
  452. if (whereClauses.Count > 0)
  453. {
  454. commandText += " where " + string.Join(" AND ", whereClauses.ToArray());
  455. }
  456. var statementTexts = new List<string>
  457. {
  458. commandText
  459. };
  460. commandText = commandText
  461. .Replace("select ItemId,Status,Progress from SyncJobItems", "select ItemIds,Status,Progress from SyncJobs")
  462. .Replace("'Synced'", "'Completed','CompletedWithError'");
  463. statementTexts.Add(commandText);
  464. var statements = connection.PrepareAll(string.Join(";", statementTexts.ToArray()))
  465. .ToList();
  466. using (var statement = statements[0])
  467. {
  468. if (!string.IsNullOrWhiteSpace(query.TargetId))
  469. {
  470. statement.TryBind("@TargetId", query.TargetId);
  471. }
  472. foreach (var row in statement.ExecuteQuery())
  473. {
  474. AddStatusResult(row, result, false);
  475. }
  476. LogQueryTime("GetSyncedItemProgresses", commandText, now);
  477. }
  478. now = DateTime.UtcNow;
  479. using (var statement = statements[1])
  480. {
  481. if (!string.IsNullOrWhiteSpace(query.TargetId))
  482. {
  483. statement.TryBind("@TargetId", query.TargetId);
  484. }
  485. foreach (var row in statement.ExecuteQuery())
  486. {
  487. AddStatusResult(row, result, true);
  488. }
  489. LogQueryTime("GetSyncedItemProgresses", commandText, now);
  490. }
  491. }
  492. }
  493. return result;
  494. }
  495. private void LogQueryTime(string methodName, string commandText, DateTime startDate)
  496. {
  497. var elapsed = (DateTime.UtcNow - startDate).TotalMilliseconds;
  498. var slowThreshold = 1000;
  499. #if DEBUG
  500. slowThreshold = 50;
  501. #endif
  502. if (elapsed >= slowThreshold)
  503. {
  504. Logger.Debug("{2} query time (slow): {0}ms. Query: {1}",
  505. Convert.ToInt32(elapsed),
  506. commandText,
  507. methodName);
  508. }
  509. else
  510. {
  511. //Logger.Debug("{2} query time: {0}ms. Query: {1}",
  512. // Convert.ToInt32(elapsed),
  513. // cmd.CommandText,
  514. // methodName);
  515. }
  516. }
  517. private void AddStatusResult(IReadOnlyList<IResultSetValue> reader, Dictionary<string, SyncedItemProgress> result, bool multipleIds)
  518. {
  519. if (reader[0].SQLiteType == SQLiteType.Null)
  520. {
  521. return;
  522. }
  523. var itemIds = new List<string>();
  524. var ids = reader[0].ToString();
  525. if (multipleIds)
  526. {
  527. itemIds = ids.Split(new[] { ',' }, StringSplitOptions.RemoveEmptyEntries).ToList();
  528. }
  529. else
  530. {
  531. itemIds.Add(ids);
  532. }
  533. if (reader[1].SQLiteType != SQLiteType.Null)
  534. {
  535. SyncJobItemStatus status;
  536. var statusString = reader[1].ToString();
  537. if (string.Equals(statusString, "Completed", StringComparison.OrdinalIgnoreCase) ||
  538. string.Equals(statusString, "CompletedWithError", StringComparison.OrdinalIgnoreCase))
  539. {
  540. status = SyncJobItemStatus.Synced;
  541. }
  542. else
  543. {
  544. status = (SyncJobItemStatus)Enum.Parse(typeof(SyncJobItemStatus), statusString, true);
  545. }
  546. if (status == SyncJobItemStatus.Synced)
  547. {
  548. foreach (var itemId in itemIds)
  549. {
  550. result[itemId] = new SyncedItemProgress
  551. {
  552. Status = SyncJobItemStatus.Synced
  553. };
  554. }
  555. }
  556. else
  557. {
  558. double progress = reader[2].SQLiteType == SQLiteType.Null ? 0.0 : reader[2].ToDouble();
  559. foreach (var itemId in itemIds)
  560. {
  561. SyncedItemProgress currentStatus;
  562. if (!result.TryGetValue(itemId, out currentStatus) || (currentStatus.Status != SyncJobItemStatus.Synced && progress >= currentStatus.Progress))
  563. {
  564. result[itemId] = new SyncedItemProgress
  565. {
  566. Status = status,
  567. Progress = progress
  568. };
  569. }
  570. }
  571. }
  572. }
  573. }
  574. public QueryResult<SyncJobItem> GetJobItems(SyncJobItemQuery query)
  575. {
  576. return GetJobItemReader(query, BaseJobItemSelectText, GetJobItem);
  577. }
  578. public Task Create(SyncJobItem jobItem)
  579. {
  580. return InsertOrUpdate(jobItem, true);
  581. }
  582. public Task Update(SyncJobItem jobItem)
  583. {
  584. return InsertOrUpdate(jobItem, false);
  585. }
  586. private async Task InsertOrUpdate(SyncJobItem jobItem, bool insert)
  587. {
  588. if (jobItem == null)
  589. {
  590. throw new ArgumentNullException("jobItem");
  591. }
  592. CheckDisposed();
  593. using (WriteLock.Write())
  594. {
  595. using (var connection = CreateConnection())
  596. {
  597. string commandText;
  598. if (insert)
  599. {
  600. commandText = "insert into SyncJobItems (Id, ItemId, ItemName, MediaSourceId, JobId, TemporaryPath, OutputPath, Status, TargetId, DateCreated, Progress, AdditionalFiles, MediaSource, IsMarkedForRemoval, JobItemIndex, ItemDateModifiedTicks) values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
  601. }
  602. else
  603. {
  604. // cmd
  605. commandText = "update SyncJobItems set ItemId=?,ItemName=?,MediaSourceId=?,JobId=?,TemporaryPath=?,OutputPath=?,Status=?,TargetId=?,DateCreated=?,Progress=?,AdditionalFiles=?,MediaSource=?,IsMarkedForRemoval=?,JobItemIndex=?,ItemDateModifiedTicks=? where Id=?";
  606. }
  607. var paramList = new List<object>();
  608. paramList.Add(jobItem.ItemId);
  609. paramList.Add(jobItem.ItemName);
  610. paramList.Add(jobItem.MediaSourceId);
  611. paramList.Add(jobItem.JobId);
  612. paramList.Add(jobItem.TemporaryPath);
  613. paramList.Add(jobItem.OutputPath);
  614. paramList.Add(jobItem.Status.ToString());
  615. paramList.Add(jobItem.TargetId);
  616. paramList.Add(jobItem.DateCreated.ToDateTimeParamValue());
  617. paramList.Add(jobItem.Progress);
  618. paramList.Add(_json.SerializeToString(jobItem.AdditionalFiles));
  619. paramList.Add(jobItem.MediaSource == null ? null : _json.SerializeToString(jobItem.MediaSource));
  620. paramList.Add(jobItem.IsMarkedForRemoval);
  621. paramList.Add(jobItem.JobItemIndex);
  622. paramList.Add(jobItem.ItemDateModifiedTicks);
  623. if (insert)
  624. {
  625. paramList.Insert(0, jobItem.Id.ToGuidParamValue());
  626. }
  627. else
  628. {
  629. paramList.Add(jobItem.Id.ToGuidParamValue());
  630. }
  631. connection.RunInTransaction(conn =>
  632. {
  633. conn.Execute(commandText, paramList.ToArray());
  634. }, TransactionMode);
  635. }
  636. }
  637. }
  638. private SyncJobItem GetJobItem(IReadOnlyList<IResultSetValue> reader)
  639. {
  640. var info = new SyncJobItem
  641. {
  642. Id = reader[0].ReadGuid().ToString("N"),
  643. ItemId = reader[1].ToString()
  644. };
  645. if (reader[2].SQLiteType != SQLiteType.Null)
  646. {
  647. info.ItemName = reader[2].ToString();
  648. }
  649. if (reader[3].SQLiteType != SQLiteType.Null)
  650. {
  651. info.MediaSourceId = reader[3].ToString();
  652. }
  653. info.JobId = reader[4].ToString();
  654. if (reader[5].SQLiteType != SQLiteType.Null)
  655. {
  656. info.TemporaryPath = reader[5].ToString();
  657. }
  658. if (reader[6].SQLiteType != SQLiteType.Null)
  659. {
  660. info.OutputPath = reader[6].ToString();
  661. }
  662. if (reader[7].SQLiteType != SQLiteType.Null)
  663. {
  664. info.Status = (SyncJobItemStatus)Enum.Parse(typeof(SyncJobItemStatus), reader[7].ToString(), true);
  665. }
  666. info.TargetId = reader[8].ToString();
  667. info.DateCreated = reader[9].ReadDateTime();
  668. if (reader[10].SQLiteType != SQLiteType.Null)
  669. {
  670. info.Progress = reader[10].ToDouble();
  671. }
  672. if (reader[11].SQLiteType != SQLiteType.Null)
  673. {
  674. var json = reader[11].ToString();
  675. if (!string.IsNullOrWhiteSpace(json))
  676. {
  677. info.AdditionalFiles = _json.DeserializeFromString<List<ItemFileInfo>>(json);
  678. }
  679. }
  680. if (reader[12].SQLiteType != SQLiteType.Null)
  681. {
  682. var json = reader[12].ToString();
  683. if (!string.IsNullOrWhiteSpace(json))
  684. {
  685. info.MediaSource = _json.DeserializeFromString<MediaSourceInfo>(json);
  686. }
  687. }
  688. info.IsMarkedForRemoval = reader[13].ToBool();
  689. info.JobItemIndex = reader[14].ToInt();
  690. if (reader[15].SQLiteType != SQLiteType.Null)
  691. {
  692. info.ItemDateModifiedTicks = reader[15].ToInt64();
  693. }
  694. return info;
  695. }
  696. }
  697. }