Browse Source

[YouTube] Cache signature timestamp from player JS

* if the YT webpage can't be loaded, getting the `sts` requires loading the
player JS: this caches it
* based on yt-dlp/yt-dlp#13047, thx bashonly
dirkf 4 months ago
parent
commit
3a42f6ad37
1 changed files with 104 additions and 38 deletions
  1. 104 38
      youtube_dl/extractor/youtube.py

+ 104 - 38
youtube_dl/extractor/youtube.py

@@ -49,6 +49,7 @@ from ..utils import (
     parse_duration,
     parse_qs,
     qualities,
+    remove_end,
     remove_start,
     smuggle_url,
     str_or_none,
@@ -1584,6 +1585,15 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
         '397': {'acodec': 'none', 'vcodec': 'av01.0.05M.08'},
     }
 
+    _PLAYER_JS_VARIANT_MAP = (
+        ('main', 'player_ias.vflset/en_US/base.js'),
+        ('tce', 'player_ias_tce.vflset/en_US/base.js'),
+        ('tv', 'tv-player-ias.vflset/tv-player-ias.js'),
+        ('tv_es6', 'tv-player-es6.vflset/tv-player-es6.js'),
+        ('phone', 'player-plasma-ias-phone-en_US.vflset/base.js'),
+        ('tablet', 'player-plasma-ias-tablet-en_US.vflset/base.js'),
+    )
+
     @classmethod
     def suitable(cls, url):
         if parse_qs(url).get('list', [None])[0]:
@@ -1631,36 +1641,83 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
             raise ExtractorError(
                 'Cannot identify player %r' % (player_url,), cause=e)
 
-    def _load_player(self, video_id, player_url, fatal=True, player_id=None):
-        if not player_id:
+    def _player_js_cache_key(self, player_url, extra_id=None, _cache={}):
+        if player_url not in _cache:
             player_id = self._extract_player_info(player_url)
-        if player_id not in self._code_cache:
+            player_path = remove_start(
+                compat_urllib_parse.urlparse(player_url).path,
+                '/s/player/{0}/'.format(player_id))
+            variant = next((k for k, v in self._PLAYER_JS_VARIANT_MAP
+                           if v == player_path), None)
+            if not variant:
+                variant = next(
+                    (k for k, v in self._PLAYER_JS_VARIANT_MAP
+                     if re.match(re.escape(v).replace('en_US', r'\w+') + '$', player_path)),
+                    None)
+            if not variant:
+                self.write_debug(
+                    'Unable to determine player JS variant\n'
+                    '        player = {0}'.format(player_url), only_once=True)
+                variant = re.sub(r'[^a-zA-Z0-9]', '_', remove_end(player_path, '.js'))
+            _cache[player_url] = join_nonempty(player_id, variant)
+
+        if extra_id:
+            extra_id = '-'.join((_cache[player_url], extra_id))
+            assert os.path.basename(extra_id) == extra_id
+            return extra_id
+        return _cache[player_url]
+
+    def _load_player(self, video_id, player_url, fatal=True):
+        player_js_key = self._player_js_cache_key(player_url)
+        if player_js_key not in self._code_cache:
             code = self._download_webpage(
                 player_url, video_id, fatal=fatal,
-                note='Downloading player ' + player_id,
-                errnote='Download of %s failed' % player_url)
+                note='Downloading player {0}'.format(player_js_key),
+                errnote='Download of {0} failed'.format(player_url))
             if code:
-                self._code_cache[player_id] = code
-        return self._code_cache[player_id] if fatal else self._code_cache.get(player_id)
+                self._code_cache[player_js_key] = code
+        return self._code_cache.get(player_js_key)
+
+    def _load_player_data_from_cache(self, name, player_url, extra_id=None):
+        cache_id = ('youtube-{0}'.format(name), self._player_js_cache_key(player_url, extra_id))
+        data = self._player_cache.get(cache_id)
+        if data:
+            return data
+
+        data = self.cache.load(*cache_id, min_ver='2025.04.07')
+        if data:
+            self._player_cache[cache_id] = data
+        return data
+
+    def _store_player_data_to_cache(self, name, player_url, data, extra_id=None):
+        cache_id = ('youtube-{0}'.format(name), self._player_js_cache_key(player_url, extra_id))
+
+        if cache_id not in self._player_cache:
+            self.cache.store(cache_id[0], cache_id[1], data)
+            self._player_cache[cache_id] = data
 
     def _extract_signature_function(self, video_id, player_url, example_sig):
-        player_id = self._extract_player_info(player_url)
+        # player_id = self._extract_player_info(player_url)
 
         # Read from filesystem cache
-        func_id = 'js_{0}_{1}'.format(
-            player_id, self._signature_cache_id(example_sig))
-        assert os.path.basename(func_id) == func_id
-
-        self.write_debug('Extracting signature function {0}'.format(func_id))
-        cache_spec, code = self.cache.load('youtube-sigfuncs', func_id, min_ver='2025.04.07'), None
+        extra_id = self._signature_cache_id(example_sig)
+        self.write_debug('Extracting signature function {0}-{1}'.format(player_url, extra_id))
+        cache_spec, code = self._load_player_data_from_cache(
+            'sigfuncs', player_url, extra_id=extra_id, min_ver='2025.04.07'
+        ), None
 
         if not cache_spec:
-            code = self._load_player(video_id, player_url, player_id)
-        if code:
-            res = self._parse_sig_js(code)
-            test_string = ''.join(map(compat_chr, range(len(example_sig))))
-            cache_spec = [ord(c) for c in res(test_string)]
-            self.cache.store('youtube-sigfuncs', func_id, cache_spec)
+            code = self._load_player(video_id, player_url)
+            if code:
+                res = self._parse_sig_js(code)
+                test_string = ''.join(map(compat_chr, range(len(example_sig))))
+                cache_spec = [ord(c) for c in res(test_string)]
+                self._store_player_data_to_cache(
+                    'sigfuncs', player_url, cache_spec, extra_id=extra_id)
+            else:
+                self.report_warning(
+                    'Failed to compute signature function {0}-{1}'.format(
+                        player_url, extra_id))
 
         return lambda s: ''.join(s[i] for i in cache_spec)
 
@@ -1885,22 +1942,21 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
 
     def _extract_n_function_code(self, video_id, player_url):
         player_id = self._extract_player_info(player_url)
-        func_code = self.cache.load('youtube-nsig', player_id, min_ver='2025.04.07')
+        func_code = self._load_player_data_from_cache('nsig', player_url)
         jscode = func_code or self._load_player(video_id, player_url)
         jsi = JSInterpreter(jscode)
 
         if func_code:
             return jsi, player_id, func_code
-        return self._extract_n_function_code_jsi(video_id, jsi, player_id)
 
-    def _extract_n_function_code_jsi(self, video_id, jsi, player_id=None):
+        return self._extract_n_function_code_jsi(video_id, jsi, player_id, player_url)
 
+    def _extract_n_function_code_jsi(self, video_id, jsi, player_id=None, player_url=None):
         func_name = self._extract_n_function_name(jsi.code)
 
         func_code = self._extract_sig_fn(jsi, func_name)
-
-        if player_id:
-            self.cache.store('youtube-nsig', player_id, func_code)
+        if player_url:
+            self._store_player_data_to_cache('nsig', player_url, func_code)
         return jsi, player_id, func_code
 
     def _extract_n_function_from_code(self, jsi, func_code):
@@ -1944,18 +2000,28 @@ class YoutubeIE(YoutubeBaseInfoExtractor):
         Required to tell API what sig/player version is in use.
         """
         sts = traverse_obj(ytcfg, 'STS', expected_type=int)
-        if not sts:
-            # Attempt to extract from player
-            if player_url is None:
-                error_msg = 'Cannot extract signature timestamp without player_url.'
-                if fatal:
-                    raise ExtractorError(error_msg)
-                self.report_warning(error_msg)
-                return
-            code = self._load_player(video_id, player_url, fatal=fatal)
-            sts = int_or_none(self._search_regex(
-                r'(?:signatureTimestamp|sts)\s*:\s*(?P<sts>[0-9]{5})', code or '',
-                'JS player signature timestamp', group='sts', fatal=fatal))
+        if sts:
+            return sts
+
+        if not player_url:
+            error_msg = 'Cannot extract signature timestamp without player url'
+            if fatal:
+                raise ExtractorError(error_msg)
+            self.report_warning(error_msg)
+            return None
+
+        sts = self._load_player_data_from_cache('sts', player_url)
+        if sts:
+            return sts
+
+        # Attempt to extract from player
+        code = self._load_player(video_id, player_url, fatal=fatal)
+        sts = int_or_none(self._search_regex(
+            r'(?:signatureTimestamp|sts)\s*:\s*(?P<sts>[0-9]{5})', code or '',
+            'JS player signature timestamp', group='sts', fatal=fatal))
+        if sts:
+            self._store_player_data_to_cache('sts', player_url, sts)
+
         return sts
 
     def _mark_watched(self, video_id, player_response):