test_downloader_external.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. #!/usr/bin/env python
  2. # coding: utf-8
  3. from __future__ import unicode_literals
  4. # Allow direct execution
  5. import os
  6. import re
  7. import sys
  8. import subprocess
  9. import unittest
  10. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  11. from test.helper import (
  12. FakeLogger,
  13. http_server_port,
  14. try_rm,
  15. )
  16. from youtube_dl import YoutubeDL
  17. from youtube_dl.compat import compat_http_server
  18. from youtube_dl.utils import encodeFilename
  19. from youtube_dl.downloader.external import Aria2pFD
  20. import threading
  21. TEST_DIR = os.path.dirname(os.path.abspath(__file__))
  22. TEST_SIZE = 10 * 1024
  23. class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
  24. def log_message(self, format, *args):
  25. pass
  26. def send_content_range(self, total=None):
  27. range_header = self.headers.get('Range')
  28. start = end = None
  29. if range_header:
  30. mobj = re.match(r'bytes=(\d+)-(\d+)', range_header)
  31. if mobj:
  32. start, end = (int(mobj.group(i)) for i in (1, 2))
  33. valid_range = start is not None and end is not None
  34. if valid_range:
  35. content_range = 'bytes %d-%d' % (start, end)
  36. if total:
  37. content_range += '/%d' % total
  38. self.send_header('Content-Range', content_range)
  39. return (end - start + 1) if valid_range else total
  40. def serve(self, range=True, content_length=True):
  41. self.send_response(200)
  42. self.send_header('Content-Type', 'video/mp4')
  43. size = TEST_SIZE
  44. if range:
  45. size = self.send_content_range(TEST_SIZE)
  46. if content_length:
  47. self.send_header('Content-Length', size)
  48. self.end_headers()
  49. self.wfile.write(b'#' * size)
  50. def do_GET(self):
  51. if self.path == '/regular':
  52. self.serve()
  53. elif self.path == '/no-content-length':
  54. self.serve(content_length=False)
  55. elif self.path == '/no-range':
  56. self.serve(range=False)
  57. elif self.path == '/no-range-no-content-length':
  58. self.serve(range=False, content_length=False)
  59. else:
  60. assert False, 'unrecognised server path'
  61. @unittest.skipUnless(Aria2pFD.available(), 'aria2p module not found')
  62. class TestAria2pFD(unittest.TestCase):
  63. def setUp(self):
  64. self.httpd = compat_http_server.HTTPServer(
  65. ('127.0.0.1', 0), HTTPTestRequestHandler)
  66. self.port = http_server_port(self.httpd)
  67. self.server_thread = threading.Thread(target=self.httpd.serve_forever)
  68. self.server_thread.daemon = True
  69. self.server_thread.start()
  70. def download(self, params, ep):
  71. with subprocess.Popen(
  72. ['aria2c', '--enable-rpc'],
  73. stdout=subprocess.DEVNULL,
  74. stderr=subprocess.DEVNULL
  75. ) as process:
  76. if not process.poll():
  77. filename = 'testfile.mp4'
  78. params['logger'] = FakeLogger()
  79. params['outtmpl'] = filename
  80. ydl = YoutubeDL(params)
  81. try_rm(encodeFilename(filename))
  82. self.assertEqual(ydl.download(['http://127.0.0.1:%d/%s' % (self.port, ep)]), 0)
  83. self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE)
  84. try_rm(encodeFilename(filename))
  85. process.kill()
  86. def download_all(self, params):
  87. for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):
  88. self.download(params, ep)
  89. def test_regular(self):
  90. self.download_all({'external_downloader': 'aria2p'})
  91. def test_chunked(self):
  92. self.download_all({
  93. 'external_downloader': 'aria2p',
  94. 'http_chunk_size': 1000,
  95. })
  96. if __name__ == '__main__':
  97. unittest.main()