| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 | #!/usr/bin/env python# coding: utf-8from __future__ import unicode_literals# Allow direct executionimport osimport reimport sysimport subprocessimport unittestsys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))from test.helper import (    FakeLogger,    FakeYDL,    http_server_port,    try_rm,)from youtube_dl import YoutubeDLfrom youtube_dl.compat import (    compat_contextlib_suppress,    compat_http_cookiejar_Cookie,    compat_http_server,    compat_kwargs,)from youtube_dl.utils import (    encodeFilename,    join_nonempty,)from youtube_dl.downloader.external import (    Aria2cFD,    Aria2pFD,    AxelFD,    CurlFD,    FFmpegFD,    HttpieFD,    WgetFD,)from youtube_dl.postprocessor import (    FFmpegPostProcessor,)import threadingTEST_SIZE = 10 * 1024TEST_COOKIE = {    'version': 0,    'name': 'test',    'value': 'ytdlp',    'port': None,    'port_specified': False,    'domain': '.example.com',    'domain_specified': True,    'domain_initial_dot': False,    'path': '/',    'path_specified': True,    'secure': False,    'expires': None,    'discard': False,    'comment': None,    'comment_url': None,    'rest': {},}TEST_COOKIE_VALUE = join_nonempty('name', 'value', delim='=', from_dict=TEST_COOKIE)TEST_INFO = {'url': 'http://www.example.com/'}def cookiejar_Cookie(**cookie_args):    return compat_http_cookiejar_Cookie(**compat_kwargs(cookie_args))def ifExternalFDAvailable(externalFD):    return unittest.skipUnless(externalFD.available(),                               externalFD.get_basename() + ' not found')class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):    def log_message(self, format, *args):        pass    def send_content_range(self, total=None):        range_header = self.headers.get('Range')        start = end = None        if range_header:            mobj = re.match(r'bytes=(\d+)-(\d+)', range_header)            if mobj:                start, end = (int(mobj.group(i)) for i in (1, 2))        valid_range = start is not None and end is not None        if valid_range:            content_range = 'bytes %d-%d' % (start, end)            if total:                content_range += '/%d' % total            self.send_header('Content-Range', content_range)        return (end - start + 1) if valid_range else total    def serve(self, range=True, content_length=True):        self.send_response(200)        self.send_header('Content-Type', 'video/mp4')        size = TEST_SIZE        if range:            size = self.send_content_range(TEST_SIZE)        if content_length:            self.send_header('Content-Length', size)        self.end_headers()        self.wfile.write(b'#' * size)    def do_GET(self):        if self.path == '/regular':            self.serve()        elif self.path == '/no-content-length':            self.serve(content_length=False)        elif self.path == '/no-range':            self.serve(range=False)        elif self.path == '/no-range-no-content-length':            self.serve(range=False, content_length=False)        else:            assert False, 'unrecognised server path'@ifExternalFDAvailable(Aria2pFD)class TestAria2pFD(unittest.TestCase):    def setUp(self):        self.httpd = compat_http_server.HTTPServer(            ('127.0.0.1', 0), HTTPTestRequestHandler)        self.port = http_server_port(self.httpd)        self.server_thread = threading.Thread(target=self.httpd.serve_forever)        self.server_thread.daemon = True        self.server_thread.start()    def download(self, params, ep):        with subprocess.Popen(            ['aria2c', '--enable-rpc'],            stdout=subprocess.DEVNULL,            stderr=subprocess.DEVNULL        ) as process:            if not process.poll():                filename = 'testfile.mp4'                params['logger'] = FakeLogger()                params['outtmpl'] = filename                ydl = YoutubeDL(params)                try_rm(encodeFilename(filename))                self.assertEqual(ydl.download(['http://127.0.0.1:%d/%s' % (self.port, ep)]), 0)                self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE)                try_rm(encodeFilename(filename))            process.kill()    def download_all(self, params):        for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):            self.download(params, ep)    def test_regular(self):        self.download_all({'external_downloader': 'aria2p'})    def test_chunked(self):        self.download_all({            'external_downloader': 'aria2p',            'http_chunk_size': 1000,        })@ifExternalFDAvailable(HttpieFD)class TestHttpieFD(unittest.TestCase):    def test_make_cmd(self):        with FakeYDL() as ydl:            downloader = HttpieFD(ydl, {})            self.assertEqual(                downloader._make_cmd('test', TEST_INFO),                ['http', '--download', '--output', 'test', 'http://www.example.com/'])            # Test cookie header is added            ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))            self.assertEqual(                downloader._make_cmd('test', TEST_INFO),                ['http', '--download', '--output', 'test',                 'http://www.example.com/', 'Cookie:' + TEST_COOKIE_VALUE])@ifExternalFDAvailable(AxelFD)class TestAxelFD(unittest.TestCase):    def test_make_cmd(self):        with FakeYDL() as ydl:            downloader = AxelFD(ydl, {})            self.assertEqual(                downloader._make_cmd('test', TEST_INFO),                ['axel', '-o', 'test', '--', 'http://www.example.com/'])            # Test cookie header is added            ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))            self.assertEqual(                downloader._make_cmd('test', TEST_INFO),                ['axel', '-o', 'test', '-H', 'Cookie: ' + TEST_COOKIE_VALUE,                 '--max-redirect=0', '--', 'http://www.example.com/'])@ifExternalFDAvailable(WgetFD)class TestWgetFD(unittest.TestCase):    def test_make_cmd(self):        with FakeYDL() as ydl:            downloader = WgetFD(ydl, {})            self.assertNotIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))            # Test cookiejar tempfile arg is added            ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))            self.assertIn('--load-cookies', downloader._make_cmd('test', TEST_INFO))@ifExternalFDAvailable(CurlFD)class TestCurlFD(unittest.TestCase):    def test_make_cmd(self):        with FakeYDL() as ydl:            downloader = CurlFD(ydl, {})            self.assertNotIn('--cookie', downloader._make_cmd('test', TEST_INFO))            # Test cookie header is added            ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))            self.assertIn('--cookie', downloader._make_cmd('test', TEST_INFO))            self.assertIn(TEST_COOKIE_VALUE, downloader._make_cmd('test', TEST_INFO))@ifExternalFDAvailable(Aria2cFD)class TestAria2cFD(unittest.TestCase):    def test_make_cmd(self):        with FakeYDL() as ydl:            downloader = Aria2cFD(ydl, {})            downloader._make_cmd('test', TEST_INFO)            self.assertFalse(hasattr(downloader, '_cookies_tempfile'))            # Test cookiejar tempfile arg is added            ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))            cmd = downloader._make_cmd('test', TEST_INFO)            self.assertIn('--load-cookies=%s' % downloader._cookies_tempfile, cmd)# Handle delegated availabilitydef ifFFmpegFDAvailable(externalFD):    # raise SkipTest, or set False!    avail = ifExternalFDAvailable(externalFD) and False    with compat_contextlib_suppress(Exception):        avail = FFmpegPostProcessor(downloader=None).available    return unittest.skipUnless(        avail, externalFD.get_basename() + ' not found')@ifFFmpegFDAvailable(FFmpegFD)class TestFFmpegFD(unittest.TestCase):    _args = []    def _test_cmd(self, args):        self._args = args    def test_make_cmd(self):        with FakeYDL() as ydl:            downloader = FFmpegFD(ydl, {})            downloader._debug_cmd = self._test_cmd            info_dict = TEST_INFO.copy()            info_dict['ext'] = 'mp4'            downloader._call_downloader('test', info_dict)            self.assertEqual(self._args, [                'ffmpeg', '-y', '-i', 'http://www.example.com/',                '-c', 'copy', '-f', 'mp4', 'file:test'])            # Test cookies arg is added            ydl.cookiejar.set_cookie(cookiejar_Cookie(**TEST_COOKIE))            downloader._call_downloader('test', info_dict)            self.assertEqual(self._args, [                'ffmpeg', '-y', '-cookies', TEST_COOKIE_VALUE + '; path=/; domain=.example.com;\r\n',                '-i', 'http://www.example.com/', '-c', 'copy', '-f', 'mp4', 'file:test'])if __name__ == '__main__':    unittest.main()
 |