Best Python code snippet using playwright-python
speech_utils.py
Source:speech_utils.py
1#!/usr/bin/env python2# -*- encoding: utf-8 -*-3'''4@File : speech_utils.py5@Description: ææ¥é³ç®±åº6@Date : 2022/02/11 14:34:457@Author : guoliang.wgl8@version : 1.09'''10import time11import math12import http13import json14import time15AUDIO_HEADER = 'fs:'16from audio import Player, Snd17on_callback = False18on_download = False19cb_data = None20class Speaker:21 tonenameSuffix = [".wav", ".mp3"]22 tonenameNumb = ["SYS_TONE_0", "SYS_TONE_1", "SYS_TONE_2", "SYS_TONE_3", "SYS_TONE_4", "SYS_TONE_5", "SYS_TONE_6", "SYS_TONE_7", "SYS_TONE_8", "SYS_TONE_9"]23 tonenameNumb1 = "SYS_TONE_yao"24 tonenameDot = "SYS_TONE_dian"25 tonenameUnit = ["SYS_TONE_MEASURE_WORD_ge", "SYS_TONE_MEASURE_WORD_shi", "SYS_TONE_MEASURE_WORD_bai", "SYS_TONE_MEASURE_WORD_qian"]26 tonenameHunit = ["SYS_TONE_MEASURE_WORD_wan", "SYS_TONE_MEASURE_WORD_yi", "SYS_TONE_MEASURE_WORD_sw", "SYS_TONE_MEASURE_WORD_bw", "SYS_TONE_MEASURE_WORD_qw"]27 def __init__(self,res_dir):28 self.toneDir = res_dir29 self._create_player()30 def _create_player(self):31 Snd.init()32 player = Player()33 player.open()34 player.setVolume(8)35 self._player = player36 def play(self,path):37 self._player.play(path)38 self._player.waitComplete()39 def playlist(self,pathlist):40 for path in pathlist:41 self.play(AUDIO_HEADER + path)42 def play_voice(self,data,dir_info):43 format = data['format']44 audioResFormat = 045 if (format == 'mp3'):46 audioResFormat = 147 speechs = data['speechs']48 toneList = []49 for speech in speechs:50 print(speech)51 # length = len(speech)52 if speech.endswith('}') and speech.startswith('{') and (speech[1] == '$'):53 speech_num = speech.strip('{').strip('$').strip('}')54 toneList = self.add_amount(speech_num,toneList,audioResFormat)55 else:56 toneList.append(self.toneDir + speech + self.tonenameSuffix[audioResFormat])57 print(toneList)58 self.playlist(toneList)59 def add_amount(self,num_str, toneList, formatFlag):60 num_f = float(num_str)61 numb = int(num_f)62 deci = num_f - numb63 target = numb64 subTarget = 065 subNumber = None66 slot = 067 factor = 068 count = 069 prevSlotZero = False70 hundredMillionExist = False71 tenThousandExist = False72 if (numb < 0 or numb >= 1000000000000):73 print('amount overrange')74 return toneList75 if (deci < 0.0001 and deci > 0.0):76 deci = 0.000177 i = 278 while(i >= 0):79 factor = math.pow(10000,i)80 if target < factor:81 i = i -182 continue83 subTarget = int(target / factor)84 target %= factor85 if (subTarget == 0):86 i = i -187 continue88 if (i == 2):89 hundredMillionExist = True90 elif (i == 1):91 tenThousandExist = True92 subNumber = subTarget93 prevSlotZero = False94 depth = 395 while(depth >= 0):96 if(subNumber == 0):97 break98 factor = math.pow(10, depth)99 if ((hundredMillionExist == True or tenThousandExist == True) and i == 0):100 pass101 elif (hundredMillionExist == True and tenThousandExist == True and depth > 0 and subTarget < factor):102 pass103 elif (subTarget < factor):104 depth = depth - 1105 continue106 slot = int(subNumber / factor)107 subNumber %= factor108 if (slot == 0 and depth == 0):109 depth = depth - 1110 continue111 if ((subTarget < 20 and depth == 1) or (slot == 0 and prevSlotZero) or (slot == 0 and depth == 0)):112 pass113 else:114 toneList.append(self.toneDir + self.tonenameNumb[slot] + self.tonenameSuffix[formatFlag])115 count += 1116 if (slot == 0 and prevSlotZero == False):117 prevSlotZero = True118 elif (prevSlotZero == True and slot != 0):119 prevSlotZero = False120 if (slot > 0 and depth > 0) :121 toneList.append(self.toneDir + self.tonenameUnit[depth] + self.tonenameSuffix[formatFlag])122 count += 1123 depth = depth - 1124 if (i > 0):125 toneList.append(self.toneDir + self.tonenameHunit[i - 1] + self.tonenameSuffix[formatFlag])126 count += 1127 i = i - 1128 if (count == 0 and numb == 0):129 toneList.append(self.toneDir + self.tonenameNumb[0] + self.tonenameSuffix[formatFlag])130 if (deci >= 0.0001) :131 toneList.append(self.toneDir + self.tonenameDot + self.tonenameSuffix[formatFlag])132 deci ="{:.4f}".format(deci)133 deci_tmp = str(deci).strip().rstrip('0')134 deci_str = ''135 got_dot = False136 for j in range(len(deci_tmp)):137 if(got_dot):138 deci_str = deci_str + deci_tmp[j]139 elif deci_tmp[j] == '.':140 got_dot = True141 deciArray = deci_str142 for item in deciArray:143 if (item >= '0' and item <= '9'):144 print(self.tonenameNumb[int(item)])145 toneList.append(self.toneDir + self.tonenameNumb[int(item)] + self.tonenameSuffix[formatFlag])146 return toneList147 def download_resource_file(self,on_request,resDir):148 global on_callback,on_download,cb_data149 data = {150 'url':on_request['url'],151 'method': 'GET',152 'headers': {153 },154 'timeout': 30000,155 'params' : ''156 }157 def cb(data):158 global on_callback,cb_data159 on_callback = True160 cb_data = data161 http.request(data,cb)162 while True:163 if on_callback:164 on_callback = False165 break166 else:167 time.sleep(1)168 response = json.loads(cb_data['body'])169 audio = response['audios'][0]170 format = audio['format']171 id = audio['id']172 size = audio['size']173 path = self.toneDir +id+'.'+format174 print('************ begin to download: ' + path)175 d_data = {176 'url': audio['url'],177 'filepath': path178 }179 def d_cb(data):180 global on_download181 on_download = True182 http.download(d_data,d_cb)183 while True:184 if on_download:185 on_download = False186 break187 else:188 time.sleep(1)...
file_transfer.py
Source:file_transfer.py
1import fnmatch2import logging3import subprocess4from typing import List5from .base import BaseFilePlugin, HandlerExit, HandlerFail6from ..common import *7class FileTransferPlugin(BaseFilePlugin):8 """Receive file from phone"""9 MARK = b'file'10 NAME = 'FileTransferPlugin'11 MAIN_CONF = dict()12 DEVICE_CONFS = dict()13 CONFIG_SCHEMA = DictEntry('file.conf.json', 'Common configuration for file transfer plugin', False, entries=(14 IntEntry('uin', 'UIN of device for which config will be applied', True, 1, 0xFFFFFFF, None),15 DirEntry('download_directory', 'Directory to save downloaded files', False, '/tmp/dcnnt', True, False),16 TemplateEntry('on_download', 'Template of command executed for every saved file',17 True, 0, 4096, None, replacements=(Rep('path', 'Path to saved file', True),)),18 ListEntry('shared_dirs', 'Directories shared to client', False, 0, 1073741824, (),19 entry=DictEntry('shared_dirs[]', 'Description of shared directory', False, entries=(20 DirEntry('path', 'Path to shared directory', False, '/tmp/dcnnt', True, False),21 StringEntry('name', 'Name using for directory instead of path', True, 0, 60, None),22 StringEntry('glob', 'UNIX glob to filter visible files in directory', False, 0, 1073741824, '*'),23 IntEntry('deep', 'Recursion deep for subdirectories', False, 1, 1024, 1)24 ))),25 ListEntry('shared_dirs_external', 'Lists of directories shared to client', True, 0, 1073741824, (),26 entry=DictEntry('shared_dirs_external[]', 'Description of shared directory list', False, entries=(27 FileEntry('path', 'Path to list of shared directories', False,28 '$DCNNT_CONFIG_DIR/shared.list.txt', True, False),29 StringEntry('glob', 'UNIX glob to filter visible files in directories',30 False, 0, 1073741824, '*'),31 IntEntry('deep', 'Recursion deep for subdirectories', False, 1, 1024, 1)32 ))),33 ))34 shared_files_index = list()35 @staticmethod36 def check_file_filter(path: str, glob_str: str) -> bool:37 """Check if file allowed for sharing by filter"""38 return fnmatch.fnmatch(path, glob_str)39 def shared_directory_list(self, directory: str, filter_data, max_deep, current_deep):40 """Create information node for one shared directory"""41 res = list()42 try:43 dir_list = os.listdir(directory)44 except (PermissionError, OSError) as e:45 self.log(f'Could not list content of directory "{directory}" ({e})')46 return res47 for name in dir_list:48 path = os.path.join(directory, name)49 if os.path.isdir(path):50 if current_deep < max_deep and max_deep > 0:51 dir_list = self.shared_directory_list(path, filter_data, max_deep, current_deep + 1)52 res.append(dict(name=name, node_type='directory', size=len(dir_list), children=dir_list))53 elif os.path.isfile(path):54 if self.check_file_filter(path, filter_data):55 self.shared_files_index.append(path)56 index = len(self.shared_files_index) - 157 res.append(dict(name=name, node_type='file', size=os.path.getsize(path), index=index))58 return res59 def process_shared_directory(self, path: str, name: Optional[str], glob: str, deep: int,60 res: List[Dict[str, Any]], names: Dict[str, int]):61 """Process one shared directory record"""62 if not os.path.isdir(path):63 self.log(f'Shared directory "{path}" not found', logging.WARN)64 return65 if name is None:66 name = os.path.basename(path)67 if name in names:68 names[name] += 169 name += f' ({names[name]})'70 else:71 names[name] = 072 dir_list = self.shared_directory_list(path, glob, deep, 1)73 res.append(dict(name=name, node_type='directory', size=len(dir_list), children=dir_list))74 def shared_files_info(self) -> list:75 """Create tree structure of shared directories"""76 self.shared_files_index.clear()77 res = list()78 names = dict()79 for shared_dir in self.conf('shared_dirs'):80 self.process_shared_directory(shared_dir['path'], shared_dir['name'], shared_dir['glob'],81 shared_dir.get('deep', 0), res, names)82 for shared_dirs_import in self.conf('shared_dirs_external'):83 import_path, glob = shared_dirs_import['path'], shared_dirs_import['glob']84 deep = shared_dirs_import.get('deep', 0)85 if not os.path.isfile(import_path):86 self.log(f'List of shared directories "{import_path}" not found', logging.INFO)87 continue88 with open(import_path) as f:89 for path in f.read().splitlines(keepends=False):90 path = Template(path).safe_substitute(self.app.environment)91 self.process_shared_directory(path, None, glob, deep, res, names)92 return res93 def handle_upload(self, request: RPCRequest):94 """Receive and save file from client"""95 path = self.receive_file(request, self.conf('download_directory'))96 on_download = self.conf('on_download')97 if isinstance(on_download, str):98 command = on_download.format(path=path)99 self.log('Execute: "{}"'.format(command))100 subprocess.call(command, shell=True)101 def handle_list_shared(self, request: RPCRequest):102 """Create shared files info and return as JSON"""103 try:104 result = self.shared_files_info()105 except Exception as e:106 self.logger.exception('[FileTransferPlugin] {}'.format(e))107 result = INTERNAL_ERROR108 self.rpc_send(RPCResponse(request.id, result))109 def handle_download(self, request):110 """Handle try of device to download file from server"""111 try:112 index, size = request.params['index'], request.params['size']113 except KeyError as e:114 self.log('KeyError {}'.format(e), logging.WARN)115 else:116 self.log('Download request is correct')117 if 0 <= index < len(self.shared_files_index):118 path = self.shared_files_index[index]119 self.send_file(request, path, size)120 else:121 self.rpc_send(RPCResponse(request.id, dict(code=1, message='No such index: {}'.format(index))))122 def process_request(self, request: RPCRequest):123 if request.method == 'list':124 self.handle_list_shared(request)125 elif request.method == 'download':126 self.handle_download(request)127 elif request.method == 'upload':...
GantMonitor.py
Source:GantMonitor.py
...129 self.set_tooltip('Gant Monitor')130 self.set_visible(True)131 self.connect('activate', self.on_download)132 self.connect('popup-menu', self.on_popup_menu)133 def on_download(self, data):134 downloadDialog=DownloadDialog()135 downloadDialog.run()136 137 def on_exit(self, data):138 gtk.main_quit()139 def on_popup_menu(self, status, button, time):140 self.menu.popup(None, None, None, button, time)141 # configure default cmd line params and serialize to xml?142 def on_preferences(self, data):143 print 'Gant preferences - todo'144 settings = SettingsDialog()145 settings.run()146 print settings.result147 def on_about(self, data):...
test_download.py
Source:test_download.py
...21 super(TestDownload, self).setUpPreSession()22 self.config.set_libtorrent(True)23 self.config.set_dispersy(False)24 self.config.set_libtorrent_max_conn_download(2)25 def on_download(self, download):26 self._logger.debug("Download started: %s", download)27 download.set_state_callback(self.downloader_state_callback)28 @deferred(timeout=60)29 def test_download_torrent_from_url(self):30 # Setup file server to serve torrent file31 files_path = os.path.join(self.session_base_dir, 'http_torrent_files')32 os.mkdir(files_path)33 shutil.copyfile(TORRENT_UBUNTU_FILE, os.path.join(files_path, 'ubuntu.torrent'))34 file_server_port = get_random_port()35 self.setUpFileServer(file_server_port, files_path)36 d = self.session.start_download_from_uri('http://localhost:%s/ubuntu.torrent' % file_server_port)37 d.addCallback(self.on_download)38 return self.test_deferred39 @skip("Fetching a torrent from the external network is unreliable")...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!