| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 | using System;using System.IO;using System.Threading;using System.Threading.Tasks;namespace Emby.Server.Implementations.IO{    public class AsyncStreamCopier : IDisposable    {        // size in bytes of the buffers in the buffer pool        private const int DefaultBufferSize = 81920;        private readonly int _bufferSize;        // number of buffers in the pool        private const int DefaultBufferCount = 4;        private readonly int _bufferCount;        // indexes of the next buffer to read into/write from        private int _nextReadBuffer = -1;        private int _nextWriteBuffer = -1;        // the buffer pool, implemented as an array, and used in a cyclic way        private readonly byte[][] _buffers;        // sizes in bytes of the available (read) data in the buffers        private readonly int[] _sizes;        // the streams...        private Stream _source;        private Stream _target;        private readonly bool _closeStreamsOnEnd;        // number of buffers that are ready to be written        private int _buffersToWrite;        // flag indicating that there is still a read operation to be scheduled        // (source end of stream not reached)        private volatile bool _moreDataToRead;        // the result of the whole operation, returned by BeginCopy()        private AsyncResult _asyncResult;        // any exception that occurs during an async operation        // stored here for rethrow        private Exception _exception;        public TaskCompletionSource<long> TaskCompletionSource;        private long _bytesToRead;        private long _totalBytesWritten;        private CancellationToken _cancellationToken;        public int IndividualReadOffset = 0;        public AsyncStreamCopier(Stream source,                                 Stream target,                                 long bytesToRead,                                 CancellationToken cancellationToken,                                 bool closeStreamsOnEnd = false,                                 int bufferSize = DefaultBufferSize,                                 int bufferCount = DefaultBufferCount)        {            if (source == null)                throw new ArgumentNullException("source");            if (target == null)                throw new ArgumentNullException("target");            if (!source.CanRead)                throw new ArgumentException("Cannot copy from a non-readable stream.");            if (!target.CanWrite)                throw new ArgumentException("Cannot copy to a non-writable stream.");            _source = source;            _target = target;            _moreDataToRead = true;            _closeStreamsOnEnd = closeStreamsOnEnd;            _bufferSize = bufferSize;            _bufferCount = bufferCount;            _buffers = new byte[_bufferCount][];            _sizes = new int[_bufferCount];            _bytesToRead = bytesToRead;            _cancellationToken = cancellationToken;        }        ~AsyncStreamCopier()        {            // ensure any exception cannot be ignored            ThrowExceptionIfNeeded();        }        public static Task<long> CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken)        {            return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken);        }        public static Task<long> CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken)        {            var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount);            var taskCompletion = new TaskCompletionSource<long>();            copier.TaskCompletionSource = taskCompletion;            var result = copier.BeginCopy(StreamCopyCallback, copier);            if (result.CompletedSynchronously)            {                StreamCopyCallback(result);            }            cancellationToken.Register(() => taskCompletion.TrySetCanceled());            return taskCompletion.Task;        }        private static void StreamCopyCallback(IAsyncResult result)        {            var copier = (AsyncStreamCopier)result.AsyncState;            var taskCompletion = copier.TaskCompletionSource;            try            {                copier.EndCopy(result);                taskCompletion.TrySetResult(copier._totalBytesWritten);            }            catch (Exception ex)            {                taskCompletion.TrySetException(ex);            }        }        public void Dispose()        {            if (_asyncResult != null)                _asyncResult.Dispose();            if (_closeStreamsOnEnd)            {                if (_source != null)                {                    _source.Dispose();                    _source = null;                }                if (_target != null)                {                    _target.Dispose();                    _target = null;                }            }            GC.SuppressFinalize(this);            ThrowExceptionIfNeeded();        }        public IAsyncResult BeginCopy(AsyncCallback callback, object state)        {            // avoid concurrent start of the copy on separate threads            if (Interlocked.CompareExchange(ref _asyncResult, new AsyncResult(callback, state), null) != null)                throw new InvalidOperationException("A copy operation has already been started on this object.");            // allocate buffers            for (int i = 0; i < _bufferCount; i++)                _buffers[i] = new byte[_bufferSize];            // we pass false to BeginRead() to avoid completing the async result            // immediately which would result in invoking the callback            // when the method fails synchronously            BeginRead(false);            // throw exception synchronously if there is one            ThrowExceptionIfNeeded();            return _asyncResult;        }        public void EndCopy(IAsyncResult ar)        {            if (ar != _asyncResult)                throw new InvalidOperationException("Invalid IAsyncResult object.");            if (!_asyncResult.IsCompleted)                _asyncResult.AsyncWaitHandle.WaitOne();            if (_closeStreamsOnEnd)            {                _source.Close();                _source = null;                _target.Close();                _target = null;            }            //_logger.Info("AsyncStreamCopier {0} bytes requested. {1} bytes transferred", _bytesToRead, _totalBytesWritten);            ThrowExceptionIfNeeded();        }        /// <summary>        /// Here we'll throw a pending exception if there is one,         /// and remove it from our instance, so we know it has been consumed.        /// </summary>        private void ThrowExceptionIfNeeded()        {            if (_exception != null)            {                var exception = _exception;                _exception = null;                throw exception;            }        }        private void BeginRead(bool completeOnError = true)        {            if (!_moreDataToRead)            {                return;            }            if (_asyncResult.IsCompleted)                return;            int bufferIndex = Interlocked.Increment(ref _nextReadBuffer) % _bufferCount;            try            {                _source.BeginRead(_buffers[bufferIndex], 0, _bufferSize, EndRead, bufferIndex);            }            catch (Exception exception)            {                _exception = exception;                if (completeOnError)                    _asyncResult.Complete(false);            }        }        private void BeginWrite()        {            if (_asyncResult.IsCompleted)                return;            // this method can actually be called concurrently!!            // indeed, let's say we call a BeginWrite, and the thread gets interrupted             // just after making the IO request.            // At that moment, the thread is still in the method. And then the IO request            // ends (extremely fast io, or caching...), EndWrite gets called            // on another thread, and calls BeginWrite again! There we have it!            // That is the reason why an Interlocked is needed here.            int bufferIndex = Interlocked.Increment(ref _nextWriteBuffer) % _bufferCount;            try            {                int bytesToWrite;                if (_bytesToRead > 0)                {                    var bytesLeftToWrite = _bytesToRead - _totalBytesWritten;                    bytesToWrite = Convert.ToInt32(Math.Min(_sizes[bufferIndex], bytesLeftToWrite));                }                else                {                    bytesToWrite = _sizes[bufferIndex];                }                _target.BeginWrite(_buffers[bufferIndex], IndividualReadOffset, bytesToWrite - IndividualReadOffset, EndWrite, null);                _totalBytesWritten += bytesToWrite;            }            catch (Exception exception)            {                _exception = exception;                _asyncResult.Complete(false);            }        }        private void EndRead(IAsyncResult ar)        {            try            {                int read = _source.EndRead(ar);                _moreDataToRead = read > 0;                var bufferIndex = (int)ar.AsyncState;                _sizes[bufferIndex] = read;            }            catch (Exception exception)            {                _exception = exception;                _asyncResult.Complete(false);                return;            }            if (_moreDataToRead && !_cancellationToken.IsCancellationRequested)            {                int usedBuffers = Interlocked.Increment(ref _buffersToWrite);                // if we incremented from zero to one, then it means we just                 // added the single buffer to write, so a writer could not                 // be busy, and we have to schedule one.                if (usedBuffers == 1)                    BeginWrite();                // test if there is at least a free buffer, and schedule                // a read, as we have read some data                if (usedBuffers < _bufferCount)                    BeginRead();            }            else            {                // we did not add a buffer, because no data was read, and                 // there is no buffer left to write so this is the end...                if (Thread.VolatileRead(ref _buffersToWrite) == 0)                {                    _asyncResult.Complete(false);                }            }        }        private void EndWrite(IAsyncResult ar)        {            try            {                _target.EndWrite(ar);            }            catch (Exception exception)            {                _exception = exception;                _asyncResult.Complete(false);                return;            }            int buffersLeftToWrite = Interlocked.Decrement(ref _buffersToWrite);            // no reader could be active if all buffers were full of data waiting to be written            bool noReaderIsBusy = buffersLeftToWrite == _bufferCount - 1;            // note that it is possible that both a reader and            // a writer see the end of the copy and call Complete            // on the _asyncResult object. That race condition is handled by            // Complete that ensures it is only executed fully once.            long bytesLeftToWrite;            if (_bytesToRead > 0)            {                bytesLeftToWrite = _bytesToRead - _totalBytesWritten;            }            else            {                bytesLeftToWrite = 1;            }            if (!_moreDataToRead || bytesLeftToWrite <= 0 || _cancellationToken.IsCancellationRequested)            {                // at this point we know no reader can schedule a read or write                if (Thread.VolatileRead(ref _buffersToWrite) == 0)                {                    // nothing left to write, so it is the end                    _asyncResult.Complete(false);                    return;                }            }            else                // here, we know we have something left to read,                 // so schedule a read if no read is busy                if (noReaderIsBusy)                BeginRead();            // also schedule a write if we are sure we did not write the last buffer            // note that if buffersLeftToWrite is zero and a reader has put another            // buffer to write between the time we decremented _buffersToWrite             // and now, that reader will also schedule another write,            // as it will increment _buffersToWrite from zero to one            if (buffersLeftToWrite > 0)                BeginWrite();        }    }    internal class AsyncResult : IAsyncResult, IDisposable    {        // Fields set at construction which never change while        // operation is pending        private readonly AsyncCallback _asyncCallback;        private readonly object _asyncState;        // Fields set at construction which do change after        // operation completes        private const int StatePending = 0;        private const int StateCompletedSynchronously = 1;        private const int StateCompletedAsynchronously = 2;        private int _completedState = StatePending;        // Field that may or may not get set depending on usage        private ManualResetEvent _waitHandle;        internal AsyncResult(            AsyncCallback asyncCallback,            object state)        {            _asyncCallback = asyncCallback;            _asyncState = state;        }        internal bool Complete(bool completedSynchronously)        {            bool result = false;            // The _completedState field MUST be set prior calling the callback            int prevState = Interlocked.CompareExchange(ref _completedState,                completedSynchronously ? StateCompletedSynchronously :                StateCompletedAsynchronously, StatePending);            if (prevState == StatePending)            {                // If the event exists, set it                if (_waitHandle != null)                    _waitHandle.Set();                if (_asyncCallback != null)                    _asyncCallback(this);                result = true;            }            return result;        }        #region Implementation of IAsyncResult        public Object AsyncState { get { return _asyncState; } }        public bool CompletedSynchronously        {            get            {                return Thread.VolatileRead(ref _completedState) ==                    StateCompletedSynchronously;            }        }        public WaitHandle AsyncWaitHandle        {            get            {                if (_waitHandle == null)                {                    bool done = IsCompleted;                    var mre = new ManualResetEvent(done);                    if (Interlocked.CompareExchange(ref _waitHandle,                        mre, null) != null)                    {                        // Another thread created this object's event; dispose                        // the event we just created                        mre.Close();                    }                    else                    {                        if (!done && IsCompleted)                        {                            // If the operation wasn't done when we created                            // the event but now it is done, set the event                            _waitHandle.Set();                        }                    }                }                return _waitHandle;            }        }        public bool IsCompleted        {            get            {                return Thread.VolatileRead(ref _completedState) !=                    StatePending;            }        }        #endregion        public void Dispose()        {            if (_waitHandle != null)            {                _waitHandle.Dispose();                _waitHandle = null;            }        }    }}
 |