| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 | using MediaBrowser.Model.Logging;using System;using System.Collections.Concurrent;using System.Data;using System.Data.Common;using System.Data.SQLite;using System.IO;using System.Threading;using System.Threading.Tasks;namespace MediaBrowser.Server.Implementations.Sqlite{    /// <summary>    /// Class SqliteRepository    /// </summary>    public abstract class SqliteRepository : IDisposable    {        /// <summary>        /// The db file name        /// </summary>        protected string dbFileName;        /// <summary>        /// The connection        /// </summary>        protected SQLiteConnection connection;        /// <summary>        /// The delayed commands        /// </summary>        protected ConcurrentQueue<SQLiteCommand> delayedCommands = new ConcurrentQueue<SQLiteCommand>();        /// <summary>        /// The flush interval        /// </summary>        private const int FlushInterval = 5000;        /// <summary>        /// The flush timer        /// </summary>        private Timer FlushTimer;        /// <summary>        /// Gets the logger.        /// </summary>        /// <value>The logger.</value>        protected ILogger Logger { get; private set; }        /// <summary>        /// Initializes a new instance of the <see cref="SqliteRepository" /> class.        /// </summary>        /// <param name="logger">The logger.</param>        /// <exception cref="System.ArgumentNullException">logger</exception>        protected SqliteRepository(ILogger logger)        {            if (logger == null)            {                throw new ArgumentNullException("logger");            }            Logger = logger;        }        /// <summary>        /// Connects to DB.        /// </summary>        /// <param name="dbPath">The db path.</param>        /// <returns>Task{System.Boolean}.</returns>        /// <exception cref="System.ArgumentNullException">dbPath</exception>        protected async Task ConnectToDB(string dbPath)        {            if (string.IsNullOrEmpty(dbPath))            {                throw new ArgumentNullException("dbPath");            }            dbFileName = dbPath;            var connectionstr = new SQLiteConnectionStringBuilder            {                PageSize = 4096,                CacheSize = 40960,                SyncMode = SynchronizationModes.Off,                DataSource = dbPath,                JournalMode = SQLiteJournalModeEnum.Memory            };            connection = new SQLiteConnection(connectionstr.ConnectionString);            await connection.OpenAsync().ConfigureAwait(false);            // Run once            FlushTimer = new Timer(Flush, null, TimeSpan.FromMilliseconds(FlushInterval), TimeSpan.FromMilliseconds(-1));        }        /// <summary>        /// Runs the queries.        /// </summary>        /// <param name="queries">The queries.</param>        /// <returns><c>true</c> if XXXX, <c>false</c> otherwise</returns>        /// <exception cref="System.ArgumentNullException">queries</exception>        protected void RunQueries(string[] queries)        {            if (queries == null)            {                throw new ArgumentNullException("queries");            }            using (var tran = connection.BeginTransaction())            {                try                {                    var cmd = connection.CreateCommand();                    foreach (var query in queries)                    {                        cmd.Transaction = tran;                        cmd.CommandText = query;                        cmd.ExecuteNonQuery();                    }                    tran.Commit();                }                catch (Exception e)                {                    Logger.ErrorException("Error running queries", e);                    tran.Rollback();                    throw;                }            }        }        /// <summary>        /// Performs application-defined tasks associated with freeing, releasing, or resetting unmanaged resources.        /// </summary>        public void Dispose()        {            Dispose(true);            GC.SuppressFinalize(this);        }        /// <summary>        /// Releases unmanaged and - optionally - managed resources.        /// </summary>        /// <param name="dispose"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>        protected virtual void Dispose(bool dispose)        {            if (dispose)            {                try                {                    if (connection != null)                    {                        // If we're not already flushing, do it now                        if (!IsFlushing)                        {                            Flush(null);                        }                        // Don't dispose in the middle of a flush                        while (IsFlushing)                        {                            Thread.Sleep(25);                        }                                                if (connection.IsOpen())                        {                            connection.Close();                        }                        connection.Dispose();                    }                    if (FlushTimer != null)                    {                        FlushTimer.Dispose();                        FlushTimer = null;                    }                }                catch (Exception ex)                {                    Logger.ErrorException("Error disposing database", ex);                }            }        }        /// <summary>        /// Queues the command.        /// </summary>        /// <param name="cmd">The CMD.</param>        /// <exception cref="System.ArgumentNullException">cmd</exception>        protected void QueueCommand(SQLiteCommand cmd)        {            if (cmd == null)            {                throw new ArgumentNullException("cmd");            }            delayedCommands.Enqueue(cmd);        }        /// <summary>        /// The is flushing        /// </summary>        private bool IsFlushing;        /// <summary>        /// Flushes the specified sender.        /// </summary>        /// <param name="sender">The sender.</param>        private void Flush(object sender)        {            // Cannot call Count on a ConcurrentQueue since it's an O(n) operation            // Use IsEmpty instead            if (delayedCommands.IsEmpty)            {                FlushTimer.Change(TimeSpan.FromMilliseconds(FlushInterval), TimeSpan.FromMilliseconds(-1));                return;            }            if (IsFlushing)            {                return;            }            IsFlushing = true;            var numCommands = 0;            using (var tran = connection.BeginTransaction())            {                try                {                    while (!delayedCommands.IsEmpty)                    {                        SQLiteCommand command;                        delayedCommands.TryDequeue(out command);                        command.Connection = connection;                        command.Transaction = tran;                        command.ExecuteNonQuery();                        numCommands++;                    }                    tran.Commit();                }                catch (Exception e)                {                    Logger.ErrorException("Failed to commit transaction.", e);                    tran.Rollback();                }            }            Logger.Info("SQL Delayed writer executed " + numCommands + " commands");            FlushTimer.Change(TimeSpan.FromMilliseconds(FlushInterval), TimeSpan.FromMilliseconds(-1));            IsFlushing = false;        }        /// <summary>        /// Executes the command.        /// </summary>        /// <param name="cmd">The CMD.</param>        /// <returns>Task.</returns>        /// <exception cref="System.ArgumentNullException">cmd</exception>        public async Task ExecuteCommand(DbCommand cmd)        {            if (cmd == null)            {                throw new ArgumentNullException("cmd");            }            using (var tran = connection.BeginTransaction())            {                try                {                    cmd.Connection = connection;                    cmd.Transaction = tran;                    await cmd.ExecuteNonQueryAsync().ConfigureAwait(false);                    tran.Commit();                }                catch (Exception e)                {                    Logger.ErrorException("Failed to commit transaction.", e);                    tran.Rollback();                }            }        }        /// <summary>        /// Gets a stream from a DataReader at a given ordinal        /// </summary>        /// <param name="reader">The reader.</param>        /// <param name="ordinal">The ordinal.</param>        /// <returns>Stream.</returns>        /// <exception cref="System.ArgumentNullException">reader</exception>        protected static Stream GetStream(IDataReader reader, int ordinal)        {            if (reader == null)            {                throw new ArgumentNullException("reader");            }            var memoryStream = new MemoryStream();            var num = 0L;            var array = new byte[4096];            long bytes;            do            {                bytes = reader.GetBytes(ordinal, num, array, 0, array.Length);                memoryStream.Write(array, 0, (int)bytes);                num += bytes;            }            while (bytes > 0L);            memoryStream.Position = 0;            return memoryStream;        }    }}
 |