| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459 | 
							- using System;
 
- using System.IO;
 
- using System.Threading;
 
- using System.Threading.Tasks;
 
- namespace Emby.Server.Implementations.IO
 
- {
 
-     public class AsyncStreamCopier : IDisposable
 
-     {
 
-         // size in bytes of the buffers in the buffer pool
 
-         private const int DefaultBufferSize = 81920;
 
-         private readonly int _bufferSize;
 
-         // number of buffers in the pool
 
-         private const int DefaultBufferCount = 4;
 
-         private readonly int _bufferCount;
 
-         // indexes of the next buffer to read into/write from
 
-         private int _nextReadBuffer = -1;
 
-         private int _nextWriteBuffer = -1;
 
-         // the buffer pool, implemented as an array, and used in a cyclic way
 
-         private readonly byte[][] _buffers;
 
-         // sizes in bytes of the available (read) data in the buffers
 
-         private readonly int[] _sizes;
 
-         // the streams...
 
-         private Stream _source;
 
-         private Stream _target;
 
-         private readonly bool _closeStreamsOnEnd;
 
-         // number of buffers that are ready to be written
 
-         private int _buffersToWrite;
 
-         // flag indicating that there is still a read operation to be scheduled
 
-         // (source end of stream not reached)
 
-         private volatile bool _moreDataToRead;
 
-         // the result of the whole operation, returned by BeginCopy()
 
-         private AsyncResult _asyncResult;
 
-         // any exception that occurs during an async operation
 
-         // stored here for rethrow
 
-         private Exception _exception;
 
-         public TaskCompletionSource<long> TaskCompletionSource;
 
-         private long _bytesToRead;
 
-         private long _totalBytesWritten;
 
-         private CancellationToken _cancellationToken;
 
-         public int IndividualReadOffset = 0;
 
-         public AsyncStreamCopier(Stream source,
 
-                                  Stream target,
 
-                                  long bytesToRead,
 
-                                  CancellationToken cancellationToken,
 
-                                  bool closeStreamsOnEnd = false,
 
-                                  int bufferSize = DefaultBufferSize,
 
-                                  int bufferCount = DefaultBufferCount)
 
-         {
 
-             if (source == null)
 
-                 throw new ArgumentNullException("source");
 
-             if (target == null)
 
-                 throw new ArgumentNullException("target");
 
-             if (!source.CanRead)
 
-                 throw new ArgumentException("Cannot copy from a non-readable stream.");
 
-             if (!target.CanWrite)
 
-                 throw new ArgumentException("Cannot copy to a non-writable stream.");
 
-             _source = source;
 
-             _target = target;
 
-             _moreDataToRead = true;
 
-             _closeStreamsOnEnd = closeStreamsOnEnd;
 
-             _bufferSize = bufferSize;
 
-             _bufferCount = bufferCount;
 
-             _buffers = new byte[_bufferCount][];
 
-             _sizes = new int[_bufferCount];
 
-             _bytesToRead = bytesToRead;
 
-             _cancellationToken = cancellationToken;
 
-         }
 
-         ~AsyncStreamCopier()
 
-         {
 
-             // ensure any exception cannot be ignored
 
-             ThrowExceptionIfNeeded();
 
-         }
 
-         public static Task<long> CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken)
 
-         {
 
-             return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken);
 
-         }
 
-         public static Task<long> CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken)
 
-         {
 
-             var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount);
 
-             var taskCompletion = new TaskCompletionSource<long>();
 
-             copier.TaskCompletionSource = taskCompletion;
 
-             var result = copier.BeginCopy(StreamCopyCallback, copier);
 
-             if (result.CompletedSynchronously)
 
-             {
 
-                 StreamCopyCallback(result);
 
-             }
 
-             cancellationToken.Register(() => taskCompletion.TrySetCanceled());
 
-             return taskCompletion.Task;
 
-         }
 
-         private static void StreamCopyCallback(IAsyncResult result)
 
-         {
 
-             var copier = (AsyncStreamCopier)result.AsyncState;
 
-             var taskCompletion = copier.TaskCompletionSource;
 
-             try
 
-             {
 
-                 copier.EndCopy(result);
 
-                 taskCompletion.TrySetResult(copier._totalBytesWritten);
 
-             }
 
-             catch (Exception ex)
 
-             {
 
-                 taskCompletion.TrySetException(ex);
 
-             }
 
-         }
 
-         public void Dispose()
 
-         {
 
-             if (_asyncResult != null)
 
-                 _asyncResult.Dispose();
 
-             if (_closeStreamsOnEnd)
 
-             {
 
-                 if (_source != null)
 
-                 {
 
-                     _source.Dispose();
 
-                     _source = null;
 
-                 }
 
-                 if (_target != null)
 
-                 {
 
-                     _target.Dispose();
 
-                     _target = null;
 
-                 }
 
-             }
 
-             GC.SuppressFinalize(this);
 
-             ThrowExceptionIfNeeded();
 
-         }
 
-         public IAsyncResult BeginCopy(AsyncCallback callback, object state)
 
-         {
 
-             // avoid concurrent start of the copy on separate threads
 
-             if (Interlocked.CompareExchange(ref _asyncResult, new AsyncResult(callback, state), null) != null)
 
-                 throw new InvalidOperationException("A copy operation has already been started on this object.");
 
-             // allocate buffers
 
-             for (int i = 0; i < _bufferCount; i++)
 
-                 _buffers[i] = new byte[_bufferSize];
 
-             // we pass false to BeginRead() to avoid completing the async result
 
-             // immediately which would result in invoking the callback
 
-             // when the method fails synchronously
 
-             BeginRead(false);
 
-             // throw exception synchronously if there is one
 
-             ThrowExceptionIfNeeded();
 
-             return _asyncResult;
 
-         }
 
-         public void EndCopy(IAsyncResult ar)
 
-         {
 
-             if (ar != _asyncResult)
 
-                 throw new InvalidOperationException("Invalid IAsyncResult object.");
 
-             if (!_asyncResult.IsCompleted)
 
-                 _asyncResult.AsyncWaitHandle.WaitOne();
 
-             if (_closeStreamsOnEnd)
 
-             {
 
-                 _source.Close();
 
-                 _source = null;
 
-                 _target.Close();
 
-                 _target = null;
 
-             }
 
-             //_logger.Info("AsyncStreamCopier {0} bytes requested. {1} bytes transferred", _bytesToRead, _totalBytesWritten);
 
-             ThrowExceptionIfNeeded();
 
-         }
 
-         /// <summary>
 
-         /// Here we'll throw a pending exception if there is one, 
 
-         /// and remove it from our instance, so we know it has been consumed.
 
-         /// </summary>
 
-         private void ThrowExceptionIfNeeded()
 
-         {
 
-             if (_exception != null)
 
-             {
 
-                 var exception = _exception;
 
-                 _exception = null;
 
-                 throw exception;
 
-             }
 
-         }
 
-         private void BeginRead(bool completeOnError = true)
 
-         {
 
-             if (!_moreDataToRead)
 
-             {
 
-                 return;
 
-             }
 
-             if (_asyncResult.IsCompleted)
 
-                 return;
 
-             int bufferIndex = Interlocked.Increment(ref _nextReadBuffer) % _bufferCount;
 
-             try
 
-             {
 
-                 _source.BeginRead(_buffers[bufferIndex], 0, _bufferSize, EndRead, bufferIndex);
 
-             }
 
-             catch (Exception exception)
 
-             {
 
-                 _exception = exception;
 
-                 if (completeOnError)
 
-                     _asyncResult.Complete(false);
 
-             }
 
-         }
 
-         private void BeginWrite()
 
-         {
 
-             if (_asyncResult.IsCompleted)
 
-                 return;
 
-             // this method can actually be called concurrently!!
 
-             // indeed, let's say we call a BeginWrite, and the thread gets interrupted 
 
-             // just after making the IO request.
 
-             // At that moment, the thread is still in the method. And then the IO request
 
-             // ends (extremely fast io, or caching...), EndWrite gets called
 
-             // on another thread, and calls BeginWrite again! There we have it!
 
-             // That is the reason why an Interlocked is needed here.
 
-             int bufferIndex = Interlocked.Increment(ref _nextWriteBuffer) % _bufferCount;
 
-             try
 
-             {
 
-                 int bytesToWrite;
 
-                 if (_bytesToRead > 0)
 
-                 {
 
-                     var bytesLeftToWrite = _bytesToRead - _totalBytesWritten;
 
-                     bytesToWrite = Convert.ToInt32(Math.Min(_sizes[bufferIndex], bytesLeftToWrite));
 
-                 }
 
-                 else
 
-                 {
 
-                     bytesToWrite = _sizes[bufferIndex];
 
-                 }
 
-                 _target.BeginWrite(_buffers[bufferIndex], IndividualReadOffset, bytesToWrite - IndividualReadOffset, EndWrite, null);
 
-                 _totalBytesWritten += bytesToWrite;
 
-             }
 
-             catch (Exception exception)
 
-             {
 
-                 _exception = exception;
 
-                 _asyncResult.Complete(false);
 
-             }
 
-         }
 
-         private void EndRead(IAsyncResult ar)
 
-         {
 
-             try
 
-             {
 
-                 int read = _source.EndRead(ar);
 
-                 _moreDataToRead = read > 0;
 
-                 var bufferIndex = (int)ar.AsyncState;
 
-                 _sizes[bufferIndex] = read;
 
-             }
 
-             catch (Exception exception)
 
-             {
 
-                 _exception = exception;
 
-                 _asyncResult.Complete(false);
 
-                 return;
 
-             }
 
-             if (_moreDataToRead && !_cancellationToken.IsCancellationRequested)
 
-             {
 
-                 int usedBuffers = Interlocked.Increment(ref _buffersToWrite);
 
-                 // if we incremented from zero to one, then it means we just 
 
-                 // added the single buffer to write, so a writer could not 
 
-                 // be busy, and we have to schedule one.
 
-                 if (usedBuffers == 1)
 
-                     BeginWrite();
 
-                 // test if there is at least a free buffer, and schedule
 
-                 // a read, as we have read some data
 
-                 if (usedBuffers < _bufferCount)
 
-                     BeginRead();
 
-             }
 
-             else
 
-             {
 
-                 // we did not add a buffer, because no data was read, and 
 
-                 // there is no buffer left to write so this is the end...
 
-                 if (Thread.VolatileRead(ref _buffersToWrite) == 0)
 
-                 {
 
-                     _asyncResult.Complete(false);
 
-                 }
 
-             }
 
-         }
 
-         private void EndWrite(IAsyncResult ar)
 
-         {
 
-             try
 
-             {
 
-                 _target.EndWrite(ar);
 
-             }
 
-             catch (Exception exception)
 
-             {
 
-                 _exception = exception;
 
-                 _asyncResult.Complete(false);
 
-                 return;
 
-             }
 
-             int buffersLeftToWrite = Interlocked.Decrement(ref _buffersToWrite);
 
-             // no reader could be active if all buffers were full of data waiting to be written
 
-             bool noReaderIsBusy = buffersLeftToWrite == _bufferCount - 1;
 
-             // note that it is possible that both a reader and
 
-             // a writer see the end of the copy and call Complete
 
-             // on the _asyncResult object. That race condition is handled by
 
-             // Complete that ensures it is only executed fully once.
 
-             long bytesLeftToWrite;
 
-             if (_bytesToRead > 0)
 
-             {
 
-                 bytesLeftToWrite = _bytesToRead - _totalBytesWritten;
 
-             }
 
-             else
 
-             {
 
-                 bytesLeftToWrite = 1;
 
-             }
 
-             if (!_moreDataToRead || bytesLeftToWrite <= 0 || _cancellationToken.IsCancellationRequested)
 
-             {
 
-                 // at this point we know no reader can schedule a read or write
 
-                 if (Thread.VolatileRead(ref _buffersToWrite) == 0)
 
-                 {
 
-                     // nothing left to write, so it is the end
 
-                     _asyncResult.Complete(false);
 
-                     return;
 
-                 }
 
-             }
 
-             else
 
-                 // here, we know we have something left to read, 
 
-                 // so schedule a read if no read is busy
 
-                 if (noReaderIsBusy)
 
-                 BeginRead();
 
-             // also schedule a write if we are sure we did not write the last buffer
 
-             // note that if buffersLeftToWrite is zero and a reader has put another
 
-             // buffer to write between the time we decremented _buffersToWrite 
 
-             // and now, that reader will also schedule another write,
 
-             // as it will increment _buffersToWrite from zero to one
 
-             if (buffersLeftToWrite > 0)
 
-                 BeginWrite();
 
-         }
 
-     }
 
-     internal class AsyncResult : IAsyncResult, IDisposable
 
-     {
 
-         // Fields set at construction which never change while
 
-         // operation is pending
 
-         private readonly AsyncCallback _asyncCallback;
 
-         private readonly object _asyncState;
 
-         // Fields set at construction which do change after
 
-         // operation completes
 
-         private const int StatePending = 0;
 
-         private const int StateCompletedSynchronously = 1;
 
-         private const int StateCompletedAsynchronously = 2;
 
-         private int _completedState = StatePending;
 
-         // Field that may or may not get set depending on usage
 
-         private ManualResetEvent _waitHandle;
 
-         internal AsyncResult(
 
-             AsyncCallback asyncCallback,
 
-             object state)
 
-         {
 
-             _asyncCallback = asyncCallback;
 
-             _asyncState = state;
 
-         }
 
-         internal bool Complete(bool completedSynchronously)
 
-         {
 
-             bool result = false;
 
-             // The _completedState field MUST be set prior calling the callback
 
-             int prevState = Interlocked.CompareExchange(ref _completedState,
 
-                 completedSynchronously ? StateCompletedSynchronously :
 
-                 StateCompletedAsynchronously, StatePending);
 
-             if (prevState == StatePending)
 
-             {
 
-                 // If the event exists, set it
 
-                 if (_waitHandle != null)
 
-                     _waitHandle.Set();
 
-                 if (_asyncCallback != null)
 
-                     _asyncCallback(this);
 
-                 result = true;
 
-             }
 
-             return result;
 
-         }
 
-         #region Implementation of IAsyncResult
 
-         public Object AsyncState { get { return _asyncState; } }
 
-         public bool CompletedSynchronously
 
-         {
 
-             get
 
-             {
 
-                 return Thread.VolatileRead(ref _completedState) ==
 
-                     StateCompletedSynchronously;
 
-             }
 
-         }
 
-         public WaitHandle AsyncWaitHandle
 
-         {
 
-             get
 
-             {
 
-                 if (_waitHandle == null)
 
-                 {
 
-                     bool done = IsCompleted;
 
-                     var mre = new ManualResetEvent(done);
 
-                     if (Interlocked.CompareExchange(ref _waitHandle,
 
-                         mre, null) != null)
 
-                     {
 
-                         // Another thread created this object's event; dispose
 
-                         // the event we just created
 
-                         mre.Close();
 
-                     }
 
-                     else
 
-                     {
 
-                         if (!done && IsCompleted)
 
-                         {
 
-                             // If the operation wasn't done when we created
 
-                             // the event but now it is done, set the event
 
-                             _waitHandle.Set();
 
-                         }
 
-                     }
 
-                 }
 
-                 return _waitHandle;
 
-             }
 
-         }
 
-         public bool IsCompleted
 
-         {
 
-             get
 
-             {
 
-                 return Thread.VolatileRead(ref _completedState) !=
 
-                     StatePending;
 
-             }
 
-         }
 
-         #endregion
 
-         public void Dispose()
 
-         {
 
-             if (_waitHandle != null)
 
-             {
 
-                 _waitHandle.Dispose();
 
-                 _waitHandle = null;
 
-             }
 
-         }
 
-     }
 
- }
 
 
  |