Best Python code snippet using playwright-python
video-rename
Source:video-rename
1#!/usr/bin/python32# -*- coding: utf-8 -*-3# kate: space-indent on; indent-width 4; mixedindent off; indent-mode python; syntax Python;4import sys5import argparse6import json7import re8import string9import urllib.request10import datetime11from arsoft.utils import *12from arsoft.inifile import IniFile13re_words = re.compile('[ \\.\\-&_,]+')14def isxdigit(value):15 for c in value:16 if c not in string.hexdigits:17 return False18 return True19def iscamelcase(s):20 ret = False21 got_upper = False22 for c in s:23 if c in string.ascii_uppercase:24 if got_upper:25 # already upper, so at least two uppercase letters26 return False27 else:28 got_upper = True29 else:30 if not c in string.ascii_lowercase:31 return False32 got_upper = False33 return True34def split_camelcase(s):35 ret = []36 got_upper = False37 start = 038 i = 039 for i in range(len(s)):40 if s[i] in string.ascii_uppercase:41 if i > start:42 ret.append(s[start:i])43 got_upper = True44 start = i45 if i >= start:46 ret.append(s[start:])47 return ret48def convert_date(words):49 ret = []50 int_values = []51 month_names = ['january', 'february', 'march', 'april', 'may', 'june', 'july', 'august', 'september', 'october', 'november', 'december']52 for w in words:53 w_int = None54 w_int_month = False55 w_int_year = False56 try:57 w_int = month_names.index(w.lower()) + 158 w_int_month = True59 except ValueError:60 pass61 if w_int is None:62 try:63 w_int = int(w)64 if w_int > 2000:65 w_int_year = True66 except ValueError:67 pass68 ret.append(w)69 #print(w, w_int)70 if w_int is None:71 int_values = []72 else:73 int_values.append( (w_int, w_int_month, w_int_year) )74 if len(int_values) == 3:75 year = 076 month = 077 day = 078 #print(int_values)79 for (w_int, w_int_month, w_int_year) in int_values:80 if w_int_month:81 month = w_int82 elif w_int_year:83 year = w_int84 for (w_int, w_int_month, w_int_year) in int_values:85 if w_int_month or w_int_year:86 continue87 else:88 if year == 0:89 year = w_int + 2000 if w_int < 100 else w_int90 elif month == 0 and w_int >= 1 and w_int <= 12:91 month = w_int92 elif day == 0 and w_int >= 1 and w_int <= 31:93 day = w_int94 #print(year, month , day)95 if year and month and day:96 # remove string values97 ret = ret[0:-3]98 ret.append(datetime.date(year=year, month=month, day=day))99 #print(int_values)100 return ret101def replace_in_list(list, needle, replacement):102 start = -1103 end = 0104 #print(list)105 for n in needle:106 try:107 i = list.index(n, end)108 if start < 0:109 start = i110 end = i111 #print('find %s -> %i' % (n, i))112 except ValueError:113 return list114 #print('build %i, %s -> %s -> %s' % (start, list[0:start], replacement, list[start+len(needle):]))115 list = list[0:start] + replacement + list[start+len(needle):]116 return list117def remove_ignored(words, ignored):118 if not words:119 return []120 ret = []121 index = 0122 words_len = len(words)123 #print('remove_ignored %s' % words)124 while index < words_len:125 found = False126 for iw in ignored:127 if isinstance(iw, list):128 offset = 0129 found = True130 #print('ff %s' % iw)131 for iwe in iw:132 #print('ff qq %s<>%s' % (iwe, words[index+offset]))133 if index+offset >= words_len or iwe != words[index+offset]:134 found = False135 break136 offset = offset + 1137 #print('ff match %s' % found)138 if found:139 index = index + len(iw)140 elif iw == words[index]:141 #print('ff match single %s' % iw)142 found = True143 index = index + 1144 break145 if not found:146 ret.append(words[index])147 index = index + 1148 #print('remove_ignored ret=%s' % ret)149 return ret150def split_into_words(s, ignored=None, combined=None):151 #print(s)152 ret = []153 ex = []154 in_brackets = 0155 brackets_list = []156 for w in re_words.split(s):157 if not w:158 continue159 if (w[0] == '(' and w[-1] == ')') or (w[0] == '[' and w[-1] == ']') or (w[0] == '{' and w[-1] == '}'):160 w = w[1:-1]161 elif w[0] == '(' or w[0] == '[':162 in_brackets+=1163 brackets_list.append(w[1:])164 continue165 elif w[-1] == ')' or w[0] == ']':166 in_brackets-=1167 brackets_list.append(w[0:-1])168 if in_brackets == 0:169 w = ' '.join(brackets_list)170 else:171 continue172 elif in_brackets > 0:173 brackets_list.append(w)174 continue175 if iscamelcase(w):176 for wx in split_camelcase(w):177 if wx[0].isalpha() and wx[-1].isdigit():178 ex.append(wx[0:-1])179 elif wx[0].isalpha() and wx[-1] == '+':180 ex.append(wx[0:-1])181 elif wx[0] == '#':182 ex.append(wx[1:].lower())183 else:184 ex.append(wx.lower())185 else:186 if w[0].isalpha() and w[-1].isdigit():187 ex.append(w[0:-1])188 elif w[0].isalpha() and w[-1] == '+':189 ex.append(w[0:-1])190 elif w[0] == '#':191 ex.append(w[1:].lower())192 else:193 ex.append(w.lower())194 if ignored is not None:195 ret = remove_ignored(ex, ignored)196 else:197 ret = ex198 if combined:199 for (needle, replacement) in combined:200 ret = replace_in_list(ret, needle, replacement)201 #print('%s -> %s, %s' % (needle, replacement, ret))202 ret = convert_date(ret)203 return ret204def expanduser_dirs(*args):205 dirs = []206 for a in args:207 if a is None:208 continue209 if isinstance(a, list):210 for e in a:211 e = os.path.expanduser(e)212 dirs.append(os.path.abspath(e))213 else:214 a = os.path.expanduser(a)215 dirs.append(os.path.abspath(a))216 return dirs217def extract_html_title(data):218 from html.parser import HTMLParser219 class MyHTMLParser(HTMLParser):220 _path = []221 _title = None222 def handle_starttag(self, tag, attrs):223 self._path.append(tag)224 def handle_endtag(self, tag):225 self._path.pop()226 def handle_data(self, data):227 if len(self._path) > 1 and self._path[-1] == 'title':228 self._title = data229 parser = MyHTMLParser()230 parser.feed(data)231 return parser._title232def copyfile(src, dst, *, follow_symlinks=True, callback=None, use_sendfile=False):233 """Copy data from src to dst.234 If follow_symlinks is not set and src is a symbolic link, a new235 symlink will be created instead of copying the file it points to.236 """237 if shutil._samefile(src, dst):238 raise shutil.SameFileError("{!r} and {!r} are the same file".format(src, dst))239 for fn in [src, dst]:240 try:241 st = os.stat(fn)242 except OSError:243 # File most likely does not exist244 pass245 else:246 # XXX What about other special files? (sockets, devices...)247 if shutil.stat.S_ISFIFO(st.st_mode):248 raise shutil.SpecialFileError("`%s` is a named pipe" % fn)249 if not follow_symlinks and os.path.islink(src):250 os.symlink(os.readlink(src), dst)251 else:252 size = os.stat(src).st_size253 with open(src, 'rb') as fsrc:254 with open(dst, 'wb') as fdst:255 copyfileobj(fsrc, fdst, callback=callback, total=size, use_sendfile=use_sendfile)256 return dst257def copyfileobj(fsrc, fdst, callback, total, length=4096*1024, flush_blocks=64, use_sendfile=False):258 copied = 0259 num_blocks = 0260 if use_sendfile:261 dest_fd = fdst.fileno()262 src_fd = fsrc.fileno()263 block_size = length * flush_blocks264 while True:265 bytesSent = os.sendfile(dest_fd, src_fd, copied, block_size)266 if bytesSent == 0:267 break268 copied += bytesSent269 if callback is not None:270 callback(copied, total=total)271 else:272 while True:273 buf = fsrc.read(length)274 if not buf:275 break276 fdst.write(buf)277 copied += len(buf)278 num_blocks += 1279 if num_blocks >= flush_blocks:280 fdst.flush()281 os.fsync(fdst.fileno())282 num_blocks = 0283 if callback is not None:284 callback(copied, total=total)285def copy_with_progress(src, dst, *, follow_symlinks=True, callback=None, use_sendfile=False):286 if os.path.isdir(dst):287 dst = os.path.join(dst, os.path.basename(src))288 copyfile(src, dst, follow_symlinks=follow_symlinks, callback=callback, use_sendfile=use_sendfile)289 shutil.copymode(src, dst, follow_symlinks=follow_symlinks)290 shutil.copystat(src, dst, follow_symlinks=follow_symlinks)291 return dst292def is_within_directory(f, d):293 relpath = os.path.relpath(f, d)294 if os.path.isabs(relpath):295 # if we got an abspath then f cannot be within d, otherwise we296 # would have gotten a relpath297 return False, None298 else:299 # if relpath starts with .. the file is outside the directory300 if relpath[0] == '.' and relpath[1] == '.':301 return False, None302 else:303 return True, relpath304class video_rule(object):305 def __init__(self, re_pattern, url_template, options={}):306 self.re_pattern = re_pattern307 self.re = re.compile(re_pattern)308 self.url_template = url_template309 self.options = options310 def match(self, basename):311 return self.re.match(basename)312 def __repr__(self):313 return '\"%s\" -> %s' % (self.re_pattern, self.url_template)314class hash_directory(object):315 def __init__(self, path=None, opts=None):316 self.path = path317 self.readonly = False318 if opts is not None:319 for o in opts.split(','):320 if o == 'read-only' or o == 'ro':321 self.readonly = True322 def __str__(self):323 if self.readonly:324 return 'read-only:%s' % self.path325 else:326 return self.path327 def __repr__(self):328 return self.__str__()329class hash_entry(object):330 def __init__(self, base=None, opts=None, words=None):331 self._aliases = []332 if words is None:333 self._words = split_into_words(base)334 else:335 self._words = words336 self.directory = base337 self.debug = False338 if opts is not None:339 for o in opts.split(','):340 if ':' in o:341 k,v = o.split(':')342 if k == 'alias':343 self._aliases.append(split_into_words(v))344 elif o == 'debug':345 self.debug = True346 def equal(self, words):347 if len(words) != len(self._words):348 return False349 for i, w in enumerate(self._words):350 if w != words[i]:351 return False352 return True353 @property354 def aliases(self):355 return self._aliases356 @staticmethod357 def _match_words(lhs_words, rhs_words, full_match=False, partial_match=False, debug=False):358 ret = -1359 first = True360 i = 0361 i_max = len(lhs_words)362 j_max = len(rhs_words)363 num_matches = 0364 while i < i_max:365 rhs_words_start = 0366 while rhs_words_start < j_max and i < i_max:367 try:368 #print('rhs_words.index(%s i=%i, %i)' % (lhs_words, i, rhs_words_start))369 j = rhs_words.index(lhs_words[i], rhs_words_start)370 if first:371 ret = 1372 num_matches += 1373 first = False374 else:375 ret += 1376 rhs_words_start = j + 1377 #print(j)378 k = 1379 while i + k < i_max and j + k < j_max and lhs_words[i + k] == rhs_words[j + k]:380 if debug:381 print('xx k=%i, %s, %s' % (k, lhs_words[i + k], rhs_words[j + k]))382 ret += 10383 num_matches += 1384 k += 1385 if debug:386 print('xx ret=%i' % (ret))387 # add extra point for full match for single words388 if i_max == 1:389 ret += 1390 if debug:391 print('full i=%i/%i, ret=%i' % (i,i_max, ret))392 i = i + k393 except ValueError:394 break395 i += 1396 if full_match:397 if j_max == num_matches:398 if debug:399 print('%s<>%s ret=%i (full match)' % (lhs_words,rhs_words,ret))400 return ret401 else:402 if debug:403 print('%s<>%s ret=-1 (no full match %i!=%i)' % (lhs_words,rhs_words,j_max,num_matches))404 return -1405 elif partial_match:406 if num_matches >= i_max:407 if debug:408 print('%s<>%s ret=%i (partial match)' % (lhs_words,rhs_words,ret))409 return ret410 else:411 if debug:412 print('%s<>%s ret=-1 (no partial match %i!=%i)' % (lhs_words,rhs_words,i_max,num_matches))413 return -1414 else:415 if debug:416 print('%s<>%s %i/%i ret=%i' % (lhs_words,rhs_words,j_max,num_matches, ret))417 return ret418 def match(self, words, full_match=False, partial_match=False, debug=False):419 if self.debug:420 debug = True421 ret = self._match_words(self._words, words, full_match=full_match, partial_match=partial_match, debug=debug)422 for alias in self._aliases:423 alias_score = self._match_words(alias, words, full_match=full_match, partial_match=partial_match, debug=debug)424 if alias_score > ret:425 ret = alias_score426 return ret427 def __eq__(self, rhs):428 return self._words == rhs._words429 def __hash__(self):430 ret = 0431 for w in self._words:432 ret += hash(w)433 return ret434 def __iter__(self):435 return iter(self._words)436 @staticmethod437 def directory_name(words, delim='.'):438 s = []439 for w in words:440 if isinstance(w, datetime.date):441 s.append(str(w))442 else:443 s.append(w)444 return delim.join(s)445 def __str__(self):446 return self.directory_name(self._words)447 def __repr__(self):448 return self.__str__()449class input_file(object):450 def __init__(self, filename, hash_dir=None, entry=None, unknown=False, channels=False):451 self.hash_dir = hash_dir452 self.entry = entry453 self.filename = filename454 self.unknown = unknown455 self.channels = channels456 if not isinstance(self.filename, str):457 raise RuntimeError("self.filename is not a string: %s" % type(self.filename))458 def __str__(self):459 return self.filename460 def __repr__(self):461 return self.__str__()462class video_rename_app:463 def __init__(self):464 self._verbose = False465 self._files = []466 self._video_file_exts = []467 self._rules = []468 self._hash_threshold = 2469 self._hash_unknown_dir = '_unknown'470 self._hashes = []471 self._input_dirs = []472 self._hash_dirs = []473 self._ignored_words = []474 self._combined_words = []475 self._channels = []476 def _load_rules(self, section=None):477 self._rules = []478 for (k,v) in section.get_all():479 if k is None or not '.' in k:480 continue481 k_name, k_ext = k.split('.', 1)482 if k_ext == 're':483 re_pattern = v484 url = section.get(k_name + '.url')485 webpage = section.getAsBoolean(k_name + '.webpage', False)486 enable = section.getAsBoolean(k_name + '.enable', True)487 if url and enable:488 options = { 'webpage': webpage }489 r = video_rule(re_pattern, url, options)490 self._rules.append(r)491 def _load_hashes(self, section):492 self._hashes = []493 for (k,v) in section.get_all():494 if k is None:495 continue496 self._hashes.append(hash_entry(k, v))497 self._hash_unknown_dir = section.get('unknown', '_unknown')498 self._hash_channels_dir = section.get('channels', '_channels')499 def _load_config(self, filename):500 filename = os.path.expanduser(filename)501 f = IniFile(filename, keyValueSeperator='=')502 if f.has_section('rules'):503 self._load_rules(f.section('rules'))504 if f.has_section('hashes'):505 self._load_hashes(f.section('hashes'))506 if self._input_dirs is None or not self._input_dirs:507 self._input_dirs = f.getAsArray(None, 'input')508 if self._hash_dirs is None or not self._hash_dirs:509 self._hash_dirs = []510 for hd in f.getAsArray(None, 'hash_dir'):511 if ',' in hd:512 path,opts = hd.split(',',1)513 else:514 path = hd515 opts = None516 self._hash_dirs.append(hash_directory(path, opts))517 self._video_file_exts = f.getAsArray(None, 'video_file_exts', ['.mp4', '.avi', '.wmv', '.mkv', '.mov', '.vid'])518 self._ignored_words = []519 ignored_words_multi = []520 for w in f.getAsArray(None, 'ignored_words', ['360p', '480p', '720p', '1080p', 'and', 'the', 'a', 'is', 'in', 'of']):521 ww = split_into_words(w)522 if len(ww) > 1:523 #print('ignore %s' % (ww))524 ignored_words_multi.append(ww)525 else:526 self._ignored_words.append(w)527 # put all multi words ignores to the beginning of the list528 for ww in ignored_words_multi:529 self._ignored_words.insert(0, ww)530 self._channels = f.getAsArray(None, 'channels', [])531 for s in self._channels:532 channel_names = s.split(',')533 main_channel_name = channel_names[0]534 for ch in channel_names:535 chw = split_into_words(ch)536 self._combined_words.append( (chw, [main_channel_name] ) )537 def _add_file(self, f):538 fabs = os.path.abspath(f.filename)539 (name, ext) = os.path.splitext(fabs)540 if ext in self._video_file_exts:541 self._files.append(input_file(fabs, hash_dir=f.hash_dir, unknown=f.unknown, channels=f.channels, entry=f.entry))542 elif self._verbose:543 print('Skip non-video file %s' % fabs)544 def _add(self, f):545 if not isinstance(f, input_file):546 raise RuntimeError('require input_file not %s' % type(f))547 if self._verbose:548 print('Process %s' % f)549 if os.path.isfile(f.filename):550 self._add_file(f)551 elif os.path.isdir(f.filename):552 for e in os.listdir(path=f.filename):553 full = os.path.join(f.filename, e)554 if os.path.isdir(full):555 self._add( input_file(full, hash_dir=f.hash_dir, unknown=f.unknown, channels=f.channels, entry=f.entry) )556 elif os.path.isfile(full):557 self._add_file(input_file(full, hash_dir=f.hash_dir, unknown=f.unknown, channels=f.channels, entry=f.entry))558 def _mkdir(self, d):559 ret = True560 if not os.path.isdir(d):561 try:562 os.makedirs(d, exist_ok=True)563 except OSError:564 ret = False565 return ret566 def _rename_file(self, src, dst, overwrite=False):567 ret = False568 dst_dir = os.path.dirname(dst)569 self._mkdir(dst_dir)570 need_copy = False571 try:572 do_rename = True573 if os.path.exists(dst):574 if os.path.islink(dst):575 target = os.readlink(dst)576 if not os.path.isabs(target):577 # convert into absolute path578 target = os.path.join(os.path.dirname(dst), target)579 if os.path.samefile(src,target):580 os.remove(dst)581 #print('Found link from src to dst -> %s' % dst)582 elif os.path.isdir(dst):583 print('Destination %s exists as directory' % (dst), file=sys.stderr)584 do_rename = False585 elif os.path.isfile(dst):586 if overwrite:587 print('Destination file %s already exists. Overwrite' % (dst), file=sys.stderr)588 os.remove(dst)589 else:590 do_rename = False591 print('Destination file %s already exists' % (dst), file=sys.stderr)592 else:593 print('Destination %s already exists' % (dst), file=sys.stderr)594 do_rename = False595 if do_rename:596 os.rename(src, dst)597 ret = True598 except OSError as e:599 # Invalid cross-device link600 if e.errno == 18:601 need_copy = True602 else:603 print('Failed to rename %s to %s: %s' % (src, dst, e), file=sys.stderr)604 if need_copy:605 def _copy_progress(copied, total):606 precent = 100*copied/total607 print('\r Copying %i %% ...' % (precent), end='')608 try:609 print(" Copying", end='')610 copy_with_progress(src, dst, callback=_copy_progress, use_sendfile=False if self._disable_sendfile else True)611 #shutil.copy2(src, dst)612 os.unlink(src)613 ret = True614 print("\r Copy complete")615 except OSError as e:616 print('Failed to copy %s to %s: %s' % (src, dst, e), file=sys.stderr)617 return ret618 def _mklink(self, src, dst):619 ret = False620 dst_dir = os.path.dirname(dst)621 self._mkdir(dst_dir)622 r = os.path.relpath(src, dst_dir)623 try:624 if os.path.islink(dst):625 # remove old link first626 os.remove(dst)627 os.symlink(r, dst, target_is_directory=False)628 ret = True629 except OSError as e:630 print('Failed to create symlink %s to %s: %s' % (src, dst, e), file=sys.stderr)631 return ret632 def _get_html_title(self, url):633 hdr = {'User-Agent':'Mozilla/5.0', 'Accept': '*/*'}634 req = urllib.request.Request(url, headers=hdr)635 try:636 response = urllib.request.urlopen(req)637 if response.status == 200:638 data = response.read() # a `bytes` object639 text = data.decode('utf-8') # a `str`; this step can't be used if data is binary640 #print(text)641 return extract_html_title(text)642 #elif response.status == 302:643 #newurl = response.geturl()644 #print('new url %s' % newurl)645 except urllib.error.HTTPError as e:646 if self._verbose:647 print('HTTP Error %s: %s' % (url, e))648 pass649 return None650 def _clean_title(self, s):651 hash_idx = s.rfind('#')652 if hash_idx > 0:653 s = s[0:hash_idx - 1]654 s = self._clean_filename(s)655 words = split_into_words(s, ignored=self._ignored_words, combined=self._combined_words)656 if self._verbose:657 print('clean filename %s: %s' % (s, words))658 if not words:659 return None660 while len(s) > 200:661 elems = s.split()662 elems.pop()663 s = ' '.join(elems)664 return s665 def _clean_filename(self, s):666 s = s.replace('/', '_')667 s = s.replace('\r', '_')668 s = s.replace('\n', '_')669 s = s.replace('__', '_')670 s = s.strip()671 return s672 def _auto_rename_file(self, f, force=False):673 if not isinstance(f, input_file):674 raise RuntimeError('require input_file not %s' % type(f))675 path, name = os.path.split(f.filename)676 (basename, ext) = os.path.splitext(name)677 found_rule = False678 found_url = None679 for rule in self._rules:680 m = rule.match(basename)681 if m is not None:682 found_rule = True683 found_url = m.expand(rule.url_template)684 if self._verbose:685 print(' %s: Match rule %s/%s -> %s' % (basename, rule.re_pattern, rule.url_template, found_url))686 break687 got_title = False688 suggested_filename = None689 if found_rule:690 try:691 (sts, stdoutdata, stderrdata) = runcmdAndGetData(args=['ffprobe', '-hide_banner', '-v', 'error', '-of', 'json', '-show_format', f.filename])692 except FileNotFoundError as ex:693 print('Cannot execute ffprobe.', file=sys.stderr)694 sts = -1695 if sts == 0:696 file_format = json.loads(stdoutdata.decode('utf8'))697 #print(file_format['format'])698 if 'tags' in file_format['format']:699 tags = file_format['format']['tags']700 if 'title' in tags:701 title = self._clean_title(tags['title'])702 if title:703 got_title = True704 suggested_filename = path + '/' + title + ext705 if not got_title and found_url:706 title = self._get_html_title(found_url)707 if title:708 title = self._clean_title(title)709 if title:710 suggested_filename = path + '/' + title + ('.%s' % basename) + ext711 else:712 new_basename = self._clean_filename(basename)713 if new_basename != basename:714 suggested_filename = path + '/' + new_basename + ext715 if suggested_filename is not None:716 if self._noop:717 print(' Rename to %s (noop)' % (suggested_filename))718 else:719 print(' Rename to %s' % (suggested_filename))720 self._rename_file(f.filename, suggested_filename)721 return suggested_filename722 def _find_hash_entry(self, basename, existing_entry=None, full_match=False, debug=False):723 words = split_into_words(basename, ignored=self._ignored_words, combined=self._combined_words)724 ret = []725 for e in self._hashes:726 score = e.match(words, full_match=full_match, debug=debug)727 if score >= 0:728 ret.append((score, e))729 ret = sorted(ret, key=lambda e: e[0], reverse=True)730 if ret and existing_entry is not None:731 (top_score, top_entry) = ret[0]732 ret.insert(0, (top_score, existing_entry) )733 return ret734 def _auto_hash_file(self, f, force=False):735 if not isinstance(f, input_file):736 raise RuntimeError('require input_file not %s' % type(f))737 path, name = os.path.split(f.filename)738 (basename, ext) = os.path.splitext(name)739 suggested_filename = None740 symlink_filenames = []741 equal_score_entries = []742 entries = self._find_hash_entry(basename, existing_entry=f.entry, debug=False)743 if entries:744 entries = filter(lambda x: x[0] >= self._hash_threshold, entries)745 if entries:746 is_unknown = False747 equal_score = None748 equal_score_entries = []749 for (score, e) in entries:750 if equal_score is None:751 equal_score = score752 equal_score_entries.append(e)753 elif equal_score == score:754 equal_score_entries.append(e)755 if equal_score_entries:756 dest_filenames = []757 for e in equal_score_entries:758 if f.hash_dir is None:759 for hd in self._hash_dirs:760 if hd.readonly:761 continue762 suggested_dir = os.path.join(hd.path, e.directory)763 new_filename = os.path.join(suggested_dir, name)764 dest_filenames.append( (new_filename, hd) )765 else:766 suggested_dir = os.path.join(f.hash_dir.path, e.directory)767 new_filename = os.path.join(suggested_dir, name)768 dest_filenames.append( (new_filename, f.hash_dir) )769 if dest_filenames:770 if self._verbose:771 print(' dest filenames: %s' % (dest_filenames))772 for (new_filename, hd) in dest_filenames:773 is_same = False774 try:775 is_same = os.path.samefile(new_filename, f.filename)776 except OSError:777 pass778 if is_same:779 suggested_filename = new_filename780 else:781 symlink_filenames.append(new_filename)782 if suggested_filename is None:783 suggested_filename = symlink_filenames[0]784 if len(symlink_filenames) > 1:785 symlink_filenames = symlink_filenames[1:]786 else:787 symlink_filenames = []788 if suggested_filename is None:789 # DO not move files if the file is within a hash_dir790 is_within_hash_dir = False791 if f.hash_dir is None:792 for hd in self._hash_dirs:793 is_within_hash_dir, relpath = is_within_directory(f.filename, hd.path)794 if is_within_hash_dir:795 break796 else:797 is_within_hash_dir = True798 if not is_within_hash_dir:799 target_hd = None800 for hd in self._hash_dirs:801 if hd.readonly:802 continue803 target_hd = hd804 break805 if target_hd is not None:806 suggested_filename = os.path.join(target_hd.path, self._hash_unknown_dir, name)807 if suggested_filename is not None:808 if f.filename != suggested_filename:809 if self._noop:810 print(' Hash move to %s (noop)' % (suggested_filename))811 else:812 print(' Hash move to %s' % (suggested_filename))813 self._rename_file(f.filename, suggested_filename)814 for f in symlink_filenames:815 if self._noop:816 print(' Hash symlink to %s (noop)' % (f))817 else:818 print(' Hash symlink to %s' % (f))819 self._mklink(suggested_filename, f)820 def _is_combined_word(self,w):821 ret = False822 for (chw, main_channel_name) in self._combined_words:823 if w in chw:824 ret = True825 break826 return ret827 def main(self):828 #=============================================================================================829 # process command line830 #=============================================================================================831 parser = argparse.ArgumentParser(description='renames video files by extract the video title from the meta data')832 parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='enable verbose output of this script.')833 parser.add_argument('-d', '--debug', dest='debug', action='store_true', help='enable debug output of this script.')834 parser.add_argument('-f', '--force', dest='force', action='store_true', help='force processing of given files.')835 parser.add_argument('-n', '--noop', dest='noop', action='store_true', help='only show what would be done.')836 parser.add_argument('--no-hash', dest='no_hash', action='store_true', help='do not perform hashing')837 parser.add_argument('--no-rename', dest='no_rename', action='store_true', help='do not rename files.')838 parser.add_argument('-H', '--hash', dest='hash_dirs', nargs='*', help='hash video files into given directory')839 parser.add_argument('-q', '--query', dest='queries', nargs='*', help='query for specific files')840 parser.add_argument('-r', '--rehash', dest='rehashs', nargs='*', help='rehash specific words')841 parser.add_argument('--unknown', dest='process_unknown', action='store_true', help='re-process files in unknown directory')842 parser.add_argument('--channels', dest='process_channels', action='store_true', help='re-process files in channels directory')843 parser.add_argument('-c', '--config', dest='config_file', default='~/.config/video-rename.conf', help='load configuration file')844 parser.add_argument('--test-words', dest='test_words', nargs='*', help='testing word processing')845 parser.add_argument('--test-match', dest='test_match', nargs='*', help='test match processing')846 parser.add_argument('--show-hashes', dest='show_hashes', action='store_true', help='show all configured hashes')847 parser.add_argument('--disable-sendfile', dest='disable_sendfile', action='store_true', help='disable usage of sendfile on Unix machines')848 parser.add_argument('files', metavar='FILE', type=str, nargs='*', help='video files or directories')849 args = parser.parse_args()850 self._verbose = args.verbose851 self._debug = args.debug852 self._noop = args.noop853 self._no_hash = args.no_hash854 self._no_rename = args.no_rename855 #self._noop = True856 self._input_dirs = args.files857 self._hash_dirs = []858 for h in expanduser_dirs(args.hash_dirs):859 self._hash_dirs.append(hash_directory(h))860 self._queries = args.queries861 self._rehashs = args.rehashs862 self._process_unknown = args.process_unknown863 self._process_channels = args.process_channels864 self._disable_sendfile = args.disable_sendfile865 self._load_config(filename=args.config_file)866 if self._debug:867 print('Rules:')868 for r in self._rules:869 print(' %s' % r)870 if args.show_hashes:871 print('Hashes:')872 for h in self._hashes:873 print(' %s' % h)874 if h.aliases:875 for a in h.aliases:876 print(' %s' % hash_entry.directory_name(a))877 print('Hash unknown dir: %s' % self._hash_unknown_dir)878 print('Hash channels dir: %s' % self._hash_channels_dir)879 return 0880 if args.test_words:881 if self._debug:882 print('Ignored words:')883 for w in self._ignored_words:884 print(' %s' %w)885 print('Combined words:')886 for (chw, main_channel_name) in self._combined_words:887 print(' %s -> %s' % ('.'.join(chw), '.'.join(main_channel_name)) )888 print('Results:')889 for w in args.test_words:890 if w.startswith('file://'):891 w = w[7:]892 if os.path.exists(w):893 (name, ext) = os.path.splitext(w)894 w = os.path.basename(name)895 words = split_into_words(w, ignored=self._ignored_words, combined=self._combined_words)896 print(' %s' % words)897 return 0898 if args.test_match:899 for w in args.test_match:900 if w.startswith('file://'):901 w = w[7:]902 if os.path.exists(w):903 (name, ext) = os.path.splitext(w)904 w = os.path.basename(name)905 entries = self._find_hash_entry(w, full_match=False, debug=self._debug)906 if entries:907 entries = filter(lambda x: x[0] >= self._hash_threshold, entries)908 if entries:909 for (score, e) in entries:910 print('%03i %s' % (score, e))911 return 0912 input_dirs = []913 for f in expanduser_dirs(self._input_dirs):914 f_hd = None915 f_entry = None916 for hd in self._hash_dirs:917 within, relpath = is_within_directory(f, hd.path)918 if within:919 if os.path.isfile(f):920 f_dir = os.path.dirname(relpath)921 else:922 f_dir = relpath923 #print('%i, %s' % (within, f_dir))924 f_hd = hd925 entries = self._find_hash_entry(f_dir, full_match=True)926 if entries:927 f_score, f_entry = entries[0]928 break929 #print(f, f_hd, f_entry)930 input_dirs.append( input_file(f, hash_dir=f_hd, entry=f_entry) )931 if self._verbose:932 if input_dirs:933 print('Input:')934 for i in input_dirs:935 print(' %s' % i)936 if not self._no_hash:937 print('Hash dirs:')938 for hd in self._hash_dirs:939 print(' %s' % hd)940 if self._process_unknown or self._process_channels:941 input_dirs = []942 elif self._queries:943 # query is a read-only operation944 self._no_hash = True945 self._no_rename = True946 elif self._rehashs:947 input_dirs = []948 # rehash, so required to do some hashing, but no rename949 self._no_hash = False950 self._no_rename = True951 if self._process_unknown:952 for h in self._hash_dirs:953 input_dirs.append( input_file(os.path.join(h.path, self._hash_unknown_dir), hash_dir=h, unknown=True) )954 if self._process_channels:955 for h in self._hash_dirs:956 input_dirs.append( input_file(os.path.join(h.path, self._hash_channels_dir), hash_dir=h, channels=True) )957 for f in input_dirs:958 self._add(f)959 if self._verbose:960 print('Files to process:')961 for i, f in enumerate(self._files):962 print(' [%03i]: %s' % (i,f))963 possible_words = []964 show_possible_words = False965 if self._process_unknown or self._process_channels:966 show_possible_words = True967 word_dict = {}968 for f in self._files:969 (name, ext) = os.path.splitext(f.filename)970 basename = os.path.basename(name)971 words = split_into_words(basename, ignored=self._ignored_words, combined=self._combined_words)972 num_words = len(words)973 for i in range(num_words):974 if i + 1 < num_words:975 if self._is_combined_word(words[i]) or self._is_combined_word(words[i+1]):976 continue977 word_combo = [ words[i], words[i+1] ]978 h = hash_entry( words=word_combo )979 if h in word_dict:980 word_dict[h].add(f)981 else:982 word_dict[h] = set( [ f ] )983 existing_hashes = []984 for (h, files) in word_dict.items():985 found_existing_hash = True if h in self._hashes else False986 if found_existing_hash:987 existing_hashes.append( (h, files) )988 elif len(files) > 1: #self._hash_threshold:989 possible_words.append( (h, files ) )990 if 0:991 # reset the list of files992 self._files = []993 if existing_hashes:994 if self._verbose:995 print('Find files for existing hashes:')996 for (h, files) in existing_hashes:997 if self._verbose:998 print(' %s: %i matches' % (h, len(files)))999 for f in files:1000 if self._verbose:1001 print(' %s' % f)1002 self._files.append(f)1003 if self._verbose:1004 if self._files:1005 print('Files for rehashing:')1006 for f in files:1007 print(' %s' % f)1008 elif self._queries:1009 for q in self._queries:1010 entries = self._find_hash_entry(q, full_match=True, debug=self._debug)1011 if entries:1012 for (score, e) in entries:1013 print('Found hash entry \'%s\':' % (e))1014 results = []1015 for hd in self._hash_dirs:1016 d = os.path.join(hd.path, str(e))1017 if os.path.isdir(d):1018 for f in os.listdir(d):1019 results.append( (f, os.path.join(d, f), hd ) )1020 for f in self._files:1021 (name, ext) = os.path.splitext(f.filename)1022 basename = os.path.basename(name)1023 words = split_into_words(basename, ignored=self._ignored_words, combined=self._combined_words)1024 score = e.match(words, full_match=False)1025 if score >= 0:1026 results.append( (basename, f.filename, None ) )1027 for (basename, full, hd) in sorted(results, key=lambda x: x[0]):1028 if hd is None:1029 print(' %s in %s' % (basename, os.path.dirname(full)))1030 else:1031 print(' %s' % basename)1032 if 1:1033 tmp_hash_entry = hash_entry(q)1034 results = []1035 for hd in self._hash_dirs:1036 d = os.path.join(hd.path, self._hash_unknown_dir)1037 if os.path.isdir(d):1038 for f in os.listdir(d):1039 (basename, ext) = os.path.splitext(f)1040 words = split_into_words(basename, ignored=self._ignored_words, combined=self._combined_words)1041 score = tmp_hash_entry.match(words, partial_match=True)1042 if score >= 0:1043 results.append( (f, os.path.join(d, f), hd ) )1044 for (basename, full, hd) in sorted(results, key=lambda x: x[0]):1045 print(' %s in %s' % (basename, os.path.dirname(full)))1046 elif self._rehashs:1047 for q in self._rehashs:1048 entries = self._find_hash_entry(q, full_match=True)1049 if entries:1050 for (score, e) in entries:1051 print('Found hash entry: %s:' % (e))1052 for hd in self._hash_dirs:1053 d = os.path.join(hd.path, str(e))1054 if os.path.isdir(d):1055 self._add(input_file(d, hash_dir=hd, entry=e))1056 total = len(self._files)1057 total_progress = 01058 current_progress = 01059 if not self._no_rename:1060 total_progress += total1061 if not self._no_hash:1062 total_progress += total1063 if not self._no_rename:1064 for i in range(total):1065 print('[%03i/%03i] Rename %s' % (current_progress + 1, total_progress, self._files[i]))1066 current_progress += 11067 f_new = self._auto_rename_file(self._files[i], force=args.force)1068 if f_new is not None:1069 self._files[i] = input_file(f_new, hash_dir=f.hash_dir)1070 if not self._no_hash:1071 for i in range(total):1072 f = self._files[i]1073 print('[%03i/%03i] Hash %s' % (current_progress + 1, total_progress, self._files[i]))1074 current_progress += 11075 self._auto_hash_file(self._files[i], force=args.force)1076 if possible_words and show_possible_words:1077 possible_words = sorted(possible_words, key=lambda e: len(e[1]), reverse=False)1078 print('Possible words:')1079 for (h, files) in possible_words:1080 print(' %s: %i matches' % (h, len(files)))1081 if self._debug:1082 for f in files:1083 print(' %s' % f)1084 for (h, files) in possible_words:1085 if len(files) > 2:1086 print('%s=' % (h))1087 ret = 01088 return ret1089if __name__ == "__main__":1090 app = video_rename_app()...
x509helper.py
Source:x509helper.py
1#!/usr/bin/env python22# -*- coding: utf-8 -*-3"""4Copyright (C) 2011-20185 Adam Greene <copyright@mzpqnxow.com>6Please see LICENSE or LICENSE.md for terms7"""8from Crypto.Util import asn19import base6410import OpenSSL11import os12import textwrap13import logging14import sys15from logger import LoggingMixin16class X509Helper(LoggingMixin):17 """18 base class for X509HelperCertificate and X509HelperKey19 functions for formatting common fieldsas errtc.20 """21 # when producing suggested_filename, how many bytes of the modules to use22 # in naming23 FILENAME_OCTETS = 624 def __init__(25 self,26 logger=None,27 log_level=logging.CRITICAL,28 blacklist_file=''):29 self.modulus_blacklist = []30 self.modulus_blacklist_config_path = blacklist_file31 if logger is None:32 LoggingMixin.__init__(self, log_level=log_level)33 else:34 self.logger = logger35 self.load_modulus_blacklist()36 def load_modulus_blacklist(self):37 if not self.modulus_blacklist_config_path:38 return39 try:40 f = open(self.modulus_blacklist_config_path, 'rb')41 for modulus_line in f.readlines():42 eline = modulus_line43 eline = eline.strip('\n')44 eline = eline.upper()45 self.modulus_blacklist.append(eline)46 f.close()47 self.logger.debug('Added {} items to modulus blacklist...'.format(48 len(self.modulus_blacklist)))49 except Exception as err:50 self.logger.error(51 'Fatal exception occurred while building blacklist...')52 self.logger.error(err)53 sys.exit(10)54 def is_blacklisted(self, modulus):55 return modulus.upper() in self.modulus_blacklist56 def printable_modulus(57 self,58 der_decimal,59 columns=16,60 prefix='\t',61 use_colons=True):62 modulus = hex(der_decimal).rstrip('L').lstrip('0x')or '0'63 printable_modulus = ''64 for i in xrange(0, len(modulus), 2):65 if i:66 if not (i % columns):67 printable_modulus += '\n' + prefix68 else:69 if use_colons:70 printable_modulus += ':'71 else:72 printable_modulus += prefix73 printable_modulus += modulus[i:i + 2]74 return printable_modulus75 def modulus_long_to_string(self, der_decimal):76 modulus = hex(der_decimal).rstrip('L').lstrip('0x')or '0'77 printable_modulus = ''78 for i in xrange(0, len(modulus), 2):79 printable_modulus += modulus[i:i + 2]80 return printable_modulus81class X509HelperKey(X509Helper):82 def __init__(83 self,84 key_file,85 blacklist_file=None,86 logger=None,87 password='password'):88 X509Helper.__init__(89 self, logger=logger, blacklist_file=blacklist_file)90 self.key_pem_buffer = None91 self.rsa_key = None92 self.key_private_asn1 = None93 self.key_private_der = None94 self.key_modulus = None95 self.key_public_exponent = None96 self.key_private_exponent = None97 self.key_private_prime1 = None98 self.key_private_prime2 = None99 self.key_private_exponent1 = None100 self.key_private_exponent2 = None101 self.key_private_coefficient = None102 self.key_file = key_file103 self.password = password104 if blacklist_file:105 self.modulus_blacklist_config_path = blacklist_file106 self.subjects = ''107 self.parsed_key = None108 self.parse_key_file_universal()109 def __eq__(self, obj):110 if isinstance(obj, X509HelperCertificate):111 return ((self.key_modulus == obj.certificate_public_modulus) and (112 obj.certificate_public_exponent == self.key_public_exponent))113 else:114 return False115 def passwd_cb(self):116 self.logger.info('Returning password "{}"'.format(self.password))117 return self.password118 def der_key_to_pem_key(self, der_buffer):119 PEM_HEADER = '-----BEGIN RSA PRIVATE KEY-----'120 PEM_FOOTER = '-----END RSA PRIVATE KEY-----'121 f = str(base64.standard_b64encode(der_buffer))122 return (PEM_HEADER + '\n' + textwrap.fill(f, 64) + '\n' + PEM_FOOTER)123 def write_to_file(self, dirname=''):124 """ also sets self.subject """125 if dirname and not dirname.endswith('/'):126 dirname += '/'127 try:128 os.stat(os.path.join(dirname, self.suggested_filename))129 return '' # dup, already processed this key130 except (IOError, OSError):131 pass # file doesn't exist, process this entry132 with open(os.path.join(dirname, self.suggested_filename), 'wb') as filefd:133 # dos2unix and add a trailing newline134 filefd.write(self.key_pem_buffer.replace('\r\n', '\n') + '\n')135 self.subjects = self.suggested_filename.ljust(136 25) + ' - %d bit RSA Private Key (PEM format)'.format(self.key_bitsize)137 return self.subjects138 def key(self):139 return self.parsed_key140 def parse_key_file_universal(self):141 try:142 self.logger.info('Parsing key {}'.format(self.key_file))143 self.key_buffer = open(self.key_file, 'rb').read()144 self.crypto = OpenSSL.crypto145 try:146 self.logger.warning(147 'Trying to load {} data as PEM...'.format(148 self.key_file))149 self.rsa_key = self.crypto.load_privatekey(150 self.crypto.FILETYPE_PEM, self.key_buffer, 'password')151 self.key_pem_buffer = self.key_buffer152 except Exception as err:153 pass154 if not self.rsa_key or not self.key_pem_buffer:155 try:156 self.logger.warning(157 'Trying to load {} data as DER...'.format(158 self.key_file))159 self.rsa_key = self.crypto.load_privatekey(160 self.crypto.FILETYPE_ASN1, self.key_buffer, 'password')161 self.key_pem_buffer = self.der_key_to_pem_key(162 self.key_buffer)163 except Exception as err:164 self.logger.warning(165 'Failure to parse {} as DER/PEM format key, skipping...'.format(self.key_file))166 raise(err)167 self.key_bitsize = self.rsa_key.bits()168 self.key_private_asn1 = self.crypto.dump_privatekey(169 self.crypto.FILETYPE_ASN1, self.rsa_key)170 self.key_private_der = asn1.DerSequence()171 self.key_private_der.decode(self.key_private_asn1)172 self.key_modulus = self.key_private_der[1]173 self.key_printable_private_modulus = self.printable_modulus(174 self.key_modulus)175 d = self.modulus_long_to_string(self.key_modulus)176 if self.is_blacklisted(d):177 self.logger.info('found blacklisted key...')178 self.parsed_key = None179 return180 self.key_public_exponent = self.key_private_der[2]181 self.key_private_exponent = self.key_private_der[3]182 self.key_private_prime1 = self.key_private_der[4]183 self.key_private_prime2 = self.key_private_der[5]184 self.key_private_exponent1 = self.key_private_der[6]185 self.key_private_exponent2 = self.key_private_der[7]186 self.key_private_coefficient = self.key_private_der[8]187 self.suggested_filename = self.key_printable_private_modulus.replace(188 '\t', '')189 self.suggested_filename = self.suggested_filename[len(190 self.suggested_filename) - (3 * self.FILENAME_OCTETS) + 1:]191 self.suggested_filename += '.key'192 self.suggested_filename = self.suggested_filename.replace(':', '_')193 self.suggested_filename = self.suggested_filename.replace('\r', '')194 self.suggested_filename = self.suggested_filename.replace('\n', '')195 self.parsed_key = {}196 self.parsed_key['public_exponent'] = self.key_public_exponent197 self.parsed_key['private_exponent'] = self.key_private_exponent198 self.parsed_key['private_prime1'] = self.key_private_prime1199 self.parsed_key['private_prime2'] = self.key_private_prime2200 self.parsed_key['private_exponent1'] = self.key_private_exponent1201 self.parsed_key['private_exponent2'] = self.key_private_exponent2202 self.parsed_key['private_coefficient'] = self.key_private_coefficient203 self.parsed_key['key_bitsize'] = self.key_bitsize204 self.parsed_key['suggested_filenmame'] = self.suggested_filename205 self.logger.critical(206 'Success - %d bit RSA Private Key (PEM format)'.format(self.key_bitsize))207 return208 except Exception as err:209 self.parsed_key = None210 self.logger.debug(err)211 self.logger.debug(212 'Exception with {} as an RSA key file, skipping...'.format(213 self.key_file))214 return215class X509HelperCertificate(X509Helper):216 def __init__(self, certificate_file, blacklist_file=None, logger=None):217 X509Helper.__init__(self, logger=logger, blacklist_file=blacklist_file)218 self.certificate_pem_buffer = None219 self.certificate_pubkey_PKey = None220 self.certificate_pubkey_asn1 = None221 self.certificate_pubkey_der = None222 self.certificate_public_modulus = None223 self.certificate_public_exponent = None224 self.text_subject = ''225 self.subject_dict = {}226 self.certificate_file = certificate_file227 self.parsed_certificate = None228 self.parse_certificate_file_universal()229 def __eq__(self, obj):230 if isinstance(obj, X509HelperKey):231 return (obj.key_modulus == self.certificate_public_modulus) and (232 obj.key_public_exponent == self.certificate_public_exponent)233 else:234 return False235 def OID_tuple_to_string(self, oid_tuple):236 i = 0237 s = ''238 for v in oid_tuple:239 if i:240 s = s + '.'241 i += 1242 s = s + str(v)243 return s244 def certificate(self):245 return self.parsed_certificate246 def lookup_OID(self, oid_tuple):247 """Map an OID in tuple form to a symbolic name248 We know a few for unknown OIDs, translate the OID to a string and use that249 a la OpenSSL250 Fill these in yourself if you want, the documentation is listed in the comments251 """252 lookup = {253 # Organization - http://www.oid-info.com/get/2.5.4.10254 # http://www.oid-info.com/get/0.9.2342.19200300.100.1.1255 (2, ): 'ISO/ITU-T',256 (2, 5): 'X.500 Directory Services',257 (2, 5, 4): 'X.500 Attribute Types',258 (2, 5, 4, 0): 'id-at-objectClass',259 (2, 5, 4, 1): 'id-at-aliasedEntryName',260 (2, 5, 4, 2): 'id-at-knowldgeinformation',261 (2, 5, 4, 3): 'id-at-commonName',262 (2, 5, 4, 4): 'id-at-surname',263 (2, 5, 4, 5): 'id-at-serialNumber',264 (2, 5, 4, 6): 'id-at-countryName',265 (2, 5, 4, 7): 'id-at-localityName',266 (2, 5, 4, 8): 'id-at-stateOrProvinceName',267 (2, 5, 4, 9): 'id-at-streetAddress',268 (2, 5, 4, 10): 'id-at-organizationName',269 (2, 5, 4, 11): 'id-at-organizationalUnitName',270 (2, 5, 4, 12): 'id-at-title',271 (2, 5, 4, 13): 'id-at-description',272 (2, 5, 4, 14): 'id-at-searchGuide',273 (2, 5, 4, 15): 'id-at-businessCategory',274 (2, 5, 4, 16): 'id-at-postalAddress',275 (2, 5, 4, 17): 'id-at-postalCode',276 (2, 5, 4, 18): 'id-at-postOfficeBox',277 (2, 5, 4, 19): 'id-at-physicalDeliveryOfficeName',278 (2, 5, 4, 20): 'id-at-telephoneNumber',279 (2, 5, 4, 21): 'id-at-telexNumber',280 (2, 5, 4, 22): 'id-at-teletexTerminalIdentifier',281 (2, 5, 4, 23): 'id-at-facsimileTelephoneNumber',282 (2, 5, 4, 24): 'id-at-x121Address',283 (2, 5, 4, 25): 'id-at-internationalISDNNumber',284 (2, 5, 4, 26): 'id-at-registeredAddress',285 (2, 5, 4, 27): 'id-at-destinationIndicator',286 (2, 5, 4, 28): 'id-at-preferredDeliveryMethod',287 (2, 5, 4, 29): 'id-at-presentationAddress',288 (2, 5, 4, 30): 'id-at-supportedApplicationContext',289 (2, 5, 4, 31): 'id-at-member',290 (2, 5, 4, 32): 'id-at-owner',291 (2, 5, 4, 33): 'id-at-roleOccupant',292 (2, 5, 4, 34): 'id-at-seeAlso',293 (2, 5, 4, 35): 'id-at-userPassword',294 (2, 5, 4, 36): 'id-at-userCertificate',295 (2, 5, 4, 37): 'id-at-cACertificate',296 (2, 5, 4, 38): 'id-at-authorityRevocationList',297 (2, 5, 4, 39): 'id-at-certificateRevocationList',298 (2, 5, 4, 40): 'id-at-crossCertificatePair',299 (2, 5, 4, 41): 'id-at-name',300 (2, 5, 4, 42): 'id-at-givenName',301 (2, 5, 4, 43): 'id-at-initials',302 (2, 5, 4, 44): 'id-at-generationQualifier',303 (2, 5, 4, 45): 'id-at-uniqueIdentifier',304 (2, 5, 4, 46): 'id-at-dnQualifier',305 (2, 5, 4, 47): 'id-at-enhancedSearchGuide',306 (2, 5, 4, 48): 'id-at-protocolInformation',307 (2, 5, 4, 49): 'id-at-distinguishedName',308 (2, 5, 4, 50): 'id-at-uniqueMember',309 (2, 5, 4, 51): 'id-at-houseIdentifier',310 (2, 5, 4, 52): 'id-at-supportedAlgorithms',311 (2, 5, 4, 53): 'id-at-deltaRevocationList',312 (2, 5, 4, 58): 'Attribute Certificate attribute (id-at-attributeCertificate)',313 (2, 5, 4, 65): 'id-at-pseudonym'}314 return lookup.get(oid_tuple, self.OID_tuple_to_string(oid_tuple))315 def handle_custom_oids(self):316 """317 process self.subject (asn1/der) in order to produce a subject string for humans to read318 this is called for non-standard Sybject stringsas errspecially the custom OIDs319 OpenSSL native can parse this fine, but python bindings can't, they assign the subject a320 field named 'UNDEF' it is OK if this function fails, it is just best effort to improve321 the 'UNDEF' description...322 """323 certType = Name()324 derData = self.subject.der()325 cert, rest = asn1.codec.der.decoder(derData, asn1spec=certType)326 try:327 subject = ''328 extensions = cert.getComponentByPosition(0)329 i = 0330 while True:331 pair = extensions.getComponentByPosition(i)332 pair = pair.getComponentByPosition(0)333 name = pair.getComponentByPosition(0).asTuple()334 value = pair.getComponentByPosition(1).getComponent()335 name = self.lookup_OID(name)336 if i != 0:337 subject += '/'338 subject += '{}={}'.format(name, value)339 i += 1340 except Exception as err:341 self.logger.debug('expected exception, ignoring...')342 self.logger.debug(err)343 return subject344 def write_to_file(self, dirname=''):345 """ also sets self.summary """346 if dirname and not dirname.endswith('/'):347 dirname += '/'348 try:349 os.stat(os.path.join(dirname, self.suggested_filename))350 return '' # dup, already processed this key351 except (IOError, OSError):352 pass # file doesn't exist, process this entry353 with open(os.path.join(dirname, self.suggested_filename), 'wb') as filefd:354 # dos2unix and add a trailing newline355 filefd.write(356 self.certificate_pem_buffer.replace(357 '\r\n', '\n') + '\n')358 self.summary = self.suggested_filename.ljust(359 25) + ' - ' + self.text_subject360 return self.summary361 def parse_subject_components(self):362 s = {}363 for c in self.subject_components:364 s[c[0]] = c[1]365 self.subject_components = s366 if 'UNDEF' in self.subject_components:367 try:368 self.text_subject += self.handle_custom_oids()369 except Exception:370 self.logger.error(371 'unexpected exception in handle_custom_oids!')372 self.text_subject += 'UNDEF=0'373 else:374 for key in self.subject_components:375 self.text_subject += key + '=' + \376 self.subject_components[key] + '/'377 return378 def get_subject_field(self, field):379 if field in self.subject_components:380 return self.subject_components[field]381 else:382 return None383 def der_cert_to_pem_cert(self, der_buffer):384 """385 Takes a certificate in binary DER format and returns the386 PEM version of it as a string.387 """388 PEM_HEADER = '-----BEGIN CERTIFICATE-----'389 PEM_FOOTER = '-----END CERTIFICATE-----'390 b64 = str(base64.standard_b64encode(der_buffer))391 return (PEM_HEADER + '\n' + textwrap.fill(b64, 64) + '\n' + PEM_FOOTER)392 def parse_certificate_file_universal(self):393 try:394 self.logger.warning(395 'OK, trying to process certificate file %s...'.format(396 self.certificate_file))397 self.c = OpenSSL.crypto398 self.certificate_buffer = open(399 self.certificate_file,400 'rb').read()401 try: # assume it is PEM first402 self.x509_certificate = self.c.load_certificate(403 self.c.FILETYPE_PEM,404 self.certificate_buffer)405 self.certificate_pem_buffer = self.certificate_buffer406 except Exception as err: # not PEM, try to treat it as DER407 self.x509_certificate = None408 self.certificate_der_buffer = self.certificate_buffer409 self.certificate_pem_buffer = self.der_cert_to_pem_cert(410 self.certificate_der_buffer)411 if not self.x509_certificate:412 self.x509_certificate = self.c.load_certificate(413 self.c.FILETYPE_PEM,414 self.certificate_pem_buffer)415 self.certificate_pubkey_PKey = self.x509_certificate.get_pubkey()416 self.subject = self.x509_certificate.get_subject()417 self.subject_components = self.subject.get_components()418 self.certificate_pubkey_asn1 = self.c.dump_privatekey(419 self.c.FILETYPE_ASN1,420 self.certificate_pubkey_PKey)421 self.certificate_pubkey_der = asn1.DerSequence()422 self.certificate_pubkey_der.decode(self.certificate_pubkey_asn1)423 self.certificate_public_modulus = self.certificate_pubkey_der[1]424 self.certificate_key_bitsize = self.certificate_public_modulus.bit_length()425 d = self.modulus_long_to_string(self.certificate_public_modulus)426 if self.is_blacklisted(d):427 self.logger.info('found blacklisted certificate...')428 return429 self.certificate_printable_public_modulus = self.printable_modulus(430 self.certificate_pubkey_der[1])431 self.certificate_public_exponent = self.certificate_pubkey_der[2]432 self.suggested_filename = self.certificate_printable_public_modulus.replace(433 '\t', '')434 self.suggested_filename = self.suggested_filename[len(435 self.suggested_filename) - (3 * self.FILENAME_OCTETS) + 1:]436 self.suggested_filename += '.cert'437 self.suggested_filename = self.suggested_filename.replace(':', '_')438 self.suggested_filename = self.suggested_filename.replace('\r', '')439 self.suggested_filename = self.suggested_filename.replace('\n', '')440 self.parse_subject_components()441 self.parsed_certificate = {}442 self.parsed_certificate['key_bitsize'] = self.certificate_key_bitsize443 self.parsed_certificate['public_modulus'] = self.certificate_public_modulus444 self.parsed_certificate['text_subject'] = self.text_subject445 self.parsed_certificate['suggested_filename'] = self.suggested_filename446 self.logger.critical('Success - %s', self.text_subject)447 return448 except Exception as err:449 self.logger.warning(err)450 self.logger.warning(451 'Failure to parse {} as DER/PEM format certificate, skipping...'.format(452 self.certificate_file))...
api.py
Source:api.py
1import subprocess2import os3import unicodedata4import json5import falcon6from datetime import date, datetime7from butterknife.pool import Subvol8class MyEncoder(json.JSONEncoder):9 def default(self, obj):10 if isinstance(obj, datetime):11 return obj.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + "Z"12 if isinstance(obj, date):13 return obj.strftime('%Y-%m-%d')14 if isinstance(obj, map):15 return tuple(obj)16 if isinstance(obj, Subvol):17 return obj.version18 return json.JSONEncoder.default(self, obj)19def parse_subvol(func):20 def wrapped(instance, req, resp, subvol, *args, **kwargs):21 return func(instance, req, resp, Subvol("@" + subvol), *args, **kwargs)22 return wrapped23def serialize(func):24 """25 Falcon response serialization26 """27 def wrapped(instance, req, resp, **kwargs):28 assert not req.get_param("unicode") or req.get_param("unicode") == u"â", "Unicode sanity check failed"29 resp.set_header("Cache-Control", "no-cache, no-store, must-revalidate");30 resp.set_header("Pragma", "no-cache");31 resp.set_header("Expires", "0");32 r = func(instance, req, resp, **kwargs)33 if not resp.body:34 if not req.client_accepts_json:35 raise falcon.HTTPUnsupportedMediaType(36 'This API only supports the JSON media type.',37 href='http://docs.examples.com/api/json')38 resp.set_header('Content-Type', 'application/json')39 resp.body = json.dumps(r, cls=MyEncoder)40 return r41 return wrapped42from jinja2 import Environment, PackageLoader, FileSystemLoader43env = Environment(loader=PackageLoader('butterknife', 'templates'))44def templatize(path):45 template = env.get_template(path)46 def wrapper(func):47 def wrapped(instance, req, resp, **kwargs):48 assert not req.get_param("unicode") or req.get_param("unicode") == u"â", "Unicode sanity check failed"49 r = func(instance, req, resp, **kwargs)50 r.pop("self", None)51 if not resp.body:52 if req.get_header("Accept") == "application/json":53 resp.set_header("Cache-Control", "no-cache, no-store, must-revalidate");54 resp.set_header("Pragma", "no-cache");55 resp.set_header("Expires", "0");56 resp.set_header('Content-Type', 'application/json')57 resp.body = json.dumps(r, cls=MyEncoder)58 return r59 else:60 resp.set_header('Content-Type', 'text/html')61 resp.body = template.render(request=req, **r)62 return r63 return wrapped64 return wrapper65class PoolResource(object):66 def __init__(self, pool, subvol_filter):67 self.pool = pool68 self.subvol_filter = subvol_filter69class SubvolResource(PoolResource):70 @templatize("index.html")71 def on_get(self, req, resp):72 def subvol_generator():73 for subvol in sorted(self.pool.subvol_list(), reverse=True):74 if req.get_param("architecture"):75 if req.get_param("architecture") != subvol.architecture:76 continue77 yield subvol78 return { "subvolumes": tuple(subvol_generator()) }79class TemplateResource(PoolResource):80 @serialize81 def on_get(self, req, resp):82 return {"templates": map(83 lambda j:{"namespace": j[0], "identifier":j[1], "architectures":j[2]},84 self.pool.template_list(self.subvol_filter))}85class VersionResource(PoolResource):86 @serialize87 def on_get(self, req, resp, name, arch):88 namespace, identifier = name.rsplit(".", 1)89 subset_filter = self.subvol_filter.subset(namespace=namespace,90 identifier=identifier, architecture=arch)91 return { "versions": map(92 lambda v:{"identifier":v, "signed":v.signed},93 sorted(subset_filter.apply(self.pool.subvol_list()), reverse=True, key=lambda j:j.numeric_version)) }94class LegacyStreamingResource(PoolResource):95 def on_get(self, req, resp, name, arch, version):96 parent_version = req.get_param("parent")97 subvol = "@template:%(name)s:%(arch)s:%(version)s" % locals()98 if not self.subvol_filter.match(Subvol(subvol)):99 raise Exception("Not going to happen")100 suggested_filename = "%(name)s:%(arch)s:%(version)s" % locals()101 if parent_version:102 parent_subvol = "@template:%(name)s:%(arch)s:%(parent_version)s" % locals()103 if not self.subvol_filter.match(Subvol(parent_subvol)): raise104 suggested_filename += ":" + parent_version105 else:106 parent_subvol = None107 suggested_filename += ".far"108 resp.set_header("Content-Disposition", "attachment; filename=\"%s\"" % suggested_filename)109 resp.set_header('Content-Type', 'application/btrfs-stream')110 streamer = self.pool.send(subvol, parent_subvol)111 resp.stream = streamer.stdout112 accepted_encodings = req.get_header("Accept-Encoding") or ""113 accepted_encodings = [j.strip() for j in accepted_encodings.lower().split(",")]114 if "gzip" in accepted_encodings:115 for cmd in "/usr/bin/pigz", "/bin/gzip":116 if os.path.exists(cmd):117 resp.set_header('Content-Encoding', 'gzip')118 print("Compressing with %s" % cmd)119 compressor = subprocess.Popen((cmd,"--fast"), bufsize=-1, stdin=streamer.stdout, stdout=subprocess.PIPE)120 resp.stream = compressor.stdout121 break122 else:123 print("No gzip compressors found, falling back to no compression")124 else:125 print("Client did not ask for compression")126class StreamResource(PoolResource):127 @parse_subvol128 def on_get(self, req, resp, subvol):129 if not self.subvol_filter.match(subvol):130 resp.body = "Subvolume does not match filter"131 resp.status = falcon.HTTP_403132 return133 format = req.get_param("format") or "btrfs-stream"134 if format == "btrfs-stream":135 parent_slug = req.get_param("parent")136 suggested_filename = "%s.%s-%s-%s" % (subvol.namespace, subvol.identifier, subvol.architecture, subvol.version)137 if parent_slug:138 parent_subvol = Subvol(parent_slug) if parent_slug else None139 if not self.subvol_filter.match(parent_subvol):140 resp.body = "Subvolume does not match filter"141 resp.status = falcon.HTTP_403142 return143 suggested_filename += "-" + parent_subvol.version144 else:145 parent_subvol = None146 suggested_filename += ".far"147 resp.set_header('Content-Type', 'application/btrfs-stream')148 try:149 streamer = self.pool.send(subvol, parent_subvol)150 except SubvolNotFound as e:151 resp.body = "Could not find subvolume %s\n" % str(e)152 resp.status = falcon.HTTP_403153 return154 elif format == "tar":155 suggested_filename = "%s.%s-%s-%s.tar" % (subvol.namespace, subvol.identifier, subvol.architecture, subvol.version)156 try:157 streamer = self.pool.tar(subvol)158 except SubvolNotFound as e:159 resp.body = "Could not find subvolume %s\n" % str(e)160 resp.status = falcon.HTTP_403161 return162 else:163 resp.body = "Requested unknown format"164 resp.status = falcon.HTTP_403165 return166 resp.stream = streamer.stdout167 resp.set_header("Content-Disposition", "attachment; filename=\"%s\"" % suggested_filename)168 accepted_encodings = req.get_header("Accept-Encoding") or ""169 accepted_encodings = [j.strip() for j in accepted_encodings.lower().split(",")]170 if "gzip" in accepted_encodings:171 for cmd in "/usr/bin/pigz", "/bin/gzip":172 if os.path.exists(cmd):173 resp.set_header('Content-Encoding', 'gzip')174 print("Compressing with %s" % cmd)175 compressor = subprocess.Popen((cmd,"--fast"), bufsize=-1, stdin=streamer.stdout, stdout=subprocess.PIPE)176 resp.stream = compressor.stdout177 return178 else:179 print("No gzip compressors found, falling back to no compression")180 else:181 print("Client did not ask for compression")182class ManifestResource(PoolResource):183 """184 Generate manifest for a subvolume185 """186 @parse_subvol187 def on_get(self, req, resp, subvol):188 if not self.subvol_filter.match(subvol):189 resp.body = "Subvolume does not match filter"190 resp.status = falcon.HTTP_403191 return192 suggested_filename = "%s.%s-%s-%s.csv" % (subvol.namespace, subvol.identifier, subvol.architecture, subvol.version)193 resp.set_header('Content-Type', 'text/plain')194 resp.stream = self.pool.manifest(subvol)195class KeyringResource(object):196 def __init__(self, filename):197 self.filename = filename198 def on_get(self, req, resp):199 resp.set_header("Content-Type", "application/x-gnupg-keyring")200 resp.set_header("Content-Disposition", "attachment; filename=\"%s.gpg\"" % req.env["SERVER_NAME"].replace(".", "_")) # HTTP_HOST instead? Underscore *should* not be allowed in hostname201 resp.stream = open(self.filename, "rb")202class SignatureResource(PoolResource):203 @parse_subvol204 def on_get(self, req, resp, subvol):205 if not self.subvol_filter.match(subvol):206 resp.body = "Subvolume does not match filter"207 resp.status = falcon.HTTP_403208 return209 try:210 resp.stream = self.pool.signature(subvol)211 suggested_filename = "%s.%s-%s-%s.asc" % (subvol.namespace, subvol.identifier, subvol.architecture, subvol.version)212 resp.set_header('Content-Type', 'text/plain')213 resp.set_header("Cache-Control", "public")214 except FileNotFoundError:215 resp.body = "Signature for %s not found" % subvol216 resp.status = falcon.HTTP_404217class PackageDiff(PoolResource):218 @templatize("packages.html")219 @parse_subvol220 def on_get(self, req, resp, subvol):221 print(subvol.domain)222 if not self.subvol_filter.match(subvol):223 resp.body = "Subvolume does not match filter"224 resp.status = falcon.HTTP_403225 return226 parent_subvol = req.get_param("parent")227 # TODO: Add heuristics to determine package management system,228 # at least don't die with RPM systems229 def dpkg_list(root):230 """231 Return dict of package names and versions corresponding to a232 Debian/Ubuntu etc root filesystem233 """234 package_name = None235 package_version = None236 versions = {}237 for line in open(os.path.join(root, "var/lib/dpkg/status")):238 line = line[:-1]239 if not line:240 assert package_name, "No package name specified!"241 assert package_version, "No package version specified!"242 versions[package_name] = package_version243 package_name = None244 package_version = None245 continue246 if ": " not in line:247 continue248 key, value = line.split(": ", 1)249 if key == "Package":250 package_name = value251 continue252 if key == "Version":253 package_version = value254 continue255 return versions256 new = dpkg_list("/var/lib/butterknife/pool/%s" % subvol)257 if not parent_subvol:258 packages_diff = False259 packages_intact = sorted(new.items())260 else:261 packages_diff = True262 if not self.subvol_filter.match(Subvol(parent_subvol)):263 resp.body = "Parent subvolume does not match filter"264 resp.status = falcon.HTTP_403265 return266 old = dpkg_list("/var/lib/butterknife/pool/%s" % parent_subvol)267 packages_added = []268 packages_removed = []269 packages_updated = []270 packages_intact = []271 for key in sorted(set(new) & set(old)):272 old_version = old[key]273 new_version = new[key]274 if old_version != new_version:275 packages_updated.append((key, old_version, new_version))276 else:277 packages_intact.append((key, old_version))278 for key in sorted(set(new) - set(old)):279 packages_added.append((key, new[key]))280 for key in sorted(set(old) - set(new)):281 packages_removed.append((key, old[key]))...
importer.py
Source:importer.py
1"""Import logic for blueprint."""2from __future__ import annotations3from dataclasses import dataclass4import html5import re6import voluptuous as vol7import yarl8from homeassistant.core import HomeAssistant9from homeassistant.exceptions import HomeAssistantError10from homeassistant.helpers import aiohttp_client, config_validation as cv11from homeassistant.util import yaml12from .models import Blueprint13from .schemas import is_blueprint_config14COMMUNITY_TOPIC_PATTERN = re.compile(15 r"^https://community.home-assistant.io/t/[a-z0-9-]+/(?P<topic>\d+)(?:/(?P<post>\d+)|)$"16)17COMMUNITY_CODE_BLOCK = re.compile(18 r'<code class="lang-(?P<syntax>[a-z]+)">(?P<content>(?:.|\n)*)</code>', re.MULTILINE19)20GITHUB_FILE_PATTERN = re.compile(21 r"^https://github.com/(?P<repository>.+)/blob/(?P<path>.+)$"22)23COMMUNITY_TOPIC_SCHEMA = vol.Schema(24 {25 "slug": str,26 "title": str,27 "post_stream": {"posts": [{"updated_at": cv.datetime, "cooked": str}]},28 },29 extra=vol.ALLOW_EXTRA,30)31class UnsupportedUrl(HomeAssistantError):32 """When the function doesn't support the url."""33@dataclass(frozen=True)34class ImportedBlueprint:35 """Imported blueprint."""36 suggested_filename: str37 raw_data: str38 blueprint: Blueprint39def _get_github_import_url(url: str) -> str:40 """Convert a GitHub url to the raw content.41 Async friendly.42 """43 if url.startswith("https://raw.githubusercontent.com/"):44 return url45 match = GITHUB_FILE_PATTERN.match(url)46 if match is None:47 raise UnsupportedUrl("Not a GitHub file url")48 repo, path = match.groups()49 return f"https://raw.githubusercontent.com/{repo}/{path}"50def _get_community_post_import_url(url: str) -> str:51 """Convert a forum post url to an import url.52 Async friendly.53 """54 match = COMMUNITY_TOPIC_PATTERN.match(url)55 if match is None:56 raise UnsupportedUrl("Not a topic url")57 _topic, post = match.groups()58 json_url = url59 if post is not None:60 # Chop off post part, ie /261 json_url = json_url[: -len(post) - 1]62 json_url += ".json"63 return json_url64def _extract_blueprint_from_community_topic(65 url: str,66 topic: dict,67) -> ImportedBlueprint | None:68 """Extract a blueprint from a community post JSON.69 Async friendly.70 """71 block_content = None72 blueprint = None73 post = topic["post_stream"]["posts"][0]74 for match in COMMUNITY_CODE_BLOCK.finditer(post["cooked"]):75 block_syntax, block_content = match.groups()76 if block_syntax not in ("auto", "yaml"):77 continue78 block_content = html.unescape(block_content.strip())79 try:80 data = yaml.parse_yaml(block_content)81 except HomeAssistantError:82 if block_syntax == "yaml":83 raise84 continue85 if not is_blueprint_config(data):86 continue87 blueprint = Blueprint(data)88 break89 if blueprint is None:90 raise HomeAssistantError(91 "No valid blueprint found in the topic. Blueprint syntax blocks need to be marked as YAML or no syntax."92 )93 return ImportedBlueprint(94 f'{post["username"]}/{topic["slug"]}', block_content, blueprint95 )96async def fetch_blueprint_from_community_post(97 hass: HomeAssistant, url: str98) -> ImportedBlueprint | None:99 """Get blueprints from a community post url.100 Method can raise aiohttp client exceptions, vol.Invalid.101 Caller needs to implement own timeout.102 """103 import_url = _get_community_post_import_url(url)104 session = aiohttp_client.async_get_clientsession(hass)105 resp = await session.get(import_url, raise_for_status=True)106 json_resp = await resp.json()107 json_resp = COMMUNITY_TOPIC_SCHEMA(json_resp)108 return _extract_blueprint_from_community_topic(url, json_resp)109async def fetch_blueprint_from_github_url(110 hass: HomeAssistant, url: str111) -> ImportedBlueprint:112 """Get a blueprint from a github url."""113 import_url = _get_github_import_url(url)114 session = aiohttp_client.async_get_clientsession(hass)115 resp = await session.get(import_url, raise_for_status=True)116 raw_yaml = await resp.text()117 data = yaml.parse_yaml(raw_yaml)118 blueprint = Blueprint(data)119 parsed_import_url = yarl.URL(import_url)120 suggested_filename = f"{parsed_import_url.parts[1]}/{parsed_import_url.parts[-1]}"121 if suggested_filename.endswith(".yaml"):122 suggested_filename = suggested_filename[:-5]123 return ImportedBlueprint(suggested_filename, raw_yaml, blueprint)124async def fetch_blueprint_from_github_gist_url(125 hass: HomeAssistant, url: str126) -> ImportedBlueprint:127 """Get a blueprint from a Github Gist."""128 if not url.startswith("https://gist.github.com/"):129 raise UnsupportedUrl("Not a GitHub gist url")130 parsed_url = yarl.URL(url)131 session = aiohttp_client.async_get_clientsession(hass)132 resp = await session.get(133 f"https://api.github.com/gists/{parsed_url.parts[2]}",134 headers={"Accept": "application/vnd.github.v3+json"},135 raise_for_status=True,136 )137 gist = await resp.json()138 blueprint = None139 filename = None140 content = None141 for filename, info in gist["files"].items():142 if not filename.endswith(".yaml"):143 continue144 content = info["content"]145 data = yaml.parse_yaml(content)146 if not is_blueprint_config(data):147 continue148 blueprint = Blueprint(data)149 break150 if blueprint is None:151 raise HomeAssistantError(152 "No valid blueprint found in the gist. The blueprint file needs to end with '.yaml'"153 )154 return ImportedBlueprint(155 f"{gist['owner']['login']}/{filename[:-5]}", content, blueprint156 )157async def fetch_blueprint_from_url(hass: HomeAssistant, url: str) -> ImportedBlueprint:158 """Get a blueprint from a url."""159 for func in (160 fetch_blueprint_from_community_post,161 fetch_blueprint_from_github_url,162 fetch_blueprint_from_github_gist_url,163 ):164 try:165 imported_bp = await func(hass, url)166 imported_bp.blueprint.update_metadata(source_url=url)167 return imported_bp168 except UnsupportedUrl:169 pass...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!