]> git.gir.st - subscriptionfeed.git/blob - app/common.py
move non-web related subroutines out of frontend
[subscriptionfeed.git] / app / common.py
1 import os
2 import re
3 import json
4 import requests
5 import dateutil.parser
6 from datetime import datetime, timezone
7 from xml.etree import ElementTree
8 from urllib.parse import parse_qs
9 from configparser import ConfigParser
10
11 cf = ConfigParser()
12 config_filename = os.environ.get('YT_CONFIG', '/etc/yt/config.ini')
13 cf.read(config_filename)
14
15 def fetch_xml(feed_type, feed_id):
16 r = requests.get(f"https://www.youtube.com/feeds/videos.xml?{feed_type}={feed_id}")
17 if not r.ok:
18 return None
19
20 return r.text
21
22 def parse_xml(xmldata):
23 ns = {
24 'atom':"http://www.w3.org/2005/Atom",
25 'yt': "http://www.youtube.com/xml/schemas/2015",
26 'media':"http://search.yahoo.com/mrss/"
27 }
28
29 feed = ElementTree.fromstring(xmldata)
30 author = feed.find('atom:author',ns).find('atom:name',ns).text if feed.find('atom:author',ns) else None
31 if feed.find('yt:channelId',ns):
32 channel_id = feed.find('yt:channelId',ns).text
33 else: # TODO: clean this up (websub has no yt:channelId, this should be adapted for playlists)
34 self = feed.find('atom:link[@rel="self"]',ns).get('href')
35 channel_id = parse_qs(self.split('?')[1]).get('channel_id')[0]
36 title = feed.find('atom:title',ns).text
37 videos = []
38 for entry in feed.findall('atom:entry',ns):
39 videos.append({
40 'video_id': entry.find('yt:videoId',ns).text,
41 'title': entry.find('atom:title',ns).text,
42 'published': entry.find('atom:published',ns).text,
43 'channel_id': entry.find('yt:channelId',ns).text,
44 'author': entry.find('atom:author',ns).find('atom:name',ns).text,
45 # extra fields for pull_subs/webhook:
46 'updated': entry.find('atom:updated',ns).text,
47 #'description': entry.find('media:group',ns).find('media:description',ns).text ##xxx:missing for websub
48 })
49
50 return title, author, channel_id, videos
51
52 def update_channel(db, xmldata):
53 """
54 returns True on success, False on failure. rigorous error checking is required, otherwise data will be lost!
55 the caller MUST (as per RFC 2119) write (append) the xmlfeed into a file on error.
56 """
57 if not xmldata: return False
58
59 # Note: wbesub does not return global author
60 title, author, channel_id, videos = parse_xml(xmldata) #xxx: perl-code had this eval'd for a die
61
62 c = db.cursor()
63 for video in videos:
64 now = datetime.now(timezone.utc)
65 updated = dateutil.parser.parse(video['updated'])
66 published = dateutil.parser.parse(video['updated'])
67 # if update and published time are near-identical, it's new. use crawl time if it was published within a week.
68 # else, it's just an update to an older video (before we subscribed, so use original upload time).
69 if (updated - published).seconds < 60 and (now - published).days < 7:
70 timestamp = now
71 else:
72 timestamp = published
73
74 c.execute("""
75 INSERT OR IGNORE INTO videos (id, channel_id, title, published, crawled)
76 VALUES (?, ?, ?, datetime(?), datetime(?))
77 """, (video['video_id'], video['channel_id'], video['title'], video['published'], timestamp)) #XXX:errorcheck
78
79 # update channel name (we don't fetch it on subscribing)
80 author = video['author'] # XXX: doing this once per channel is enough (for pull-subs.pl)
81 c.execute("""
82 INSERT OR REPLACE INTO channels (id, name)
83 VALUES (?, ?)
84 """, (channel_id, author)) #XXX:errorcheck
85
86 return True
87
88 def get_video_info(video_id, sts=0, algo=""):
89 """
90 returns the best-quality muxed video stream, the player_response, error-type/-mesage
91 error types: 'initial': the request to get_video_info was malformed
92 'player': playabilityStatus != OK
93 'internal': [livestream, geolocked, exhausted]
94 """
95 # TODO: caching, e.g. beaker? need to not cache premiering-soon videos/livestreams/etc, though
96 # responses are apparently valid for 6h; maybe cache for (video_length - 2h)
97 # TODO: errro types? ["invalid parameters", playabilitystatus, own]
98 # todo: a bit messy; should return all unscrambled video urls in best->worst quality
99
100 # we try to fetch the video multiple times using different origins
101 for el in ['embedded', 'detailpage']: # ['el-completely-absent',info,leanback,editpage,adunit,previewpage,profilepage]
102 r = requests.get(f"https://www.youtube.com/get_video_info"+
103 f"?video_id={video_id}"+
104 f"&eurl=https://youtube.googleapis.com/v/{video_id}"+
105 f"&el={el}"+
106 f"&sts={sts}"+
107 f"&hl=en_US") #"&hl=en&gl=US"
108 params = parse_qs(r.text)
109 if 'errorcode' in params: # status=fail
110 return None, None, 'initial', f"MALFORMED: {params['reason'][0]}"
111
112 metadata = json.loads(params.get('player_response')[0])
113 if metadata['playabilityStatus']['status'] != "OK":
114 if metadata['playabilityStatus']['status'] == "UNPLAYABLE":
115 continue # try again with different 'el' value. if none succeeds, we fall into "exhausted" path, which returns last tried metadata, from which the playabilityStatus.reason can be extracted. according to jwz/youtubedown, the worst error message comes from embedded, which is tried first, so it should be overwritten by a better message.
116 return None, None, 'player', f"{metadata['playabilityStatus']['status']}: {metadata['playabilityStatus']['reason']}"
117 if 'liveStreamability' in metadata['playabilityStatus']:
118 return None, metadata, 'internal', "livestream" # can also check .microformat.liveBroadcastDetails.isLiveNow
119
120 formats = metadata['streamingData']['formats']
121 for (i,v) in enumerate(formats):
122 if not ('cipher' in v or 'signatureCipher' in v): continue
123 cipher = parse_qs(v.get('cipher') or v.get('signatureCipher'))
124 formats[i]['url'] = unscramble(cipher, algo)
125
126 # todo: check if we have urls or try again
127 url = sorted(formats, key=lambda k: k['height'], reverse=True)[0]['url']
128
129 if 'gcr' in parse_qs(url):
130 return None, metadata, 'internal', "geolocked"
131
132 return url, metadata, None, None
133 else:
134 return None, metadata, 'internal', "exhausted"
135
136 def unscramble(cipher, algo): # test video id: UxxajLWwzqY
137 signature = list(cipher['s'][0])
138 for c in algo.split():
139 op, ix = re.match(r"([rsw])(\d+)?", c).groups()
140 if not op: continue
141 if op == 'r': signature = list(reversed(signature))
142 if op == 's': signature = signature[int(ix):]
143 if op == 'w': signature[0], signature[int(ix)%len(signature)] = signature[int(ix)%len(signature)], signature[0]
144 sp = cipher.get('sp', ['signature'])[0]
145 sig = cipher['sig'][0] if 'sig' in cipher else ''.join(signature)
146 return f"{cipher['url'][0]}&{sp}={sig}"
147
148 def prepare_metadata(metadata):
149 meta1 = metadata['videoDetails']
150 meta2 = metadata['microformat']['playerMicroformatRenderer']
151 cards = metadata['cards']['cardCollectionRenderer']['cards'] if 'cards' in metadata else []
152 endsc = metadata['endscreen']['endscreenRenderer']['elements'] if 'endscreen' in metadata else []
153
154 #aspect_ratio = meta2['embed']['width'] / meta2['embed']['height'], # sometimes absent
155 aspect_ratio = meta2['thumbnail']['thumbnails'][0]['width'] / meta2['thumbnail']['thumbnails'][0]['height']
156
157 subtitles = sorted([
158 {'url':cc['baseUrl'],
159 'code':cc['languageCode'],
160 'autogenerated':cc.get('kind')=="asr",
161 'name':cc['name']['simpleText']}
162 for cc in metadata['captions']['playerCaptionsTracklistRenderer']['captionTracks']
163 ], key=lambda cc: cc['autogenerated']) if 'captionTracks' in metadata['captions']['playerCaptionsTracklistRenderer'] else []
164
165 def parse_infocard(card):
166 card = card['cardRenderer']
167 teaser = card['teaser']['simpleCardTeaserRenderer']['message']['simpleText'] # not used
168 ctype = list(card['content'].keys())[0]
169 content = card['content'][ctype]
170 if ctype == "pollRenderer":
171 ctype = "POLL"
172 content = {
173 'question': content['question']['simpleText'],
174 'answers': [(a['text']['simpleText'],a['numVotes']) for a in content['choices']],
175 }
176 elif ctype == "videoInfoCardContentRenderer":
177 ctype = "VIDEO"
178 content = {
179 'video_id': content['action']['watchEndpoint']['videoId'],
180 'title': content['videoTitle']['simpleText'],
181 'author': content['channelName']['simpleText'], # 'by xXxXx'
182 'length': content['lengthString']['simpleText'], # '23:03'
183 'views': content['viewCountText']['simpleText'], # '421,248 views'
184 }
185 elif ctype == "playlistInfoCardContentRenderer":
186 ctype = "PLAYLIST"
187 content = {
188 'playlist_id': content['action']['watchEndpoint']['playlistId'],
189 'video_id': content['action']['watchEndpoint']['videoId'],
190 'title': content['playlistTitle']['simpleText'],
191 'author': content['channelName']['simpleText'],
192 'n_videos': content['playlistVideoCount']['simpleText'], # '21'
193 }
194 elif ctype == "simpleCardContentRenderer" and 'urlEndpoint' in content.get('command',{}).keys():
195 ctype = "WEBSITE"
196 content = {
197 'url': parse_qs(content['command']['urlEndpoint']['url'].split('?')[1])['q'][0],
198 'domain': content['displayDomain']['simpleText'],
199 'title': content['title']['simpleText'],
200 'text': content['actionButton']['simpleCardButtonRenderer']['text']['simpleText'],
201 }
202 else:
203 import pprint
204 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
205
206 return {'teaser': teaser, 'type': ctype, 'content': content}
207
208 def parse_endcard(card):
209 card = card['endscreenElementRenderer'] if 'endscreenElementRenderer' in card.keys() else card
210 ctype = card['style']
211 if ctype == "CHANNEL":
212 content = {
213 'channel_id': card['endpoint']['browseEndpoint']['browseId'],
214 'title': card['title']['simpleText'],
215 'icons': {e['height']: e['url'] for e in card['image']['thumbnails']},
216 }
217 elif ctype == "VIDEO":
218 content = {
219 'video_id': card['endpoint']['watchEndpoint']['videoId'],
220 'title': card['title']['simpleText'],
221 'length': card['videoDuration']['simpleText'], # '12:21'
222 'views': card['metadata']['simpleText'], # '51,649 views'
223 }
224 elif ctype == "PLAYLIST":
225 content = {
226 'playlist_id': card['endpoint']['watchEndpoint']['playlistId'],
227 'video_id': card['endpoint']['watchEndpoint']['videoId'],
228 'title': card['title']['simpleText'],
229 'author': card['metadata']['simpleText'],
230 'n_videos': card['playlistLength']['simpleText'].replace(" videos", ""),
231 }
232 elif ctype == "WEBSITE":
233 content = {
234 'url': parse_qs(card['endpoint']['urlEndpoint']['url'].split('?')[1])['q'][0],
235 'domain': card['metadata']['simpleText'],
236 'title': card['title']['simpleText'],
237 'icons': {e['height']: e['url'] for e in card['image']['thumbnails']},
238 }
239 else:
240 import pprint
241 content = {'error': f"{ctype} is not implemented; <pre>{pprint.pformat(card)}</pre>"}
242
243 return {'type': ctype, 'content': content}
244
245 return {
246 'title': meta1['title'],
247 'author': meta1['author'],
248 'channel_id': meta1['channelId'],
249 'description': meta1['shortDescription'],
250 'published': meta2['publishDate'],
251 'views': meta1['viewCount'],
252 'length': int(meta1['lengthSeconds']),
253 'rating': meta1['averageRating'],
254 'category': meta2['category'],
255 'aspectr': aspect_ratio,
256 'unlisted': meta2['isUnlisted'],
257 'countries': meta2['availableCountries'],
258 'poster': meta2['thumbnail']['thumbnails'][0]['url'],
259 'infocards': [parse_infocard(card) for card in cards],
260 'endcards': [parse_endcard(card) for card in endsc],
261 'subtitles': subtitles,
262 }
263
264 def pp(*args):
265 from pprint import pprint
266 import sys, codecs
267 pprint(args, stream=codecs.getwriter("utf-8")(sys.stderr.buffer))
Imprint / Impressum