test_download.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. #!/usr/bin/env python
  2. import hashlib
  3. import io
  4. import os
  5. import json
  6. import unittest
  7. import sys
  8. import socket
  9. # Allow direct execution
  10. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  11. import youtube_dl.FileDownloader
  12. import youtube_dl.InfoExtractors
  13. from youtube_dl.utils import *
  14. DEF_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tests.json')
  15. PARAMETERS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "parameters.json")
  16. # General configuration (from __init__, not very elegant...)
  17. jar = compat_cookiejar.CookieJar()
  18. cookie_processor = compat_urllib_request.HTTPCookieProcessor(jar)
  19. proxy_handler = compat_urllib_request.ProxyHandler()
  20. opener = compat_urllib_request.build_opener(proxy_handler, cookie_processor, YoutubeDLHandler())
  21. compat_urllib_request.install_opener(opener)
  22. socket.setdefaulttimeout(300) # 5 minutes should be enough (famous last words)
  23. class FileDownloader(youtube_dl.FileDownloader):
  24. def __init__(self, *args, **kwargs):
  25. youtube_dl.FileDownloader.__init__(self, *args, **kwargs)
  26. self.to_stderr = self.to_screen
  27. def _file_md5(fn):
  28. with open(fn, 'rb') as f:
  29. return hashlib.md5(f.read()).hexdigest()
  30. with io.open(DEF_FILE, encoding='utf-8') as deff:
  31. defs = json.load(deff)
  32. with io.open(PARAMETERS_FILE, encoding='utf-8') as pf:
  33. parameters = json.load(pf)
  34. class TestDownload(unittest.TestCase):
  35. def setUp(self):
  36. self.parameters = parameters
  37. self.defs = defs
  38. # Clear old files
  39. self.tearDown()
  40. def tearDown(self):
  41. for fn in [ test.get('file', False) for test in self.defs ]:
  42. if fn and os.path.exists(fn):
  43. os.remove(fn)
  44. ### Dinamically generate tests
  45. def generator(test_case):
  46. def test_template(self):
  47. ie = getattr(youtube_dl.InfoExtractors, test_case['name'] + 'IE')
  48. if not ie._WORKING:
  49. print('Skipping: IE marked as not _WORKING')
  50. return
  51. if not test_case['file']:
  52. print('Skipping: No output file specified')
  53. return
  54. if 'skip' in test_case:
  55. print('Skipping: {0}'.format(test_case['skip']))
  56. return
  57. params = dict(self.parameters) # Duplicate it locally
  58. for p in test_case.get('params', {}):
  59. params[p] = test_case['params'][p]
  60. fd = FileDownloader(params)
  61. fd.add_info_extractor(ie())
  62. for ien in test_case.get('add_ie', []):
  63. fd.add_info_extractor(getattr(youtube_dl.InfoExtractors, ien + 'IE')())
  64. fd.download([test_case['url']])
  65. self.assertTrue(os.path.exists(test_case['file']))
  66. if 'md5' in test_case:
  67. md5_for_file = _file_md5(test_case['file'])
  68. self.assertEqual(md5_for_file, test_case['md5'])
  69. return test_template
  70. ### And add them to TestDownload
  71. for test_case in defs:
  72. test_method = generator(test_case)
  73. test_method.__name__ = "test_{0}".format(test_case["name"])
  74. setattr(TestDownload, test_method.__name__, test_method)
  75. del test_method
  76. if __name__ == '__main__':
  77. unittest.main()