Browse Source

update async stream writing

Luke Pulverenti 9 years ago
parent
commit
2e91d69d20

+ 7 - 4
MediaBrowser.Api/Playback/Progressive/BaseProgressiveStreamingService.cs

@@ -13,6 +13,7 @@ using ServiceStack.Web;
 using System;
 using System.Collections.Generic;
 using System.Globalization;
+using System.IO;
 using System.Threading;
 using System.Threading.Tasks;
 using CommonIO;
@@ -336,17 +337,19 @@ namespace MediaBrowser.Api.Playback.Progressive
                     state.Dispose();
                 }
 
-                var result = new ProgressiveStreamWriter(outputPath, Logger, FileSystem, job);
+                var outputHeaders = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
 
-                result.Options["Content-Type"] = contentType;
+                outputHeaders["Content-Type"] = contentType;
 
                 // Add the response headers to the result object
                 foreach (var item in responseHeaders)
                 {
-                    result.Options[item.Key] = item.Value;
+                    outputHeaders[item.Key] = item.Value;
                 }
 
-                return result;
+                Func<Stream,Task> streamWriter = stream => new ProgressiveFileCopier(FileSystem, job, Logger).StreamFile(outputPath, stream);
+
+                return ResultFactory.GetAsyncStreamWriter(streamWriter, outputHeaders);
             }
             finally
             {

+ 5 - 7
MediaBrowser.Api/Playback/Progressive/ProgressiveStreamWriter.cs

@@ -48,21 +48,19 @@ namespace MediaBrowser.Api.Playback.Progressive
         /// <param name="responseStream">The response stream.</param>
         public void WriteTo(Stream responseStream)
         {
-            WriteToInternal(responseStream);
+            var task = WriteToAsync(responseStream);
+            Task.WaitAll(task);
         }
 
         /// <summary>
-        /// Writes to async.
+        /// Writes to.
         /// </summary>
         /// <param name="responseStream">The response stream.</param>
-        /// <returns>Task.</returns>
-        private void WriteToInternal(Stream responseStream)
+        public async Task WriteToAsync(Stream responseStream)
         {
             try
             {
-                var task = new ProgressiveFileCopier(_fileSystem, _job, Logger).StreamFile(Path, responseStream);
-
-                Task.WaitAll(task);
+                await new ProgressiveFileCopier(_fileSystem, _job, Logger).StreamFile(Path, responseStream).ConfigureAwait(false);
             }
             catch (IOException)
             {

+ 2 - 0
MediaBrowser.Controller/Net/IHttpResultFactory.cs

@@ -28,6 +28,8 @@ namespace MediaBrowser.Controller.Net
         /// <returns>System.Object.</returns>
         object GetResult(object content, string contentType, IDictionary<string,string> responseHeaders = null);
 
+        object GetAsyncStreamWriter(Func<Stream,Task> streamWriter, IDictionary<string, string> responseHeaders = null);
+
         /// <summary>
         /// Gets the optimized result.
         /// </summary>

+ 56 - 0
MediaBrowser.Server.Implementations/HttpServer/AsyncStreamWriterFunc.cs

@@ -0,0 +1,56 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Threading.Tasks;
+using ServiceStack;
+using ServiceStack.Web;
+
+namespace MediaBrowser.Server.Implementations.HttpServer
+{
+    public class AsyncStreamWriterFunc : IStreamWriter, IAsyncStreamWriter, IHasOptions
+    {
+        /// <summary>
+        /// Gets or sets the source stream.
+        /// </summary>
+        /// <value>The source stream.</value>
+        private Func<Stream, Task> Writer { get; set; }
+
+        /// <summary>
+        /// Gets the options.
+        /// </summary>
+        /// <value>The options.</value>
+        public IDictionary<string, string> Options { get; }
+
+        public Action OnComplete { get; set; }
+        public Action OnError { get; set; }
+
+        /// <summary>
+        /// Initializes a new instance of the <see cref="StreamWriter" /> class.
+        /// </summary>
+        public AsyncStreamWriterFunc(Func<Stream, Task> writer, IDictionary<string, string> headers)
+        {
+            Writer = writer;
+
+            if (headers == null)
+            {
+                headers = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
+            }
+            Options = headers;
+        }
+
+        /// <summary>
+        /// Writes to.
+        /// </summary>
+        /// <param name="responseStream">The response stream.</param>
+        public void WriteTo(Stream responseStream)
+        {
+            var task = Writer(responseStream);
+            Task.WaitAll(task);
+        }
+
+        public async Task WriteToAsync(Stream responseStream)
+        {
+            await Writer(responseStream).ConfigureAwait(false);
+        }
+    }
+}

+ 5 - 0
MediaBrowser.Server.Implementations/HttpServer/HttpResultFactory.cs

@@ -699,5 +699,10 @@ namespace MediaBrowser.Server.Implementations.HttpServer
 
             throw error;
         }
+
+        public object GetAsyncStreamWriter(Func<Stream, Task> streamWriter, IDictionary<string, string> responseHeaders = null)
+        {
+            return new AsyncStreamWriterFunc(streamWriter, responseHeaders);
+        }
     }
 }

+ 0 - 285
MediaBrowser.Server.Implementations/HttpServer/NetListener/HttpListenerServer.cs

@@ -1,285 +0,0 @@
-using MediaBrowser.Controller.Net;
-using MediaBrowser.Model.Logging;
-using ServiceStack;
-using ServiceStack.Host.HttpListener;
-using ServiceStack.Web;
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Net;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace MediaBrowser.Server.Implementations.HttpServer.NetListener
-{
-    public class HttpListenerServer : IHttpListener
-    {
-        private readonly ILogger _logger;
-        private HttpListener _listener;
-        private readonly ManualResetEventSlim _listenForNextRequest = new ManualResetEventSlim(false);
-
-        public Action<Exception, IRequest> ErrorHandler { get; set; }
-        public Action<WebSocketConnectEventArgs> WebSocketHandler { get; set; }
-        public Func<IHttpRequest, Uri, Task> RequestHandler { get; set; }
-
-        private readonly Action<string> _endpointListener;
-
-        public HttpListenerServer(ILogger logger, Action<string> endpointListener)
-        {
-            _logger = logger;
-            _endpointListener = endpointListener;
-        }
-
-        private List<string> UrlPrefixes { get; set; }
-
-        public void Start(IEnumerable<string> urlPrefixes)
-        {
-            UrlPrefixes = urlPrefixes.ToList();
-
-            if (_listener == null)
-                _listener = new HttpListener();
-
-            //HostContext.Config.HandlerFactoryPath = ListenerRequest.GetHandlerPathIfAny(UrlPrefixes.First());
-
-            foreach (var prefix in UrlPrefixes)
-            {
-                _logger.Info("Adding HttpListener prefix " + prefix);
-                _listener.Prefixes.Add(prefix);
-            }
-
-            _listener.Start();
-
-            Task.Factory.StartNew(Listen, TaskCreationOptions.LongRunning);
-        }
-
-        private bool IsListening
-        {
-            get { return _listener != null && _listener.IsListening; }
-        }
-
-        // Loop here to begin processing of new requests.
-        private void Listen()
-        {
-            while (IsListening)
-            {
-                if (_listener == null) return;
-                _listenForNextRequest.Reset();
-
-                try
-                {
-                    _listener.BeginGetContext(ListenerCallback, _listener);
-                    _listenForNextRequest.Wait();
-                }
-                catch (Exception ex)
-                {
-                    _logger.Error("Listen()", ex);
-                    return;
-                }
-                if (_listener == null) return;
-            }
-        }
-
-        // Handle the processing of a request in here.
-        private void ListenerCallback(IAsyncResult asyncResult)
-        {
-            _listenForNextRequest.Set();
-
-            var listener = asyncResult.AsyncState as HttpListener;
-            HttpListenerContext context;
-
-            if (listener == null) return;
-            var isListening = listener.IsListening;
-
-            try
-            {
-                if (!isListening)
-                {
-                    _logger.Debug("Ignoring ListenerCallback() as HttpListener is no longer listening"); return;
-                }
-                // The EndGetContext() method, as with all Begin/End asynchronous methods in the .NET Framework,
-                // blocks until there is a request to be processed or some type of data is available.
-                context = listener.EndGetContext(asyncResult);
-            }
-            catch (Exception ex)
-            {
-                // You will get an exception when httpListener.Stop() is called
-                // because there will be a thread stopped waiting on the .EndGetContext()
-                // method, and again, that is just the way most Begin/End asynchronous
-                // methods of the .NET Framework work.
-                var errMsg = ex + ": " + IsListening;
-                _logger.Warn(errMsg);
-                return;
-            }
-
-            Task.Factory.StartNew(() => InitTask(context));
-        }
-
-        private void InitTask(HttpListenerContext context)
-        {
-            try
-            {
-                var task = this.ProcessRequestAsync(context);
-                task.ContinueWith(x => HandleError(x.Exception, context), TaskContinuationOptions.OnlyOnFaulted | TaskContinuationOptions.AttachedToParent);
-
-                if (task.Status == TaskStatus.Created)
-                {
-                    task.RunSynchronously();
-                }
-            }
-            catch (Exception ex)
-            {
-                HandleError(ex, context);
-            }
-        }
-
-        private Task ProcessRequestAsync(HttpListenerContext context)
-        {
-            var request = context.Request;
-
-            LogHttpRequest(request);
-
-            if (request.IsWebSocketRequest)
-            {
-                return ProcessWebSocketRequest(context);
-            }
-
-            if (string.IsNullOrEmpty(context.Request.RawUrl))
-                return ((object)null).AsTaskResult();
-
-            var operationName = context.Request.GetOperationName();
-
-            var httpReq = GetRequest(context, operationName);
-
-            return RequestHandler(httpReq, request.Url);
-        }
-
-        /// <summary>
-        /// Processes the web socket request.
-        /// </summary>
-        /// <param name="ctx">The CTX.</param>
-        /// <returns>Task.</returns>
-        private async Task ProcessWebSocketRequest(HttpListenerContext ctx)
-        {
-#if !__MonoCS__
-            try
-            {
-                var webSocketContext = await ctx.AcceptWebSocketAsync(null).ConfigureAwait(false);
-
-                if (WebSocketHandler != null)
-                {
-                    WebSocketHandler(new WebSocketConnectEventArgs
-                    {
-                        WebSocket = new NativeWebSocket(webSocketContext.WebSocket, _logger),
-                        Endpoint = ctx.Request.RemoteEndPoint.ToString()
-                    });
-                }
-            }
-            catch (Exception ex)
-            {
-                _logger.ErrorException("AcceptWebSocketAsync error", ex);
-                ctx.Response.StatusCode = 500;
-                ctx.Response.Close();
-            }
-#endif
-        }
-
-        private void HandleError(Exception ex, HttpListenerContext context)
-        {
-            var operationName = context.Request.GetOperationName();
-            var httpReq = GetRequest(context, operationName);
-
-            if (ErrorHandler != null)
-            {
-                ErrorHandler(ex, httpReq);
-            }
-        }
-
-        private static ListenerRequest GetRequest(HttpListenerContext httpContext, string operationName)
-        {
-            var req = new ListenerRequest(httpContext, operationName, RequestAttributes.None);
-            req.RequestAttributes = req.GetAttributes();
-
-            return req;
-        }
-
-        /// <summary>
-        /// Logs the HTTP request.
-        /// </summary>
-        /// <param name="request">The request.</param>
-        private void LogHttpRequest(HttpListenerRequest request)
-        {
-            var endpoint = request.LocalEndPoint;
-
-            if (endpoint != null)
-            {
-                var address = endpoint.ToString();
-
-                _endpointListener(address);
-            }
-
-            LogRequest(_logger, request);
-        }
-
-        /// <summary>
-        /// Logs the request.
-        /// </summary>
-        /// <param name="logger">The logger.</param>
-        /// <param name="request">The request.</param>
-        private static void LogRequest(ILogger logger, HttpListenerRequest request)
-        {
-            var log = new StringBuilder();
-
-            var logHeaders = true;
-
-            if (logHeaders)
-            {
-                var headers = string.Join(",", request.Headers.AllKeys.Where(i => !string.Equals(i, "cookie", StringComparison.OrdinalIgnoreCase) && !string.Equals(i, "Referer", StringComparison.OrdinalIgnoreCase)).Select(k => k + "=" + request.Headers[k]));
-
-                log.AppendLine("Ip: " + request.RemoteEndPoint + ". Headers: " + headers);
-            }
-
-            var type = request.IsWebSocketRequest ? "Web Socket" : "HTTP " + request.HttpMethod;
-
-            logger.LogMultiline(type + " " + request.Url, LogSeverity.Debug, log);
-        }
-
-        public void Stop()
-        {
-            if (_listener != null)
-            {
-                foreach (var prefix in UrlPrefixes)
-                {
-                    _listener.Prefixes.Remove(prefix);
-                }
-
-                _listener.Close();
-            }
-        }
-
-        public void Dispose()
-        {
-            Dispose(true);
-        }
-
-        private bool _disposed;
-        private readonly object _disposeLock = new object();
-        protected virtual void Dispose(bool disposing)
-        {
-            if (_disposed) return;
-
-            lock (_disposeLock)
-            {
-                if (_disposed) return;
-
-                if (disposing)
-                {
-                    Stop();
-                }
-
-                //release unmanaged resources here...
-                _disposed = true;
-            }
-        }
-    }
-}

+ 63 - 11
MediaBrowser.Server.Implementations/HttpServer/RangeRequestWriter.cs

@@ -5,10 +5,12 @@ using System.Collections.Generic;
 using System.Globalization;
 using System.IO;
 using System.Net;
+using System.Threading.Tasks;
+using ServiceStack;
 
 namespace MediaBrowser.Server.Implementations.HttpServer
 {
-    public class RangeRequestWriter : IStreamWriter, IHttpResult
+    public class RangeRequestWriter : IStreamWriter, IAsyncStreamWriter, IHttpResult
     {
         /// <summary>
         /// Gets or sets the source stream.
@@ -168,16 +170,6 @@ namespace MediaBrowser.Server.Implementations.HttpServer
         /// </summary>
         /// <param name="responseStream">The response stream.</param>
         public void WriteTo(Stream responseStream)
-        {
-            WriteToInternal(responseStream);
-        }
-
-        /// <summary>
-        /// Writes to async.
-        /// </summary>
-        /// <param name="responseStream">The response stream.</param>
-        /// <returns>Task.</returns>
-        private void WriteToInternal(Stream responseStream)
         {
             try
             {
@@ -237,6 +229,66 @@ namespace MediaBrowser.Server.Implementations.HttpServer
             }
         }
 
+        public async Task WriteToAsync(Stream responseStream)
+        {
+            try
+            {
+                // Headers only
+                if (IsHeadRequest)
+                {
+                    return;
+                }
+
+                using (var source = SourceStream)
+                {
+                    // If the requested range is "0-", we can optimize by just doing a stream copy
+                    if (RangeEnd >= TotalContentLength - 1)
+                    {
+                        await source.CopyToAsync(responseStream, BufferSize).ConfigureAwait(false);
+                    }
+                    else
+                    {
+                        await CopyToInternalAsync(source, responseStream, RangeLength).ConfigureAwait(false);
+                    }
+                }
+            }
+            catch (IOException ex)
+            {
+                throw;
+            }
+            catch (Exception ex)
+            {
+                _logger.ErrorException("Error in range request writer", ex);
+                throw;
+            }
+            finally
+            {
+                if (OnComplete != null)
+                {
+                    OnComplete();
+                }
+            }
+        }
+
+        private async Task CopyToInternalAsync(Stream source, Stream destination, long copyLength)
+        {
+            var array = new byte[BufferSize];
+            int count;
+            while ((count = await source.ReadAsync(array, 0, array.Length).ConfigureAwait(false)) != 0)
+            {
+                var bytesToCopy = Math.Min(count, copyLength);
+
+                await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToCopy)).ConfigureAwait(false);
+
+                copyLength -= bytesToCopy;
+
+                if (copyLength <= 0)
+                {
+                    break;
+                }
+            }
+        }
+
         public string ContentType { get; set; }
 
         public IRequest RequestContext { get; set; }

+ 1 - 1
MediaBrowser.Server.Implementations/HttpServer/StreamWriter.cs

@@ -12,7 +12,7 @@ namespace MediaBrowser.Server.Implementations.HttpServer
     /// <summary>
     /// Class StreamWriter
     /// </summary>
-    public class StreamWriter : IStreamWriter, /*IAsyncStreamWriter,*/ IHasOptions
+    public class StreamWriter : IStreamWriter, IAsyncStreamWriter, IHasOptions
     {
         private ILogger Logger { get; set; }
 

+ 2 - 3
MediaBrowser.Server.Implementations/MediaBrowser.Server.Implementations.csproj

@@ -156,6 +156,7 @@
     <Compile Include="EntryPoints\ServerEventNotifier.cs" />
     <Compile Include="EntryPoints\UserDataChangeNotifier.cs" />
     <Compile Include="FileOrganization\OrganizerScheduledTask.cs" />
+    <Compile Include="HttpServer\AsyncStreamWriterFunc.cs" />
     <Compile Include="HttpServer\IHttpListener.cs" />
     <Compile Include="HttpServer\Security\AuthorizationContext.cs" />
     <Compile Include="HttpServer\ContainerAdapter.cs" />
@@ -757,9 +758,7 @@
     <EmbeddedResource Include="Localization\iso6392.txt" />
     <EmbeddedResource Include="Localization\Ratings\be.txt" />
   </ItemGroup>
-  <ItemGroup>
-    <Folder Include="HttpServer\NetListener\" />
-  </ItemGroup>
+  <ItemGroup />
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
        Other similar extension points exist, see Microsoft.Common.targets.