| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108 | #!/usr/bin/env python# coding: utf-8from __future__ import unicode_literals# Allow direct executionimport osimport reimport sysimport unittestsys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))from test.helper import (    FakeLogger,    http_server_port,    try_rm,)from youtube_dl import YoutubeDLfrom youtube_dl.compat import compat_http_serverfrom youtube_dl.downloader.http import HttpFDfrom youtube_dl.utils import encodeFilenameimport threadingTEST_DIR = os.path.dirname(os.path.abspath(__file__))TEST_SIZE = 10 * 1024class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):    def log_message(self, format, *args):        pass    def send_content_range(self, total=None):        range_header = self.headers.get('Range')        start = end = None        if range_header:            mobj = re.search(r'^bytes=(\d+)-(\d+)', range_header)            if mobj:                start = int(mobj.group(1))                end = int(mobj.group(2))        valid_range = start is not None and end is not None        if valid_range:            content_range = 'bytes %d-%d' % (start, end)            if total:                content_range += '/%d' % total            self.send_header('Content-Range', content_range)        return (end - start + 1) if valid_range else total    def serve(self, range=True, content_length=True):        self.send_response(200)        self.send_header('Content-Type', 'video/mp4')        size = TEST_SIZE        if range:            size = self.send_content_range(TEST_SIZE)        if content_length:            self.send_header('Content-Length', size)        self.end_headers()        self.wfile.write(b'#' * size)    def do_GET(self):        if self.path == '/regular':            self.serve()        elif self.path == '/no-content-length':            self.serve(content_length=False)        elif self.path == '/no-range':            self.serve(range=False)        elif self.path == '/no-range-no-content-length':            self.serve(range=False, content_length=False)        else:            assert Falseclass TestHttpFD(unittest.TestCase):    def setUp(self):        self.httpd = compat_http_server.HTTPServer(            ('127.0.0.1', 0), HTTPTestRequestHandler)        self.port = http_server_port(self.httpd)        self.server_thread = threading.Thread(target=self.httpd.serve_forever)        self.server_thread.daemon = True        self.server_thread.start()    def download(self, params, ep):        params['logger'] = FakeLogger()        ydl = YoutubeDL(params)        downloader = HttpFD(ydl, params)        filename = 'testfile.mp4'        try_rm(encodeFilename(filename))        self.assertTrue(downloader.real_download(filename, {            'url': 'http://127.0.0.1:%d/%s' % (self.port, ep),        }))        self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE, ep)        try_rm(encodeFilename(filename))    def download_all(self, params):        for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):            self.download(params, ep)    def test_regular(self):        self.download_all({})    def test_chunked(self):        self.download_all({            'http_chunk_size': 1000,        })if __name__ == '__main__':    unittest.main()
 |