]> git.gir.st - subscriptionfeed.git/blob - app/common.py
fix old videos from showing up in subscriptions
[subscriptionfeed.git] / app / common.py
1 import os
2 import re
3 import json
4 import requests
5 import dateutil.parser
6 from datetime import datetime, timezone
7 from xml.etree import ElementTree
8 from urllib.parse import parse_qs
9 from configparser import ConfigParser
10
11 cf = ConfigParser()
12 config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini')
13 cf.read(config_filename)
14
15 def fetch_xml(feed_type, feed_id):
16 r = requests.get(f"https://www.youtube.com/feeds/videos.xml?{feed_type}={feed_id}")
17 if not r.ok:
18 return None
19
20 return r.text
21
22 def parse_xml(xmldata):
23 ns = {
24 'atom':"http://www.w3.org/2005/Atom",
25 'yt': "http://www.youtube.com/xml/schemas/2015",
26 'media':"http://search.yahoo.com/mrss/"
27 }
28
29 feed = ElementTree.fromstring(xmldata)
30 author = feed.find('atom:author',ns).find('atom:name',ns).text if feed.find('atom:author',ns) else None
31 if feed.find('yt:channelId',ns):
32 channel_id = feed.find('yt:channelId',ns).text
33 else: # TODO: clean this up (websub has no yt:channelId, this should be adapted for playlists)
34 self = feed.find('atom:link[@rel="self"]',ns).get('href')
35 channel_id = parse_qs(self.split('?')[1]).get('channel_id')[0]
36 title = feed.find('atom:title',ns).text
37 videos = []
38 for entry in feed.findall('atom:entry',ns):
39 videos.append({
40 'video_id': entry.find('yt:videoId',ns).text,
41 'title': entry.find('atom:title',ns).text,
42 'published': entry.find('atom:published',ns).text,
43 'channel_id': entry.find('yt:channelId',ns).text,
44 'author': entry.find('atom:author',ns).find('atom:name',ns).text,
45 # extra fields for pull_subs/webhook:
46 'updated': entry.find('atom:updated',ns).text,
47 #'description': entry.find('media:group',ns).find('media:description',ns).text ##xxx:missing for websub
48 })
49
50 return title, author, channel_id, videos
51
52 def update_channel(db, xmldata):
53 """
54 returns True on success, False on failure. rigorous error checking is required, otherwise data will be lost!
55 the caller MUST (as per RFC 2119) write (append) the xmlfeed into a file on error.
56 """
57 if not xmldata: return False
58
59 # Note: wbesub does not return global author
60 title, author, channel_id, videos = parse_xml(xmldata) #xxx: perl-code had this eval'd for a die
61
62 c = db.cursor()
63 for video in videos:
64 now = datetime.now(timezone.utc)
65 updated = dateutil.parser.parse(video['updated'])
66 published = dateutil.parser.parse(video['published'])
67 # if update and published time are near-identical, it's new. use crawl time if it was published within a week.
68 # else, it's just an update to an older video (before we subscribed, so use original upload time).
69 if (updated - published).seconds < 60 and (now - published).days < 7:
70 timestamp = now
71 else:
72 timestamp = published
73
74 c.execute("""
75 INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled)
76 VALUES (?, ?, ?, datetime(?), datetime(?))
77 """, (video['video_id'], video['channel_id'], video['title'], video['published'], timestamp)) #XXX:errorcheck
78
79 # update channel name (we don't fetch it on subscribing)
80 author = video['author'] # XXX: doing this once per channel is enough (for pull-subs.pl)
81 c.execute("""
82 INSERT OR REPLACE INTO channels (id, name)
83 VALUES (?, ?)
84 """, (channel_id, author)) #XXX:errorcheck
85
86 return True
87
88 def get_video_info(video_id, sts=0, algo=""):
89 """
90 returns the best-quality muxed video stream, the player_response, error-type/-mesage
91 error types: 'initial': the request to get_video_info was malformed
92 'player': playabilityStatus != OK
93 'internal': [livestream, geolocked, exhausted]
94 """
95 # TODO: caching, e.g. beaker? need to not cache premiering-soon videos/livestreams/etc, though
96 # responses are apparently valid for 6h; maybe cache for (video_length - 2h)
97 # TODO: errro types? ["invalid parameters", playabilitystatus, own]
98 # todo: a bit messy; should return all unscrambled video urls in best->worst quality
99
100 # we try to fetch the video multiple times using different origins
101 for el in ['embedded', 'detailpage']: # ['el-completely-absent',info,leanback,editpage,adunit,previewpage,profilepage]
102 r = requests.get(f"https://www.youtube.com/get_video_info"+
103 f"?video_id={video_id}"+
104 f"&eurl=https://youtube.googleapis.com/v/{video_id}"+
105 f"&el={el}"+
106 f"&sts={sts}"+
107 f"&hl=en_US") #"&hl=en&gl=US"
108 params = parse_qs(r.text)
109 if 'errorcode' in params: # status=fail
110 return None, None, 'initial', f"MALFORMED: {params['reason'][0]}" # TODO: assuming we haven't fucked it up, this error comes up if the video id is garbage. give better error message
111
112 metadata = json.loads(params.get('player_response')[0])
113 if metadata['playabilityStatus']['status'] != "OK":
114 if metadata['playabilityStatus']['status'] == "UNPLAYABLE":
115 continue # try again with different 'el' value. if none succeeds, we fall into "exhausted" path, which returns last tried metadata, from which the playabilityStatus.reason can be extracted. according to jwz/youtubedown, the worst error message comes from embedded, which is tried first, so it should be overwritten by a better message.
116 return None, None, 'player', f"{metadata['playabilityStatus']['status']}: {metadata['playabilityStatus']['reason']}"
117 if 'liveStreamability' in metadata['playabilityStatus']:
118 return None, metadata, 'internal', "livestream" # can also check .microformat.liveBroadcastDetails.isLiveNow
119
120 if not 'formats' in metadata['streamingData']:
121 #TODO: hls only video with those params (kAZCrtJJaAo):
122 # "videoDetails": {
123 # "isLiveDefaultBroadcast": true,
124 # "isLowLatencyLiveStream": true,
125 # "isLiveContent": true,
126 # "isPostLiveDvr": true
127 continue # no urls
128
129 formats = metadata['streamingData']['formats']
130 for (i,v) in enumerate(formats):
131 if not ('cipher' in v or 'signatureCipher' in v): continue
132 cipher = parse_qs(v.get('cipher') or v.get('signatureCipher'))
133 formats[i]['url'] = unscramble(cipher, algo)
134
135 # todo: check if we have urls or try again
136 url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url']
137
138 if 'gcr' in parse_qs(url):
139 return None, metadata, 'internal', "geolocked"
140
141 return url, metadata, None, None
142 else:
143 return None, metadata, 'internal', "exhausted"
144
145 def unscramble(cipher, algo): # test video id: UxxajLWwzqY
146 signature = list(cipher['s'][0])
147 for c in algo.split():
148 op, ix = re.match(r"([rsw])(\d+)?", c).groups()
149 if not op: continue
150 if op == 'r': signature = list(reversed(signature))
151 if op == 's': signature = signature[int(ix):]
152 if op == 'w': signature[0], signature[int(ix)%len(signature)] = signature[int(ix)%len(signature)], signature[0]
153 sp = cipher.get('sp', ['signature'])[0]
154 sig = cipher['sig'][0] if 'sig' in cipher else ''.join(signature)
155 return f"{cipher['url'][0]}&{sp}={sig}"
156
157 def prepare_metadata(metadata):
158 meta1 = metadata['videoDetails']
159 meta2 = metadata['microformat']['playerMicroformatRenderer']
160 cards = metadata['cards']['cardCollectionRenderer']['cards'] if 'cards' in metadata else []
161 endsc = metadata['endscreen']['endscreenRenderer']['elements'] if 'endscreen' in metadata else []
162
163 #aspect_ratio = meta2['embed']['width'] / meta2['embed']['height'], # sometimes absent
164 aspect_ratio = meta2['thumbnail']['thumbnails'][0]['width'] / meta2['thumbnail']['thumbnails'][0]['height']
165
166 subtitles = sorted([
167 {'url':cc['baseUrl'],
168 'code':cc['languageCode'],
169 'autogenerated':cc.get('kind')=="asr",
170 'name':cc['name']['simpleText']}
171 for cc in metadata['captions']['playerCaptionsTracklistRenderer']['captionTracks']
172 ], key=lambda cc: cc['autogenerated']) if 'captions' in metadata and 'captionTracks' in metadata['captions']['playerCaptionsTracklistRenderer'] else []
173
174 def parse_infocard(card):
175 card = card['cardRenderer']
176 teaser = card['teaser']['simpleCardTeaserRenderer']['message']['simpleText'] # not used
177 ctype = list(card['content'].keys())[0]
178 content = card['content'][ctype]
179 if ctype == "pollRenderer":
180 ctype = "POLL"
181 content = {
182 'question': content['question']['simpleText'],
183 'answers': [(a['text']['simpleText'],a['numVotes']) for a in content['choices']],
184 }
185 elif ctype == "videoInfoCardContentRenderer":
186 ctype = "VIDEO"
187 content = {
188 'video_id': content['action']['watchEndpoint']['videoId'],
189 'title': content['videoTitle']['simpleText'],
190 'author': content['channelName']['simpleText'], # 'by xXxXx'
191 'length': content['lengthString']['simpleText'], # '23:03'
192 'views': content['viewCountText']['simpleText'], # '421,248 views'
193 }
194 elif ctype == "playlistInfoCardContentRenderer":
195 ctype = "PLAYLIST"
196 content = {
197 'playlist_id': content['action']['watchEndpoint']['playlistId'],
198 'video_id': content['action']['watchEndpoint']['videoId'],
199 'title': content['playlistTitle']['simpleText'],
200 'author': content['channelName']['simpleText'],
201 'n_videos': content['playlistVideoCount']['simpleText'], # '21'
202 }
203 elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content.get('command',{}).keys():
204 ctype = "WEBSITE"
205 content = {
206 'url': parse_qs(content['command']['urlEndpoint']['url'].split('?')[1])['q'][0],
207 'domain': content['displayDomain']['simpleText'],
208 'title': content['title']['simpleText'],
209 'text': content['actionButton']['simpleCardButtonRenderer']['text']['simpleText'],
210 }
211 else:
212 import pprint
213 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
214
215 return {'teaser': teaser, 'type': ctype, 'content': content}
216
217 def parse_endcard(card):
218 card = card['endscreenElementRenderer'] if 'endscreenElementRenderer' in card.keys() else card
219 ctype = card['style']
220 if ctype == "CHANNEL":
221 content = {
222 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
223 'title': card['title']['simpleText'],
224 'icons': {e['height']: e['url'] for e in card['image']['thumbnails']},
225 }
226 elif ctype == "VIDEO":
227 content = {
228 'video_id': card['endpoint']['watchEndpoint']['videoId'],
229 'title': card['title']['simpleText'],
230 'length': card['videoDuration']['simpleText'], # '12:21'
231 'views': card['metadata']['simpleText'], # '51,649 views'
232 }
233 elif ctype == "PLAYLIST":
234 content = {
235 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
236 'video_id': card['endpoint']['watchEndpoint']['videoId'],
237 'title': card['title']['simpleText'],
238 'author': card['metadata']['simpleText'],
239 'n_videos': card['playlistLength']['simpleText'].replace(" videos", ""),
240 }
241 elif ctype == "WEBSITE":
242 content = {
243 'url': parse_qs(card['endpoint']['urlEndpoint']['url'].split('?')[1])['q'][0],
244 'domain': card['metadata']['simpleText'],
245 'title': card['title']['simpleText'],
246 'icons': {e['height']: e['url'] for e in card['image']['thumbnails']},
247 }
248 else:
249 import pprint
250 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
251
252 return {'type': ctype, 'content': content}
253
254 return {
255 'title': meta1['title'],
256 'author': meta1['author'],
257 'channel_id': meta1['channelId'],
258 'description': meta1['shortDescription'],
259 'published': meta2['publishDate'],
260 'views': meta1['viewCount'],
261 'length': int(meta1['lengthSeconds']),
262 'rating': meta1['averageRating'],
263 'category': meta2['category'],
264 'aspectr': aspect_ratio,
265 'unlisted': meta2['isUnlisted'],
266 'countries': meta2['availableCountries'],
267 'poster': meta2['thumbnail']['thumbnails'][0]['url'],
268 'infocards': [parse_infocard(card) for card in cards],
269 'endcards': [parse_endcard(card) for card in endsc],
270 'subtitles': subtitles,
271 }
272
273 def pp(*args):
274 from pprint import pprint
275 import sys, codecs
276 pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))
Imprint / Impressum