test_downloader_http.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. #!/usr/bin/env python
  2. # coding: utf-8
  3. from __future__ import unicode_literals
  4. # Allow direct execution
  5. import os
  6. import re
  7. import sys
  8. import unittest
  9. sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  10. from youtube_dl import YoutubeDL
  11. from youtube_dl.compat import compat_http_server
  12. from youtube_dl.downloader.http import HttpFD
  13. from youtube_dl.utils import encodeFilename
  14. import ssl
  15. import threading
  16. TEST_DIR = os.path.dirname(os.path.abspath(__file__))
  17. def http_server_port(httpd):
  18. if os.name == 'java' and isinstance(httpd.socket, ssl.SSLSocket):
  19. # In Jython SSLSocket is not a subclass of socket.socket
  20. sock = httpd.socket.sock
  21. else:
  22. sock = httpd.socket
  23. return sock.getsockname()[1]
  24. TEST_SIZE = 10 * 1024
  25. class HTTPTestRequestHandler(compat_http_server.BaseHTTPRequestHandler):
  26. def log_message(self, format, *args):
  27. pass
  28. def send_content_range(self, total=None):
  29. range_header = self.headers.get('Range')
  30. start = end = None
  31. if range_header:
  32. mobj = re.search(r'^bytes=(\d+)-(\d+)', range_header)
  33. if mobj:
  34. start = int(mobj.group(1))
  35. end = int(mobj.group(2))
  36. valid_range = start is not None and end is not None
  37. if valid_range:
  38. content_range = 'bytes %d-%d' % (start, end)
  39. if total:
  40. content_range += '/%d' % total
  41. self.send_header('Content-Range', content_range)
  42. return (end - start + 1) if valid_range else total
  43. def serve(self, range=True, content_length=True):
  44. self.send_response(200)
  45. self.send_header('Content-Type', 'video/mp4')
  46. size = TEST_SIZE
  47. if range:
  48. size = self.send_content_range(TEST_SIZE)
  49. if content_length:
  50. self.send_header('Content-Length', size)
  51. self.end_headers()
  52. self.wfile.write(b'#' * size)
  53. def do_GET(self):
  54. if self.path == '/regular':
  55. self.serve()
  56. elif self.path == '/no-content-length':
  57. self.serve(content_length=False)
  58. elif self.path == '/no-range':
  59. self.serve(range=False)
  60. elif self.path == '/no-range-no-content-length':
  61. self.serve(range=False, content_length=False)
  62. else:
  63. assert False
  64. class FakeLogger(object):
  65. def debug(self, msg):
  66. pass
  67. def warning(self, msg):
  68. pass
  69. def error(self, msg):
  70. pass
  71. class TestHttpFD(unittest.TestCase):
  72. def setUp(self):
  73. self.httpd = compat_http_server.HTTPServer(
  74. ('127.0.0.1', 0), HTTPTestRequestHandler)
  75. self.port = http_server_port(self.httpd)
  76. self.server_thread = threading.Thread(target=self.httpd.serve_forever)
  77. self.server_thread.daemon = True
  78. self.server_thread.start()
  79. def download(self, params, ep):
  80. params['logger'] = FakeLogger()
  81. ydl = YoutubeDL(params)
  82. downloader = HttpFD(ydl, params)
  83. filename = 'testfile.mp4'
  84. self.assertTrue(downloader.real_download(filename, {
  85. 'url': 'http://127.0.0.1:%d/%s' % (self.port, ep),
  86. }))
  87. self.assertEqual(os.path.getsize(encodeFilename(filename)), TEST_SIZE)
  88. os.remove(encodeFilename(filename))
  89. def download_all(self, params):
  90. for ep in ('regular', 'no-content-length', 'no-range', 'no-range-no-content-length'):
  91. self.download(params, ep)
  92. def test_regular(self):
  93. self.download_all({})
  94. def test_chunked(self):
  95. self.download_all({
  96. 'http_chunk_size': 1000,
  97. })
  98. if __name__ == '__main__':
  99. unittest.main()