AsyncStreamCopier.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. using System;
  2. using System.IO;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. namespace Emby.Server.Implementations.IO
  6. {
  7. public class AsyncStreamCopier : IDisposable
  8. {
  9. // size in bytes of the buffers in the buffer pool
  10. private const int DefaultBufferSize = 81920;
  11. private readonly int _bufferSize;
  12. // number of buffers in the pool
  13. private const int DefaultBufferCount = 4;
  14. private readonly int _bufferCount;
  15. // indexes of the next buffer to read into/write from
  16. private int _nextReadBuffer = -1;
  17. private int _nextWriteBuffer = -1;
  18. // the buffer pool, implemented as an array, and used in a cyclic way
  19. private readonly byte[][] _buffers;
  20. // sizes in bytes of the available (read) data in the buffers
  21. private readonly int[] _sizes;
  22. // the streams...
  23. private Stream _source;
  24. private Stream _target;
  25. private readonly bool _closeStreamsOnEnd;
  26. // number of buffers that are ready to be written
  27. private int _buffersToWrite;
  28. // flag indicating that there is still a read operation to be scheduled
  29. // (source end of stream not reached)
  30. private volatile bool _moreDataToRead;
  31. // the result of the whole operation, returned by BeginCopy()
  32. private AsyncResult _asyncResult;
  33. // any exception that occurs during an async operation
  34. // stored here for rethrow
  35. private Exception _exception;
  36. public TaskCompletionSource<long> TaskCompletionSource;
  37. private long _bytesToRead;
  38. private long _totalBytesWritten;
  39. private CancellationToken _cancellationToken;
  40. public int IndividualReadOffset = 0;
  41. public AsyncStreamCopier(Stream source,
  42. Stream target,
  43. long bytesToRead,
  44. CancellationToken cancellationToken,
  45. bool closeStreamsOnEnd = false,
  46. int bufferSize = DefaultBufferSize,
  47. int bufferCount = DefaultBufferCount)
  48. {
  49. if (source == null)
  50. throw new ArgumentNullException("source");
  51. if (target == null)
  52. throw new ArgumentNullException("target");
  53. if (!source.CanRead)
  54. throw new ArgumentException("Cannot copy from a non-readable stream.");
  55. if (!target.CanWrite)
  56. throw new ArgumentException("Cannot copy to a non-writable stream.");
  57. _source = source;
  58. _target = target;
  59. _moreDataToRead = true;
  60. _closeStreamsOnEnd = closeStreamsOnEnd;
  61. _bufferSize = bufferSize;
  62. _bufferCount = bufferCount;
  63. _buffers = new byte[_bufferCount][];
  64. _sizes = new int[_bufferCount];
  65. _bytesToRead = bytesToRead;
  66. _cancellationToken = cancellationToken;
  67. }
  68. ~AsyncStreamCopier()
  69. {
  70. // ensure any exception cannot be ignored
  71. ThrowExceptionIfNeeded();
  72. }
  73. public static Task<long> CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken)
  74. {
  75. return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken);
  76. }
  77. public static Task<long> CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken)
  78. {
  79. var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount);
  80. var taskCompletion = new TaskCompletionSource<long>();
  81. copier.TaskCompletionSource = taskCompletion;
  82. var result = copier.BeginCopy(StreamCopyCallback, copier);
  83. if (result.CompletedSynchronously)
  84. {
  85. StreamCopyCallback(result);
  86. }
  87. cancellationToken.Register(() => taskCompletion.TrySetCanceled());
  88. return taskCompletion.Task;
  89. }
  90. private static void StreamCopyCallback(IAsyncResult result)
  91. {
  92. var copier = (AsyncStreamCopier)result.AsyncState;
  93. var taskCompletion = copier.TaskCompletionSource;
  94. try
  95. {
  96. copier.EndCopy(result);
  97. taskCompletion.TrySetResult(copier._totalBytesWritten);
  98. }
  99. catch (Exception ex)
  100. {
  101. taskCompletion.TrySetException(ex);
  102. }
  103. }
  104. public void Dispose()
  105. {
  106. if (_asyncResult != null)
  107. _asyncResult.Dispose();
  108. if (_closeStreamsOnEnd)
  109. {
  110. if (_source != null)
  111. {
  112. _source.Dispose();
  113. _source = null;
  114. }
  115. if (_target != null)
  116. {
  117. _target.Dispose();
  118. _target = null;
  119. }
  120. }
  121. GC.SuppressFinalize(this);
  122. ThrowExceptionIfNeeded();
  123. }
  124. public IAsyncResult BeginCopy(AsyncCallback callback, object state)
  125. {
  126. // avoid concurrent start of the copy on separate threads
  127. if (Interlocked.CompareExchange(ref _asyncResult, new AsyncResult(callback, state), null) != null)
  128. throw new InvalidOperationException("A copy operation has already been started on this object.");
  129. // allocate buffers
  130. for (int i = 0; i < _bufferCount; i++)
  131. _buffers[i] = new byte[_bufferSize];
  132. // we pass false to BeginRead() to avoid completing the async result
  133. // immediately which would result in invoking the callback
  134. // when the method fails synchronously
  135. BeginRead(false);
  136. // throw exception synchronously if there is one
  137. ThrowExceptionIfNeeded();
  138. return _asyncResult;
  139. }
  140. public void EndCopy(IAsyncResult ar)
  141. {
  142. if (ar != _asyncResult)
  143. throw new InvalidOperationException("Invalid IAsyncResult object.");
  144. if (!_asyncResult.IsCompleted)
  145. _asyncResult.AsyncWaitHandle.WaitOne();
  146. if (_closeStreamsOnEnd)
  147. {
  148. _source.Close();
  149. _source = null;
  150. _target.Close();
  151. _target = null;
  152. }
  153. //_logger.Info("AsyncStreamCopier {0} bytes requested. {1} bytes transferred", _bytesToRead, _totalBytesWritten);
  154. ThrowExceptionIfNeeded();
  155. }
  156. /// <summary>
  157. /// Here we'll throw a pending exception if there is one,
  158. /// and remove it from our instance, so we know it has been consumed.
  159. /// </summary>
  160. private void ThrowExceptionIfNeeded()
  161. {
  162. if (_exception != null)
  163. {
  164. var exception = _exception;
  165. _exception = null;
  166. throw exception;
  167. }
  168. }
  169. private void BeginRead(bool completeOnError = true)
  170. {
  171. if (!_moreDataToRead)
  172. {
  173. return;
  174. }
  175. if (_asyncResult.IsCompleted)
  176. return;
  177. int bufferIndex = Interlocked.Increment(ref _nextReadBuffer) % _bufferCount;
  178. try
  179. {
  180. _source.BeginRead(_buffers[bufferIndex], 0, _bufferSize, EndRead, bufferIndex);
  181. }
  182. catch (Exception exception)
  183. {
  184. _exception = exception;
  185. if (completeOnError)
  186. _asyncResult.Complete(false);
  187. }
  188. }
  189. private void BeginWrite()
  190. {
  191. if (_asyncResult.IsCompleted)
  192. return;
  193. // this method can actually be called concurrently!!
  194. // indeed, let's say we call a BeginWrite, and the thread gets interrupted
  195. // just after making the IO request.
  196. // At that moment, the thread is still in the method. And then the IO request
  197. // ends (extremely fast io, or caching...), EndWrite gets called
  198. // on another thread, and calls BeginWrite again! There we have it!
  199. // That is the reason why an Interlocked is needed here.
  200. int bufferIndex = Interlocked.Increment(ref _nextWriteBuffer) % _bufferCount;
  201. try
  202. {
  203. int bytesToWrite;
  204. if (_bytesToRead > 0)
  205. {
  206. var bytesLeftToWrite = _bytesToRead - _totalBytesWritten;
  207. bytesToWrite = Convert.ToInt32(Math.Min(_sizes[bufferIndex], bytesLeftToWrite));
  208. }
  209. else
  210. {
  211. bytesToWrite = _sizes[bufferIndex];
  212. }
  213. _target.BeginWrite(_buffers[bufferIndex], IndividualReadOffset, bytesToWrite - IndividualReadOffset, EndWrite, null);
  214. _totalBytesWritten += bytesToWrite;
  215. }
  216. catch (Exception exception)
  217. {
  218. _exception = exception;
  219. _asyncResult.Complete(false);
  220. }
  221. }
  222. private void EndRead(IAsyncResult ar)
  223. {
  224. try
  225. {
  226. int read = _source.EndRead(ar);
  227. _moreDataToRead = read > 0;
  228. var bufferIndex = (int)ar.AsyncState;
  229. _sizes[bufferIndex] = read;
  230. }
  231. catch (Exception exception)
  232. {
  233. _exception = exception;
  234. _asyncResult.Complete(false);
  235. return;
  236. }
  237. if (_moreDataToRead && !_cancellationToken.IsCancellationRequested)
  238. {
  239. int usedBuffers = Interlocked.Increment(ref _buffersToWrite);
  240. // if we incremented from zero to one, then it means we just
  241. // added the single buffer to write, so a writer could not
  242. // be busy, and we have to schedule one.
  243. if (usedBuffers == 1)
  244. BeginWrite();
  245. // test if there is at least a free buffer, and schedule
  246. // a read, as we have read some data
  247. if (usedBuffers < _bufferCount)
  248. BeginRead();
  249. }
  250. else
  251. {
  252. // we did not add a buffer, because no data was read, and
  253. // there is no buffer left to write so this is the end...
  254. if (Thread.VolatileRead(ref _buffersToWrite) == 0)
  255. {
  256. _asyncResult.Complete(false);
  257. }
  258. }
  259. }
  260. private void EndWrite(IAsyncResult ar)
  261. {
  262. try
  263. {
  264. _target.EndWrite(ar);
  265. }
  266. catch (Exception exception)
  267. {
  268. _exception = exception;
  269. _asyncResult.Complete(false);
  270. return;
  271. }
  272. int buffersLeftToWrite = Interlocked.Decrement(ref _buffersToWrite);
  273. // no reader could be active if all buffers were full of data waiting to be written
  274. bool noReaderIsBusy = buffersLeftToWrite == _bufferCount - 1;
  275. // note that it is possible that both a reader and
  276. // a writer see the end of the copy and call Complete
  277. // on the _asyncResult object. That race condition is handled by
  278. // Complete that ensures it is only executed fully once.
  279. long bytesLeftToWrite;
  280. if (_bytesToRead > 0)
  281. {
  282. bytesLeftToWrite = _bytesToRead - _totalBytesWritten;
  283. }
  284. else
  285. {
  286. bytesLeftToWrite = 1;
  287. }
  288. if (!_moreDataToRead || bytesLeftToWrite <= 0 || _cancellationToken.IsCancellationRequested)
  289. {
  290. // at this point we know no reader can schedule a read or write
  291. if (Thread.VolatileRead(ref _buffersToWrite) == 0)
  292. {
  293. // nothing left to write, so it is the end
  294. _asyncResult.Complete(false);
  295. return;
  296. }
  297. }
  298. else
  299. // here, we know we have something left to read,
  300. // so schedule a read if no read is busy
  301. if (noReaderIsBusy)
  302. BeginRead();
  303. // also schedule a write if we are sure we did not write the last buffer
  304. // note that if buffersLeftToWrite is zero and a reader has put another
  305. // buffer to write between the time we decremented _buffersToWrite
  306. // and now, that reader will also schedule another write,
  307. // as it will increment _buffersToWrite from zero to one
  308. if (buffersLeftToWrite > 0)
  309. BeginWrite();
  310. }
  311. }
  312. internal class AsyncResult : IAsyncResult, IDisposable
  313. {
  314. // Fields set at construction which never change while
  315. // operation is pending
  316. private readonly AsyncCallback _asyncCallback;
  317. private readonly object _asyncState;
  318. // Fields set at construction which do change after
  319. // operation completes
  320. private const int StatePending = 0;
  321. private const int StateCompletedSynchronously = 1;
  322. private const int StateCompletedAsynchronously = 2;
  323. private int _completedState = StatePending;
  324. // Field that may or may not get set depending on usage
  325. private ManualResetEvent _waitHandle;
  326. internal AsyncResult(
  327. AsyncCallback asyncCallback,
  328. object state)
  329. {
  330. _asyncCallback = asyncCallback;
  331. _asyncState = state;
  332. }
  333. internal bool Complete(bool completedSynchronously)
  334. {
  335. bool result = false;
  336. // The _completedState field MUST be set prior calling the callback
  337. int prevState = Interlocked.CompareExchange(ref _completedState,
  338. completedSynchronously ? StateCompletedSynchronously :
  339. StateCompletedAsynchronously, StatePending);
  340. if (prevState == StatePending)
  341. {
  342. // If the event exists, set it
  343. if (_waitHandle != null)
  344. _waitHandle.Set();
  345. if (_asyncCallback != null)
  346. _asyncCallback(this);
  347. result = true;
  348. }
  349. return result;
  350. }
  351. #region Implementation of IAsyncResult
  352. public Object AsyncState { get { return _asyncState; } }
  353. public bool CompletedSynchronously
  354. {
  355. get
  356. {
  357. return Thread.VolatileRead(ref _completedState) ==
  358. StateCompletedSynchronously;
  359. }
  360. }
  361. public WaitHandle AsyncWaitHandle
  362. {
  363. get
  364. {
  365. if (_waitHandle == null)
  366. {
  367. bool done = IsCompleted;
  368. var mre = new ManualResetEvent(done);
  369. if (Interlocked.CompareExchange(ref _waitHandle,
  370. mre, null) != null)
  371. {
  372. // Another thread created this object's event; dispose
  373. // the event we just created
  374. mre.Close();
  375. }
  376. else
  377. {
  378. if (!done && IsCompleted)
  379. {
  380. // If the operation wasn't done when we created
  381. // the event but now it is done, set the event
  382. _waitHandle.Set();
  383. }
  384. }
  385. }
  386. return _waitHandle;
  387. }
  388. }
  389. public bool IsCompleted
  390. {
  391. get
  392. {
  393. return Thread.VolatileRead(ref _completedState) !=
  394. StatePending;
  395. }
  396. }
  397. #endregion
  398. public void Dispose()
  399. {
  400. if (_waitHandle != null)
  401. {
  402. _waitHandle.Dispose();
  403. _waitHandle = null;
  404. }
  405. }
  406. }
  407. }