Best JavaScript code snippet using playwright-internal
opener.py
Source:opener.py
...139 def split_segments(self, fs_url): 140 match = self.re_fs_url.match(fs_url) 141 return match 142 143 def get_opener(self, name):144 """Retrieve an opener for the given protocol145 146 :param name: name of the opener to open147 :raises NoOpenerError: if no opener has been registered of that name148 149 """150 if name not in self.registry:151 raise NoOpenerError("No opener for %s" % name)152 index = self.registry[name]153 return self.openers[index] 154 155 def add(self, opener):156 """Adds an opener to the registry157 158 :param opener: a class derived from fs.opener.Opener159 160 """161 162 index = len(self.openers)163 self.openers[index] = opener164 for name in opener.names:165 self.registry[name] = index166 167 def parse(self, fs_url, default_fs_name=None, writeable=False, create_dir=False, cache_hint=True):168 """Parses a FS url and returns an fs object a path within that FS object169 (if indicated in the path). A tuple of (<FS instance>, <path>) is returned.170 171 :param fs_url: an FS url172 :param default_fs_name: the default FS to use if none is indicated (defaults is OSFS)173 :param writeable: if True, a writeable FS will be returned174 :param create_dir: if True, then the directory in the FS will be created175 176 """177 178 orig_url = fs_url 179 match = self.split_segments(fs_url)180 181 if match: 182 fs_name, credentials, url1, url2, path = match.groups()183 if credentials:184 fs_url = '%s@%s' % (credentials, url1)185 else:186 fs_url = url2187 path = path or ''188 fs_url = fs_url or ''189 if ':' in fs_name:190 fs_name, sub_protocol = fs_name.split(':', 1)191 fs_url = '%s://%s' % (sub_protocol, fs_url)192 if '!' in path:193 paths = path.split('!')194 path = paths.pop()195 fs_url = '%s!%s' % (fs_url, '!'.join(paths))196 197 fs_name = fs_name or self.default_opener198 else:199 fs_name = default_fs_name or self.default_opener200 fs_url = _expand_syspath(fs_url) 201 path = '' 202 203 fs_name, fs_name_params = _parse_name(fs_name) 204 opener = self.get_opener(fs_name)205 206 if fs_url is None:207 raise OpenerError("Unable to parse '%s'" % orig_url) 208 fs, fs_path = opener.get_fs(self, fs_name, fs_name_params, fs_url, writeable, create_dir) 209 fs.cache_hint(cache_hint)210 211 if fs_path and iswildcard(fs_path):212 pathname, resourcename = pathsplit(fs_path or '')213 if pathname:214 fs = fs.opendir(pathname)215 return fs, resourcename216 217 fs_path = join(fs_path, path)218 ...
cetsms.py
Source:cetsms.py
...57 atdata = data[1].split(' ')58 uid =atdata[0]59 psw = atdata[1]60 cj = cookielib.CookieJar()61 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))62 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]63 url = 'http://117.211.100.44:8080/index.php'64 url_data = urlencode({'userid':uid,'password':psw,'Submit':'Submit'})65 usock = opener.open(url, url_data) 66 url = 'http://117.211.100.44:8080/index.php'67 url_data = urlencode({'module':'com_views','task':'student_attendance_view'})68 atand = opener.open(url, url_data)69 page=atand.read()70 line=page.split('\n')71 new=set([])72 for i in range (len(line)) : 73 mat2=re.match(r'.*Logged in as.*',line[i],re.M)74 if mat2: 75 i = i+276 nam=re.match(r'.*<td>(.*?)</td>.*',line[i])77 if nam:78 str0= nam.group(1)79 self.response.out.write("Name : "+str0)80 mat1=re.match(r'.*Attendance till date :.*',line[i],re.M)81 if mat1: 82 self.response.out.write("</br>"+line[i]) 83 elif data[0] == 'send' and len(data)==2:84 self.response.out.write("Received" )85 smsdata = data[1].split('>',1) #contain all message exclude 'send'86 key = smsdata[0]87 if key == 'a' : #admit new user88 secondparse = smsdata[1].split('>',3)89 name = secondparse[0]90 mobno = secondparse[1]91 pro = secondparse[2]92 pswd =secondparse[3]93 94 deleteContact = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 AND mobno =:2 and pro =:3", registration_key(),mobno,pro).get()95 if deleteContact is not None:96 code = deleteContact.code97 if code == '0':98 sys.exit(1)99 db.delete(deleteContact)100 cd = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key()).get() 101 if cd is not None:102 tcode = cd.current103 code = str(int(tcode)+1)104 else:105 code = '1'106 deletecode = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key())107 if deletecode is not None:108 db.delete(deletecode) 109 codedata = Code(parent = code_key())110 codedata.current = code111 codedata.put()112 usrdata = Registration(parent=registration_key()) 113 usrdata.code = code114 usrdata.mobno = mobno115 usrdata.name = name116 usrdata.pro = pro117 usrdata.pwd = pswd118 usrdata.put()119 tono =mobno120 msg="Application activated, |"+pro+"|"+code+"|"121 urltosend = name+">"+mobno+">"+pswd+">"+pro+">"+tono+">"+msg #url to send122 if pro == 'u': #ultoo.com123 now = datetime.datetime.now()124 if mobno == '9037755659':125 msg=msg126 else:127 msg2= name+'('+mobno+'):'+msg128 msg=msg2129 cj = cookielib.CookieJar()130 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))131 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]132 url = 'http://ultoo.in/login.php'133 url_data = urlencode({'LoginMobile':'9037755659','LoginPassword':'47951','RememberMe':'1','submit2':'LOGIN HERE'})134 usock = opener.open(url, url_data)135 day=now.day136 day2=str(day)137 if len(day2) == 1 :138 day2='0'+str(day)139 mon=now.month140 mon2=str(mon)141 if len(mon2) == 1 :142 mon2='0'+str(mon) 143 send_sms_url = 'http://ultoo.in/home.php'144 send_sms_data = urlencode({'MessageLength':'140','MobileNos':tono,'Message': msg,'SendNow': 'Send Now','SendNowBtn':'Send Now','Day':day2,'Month':mon2,'Year':now.year,'TimeInterval':'09:00 - 09:59','CCode':''})145 146 opener.addheaders = [('Referer','http://ultoo.com/home.php')] 147 sms_sent_page = opener.open(send_sms_url,send_sms_data) 148 elif pro =='s': #sitetosms.com149 msg2= '('+name+'):'+msg150 msg=msg2 151 cj = cookielib.CookieJar()152 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))153 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]154 url = 'http://www.site2sms.com/auth.asp'155 url_data = urlencode({'txtCCode':'91','userid':mobno,'Password':pswd,'Submit':'Login'}) 156 usock = opener.open(url, url_data)157 send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'158 send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': tono,'txtUsed':len(msg)}) 159 opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')] 160 sms_sent_page = opener.open(send_sms_url,send_sms_data) 161 elif pro == 'w': #way2sms.com162 msg2= '('+name+'):'+msg163 msg=msg2 164 cj = cookielib.CookieJar()165 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))166 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]167 url='http://site4.way2sms.com/Login1.action'168 url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})169 usock = opener.open(url, url_data)170 ###############3171 send_sms_url = 'http://site4.way2sms.com/quicksms.action'172 send_sms_data = urlencode({'HiddenAction':'instantsms','catnamedis':'Birthday','Action': 'sa65sdf656fdfd','chkall': 'on','MobNo':tono,'textArea':msg})173 opener.addheaders = [('Referer','http://site4.way2sms.com/jsp/InstantSMS.jsp')] 174 sms_sent_page = opener.open(send_sms_url,send_sms_data)175 elif pro == 'b': #160by2.com176 msg2= '('+name+'):'+msg177 msg=msg2 178 cj = cookielib.CookieJar()179 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))180 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]181 url='http://160by2.com/re-login'182 opener.addheaders = [('Referer','Http://160by2.com/')] 183 url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})184 usock = opener.open(url, url_data)185 self.response.out.write("login :"+mobno+pswd)186 send_sms_url = 'http://160by2.com/SendSMSAction'187 send_sms_data = urlencode({'hid_exists':'no','action1':'sf55sa5655sdf5','mobile1':tono,'msg1':msg,'sel_month':0,'sel_day':0,'sel_year':0,'sel_hour':'hh','sel_minute':'mm','sel_cat':0,'messid_0':'','messid_1':'','messid_2':'','messid_3':''})188 opener.addheaders = [('Referer','http://160by2.com/SendSMS?id=9AEF17EFFA4653F27EEFEC3CA5F4BA30.05')] 189 sms_sent_page = opener.open(send_sms_url,send_sms_data) 190 #end else if 191 msg="New user registerd ."+"Name :"+name +", Mob : "+mobno+", In :"+pro192 cj = cookielib.CookieJar()193 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))194 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]195 url = 'http://www.site2sms.com/auth.asp'196 url_data = urlencode({'txtCCode':'91','userid':'9037755659','Password':'9054687','Submit':'Login'}) 197 usock = opener.open(url, url_data)198 send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'199 send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': '9037755659','txtUsed':len(msg)}) 200 opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')] 201 sms_sent_page = opener.open(send_sms_url,send_sms_data) 202 203 #usock = opener.open(url, url_data) #must add new url that i new user registerd to me204 205 elif key == 'm': #mailing to request206 secondparse2 = smsdata[1].split('>',2)207 code = secondparse2[0]208 tono = secondparse2[1]209 msg = secondparse2[2]210 my_text = msg.replace('_', ' ')211 msg=my_text212 #213 214 #215 rslt = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 AND code =:2",registration_key(),code).get() 216 if rslt is not None:217 mobno = rslt.mobno218 pswd = rslt.pwd 219 pro = rslt.pro220 name = rslt.name221 else:222 sys.exit(1)223 urltosend = name+">"+mobno+">"+pswd+">"+pro+">"+tono+">"+msg 224 if pro == 'u': #ultoo.com225 226 227 #Logging into the SMS Site228 cj = cookielib.CookieJar()229 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))230 231 # To fool the website as if a Web browser is visiting the site232 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]233 234 # 235 url = 'http://ultoo.in/login.php'236 url_data = urlencode({'LoginMobile':'9037755659','LoginPassword':'47951','RememberMe':'1','submit2':'LOGIN HERE'})237 #try:238 usock = opener.open(url, url_data)239 now = datetime.datetime.now()240 day=now.day241 day2=str(day)242 if len(day2) == 1 :243 day2='0'+str(day)244 245 mon=now.month246 mon2=str(mon)247 if len(mon2) == 1 :248 mon2='0'+str(mon) 249 send_sms_url = 'http://ultoo.in/home.php'250 send_sms_data = urlencode({'MessageLength':'140','MobileNos':tono,'Message': msg,'SendNow': 'Send Now','SendNowBtn':'Send Now','Day':day2,'Month':mon2,'Year':now.year,'TimeInterval':'09:00 - 09:59','CCode':''})251 opener.addheaders = [('Referer','http://ultoo.com/home.php')] 252 sms_sent_page = opener.open(send_sms_url,send_sms_data)253 254 255 elif pro =='s': #sitetosms.com256 #self.response.out.write("Site 2 sms > "+urltosend)257 msg2= '('+name+'):'+msg258 msg=msg2 259 cj = cookielib.CookieJar()260 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))261 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]262 url = 'http://www.site2sms.com/auth.asp'263 url_data = urlencode({'txtCCode':'91','userid':mobno,'Password':pswd,'Submit':'Login'}) 264 usock = opener.open(url, url_data)265 send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'266 send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': tono,'txtUsed':len(msg)}) 267 opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')] 268 sms_sent_page = opener.open(send_sms_url,send_sms_data) 269 elif pro == 'w': #way2sms.com270 msg2= '('+name+'):'+msg271 msg=msg2 272 cj = cookielib.CookieJar()273 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))274 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]275 url='http://site4.way2sms.com/Login1.action'276 url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})277 usock = opener.open(url, url_data)278 send_sms_url = 'http://site4.way2sms.com/quicksms.action'279 send_sms_data = urlencode({'HiddenAction':'instantsms','catnamedis':'Birthday','Action': 'sa65sdf656fdfd','chkall': 'on','MobNo':tono,'textArea':msg})280 opener.addheaders = [('Referer','http://site4.way2sms.com/jsp/InstantSMS.jsp')] 281 sms_sent_page = opener.open(send_sms_url,send_sms_data)282 elif pro == 'b': #160by2.com283 self.response.out.write("enter")284 msg2= '('+name+'):'+msg285 msg=msg2 286 cj = cookielib.CookieJar()287 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))288 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]289 url='http://160by2.com/re-login'290 opener.addheaders = [('Referer','Http://160by2.com/')] 291 url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})292 usock = opener.open(url, url_data)293 self.response.out.write("login :"+mobno+pswd)294 send_sms_url = 'http://160by2.com/SendSMSAction'295 send_sms_data = urlencode({'hid_exists':'no','action1':'sf55sa5655sdf5','mobile1':tono,'msg1':msg,'sel_month':0,'sel_day':0,'sel_year':0,'sel_hour':'hh','sel_minute':'mm','sel_cat':0,'messid_0':'','messid_1':'','messid_2':'','messid_3':''})296 opener.addheaders = [('Referer','http://160by2.com/SendSMS?id=9AEF17EFFA4653F27EEFEC3CA5F4BA30.05')] 297 sms_sent_page = opener.open(send_sms_url,send_sms_data) 298 299 300 #if else end here301 elif key == 'x':302 self.response.out.write("<br>" )303 commands =smsdata[1].split('>')304 c1 = commands[0]305 c2 = commands[1]306 if c1 == "all" :307 if c2 == "disp":308 self.response.out.write("<br>" )309 self.response.out.write("<Table>")310 self.response.out.write("<tr>")311 self.response.out.write("<td>No</td><td>Code</td><td>Name</td><td>Moblie Number</td><td>Password</td><td>Provider</td>")312 self.response.out.write("</tr>")313 members = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 ", registration_key())314 count = 1315 for member in members:316 self.response.out.write("<tr>")317 self.response.out.write("<td>"+str(count)+"</td><td>"+member.code +"</td><td>"+member.name+"</td><td>"+member.mobno+"</td><td>"+member.pwd+"</td><td>"+member.pro )318 self.response.out.write("</tr>" ) 319 count =count +1320 self.response.out.write("</Table>")321 elif c2 == "del":322 deleteallreg = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 ", registration_key())323 if deleteallreg is not None:324 db.delete(deleteallreg) 325 deleteall = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key())326 if deleteall is not None:327 db.delete(deleteall)328 self.response.out.write("Database cleared successfully" )329 elif c1 == "block":330 blkuser = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 and code=:2 ", registration_key(),c2).get()331 if blkuser is not None:332 name = blkuser.name333 mobno =blkuser.mobno334 pswd = blkuser.pwd335 pro =blkuser.pro336 db.delete(blkuser)337 usrdata = Registration(parent=registration_key()) 338 usrdata.code = '0'339 usrdata.mobno = mobno340 usrdata.name = name341 usrdata.pro = pro342 usrdata.pwd = pswd343 usrdata.put() 344 self.response.out.write("User Blocked :"+name+" "+mobno )345 elif c1=="unblock":346 ublkuser = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 and mobno=:2 ", registration_key(),c2).get()347 if ublkuser is not None:348 self.response.out.write("User Unblocked :"+ublkuser.name+" "+ublkuser.mobno )349 db.delete(ublkuser) 350 351 352 353 else:354 op = urllib2.build_opener()355 url='http://thesaurus.com/browse/' 356 #self.response.out.write("<html><body>")357 l=[]358 l.append(url)359 l.append(data[0])360 url=''.join(l)361 intake=op.open(url)362 page=intake.read()363 line=page.split('\n')364 new=set([])365 for i in range (len(line)) :366 mat1=re.match(r'.*Antonyms:.*',line[i],re.M)367 if mat1:368 i=i+2
...
test_url_policy_open.py
Source:test_url_policy_open.py
...61 self.follow_reference_calls = []62 def follow_reference(self, url):63 self.follow_reference_calls.append(url)64 return self._reference_values[url]65 def make_branch_opener(self, should_follow_references, references,66 unsafe_urls=None):67 policy = _BlacklistPolicy(should_follow_references, unsafe_urls)68 opener = self.StubbedBranchOpener(references, policy)69 return opener70 def test_check_initial_url(self):71 # check_and_follow_branch_reference rejects all URLs that are not72 # allowed.73 opener = self.make_branch_opener(None, [], set(['a']))74 self.assertRaises(75 BadUrl, opener.check_and_follow_branch_reference, 'a')76 def test_not_reference(self):77 # When branch references are forbidden, check_and_follow_branch_reference78 # does not raise on non-references.79 opener = self.make_branch_opener(False, ['a', None])80 self.assertEqual(81 'a', opener.check_and_follow_branch_reference('a'))82 self.assertEqual(['a'], opener.follow_reference_calls)83 def test_branch_reference_forbidden(self):84 # check_and_follow_branch_reference raises BranchReferenceForbidden if85 # branch references are forbidden and the source URL points to a86 # branch reference.87 opener = self.make_branch_opener(False, ['a', 'b'])88 self.assertRaises(89 BranchReferenceForbidden,90 opener.check_and_follow_branch_reference, 'a')91 self.assertEqual(['a'], opener.follow_reference_calls)92 def test_allowed_reference(self):93 # check_and_follow_branch_reference does not raise if following references94 # is allowed and the source URL points to a branch reference to a95 # permitted location.96 opener = self.make_branch_opener(True, ['a', 'b', None])97 self.assertEqual(98 'b', opener.check_and_follow_branch_reference('a'))99 self.assertEqual(['a', 'b'], opener.follow_reference_calls)100 def test_check_referenced_urls(self):101 # check_and_follow_branch_reference checks if the URL a reference points102 # to is safe.103 opener = self.make_branch_opener(104 True, ['a', 'b', None], unsafe_urls=set('b'))105 self.assertRaises(106 BadUrl, opener.check_and_follow_branch_reference, 'a')107 self.assertEqual(['a'], opener.follow_reference_calls)108 def test_self_referencing_branch(self):109 # check_and_follow_branch_reference raises BranchReferenceLoopError if110 # following references is allowed and the source url points to a111 # self-referencing branch reference.112 opener = self.make_branch_opener(True, ['a', 'a'])113 self.assertRaises(114 BranchLoopError, opener.check_and_follow_branch_reference, 'a')115 self.assertEqual(['a'], opener.follow_reference_calls)116 def test_branch_reference_loop(self):117 # check_and_follow_branch_reference raises BranchReferenceLoopError if118 # following references is allowed and the source url points to a loop119 # of branch references.120 references = ['a', 'b', 'a']121 opener = self.make_branch_opener(True, references)122 self.assertRaises(123 BranchLoopError, opener.check_and_follow_branch_reference, 'a')124 self.assertEqual(['a', 'b'], opener.follow_reference_calls)125class TrackingProber(BzrProber):126 """Subclass of BzrProber which tracks URLs it has been asked to open."""127 seen_urls = []128 @classmethod129 def probe_transport(klass, transport):130 klass.seen_urls.append(transport.base)131 return BzrProber.probe_transport(transport)132class TestBranchOpenerStacking(TestCaseWithTransport):133 def setUp(self):134 super(TestBranchOpenerStacking, self).setUp()135 BranchOpener.install_hook()136 def make_branch_opener(self, allowed_urls, probers=None):137 policy = WhitelistPolicy(True, allowed_urls, True)138 return BranchOpener(policy, probers)139 def test_probers(self):140 # Only the specified probers should be used141 b = self.make_branch('branch')142 opener = self.make_branch_opener([b.base], probers=[])143 self.assertRaises(NotBranchError, opener.open, b.base)144 opener = self.make_branch_opener([b.base], probers=[BzrProber])145 self.assertEqual(b.base, opener.open(b.base).base)146 def test_default_probers(self):147 # If no probers are specified to the constructor148 # of BranchOpener, then a safe set will be used,149 # rather than all probers registered in bzr.150 self.addCleanup(ControlDirFormat.unregister_prober, TrackingProber)151 ControlDirFormat.register_prober(TrackingProber)152 # Open a location without any branches, so that all probers are153 # tried.154 # First, check that the TrackingProber tracks correctly.155 TrackingProber.seen_urls = []156 opener = self.make_branch_opener(["."], probers=[TrackingProber])157 self.assertRaises(NotBranchError, opener.open, ".")158 self.assertEqual(1, len(TrackingProber.seen_urls))159 TrackingProber.seen_urls = []160 # And make sure it's registered in such a way that ControlDir.open would161 # use it.162 self.assertRaises(NotBranchError, ControlDir.open, ".")163 self.assertEqual(1, len(TrackingProber.seen_urls))164 def test_allowed_url(self):165 # the opener does not raise an exception for branches stacked on166 # branches with allowed URLs.167 stacked_on_branch = self.make_branch('base-branch', format='1.6')168 stacked_branch = self.make_branch('stacked-branch', format='1.6')169 stacked_branch.set_stacked_on_url(stacked_on_branch.base)170 opener = self.make_branch_opener(171 [stacked_branch.base, stacked_on_branch.base])172 # This doesn't raise an exception.173 opener.open(stacked_branch.base)174 def test_nstackable_repository(self):175 # treats branches with UnstackableRepositoryFormats as176 # being not stacked.177 branch = self.make_branch('unstacked', format='knit')178 opener = self.make_branch_opener([branch.base])179 # This doesn't raise an exception.180 opener.open(branch.base)181 def test_allowed_relative_url(self):182 # passes on absolute urls to check_one_url, even if the183 # value of stacked_on_location in the config is set to a relative URL.184 stacked_on_branch = self.make_branch('base-branch', format='1.6')185 stacked_branch = self.make_branch('stacked-branch', format='1.6')186 stacked_branch.set_stacked_on_url('../base-branch')187 opener = self.make_branch_opener(188 [stacked_branch.base, stacked_on_branch.base])189 # Note that stacked_on_branch.base is not '../base-branch', it's an190 # absolute URL.191 self.assertNotEqual('../base-branch', stacked_on_branch.base)192 # This doesn't raise an exception.193 opener.open(stacked_branch.base)194 def test_allowed_relative_nested(self):195 # Relative URLs are resolved relative to the stacked branch.196 self.get_transport().mkdir('subdir')197 a = self.make_branch('subdir/a', format='1.6')198 b = self.make_branch('b', format='1.6')199 b.set_stacked_on_url('../subdir/a')200 c = self.make_branch('subdir/c', format='1.6')201 c.set_stacked_on_url('../../b')202 opener = self.make_branch_opener([c.base, b.base, a.base])203 # This doesn't raise an exception.204 opener.open(c.base)205 def test_forbidden_url(self):206 # raises a BadUrl exception if a branch is stacked on a207 # branch with a forbidden URL.208 stacked_on_branch = self.make_branch('base-branch', format='1.6')209 stacked_branch = self.make_branch('stacked-branch', format='1.6')210 stacked_branch.set_stacked_on_url(stacked_on_branch.base)211 opener = self.make_branch_opener([stacked_branch.base])212 self.assertRaises(BadUrl, opener.open, stacked_branch.base)213 def test_forbidden_url_nested(self):214 # raises a BadUrl exception if a branch is stacked on a215 # branch that is in turn stacked on a branch with a forbidden URL.216 a = self.make_branch('a', format='1.6')217 b = self.make_branch('b', format='1.6')218 b.set_stacked_on_url(a.base)219 c = self.make_branch('c', format='1.6')220 c.set_stacked_on_url(b.base)221 opener = self.make_branch_opener([c.base, b.base])222 self.assertRaises(BadUrl, opener.open, c.base)223 def test_self_stacked_branch(self):224 # raises StackingLoopError if a branch is stacked on225 # itself. This avoids infinite recursion errors.226 a = self.make_branch('a', format='1.6')227 # Bazaar 1.17 and up make it harder to create branches like this.228 # It's still worth testing that we don't blow up in the face of them,229 # so we grovel around a bit to create one anyway.230 a.get_config().set_user_option('stacked_on_location', a.base)231 opener = self.make_branch_opener([a.base])232 self.assertRaises(BranchLoopError, opener.open, a.base)233 def test_loop_stacked_branch(self):234 # raises StackingLoopError if a branch is stacked in such235 # a way so that it is ultimately stacked on itself. e.g. a stacked on236 # b stacked on a.237 a = self.make_branch('a', format='1.6')238 b = self.make_branch('b', format='1.6')239 a.set_stacked_on_url(b.base)240 b.set_stacked_on_url(a.base)241 opener = self.make_branch_opener([a.base, b.base])242 self.assertRaises(BranchLoopError, opener.open, a.base)243 self.assertRaises(BranchLoopError, opener.open, b.base)244 def test_custom_opener(self):245 # A custom function for opening a control dir can be specified.246 a = self.make_branch('a', format='2a')247 b = self.make_branch('b', format='2a')248 b.set_stacked_on_url(a.base)249 TrackingProber.seen_urls = []250 opener = self.make_branch_opener(251 [a.base, b.base], probers=[TrackingProber])252 opener.open(b.base)253 self.assertEqual(254 set(TrackingProber.seen_urls), set([b.base, a.base]))255 def test_custom_opener_with_branch_reference(self):256 # A custom function for opening a control dir can be specified.257 a = self.make_branch('a', format='2a')258 b_dir = self.make_bzrdir('b')259 b = BranchReferenceFormat().initialize(b_dir, target_branch=a)260 TrackingProber.seen_urls = []261 opener = self.make_branch_opener(262 [a.base, b.base], probers=[TrackingProber])263 opener.open(b.base)264 self.assertEqual(265 set(TrackingProber.seen_urls), set([b.base, a.base]))266class TestOpenOnlyScheme(TestCaseWithTransport):267 """Tests for `open_only_scheme`."""268 def setUp(self):269 super(TestOpenOnlyScheme, self).setUp()270 BranchOpener.install_hook()271 def test_hook_does_not_interfere(self):272 # The transform_fallback_location hook does not interfere with regular273 # stacked branch access outside of open_only_scheme.274 self.make_branch('stacked')275 self.make_branch('stacked-on')...
test_Request.py
Source:test_Request.py
1# -*- coding: utf-8 -*-2# (c) 2018 Matt Martz <matt@sivel.net>3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)4from __future__ import absolute_import, division, print_function5__metaclass__ = type6import datetime7import os8from ansible.module_utils.urls import (Request, open_url, urllib_request, HAS_SSLCONTEXT, cookiejar, RequestWithMethod,9 UnixHTTPHandler, UnixHTTPSConnection, httplib)10from ansible.module_utils.urls import SSLValidationHandler, HTTPSClientAuthHandler, RedirectHandlerFactory11import pytest12from mock import call13if HAS_SSLCONTEXT:14 import ssl15@pytest.fixture16def urlopen_mock(mocker):17 return mocker.patch('ansible.module_utils.urls.urllib_request.urlopen')18@pytest.fixture19def install_opener_mock(mocker):20 return mocker.patch('ansible.module_utils.urls.urllib_request.install_opener')21def test_Request_fallback(urlopen_mock, install_opener_mock, mocker):22 cookies = cookiejar.CookieJar()23 request = Request(24 headers={'foo': 'bar'},25 use_proxy=False,26 force=True,27 timeout=100,28 validate_certs=False,29 url_username='user',30 url_password='passwd',31 http_agent='ansible-tests',32 force_basic_auth=True,33 follow_redirects='all',34 client_cert='/tmp/client.pem',35 client_key='/tmp/client.key',36 cookies=cookies,37 unix_socket='/foo/bar/baz.sock',38 ca_path='/foo/bar/baz.pem',39 )40 fallback_mock = mocker.spy(request, '_fallback')41 r = request.open('GET', 'https://ansible.com')42 calls = [43 call(None, False), # use_proxy44 call(None, True), # force45 call(None, 100), # timeout46 call(None, False), # validate_certs47 call(None, 'user'), # url_username48 call(None, 'passwd'), # url_password49 call(None, 'ansible-tests'), # http_agent50 call(None, True), # force_basic_auth51 call(None, 'all'), # follow_redirects52 call(None, '/tmp/client.pem'), # client_cert53 call(None, '/tmp/client.key'), # client_key54 call(None, cookies), # cookies55 call(None, '/foo/bar/baz.sock'), # unix_socket56 call(None, '/foo/bar/baz.pem'), # ca_path57 ]58 fallback_mock.assert_has_calls(calls)59 assert fallback_mock.call_count == 14 # All but headers use fallback60 args = urlopen_mock.call_args[0]61 assert args[1] is None # data, this is handled in the Request not urlopen62 assert args[2] == 100 # timeout63 req = args[0]64 assert req.headers == {65 'Authorization': b'Basic dXNlcjpwYXNzd2Q=',66 'Cache-control': 'no-cache',67 'Foo': 'bar',68 'User-agent': 'ansible-tests'69 }70 assert req.data is None71 assert req.get_method() == 'GET'72def test_Request_open(urlopen_mock, install_opener_mock):73 r = Request().open('GET', 'https://ansible.com/')74 args = urlopen_mock.call_args[0]75 assert args[1] is None # data, this is handled in the Request not urlopen76 assert args[2] == 10 # timeout77 req = args[0]78 assert req.headers == {}79 assert req.data is None80 assert req.get_method() == 'GET'81 opener = install_opener_mock.call_args[0][0]82 handlers = opener.handlers83 if not HAS_SSLCONTEXT:84 expected_handlers = (85 SSLValidationHandler,86 RedirectHandlerFactory(), # factory, get handler87 )88 else:89 expected_handlers = (90 RedirectHandlerFactory(), # factory, get handler91 )92 found_handlers = []93 for handler in handlers:94 if isinstance(handler, SSLValidationHandler) or handler.__class__.__name__ == 'RedirectHandler':95 found_handlers.append(handler)96 assert len(found_handlers) == len(expected_handlers)97def test_Request_open_http(urlopen_mock, install_opener_mock):98 r = Request().open('GET', 'http://ansible.com/')99 args = urlopen_mock.call_args[0]100 opener = install_opener_mock.call_args[0][0]101 handlers = opener.handlers102 found_handlers = []103 for handler in handlers:104 if isinstance(handler, SSLValidationHandler):105 found_handlers.append(handler)106 assert len(found_handlers) == 0107def test_Request_open_unix_socket(urlopen_mock, install_opener_mock):108 r = Request().open('GET', 'http://ansible.com/', unix_socket='/foo/bar/baz.sock')109 args = urlopen_mock.call_args[0]110 opener = install_opener_mock.call_args[0][0]111 handlers = opener.handlers112 found_handlers = []113 for handler in handlers:114 if isinstance(handler, UnixHTTPHandler):115 found_handlers.append(handler)116 assert len(found_handlers) == 1117def test_Request_open_https_unix_socket(urlopen_mock, install_opener_mock):118 r = Request().open('GET', 'https://ansible.com/', unix_socket='/foo/bar/baz.sock')119 args = urlopen_mock.call_args[0]120 opener = install_opener_mock.call_args[0][0]121 handlers = opener.handlers122 found_handlers = []123 for handler in handlers:124 if isinstance(handler, HTTPSClientAuthHandler):125 found_handlers.append(handler)126 assert len(found_handlers) == 1127 inst = found_handlers[0]._build_https_connection('foo')128 assert isinstance(inst, UnixHTTPSConnection)129def test_Request_open_ftp(urlopen_mock, install_opener_mock, mocker):130 mocker.patch('ansible.module_utils.urls.ParseResultDottedDict.as_list', side_effect=AssertionError)131 # Using ftp scheme should prevent the AssertionError side effect to fire132 r = Request().open('GET', 'ftp://foo@ansible.com/')133def test_Request_open_headers(urlopen_mock, install_opener_mock):134 r = Request().open('GET', 'http://ansible.com/', headers={'Foo': 'bar'})135 args = urlopen_mock.call_args[0]136 req = args[0]137 assert req.headers == {'Foo': 'bar'}138def test_Request_open_username(urlopen_mock, install_opener_mock):139 r = Request().open('GET', 'http://ansible.com/', url_username='user')140 opener = install_opener_mock.call_args[0][0]141 handlers = opener.handlers142 expected_handlers = (143 urllib_request.HTTPBasicAuthHandler,144 urllib_request.HTTPDigestAuthHandler,145 )146 found_handlers = []147 for handler in handlers:148 if isinstance(handler, expected_handlers):149 found_handlers.append(handler)150 assert len(found_handlers) == 2151 assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user', None)}152def test_Request_open_username_in_url(urlopen_mock, install_opener_mock):153 r = Request().open('GET', 'http://user2@ansible.com/')154 opener = install_opener_mock.call_args[0][0]155 handlers = opener.handlers156 expected_handlers = (157 urllib_request.HTTPBasicAuthHandler,158 urllib_request.HTTPDigestAuthHandler,159 )160 found_handlers = []161 for handler in handlers:162 if isinstance(handler, expected_handlers):163 found_handlers.append(handler)164 assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user2', '')}165def test_Request_open_username_force_basic(urlopen_mock, install_opener_mock):166 r = Request().open('GET', 'http://ansible.com/', url_username='user', url_password='passwd', force_basic_auth=True)167 opener = install_opener_mock.call_args[0][0]168 handlers = opener.handlers169 expected_handlers = (170 urllib_request.HTTPBasicAuthHandler,171 urllib_request.HTTPDigestAuthHandler,172 )173 found_handlers = []174 for handler in handlers:175 if isinstance(handler, expected_handlers):176 found_handlers.append(handler)177 assert len(found_handlers) == 0178 args = urlopen_mock.call_args[0]179 req = args[0]180 assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='181def test_Request_open_auth_in_netloc(urlopen_mock, install_opener_mock):182 r = Request().open('GET', 'http://user:passwd@ansible.com/')183 args = urlopen_mock.call_args[0]184 req = args[0]185 assert req.get_full_url() == 'http://ansible.com/'186 opener = install_opener_mock.call_args[0][0]187 handlers = opener.handlers188 expected_handlers = (189 urllib_request.HTTPBasicAuthHandler,190 urllib_request.HTTPDigestAuthHandler,191 )192 found_handlers = []193 for handler in handlers:194 if isinstance(handler, expected_handlers):195 found_handlers.append(handler)196 assert len(found_handlers) == 2197def test_Request_open_netrc(urlopen_mock, install_opener_mock, monkeypatch):198 here = os.path.dirname(__file__)199 monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc'))200 r = Request().open('GET', 'http://ansible.com/')201 args = urlopen_mock.call_args[0]202 req = args[0]203 assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='204 r = Request().open('GET', 'http://foo.ansible.com/')205 args = urlopen_mock.call_args[0]206 req = args[0]207 assert 'Authorization' not in req.headers208 monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc.nonexistant'))209 r = Request().open('GET', 'http://ansible.com/')210 args = urlopen_mock.call_args[0]211 req = args[0]212 assert 'Authorization' not in req.headers213def test_Request_open_no_proxy(urlopen_mock, install_opener_mock, mocker):214 build_opener_mock = mocker.patch('ansible.module_utils.urls.urllib_request.build_opener')215 r = Request().open('GET', 'http://ansible.com/', use_proxy=False)216 handlers = build_opener_mock.call_args[0]217 found_handlers = []218 for handler in handlers:219 if isinstance(handler, urllib_request.ProxyHandler):220 found_handlers.append(handler)221 assert len(found_handlers) == 1222@pytest.mark.skipif(not HAS_SSLCONTEXT, reason="requires SSLContext")223def test_Request_open_no_validate_certs(urlopen_mock, install_opener_mock):224 r = Request().open('GET', 'https://ansible.com/', validate_certs=False)225 opener = install_opener_mock.call_args[0][0]226 handlers = opener.handlers227 ssl_handler = None228 for handler in handlers:229 if isinstance(handler, HTTPSClientAuthHandler):230 ssl_handler = handler231 break232 assert ssl_handler is not None233 inst = ssl_handler._build_https_connection('foo')234 assert isinstance(inst, httplib.HTTPSConnection)235 context = ssl_handler._context236 assert context.protocol == ssl.PROTOCOL_SSLv23237 if ssl.OP_NO_SSLv2:238 assert context.options & ssl.OP_NO_SSLv2239 assert context.options & ssl.OP_NO_SSLv3240 assert context.verify_mode == ssl.CERT_NONE241 assert context.check_hostname is False242def test_Request_open_client_cert(urlopen_mock, install_opener_mock):243 here = os.path.dirname(__file__)244 client_cert = os.path.join(here, 'fixtures/client.pem')245 client_key = os.path.join(here, 'fixtures/client.key')246 r = Request().open('GET', 'https://ansible.com/', client_cert=client_cert, client_key=client_key)247 opener = install_opener_mock.call_args[0][0]248 handlers = opener.handlers249 ssl_handler = None250 for handler in handlers:251 if isinstance(handler, HTTPSClientAuthHandler):252 ssl_handler = handler253 break254 assert ssl_handler is not None255 assert ssl_handler.client_cert == client_cert256 assert ssl_handler.client_key == client_key257 https_connection = ssl_handler._build_https_connection('ansible.com')258 assert https_connection.key_file == client_key259 assert https_connection.cert_file == client_cert260def test_Request_open_cookies(urlopen_mock, install_opener_mock):261 r = Request().open('GET', 'https://ansible.com/', cookies=cookiejar.CookieJar())262 opener = install_opener_mock.call_args[0][0]263 handlers = opener.handlers264 cookies_handler = None265 for handler in handlers:266 if isinstance(handler, urllib_request.HTTPCookieProcessor):267 cookies_handler = handler268 break269 assert cookies_handler is not None270def test_Request_open_invalid_method(urlopen_mock, install_opener_mock):271 r = Request().open('UNKNOWN', 'https://ansible.com/')272 args = urlopen_mock.call_args[0]273 req = args[0]274 assert req.data is None275 assert req.get_method() == 'UNKNOWN'276 # assert r.status == 504277def test_Request_open_custom_method(urlopen_mock, install_opener_mock):278 r = Request().open('DELETE', 'https://ansible.com/')279 args = urlopen_mock.call_args[0]280 req = args[0]281 assert isinstance(req, RequestWithMethod)282def test_Request_open_user_agent(urlopen_mock, install_opener_mock):283 r = Request().open('GET', 'https://ansible.com/', http_agent='ansible-tests')284 args = urlopen_mock.call_args[0]285 req = args[0]286 assert req.headers.get('User-agent') == 'ansible-tests'287def test_Request_open_force(urlopen_mock, install_opener_mock):288 r = Request().open('GET', 'https://ansible.com/', force=True, last_mod_time=datetime.datetime.now())289 args = urlopen_mock.call_args[0]290 req = args[0]291 assert req.headers.get('Cache-control') == 'no-cache'292 assert 'If-modified-since' not in req.headers293def test_Request_open_last_mod(urlopen_mock, install_opener_mock):294 now = datetime.datetime.now()295 r = Request().open('GET', 'https://ansible.com/', last_mod_time=now)296 args = urlopen_mock.call_args[0]297 req = args[0]298 assert req.headers.get('If-modified-since') == now.strftime('%a, %d %b %Y %H:%M:%S -0000')299def test_Request_open_headers_not_dict(urlopen_mock, install_opener_mock):300 with pytest.raises(ValueError):301 Request().open('GET', 'https://ansible.com/', headers=['bob'])302def test_Request_init_headers_not_dict(urlopen_mock, install_opener_mock):303 with pytest.raises(ValueError):304 Request(headers=['bob'])305@pytest.mark.parametrize('method,kwargs', [306 ('get', {}),307 ('options', {}),308 ('head', {}),309 ('post', {'data': None}),310 ('put', {'data': None}),311 ('patch', {'data': None}),312 ('delete', {}),313])314def test_methods(method, kwargs, mocker):315 expected = method.upper()316 open_mock = mocker.patch('ansible.module_utils.urls.Request.open')317 request = Request()318 getattr(request, method)('https://ansible.com')319 open_mock.assert_called_once_with(expected, 'https://ansible.com', **kwargs)320def test_open_url(urlopen_mock, install_opener_mock, mocker):321 req_mock = mocker.patch('ansible.module_utils.urls.Request.open')322 open_url('https://ansible.com/')323 req_mock.assert_called_once_with('GET', 'https://ansible.com/', data=None, headers=None, use_proxy=True,324 force=False, last_mod_time=None, timeout=10, validate_certs=True,325 url_username=None, url_password=None, http_agent=None,326 force_basic_auth=False, follow_redirects='urllib2',327 client_cert=None, client_key=None, cookies=None, use_gssapi=False,...
_opener.py
Source:_opener.py
...312 handlers = []313 replacement_handlers = []314 def __init__(self, klass=OpenerDirector):315 self.klass = klass316 def build_opener(self, *handlers):317 """Create an opener object from a list of handlers and processors.318 The opener will use several default handlers and processors, including319 support for HTTP and FTP.320 If any of the handlers passed as arguments are subclasses of the321 default handlers, the default handlers will not be used.322 """323 opener = self.klass()324 default_classes = list(self.default_classes)325 skip = []326 for klass in default_classes:327 for check in handlers:328 if type(check) == types.ClassType:329 if issubclass(check, klass):330 skip.append(klass)331 elif type(check) == types.InstanceType:332 if isinstance(check, klass):333 skip.append(klass)334 for klass in skip:335 default_classes.remove(klass)336 for klass in default_classes:337 opener.add_handler(klass())338 for h in handlers:339 if type(h) == types.ClassType:340 h = h()341 opener.add_handler(h)342 return opener343build_opener = OpenerFactory().build_opener344_opener = None345urlopen_lock = _threading.Lock()346def urlopen(url, data=None, timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT):347 global _opener348 if _opener is None:349 urlopen_lock.acquire()350 try:351 if _opener is None:352 _opener = build_opener()353 finally:354 urlopen_lock.release()355 return _opener.open(url, data, timeout)356def urlretrieve(url, filename=None, reporthook=None, data=None,357 timeout=_sockettimeout._GLOBAL_DEFAULT_TIMEOUT):358 global _opener359 if _opener is None:360 urlopen_lock.acquire()361 try:362 if _opener is None:363 _opener = build_opener()364 finally:365 urlopen_lock.release()366 return _opener.retrieve(url, filename, reporthook, data, timeout)367def install_opener(opener):368 global _opener...
test_locked_file.py
Source:test_locked_file.py
1# Copyright 2016 Google Inc. All rights reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14import errno15import os16import sys17import tempfile18import mock19import unittest220from oauth2client.contrib import locked_file21class TestOpener(unittest2.TestCase):22 def _make_one(self):23 _filehandle, filename = tempfile.mkstemp()24 os.close(_filehandle)25 return locked_file._Opener(filename, 'r+', 'r'), filename26 def test_ctor(self):27 instance, filename = self._make_one()28 self.assertFalse(instance._locked)29 self.assertEqual(instance._filename, filename)30 self.assertEqual(instance._mode, 'r+')31 self.assertEqual(instance._fallback_mode, 'r')32 self.assertIsNone(instance._fh)33 self.assertIsNone(instance._lock_fd)34 def test_is_locked(self):35 instance, _ = self._make_one()36 self.assertFalse(instance.is_locked())37 instance._locked = True38 self.assertTrue(instance.is_locked())39 def test_file_handle(self):40 instance, _ = self._make_one()41 self.assertIsNone(instance.file_handle())42 fh = mock.Mock()43 instance._fh = fh44 self.assertEqual(instance.file_handle(), fh)45 def test_filename(self):46 instance, filename = self._make_one()47 self.assertEqual(instance.filename(), filename)48 def test_open_and_lock(self):49 instance, _ = self._make_one()50 instance.open_and_lock(1, 1)51 def test_unlock_and_close(self):52 instance, _ = self._make_one()53 instance.unlock_and_close()54class TestPosixOpener(TestOpener):55 def _make_one(self):56 _filehandle, filename = tempfile.mkstemp()57 os.close(_filehandle)58 return locked_file._PosixOpener(filename, 'r+', 'r'), filename59 def test_relock_fail(self):60 instance, _ = self._make_one()61 instance.open_and_lock(1, 1)62 self.assertTrue(instance.is_locked())63 self.assertIsNotNone(instance.file_handle())64 self.assertRaises(65 locked_file.AlreadyLockedException, instance.open_and_lock, 1, 1)66 @mock.patch('oauth2client.contrib.locked_file.open', create=True)67 def test_lock_access_error_fallback_mode(self, mock_open):68 # NOTE: This is a bad case. The behavior here should be that the69 # error gets re-raised, but the module lets the if statement fall70 # through.71 instance, _ = self._make_one()72 mock_open.side_effect = [IOError(errno.ENOENT, '')]73 instance.open_and_lock(1, 1)74 self.assertIsNone(instance.file_handle())75 self.assertTrue(instance.is_locked())76 @mock.patch('oauth2client.contrib.locked_file.open', create=True)77 def test_lock_non_access_error(self, mock_open):78 instance, _ = self._make_one()79 fh_mock = mock.Mock()80 mock_open.side_effect = [IOError(errno.EACCES, ''), fh_mock]81 instance.open_and_lock(1, 1)82 self.assertEqual(instance.file_handle(), fh_mock)83 self.assertFalse(instance.is_locked())84 @mock.patch('oauth2client.contrib.locked_file.open', create=True)85 def test_lock_unexpected_error(self, mock_open):86 instance, _ = self._make_one()87 with mock.patch('os.open') as mock_os_open:88 mock_os_open.side_effect = [OSError(errno.EPERM, '')]89 self.assertRaises(OSError, instance.open_and_lock, 1, 1)90 @mock.patch('oauth2client.contrib.locked_file.open', create=True)91 @mock.patch('oauth2client.contrib.locked_file.logger')92 @mock.patch('time.time')93 def test_lock_timeout_error(self, mock_time, mock_logger, mock_open):94 instance, _ = self._make_one()95 # Make it seem like 10 seconds have passed between calls.96 mock_time.side_effect = [0, 10]97 with mock.patch('os.open') as mock_os_open:98 # Raising EEXIST should cause it to try to retry locking.99 mock_os_open.side_effect = [OSError(errno.EEXIST, '')]100 instance.open_and_lock(1, 1)101 self.assertFalse(instance.is_locked())102 self.assertTrue(mock_logger.warn.called)103 @mock.patch('oauth2client.contrib.locked_file.open', create=True)104 @mock.patch('oauth2client.contrib.locked_file.logger')105 @mock.patch('time.time')106 def test_lock_timeout_error_no_fh(self, mock_time, mock_logger, mock_open):107 instance, _ = self._make_one()108 # Make it seem like 10 seconds have passed between calls.109 mock_time.side_effect = [0, 10]110 # This will cause the retry loop to enter without a file handle.111 fh_mock = mock.Mock()112 mock_open.side_effect = [IOError(errno.ENOENT, ''), fh_mock]113 with mock.patch('os.open') as mock_os_open:114 # Raising EEXIST should cause it to try to retry locking.115 mock_os_open.side_effect = [OSError(errno.EEXIST, '')]116 instance.open_and_lock(1, 1)117 self.assertFalse(instance.is_locked())118 self.assertTrue(mock_logger.warn.called)119 self.assertEqual(instance.file_handle(), fh_mock)120 @mock.patch('oauth2client.contrib.locked_file.open', create=True)121 @mock.patch('time.time')122 @mock.patch('time.sleep')123 def test_lock_retry_success(self, mock_sleep, mock_time, mock_open):124 instance, _ = self._make_one()125 # Make it seem like 1 second has passed between calls. Extra values126 # are needed by the logging module.127 mock_time.side_effect = [0, 1]128 with mock.patch('os.open') as mock_os_open:129 # Raising EEXIST should cause it to try to retry locking.130 mock_os_open.side_effect = [131 OSError(errno.EEXIST, ''), mock.Mock()]132 instance.open_and_lock(10, 1)133 print(mock_os_open.call_args_list)134 self.assertTrue(instance.is_locked())135 mock_sleep.assert_called_with(1)136 @mock.patch('oauth2client.contrib.locked_file.os')137 def test_unlock(self, os_mock):138 instance, _ = self._make_one()139 instance._locked = True140 lock_fd_mock = instance._lock_fd = mock.Mock()141 instance._fh = mock.Mock()142 instance.unlock_and_close()143 self.assertFalse(instance.is_locked())144 os_mock.close.assert_called_once_with(lock_fd_mock)145 self.assertTrue(os_mock.unlink.called)146 self.assertTrue(instance._fh.close.called)147class TestLockedFile(unittest2.TestCase):148 @mock.patch('oauth2client.contrib.locked_file._PosixOpener')149 def _make_one(self, opener_ctor_mock):150 opener_mock = mock.Mock()151 opener_ctor_mock.return_value = opener_mock152 return locked_file.LockedFile(153 'a_file', 'r+', 'r', use_native_locking=False), opener_mock154 @mock.patch('oauth2client.contrib.locked_file._PosixOpener')155 def test_ctor_minimal(self, opener_mock):156 locked_file.LockedFile(157 'a_file', 'r+', 'r', use_native_locking=False)158 opener_mock.assert_called_with('a_file', 'r+', 'r')159 @mock.patch.dict('sys.modules', {160 'oauth2client.contrib._win32_opener': mock.Mock()})161 def test_ctor_native_win32(self):162 _win32_opener_mock = sys.modules['oauth2client.contrib._win32_opener']163 locked_file.LockedFile(164 'a_file', 'r+', 'r', use_native_locking=True)165 _win32_opener_mock._Win32Opener.assert_called_with('a_file', 'r+', 'r')166 @mock.patch.dict('sys.modules', {167 'oauth2client.contrib._win32_opener': None,168 'oauth2client.contrib._fcntl_opener': mock.Mock()})169 def test_ctor_native_fcntl(self):170 _fnctl_opener_mock = sys.modules['oauth2client.contrib._fcntl_opener']171 locked_file.LockedFile(172 'a_file', 'r+', 'r', use_native_locking=True)173 _fnctl_opener_mock._FcntlOpener.assert_called_with('a_file', 'r+', 'r')174 @mock.patch('oauth2client.contrib.locked_file._PosixOpener')175 @mock.patch.dict('sys.modules', {176 'oauth2client.contrib._win32_opener': None,177 'oauth2client.contrib._fcntl_opener': None})178 def test_ctor_native_posix_fallback(self, opener_mock):179 locked_file.LockedFile(180 'a_file', 'r+', 'r', use_native_locking=True)181 opener_mock.assert_called_with('a_file', 'r+', 'r')182 def test_filename(self):183 instance, opener = self._make_one()184 opener._filename = 'some file'185 self.assertEqual(instance.filename(), 'some file')186 def test_file_handle(self):187 instance, opener = self._make_one()188 self.assertEqual(instance.file_handle(), opener.file_handle())189 self.assertTrue(opener.file_handle.called)190 def test_is_locked(self):191 instance, opener = self._make_one()192 self.assertEqual(instance.is_locked(), opener.is_locked())193 self.assertTrue(opener.is_locked.called)194 def test_open_and_lock(self):195 instance, opener = self._make_one()196 instance.open_and_lock()197 opener.open_and_lock.assert_called_with(0, 0.05)198 def test_unlock_and_close(self):199 instance, opener = self._make_one()200 instance.unlock_and_close()201 opener.unlock_and_close.assert_called_with()202if __name__ == '__main__': # pragma: NO COVER...
template.py
Source:template.py
1import statistics2import math3validation = False4def get_first_corrupted_chunk(line):5 openers = []6 for index, char in enumerate(line):7 if char == '}':8 last_opener = openers.pop()9 if last_opener != '{':10 return 119711 elif char == ']':12 last_opener = openers.pop()13 if last_opener != '[':14 return 5715 elif char == ')':16 last_opener = openers.pop()17 if last_opener != '(':18 return 319 elif char == '>':20 last_opener = openers.pop()21 if last_opener != '<':22 return 2513723 else:24 openers.append(char)25 return 026def part_1():27 part_1_input_file = "input.txt"28 if validation:29 part_1_input_file = "validation_part_1.txt"30 with open(part_1_input_file, "r") as file:31 # Solution here32 stack_dict = {'{': [], '}': [], '[': [], ']': [], '(': [], ')': [], '<': [], '>': []}33 total_score = 034 for line in file:35 total_score += get_first_corrupted_chunk(line.strip())36 return total_score37def get_all_unfinished_openers(line):38 openers = []39 for index, char in enumerate(line):40 if char == '}':41 last_opener = openers.pop()42 if last_opener != '{':43 return []44 elif char == ']':45 last_opener = openers.pop()46 if last_opener != '[':47 return []48 elif char == ')':49 last_opener = openers.pop()50 if last_opener != '(':51 return []52 elif char == '>':53 last_opener = openers.pop()54 if last_opener != '<':55 return []56 else:57 openers.append(char)58 return openers59def part_2():60 part_2_input_file = "input.txt"61 if validation:62 part_2_input_file = "validation_part_2.txt"63 with open(part_2_input_file, "r") as file:64 # Solution here65 scores = []66 for index, line in enumerate(file):67 score = 068 openers = get_all_unfinished_openers(line.strip())69 print(index, openers)70 if len(openers) > 0:71 openers.reverse()72 for opener in openers:73 match opener:74 case '(':75 score *= 576 score += 177 case '[':78 score *= 579 score += 280 case '{':81 score *= 582 score += 383 case '<':84 score *= 585 score += 486 #print(openers, score)87 scores.append(score)88 scores.sort()89 print(scores[int(math.floor(len(scores) / 2))])90 return statistics.median(scores)91answer_part_1 = part_1()92answer_part_2 = part_2()93print(f'{answer_part_1=} {answer_part_2=}')...
test_urllib2.py
Source:test_urllib2.py
...15from ndg.httpsclient.urllib2_build_opener import build_opener16class Urllib2TestCase(unittest.TestCase):17 """Unit tests for urllib2 functionality"""18 19 def test01_urllib2_build_opener(self): 20 opener = build_opener()21 self.assert_(opener)22 def test02_open(self):23 opener = build_opener()24 res = opener.open(Constants.TEST_URI)25 self.assert_(res)26 print("res = %s" % res.read())27 def test03_open_fails_unknown_loc(self):28 opener = build_opener()29 self.failUnlessRaises(URLError, opener.open, Constants.TEST_URI2)30 31 def test04_open_peer_cert_verification_fails(self):32 # Explicitly set empty CA directory to make verification fail33 ctx = SSL.Context(SSL.SSLv3_METHOD)34 verify_callback = lambda conn, x509, errnum, errdepth, preverify_ok: \35 preverify_ok 36 37 ctx.set_verify(SSL.VERIFY_PEER, verify_callback)38 ctx.load_verify_locations(None, './')39 opener = build_opener(ssl_context=ctx)40 self.failUnlessRaises(SSL.Error, opener.open, Constants.TEST_URI)41 42 43if __name__ == "__main__":...
Using AI Code Generation
1const { chromium } = require('playwright');2(async () => {3 const browser = await chromium.launch();4 const context = await browser.newContext();5 const page = await context.newPage();6 await page.screenshot({ path: 'google.png' });7 await browser.close();8})();9const { chromium } = require('playwright');10(async () => {11 const browserServer = await chromium.launchServer();12 const wsEndpoint = browserServer.wsEndpoint();13 const browser = await chromium.connect({ wsEndpoint });14 const context = await browser.newContext();15 const page = await context.newPage();16 await page.screenshot({ path: 'google.png' });17 await browser.close();18 await browserServer.close();19})();20const { chromium } = require('playwright');21(async () => {22 const browser = await chromium.launch();23 const context = await browser.newContext({24 });25 const page = await context.newPage();
Using AI Code Generation
1const { chromium } = require('playwright');2(async () => {3 const browser = await chromium.launch();4 const context = await browser.newContext();5 const page = await context.newPage();6 await page.opener();7 await page.close();8 await context.close();9 await browser.close();10})();11const { chromium } = require('playwright');12(async () => {13 const browser = await chromium.launch();14 const context = await browser.newContext();15 const page = await context.newPage();16 const newPage = await page.opener();17 await newPage.close();18 await page.close();19 await context.close();20 await browser.close();21})();22const { chromium } = require('playwright');23(async () => {24 const browser = await chromium.launch();25 const context = await browser.newContext();26 const page = await context.newPage();27 const newContext = await browser.newContext();28 const newPage = await page.opener({ context: newContext });29 await newPage.close();30 await context.close();31 await newContext.close();32 await browser.close();33})();34const { chromium } = require('playwright');35(async () => {36 const browser = await chromium.launch();37 const context = await browser.newContext();38 const page = await context.newPage();39 const newPage = await page.opener();40 await newPage.close();
Using AI Code Generation
1const playwright = require('playwright');2(async () => {3 for (const browserType of ['chromium', 'webkit', 'firefox']) {4 const browser = await playwright[browserType].launch();5 const context = await browser.newContext();6 const page = await context.newPage();7 const opener = await page.opener();8 if (opener === null) {9 console.log('No opener');10 } else {11 console.log('The opener is:');12 console.log(opener);13 }14 await browser.close();15 }16})();17const playwright = require('playwright');18(async () => {19 for (const browserType of ['chromium', 'webkit', 'firefox']) {20 const browser = await playwright[browserType].launch();21 const context = await browser.newContext();22 const page = await context.newPage();23 await Promise.all([24 page.waitForEvent('popup'),25 page.click('a'),26 ]);27 const opener = await page.opener();28 if (opener === null) {29 console.log('No opener');30 } else {31 console.log('The opener is:');32 console.log(opener);33 }34 await browser.close();35 }36})();37const playwright = require('playwright');38(async () => {39 for (const browserType of ['chromium', 'webkit', 'firefox']) {40 const browser = await playwright[browserType].launch();41 const context = await browser.newContext();42 const page = await context.newPage();43 await Promise.all([44 page.waitForEvent('popup'),45 page.click('a'),46 ]);47 const opener = await page.opener();48 if (opener === null) {49 console.log('No opener');50 } else {51 console.log('The opener is:');52 console.log(opener);53 }54 await browser.close();55 }56})();57const playwright = require('playwright');58(async () => {59 for (const
Using AI Code Generation
1const { chromium } = require('playwright');2(async () => {3 const browser = await chromium.launch();4 const context = await browser.newContext();5 const page = await context.newPage();6 await page.opener();7 await browser.close();8})();9## 3.3.4. Page.opener()10const { chromium } = require('playwright');11(async () => {12 const browser = await chromium.launch();13 const context = await browser.newContext();14 const page = await context.newPage();15 console.log(await page.opener());16 await browser.close();17})();18## 3.3.5. Page.opener()19const { chromium } = require('playwright');20(async () => {21 const browser = await chromium.launch();22 const context = await browser.newContext();23 const page = await context.newPage();24 console.log(await page.opener());25 await browser.close();26})();27## 3.3.6. Page.opener()28const { chromium } = require('playwright');29(async () => {30 const browser = await chromium.launch();31 const context = await browser.newContext();32 const page = await context.newPage();33 console.log(await page.opener());34 await browser.close();35})();36## 3.3.7. Page.opener()37const { chromium } = require('playwright');38(async () => {39 const browser = await chromium.launch();40 const context = await browser.newContext();
Using AI Code Generation
1const { chromium } = require('playwright');2(async () => {3 const browser = await chromium.launch({ headless: false, slowMo: 50 });4 const context = await browser.newContext();5 const page = await context.newPage();6 await page.click('text=Get started');7 const newPagePromise = new Promise(x => browser.once('targetcreated', target => x(target.page())));8 await page.click('text=Show documentation');9 const newPage = await newPagePromise;10 await newPage.waitForLoadState();11 console.log(newPage.url());12 await newPage.close();13 await browser.close();14})();
Using AI Code Generation
1const { chromium } = require('playwright');2(async () => {3 const browser = await chromium.launch();4 const context = await browser.newContext();5 const page = await context.newPage();6 await page.click('text=Sign in');7 await page.waitForSelector('input[type="email"]');8 await page.fill('input[type="email"]', 'yourEmail');9 await page.click('text=Next');10 await page.waitForSelector('input[type="password"]');11 await page.fill('input[type="password"]', 'yourPassword');12 await page.click('text=Next');13 await page.waitForNavigation();14 await page.click('text=Create');15 await page.waitForSelector('input[name="q"]');16 await page.fill('input[name="q"]', 'yourSearch');17 await page.click('text=Google Search');18 await page.waitForNavigation();19 await page.click('text=Images');20 await page.waitForNavigation();21 await page.click('text=Videos');22 await page.waitForNavigation();23 await page.click('text=Maps');24 await page.waitForNavigation();25 await page.click('text=News');26 await page.waitForNavigation();27 await page.click('text=Gmail');28 await page.waitForNavigation();29 await page.click('text=Drive');30 await page.waitForNavigation();31 await page.click('text=Calendar');32 await page.waitForNavigation();33 await page.click('text=Translate');34 await page.waitForNavigation();35 await page.click('text=Photos');36 await page.waitForNavigation();37 await page.click('text=Shopping');38 await page.waitForNavigation();39 await page.click('text=More');40 await page.waitForNavigation();41 await page.click('text=Your data in Search');42 await page.waitForNavigation();43 await page.click('text=Web History');44 await page.waitForNavigation();45 await page.click('text=Privacy');46 await page.waitForNavigation();47 await page.click('text=Terms');48 await page.waitForNavigation();49 await page.click('text=Settings');50 await page.waitForNavigation();51 await page.click('text=Advertising');52 await page.waitForNavigation();53 await page.click('text=Business');54 await page.waitForNavigation();55 await page.click('text=About');56 await page.waitForNavigation();57 await page.click('text=How Search works');
Using AI Code Generation
1const playwright = require("playwright");2(async () => {3 const browser = await playwright["chromium"].launch();4 const context = await browser.newContext();5 const page = await context.newPage();6 await page.opener();7 await page.screenshot({ path: "example.png" });8 await browser.close();9})();
Using AI Code Generation
1const { openBrowser, goto, textBox, write, click, closeBrowser } = require('taiko');2(async () => {3 try {4 await openBrowser({ headless: false });5 await write("Taiko", into(textBox({ placeholder: "Search" })));6 await click("Google Search");7 } catch (e) {8 console.error(e);9 } finally {10 await closeBrowser();11 }12})();
Using AI Code Generation
1const { chromium } = require("playwright");2const browser = await chromium.launch();3const context = await browser.newContext({4 opener: (url) => {5 console.log("Opening:", url);6 return true;7 },8});9const page = await context.newPage();10await page.screenshot({ path: "google.png" });11await browser.close();12const { chromium } = require("playwright");13const browser = await chromium.launch();14const context = await browser.newContext();15const page = await context.newPage();16await page.screenshot({ path: "google.png" });17await page.click("text=About");18await page.waitForNavigation();19await page.screenshot({ path: "about.png" });20await browser.close();21const { chromium } = require("playwright");22const browser = await chromium.launch();23const context = await browser.newContext();24const page = await context.newPage();25await page.screenshot({ path: "google.png" });26await page.click("text=About");27await page.waitForNavigation();28await page.screenshot({ path: "about.png" });29const [request] = await Promise.all([30 page.waitForRequest("**/collect"),31 page.click("text=Advertising"),32]);33console.log(request.url());34await browser.close();35const { chromium } = require("playwright");36const browser = await chromium.launch();37const context = await browser.newContext();38const page = await context.newPage();39await page.screenshot({ path: "google.png" });40await page.click("text=About");
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!