Best Python code snippet using playwright-python
opener.py
Source:opener.py
...139 def split_segments(self, fs_url): 140 match = self.re_fs_url.match(fs_url) 141 return match 142 143 def get_opener(self, name):144 """Retrieve an opener for the given protocol145 146 :param name: name of the opener to open147 :raises NoOpenerError: if no opener has been registered of that name148 149 """150 if name not in self.registry:151 raise NoOpenerError("No opener for %s" % name)152 index = self.registry[name]153 return self.openers[index] 154 155 def add(self, opener):156 """Adds an opener to the registry157 158 :param opener: a class derived from fs.opener.Opener159 160 """161 162 index = len(self.openers)163 self.openers[index] = opener164 for name in opener.names:165 self.registry[name] = index166 167 def parse(self, fs_url, default_fs_name=None, writeable=False, create_dir=False, cache_hint=True):168 """Parses a FS url and returns an fs object a path within that FS object169 (if indicated in the path). A tuple of (<FS instance>, <path>) is returned.170 171 :param fs_url: an FS url172 :param default_fs_name: the default FS to use if none is indicated (defaults is OSFS)173 :param writeable: if True, a writeable FS will be returned174 :param create_dir: if True, then the directory in the FS will be created175 176 """177 178 orig_url = fs_url 179 match = self.split_segments(fs_url)180 181 if match: 182 fs_name, credentials, url1, url2, path = match.groups()183 if credentials:184 fs_url = '%s@%s' % (credentials, url1)185 else:186 fs_url = url2187 path = path or ''188 fs_url = fs_url or ''189 if ':' in fs_name:190 fs_name, sub_protocol = fs_name.split(':', 1)191 fs_url = '%s://%s' % (sub_protocol, fs_url)192 if '!' in path:193 paths = path.split('!')194 path = paths.pop()195 fs_url = '%s!%s' % (fs_url, '!'.join(paths))196 197 fs_name = fs_name or self.default_opener198 else:199 fs_name = default_fs_name or self.default_opener200 fs_url = _expand_syspath(fs_url) 201 path = '' 202 203 fs_name, fs_name_params = _parse_name(fs_name) 204 opener = self.get_opener(fs_name)205 206 if fs_url is None:207 raise OpenerError("Unable to parse '%s'" % orig_url) 208 fs, fs_path = opener.get_fs(self, fs_name, fs_name_params, fs_url, writeable, create_dir) 209 fs.cache_hint(cache_hint)210 211 if fs_path and iswildcard(fs_path):212 pathname, resourcename = pathsplit(fs_path or '')213 if pathname:214 fs = fs.opendir(pathname)215 return fs, resourcename216 217 fs_path = join(fs_path, path)218 ...
cetsms.py
Source:cetsms.py
...57 atdata = data[1].split(' ')58 uid =atdata[0]59 psw = atdata[1]60 cj = cookielib.CookieJar()61 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))62 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]63 url = 'http://117.211.100.44:8080/index.php'64 url_data = urlencode({'userid':uid,'password':psw,'Submit':'Submit'})65 usock = opener.open(url, url_data) 66 url = 'http://117.211.100.44:8080/index.php'67 url_data = urlencode({'module':'com_views','task':'student_attendance_view'})68 atand = opener.open(url, url_data)69 page=atand.read()70 line=page.split('\n')71 new=set([])72 for i in range (len(line)) : 73 mat2=re.match(r'.*Logged in as.*',line[i],re.M)74 if mat2: 75 i = i+276 nam=re.match(r'.*<td>(.*?)</td>.*',line[i])77 if nam:78 str0= nam.group(1)79 self.response.out.write("Name : "+str0)80 mat1=re.match(r'.*Attendance till date :.*',line[i],re.M)81 if mat1: 82 self.response.out.write("</br>"+line[i]) 83 elif data[0] == 'send' and len(data)==2:84 self.response.out.write("Received" )85 smsdata = data[1].split('>',1) #contain all message exclude 'send'86 key = smsdata[0]87 if key == 'a' : #admit new user88 secondparse = smsdata[1].split('>',3)89 name = secondparse[0]90 mobno = secondparse[1]91 pro = secondparse[2]92 pswd =secondparse[3]93 94 deleteContact = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 AND mobno =:2 and pro =:3", registration_key(),mobno,pro).get()95 if deleteContact is not None:96 code = deleteContact.code97 if code == '0':98 sys.exit(1)99 db.delete(deleteContact)100 cd = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key()).get() 101 if cd is not None:102 tcode = cd.current103 code = str(int(tcode)+1)104 else:105 code = '1'106 deletecode = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key())107 if deletecode is not None:108 db.delete(deletecode) 109 codedata = Code(parent = code_key())110 codedata.current = code111 codedata.put()112 usrdata = Registration(parent=registration_key()) 113 usrdata.code = code114 usrdata.mobno = mobno115 usrdata.name = name116 usrdata.pro = pro117 usrdata.pwd = pswd118 usrdata.put()119 tono =mobno120 msg="Application activated, |"+pro+"|"+code+"|"121 urltosend = name+">"+mobno+">"+pswd+">"+pro+">"+tono+">"+msg #url to send122 if pro == 'u': #ultoo.com123 now = datetime.datetime.now()124 if mobno == '9037755659':125 msg=msg126 else:127 msg2= name+'('+mobno+'):'+msg128 msg=msg2129 cj = cookielib.CookieJar()130 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))131 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]132 url = 'http://ultoo.in/login.php'133 url_data = urlencode({'LoginMobile':'9037755659','LoginPassword':'47951','RememberMe':'1','submit2':'LOGIN HERE'})134 usock = opener.open(url, url_data)135 day=now.day136 day2=str(day)137 if len(day2) == 1 :138 day2='0'+str(day)139 mon=now.month140 mon2=str(mon)141 if len(mon2) == 1 :142 mon2='0'+str(mon) 143 send_sms_url = 'http://ultoo.in/home.php'144 send_sms_data = urlencode({'MessageLength':'140','MobileNos':tono,'Message': msg,'SendNow': 'Send Now','SendNowBtn':'Send Now','Day':day2,'Month':mon2,'Year':now.year,'TimeInterval':'09:00 - 09:59','CCode':''})145 146 opener.addheaders = [('Referer','http://ultoo.com/home.php')] 147 sms_sent_page = opener.open(send_sms_url,send_sms_data) 148 elif pro =='s': #sitetosms.com149 msg2= '('+name+'):'+msg150 msg=msg2 151 cj = cookielib.CookieJar()152 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))153 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]154 url = 'http://www.site2sms.com/auth.asp'155 url_data = urlencode({'txtCCode':'91','userid':mobno,'Password':pswd,'Submit':'Login'}) 156 usock = opener.open(url, url_data)157 send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'158 send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': tono,'txtUsed':len(msg)}) 159 opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')] 160 sms_sent_page = opener.open(send_sms_url,send_sms_data) 161 elif pro == 'w': #way2sms.com162 msg2= '('+name+'):'+msg163 msg=msg2 164 cj = cookielib.CookieJar()165 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))166 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]167 url='http://site4.way2sms.com/Login1.action'168 url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})169 usock = opener.open(url, url_data)170 ###############3171 send_sms_url = 'http://site4.way2sms.com/quicksms.action'172 send_sms_data = urlencode({'HiddenAction':'instantsms','catnamedis':'Birthday','Action': 'sa65sdf656fdfd','chkall': 'on','MobNo':tono,'textArea':msg})173 opener.addheaders = [('Referer','http://site4.way2sms.com/jsp/InstantSMS.jsp')] 174 sms_sent_page = opener.open(send_sms_url,send_sms_data)175 elif pro == 'b': #160by2.com176 msg2= '('+name+'):'+msg177 msg=msg2 178 cj = cookielib.CookieJar()179 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))180 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]181 url='http://160by2.com/re-login'182 opener.addheaders = [('Referer','Http://160by2.com/')] 183 url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})184 usock = opener.open(url, url_data)185 self.response.out.write("login :"+mobno+pswd)186 send_sms_url = 'http://160by2.com/SendSMSAction'187 send_sms_data = urlencode({'hid_exists':'no','action1':'sf55sa5655sdf5','mobile1':tono,'msg1':msg,'sel_month':0,'sel_day':0,'sel_year':0,'sel_hour':'hh','sel_minute':'mm','sel_cat':0,'messid_0':'','messid_1':'','messid_2':'','messid_3':''})188 opener.addheaders = [('Referer','http://160by2.com/SendSMS?id=9AEF17EFFA4653F27EEFEC3CA5F4BA30.05')] 189 sms_sent_page = opener.open(send_sms_url,send_sms_data) 190 #end else if 191 msg="New user registerd ."+"Name :"+name +", Mob : "+mobno+", In :"+pro192 cj = cookielib.CookieJar()193 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))194 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]195 url = 'http://www.site2sms.com/auth.asp'196 url_data = urlencode({'txtCCode':'91','userid':'9037755659','Password':'9054687','Submit':'Login'}) 197 usock = opener.open(url, url_data)198 send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'199 send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': '9037755659','txtUsed':len(msg)}) 200 opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')] 201 sms_sent_page = opener.open(send_sms_url,send_sms_data) 202 203 #usock = opener.open(url, url_data) #must add new url that i new user registerd to me204 205 elif key == 'm': #mailing to request206 secondparse2 = smsdata[1].split('>',2)207 code = secondparse2[0]208 tono = secondparse2[1]209 msg = secondparse2[2]210 my_text = msg.replace('_', ' ')211 msg=my_text212 #213 214 #215 rslt = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 AND code =:2",registration_key(),code).get() 216 if rslt is not None:217 mobno = rslt.mobno218 pswd = rslt.pwd 219 pro = rslt.pro220 name = rslt.name221 else:222 sys.exit(1)223 urltosend = name+">"+mobno+">"+pswd+">"+pro+">"+tono+">"+msg 224 if pro == 'u': #ultoo.com225 226 227 #Logging into the SMS Site228 cj = cookielib.CookieJar()229 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))230 231 # To fool the website as if a Web browser is visiting the site232 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]233 234 # 235 url = 'http://ultoo.in/login.php'236 url_data = urlencode({'LoginMobile':'9037755659','LoginPassword':'47951','RememberMe':'1','submit2':'LOGIN HERE'})237 #try:238 usock = opener.open(url, url_data)239 now = datetime.datetime.now()240 day=now.day241 day2=str(day)242 if len(day2) == 1 :243 day2='0'+str(day)244 245 mon=now.month246 mon2=str(mon)247 if len(mon2) == 1 :248 mon2='0'+str(mon) 249 send_sms_url = 'http://ultoo.in/home.php'250 send_sms_data = urlencode({'MessageLength':'140','MobileNos':tono,'Message': msg,'SendNow': 'Send Now','SendNowBtn':'Send Now','Day':day2,'Month':mon2,'Year':now.year,'TimeInterval':'09:00 - 09:59','CCode':''})251 opener.addheaders = [('Referer','http://ultoo.com/home.php')] 252 sms_sent_page = opener.open(send_sms_url,send_sms_data)253 254 255 elif pro =='s': #sitetosms.com256 #self.response.out.write("Site 2 sms > "+urltosend)257 msg2= '('+name+'):'+msg258 msg=msg2 259 cj = cookielib.CookieJar()260 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))261 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]262 url = 'http://www.site2sms.com/auth.asp'263 url_data = urlencode({'txtCCode':'91','userid':mobno,'Password':pswd,'Submit':'Login'}) 264 usock = opener.open(url, url_data)265 send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'266 send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': tono,'txtUsed':len(msg)}) 267 opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')] 268 sms_sent_page = opener.open(send_sms_url,send_sms_data) 269 elif pro == 'w': #way2sms.com270 msg2= '('+name+'):'+msg271 msg=msg2 272 cj = cookielib.CookieJar()273 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))274 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]275 url='http://site4.way2sms.com/Login1.action'276 url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})277 usock = opener.open(url, url_data)278 send_sms_url = 'http://site4.way2sms.com/quicksms.action'279 send_sms_data = urlencode({'HiddenAction':'instantsms','catnamedis':'Birthday','Action': 'sa65sdf656fdfd','chkall': 'on','MobNo':tono,'textArea':msg})280 opener.addheaders = [('Referer','http://site4.way2sms.com/jsp/InstantSMS.jsp')] 281 sms_sent_page = opener.open(send_sms_url,send_sms_data)282 elif pro == 'b': #160by2.com283 self.response.out.write("enter")284 msg2= '('+name+'):'+msg285 msg=msg2 286 cj = cookielib.CookieJar()287 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))288 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]289 url='http://160by2.com/re-login'290 opener.addheaders = [('Referer','Http://160by2.com/')] 291 url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})292 usock = opener.open(url, url_data)293 self.response.out.write("login :"+mobno+pswd)294 send_sms_url = 'http://160by2.com/SendSMSAction'295 send_sms_data = urlencode({'hid_exists':'no','action1':'sf55sa5655sdf5','mobile1':tono,'msg1':msg,'sel_month':0,'sel_day':0,'sel_year':0,'sel_hour':'hh','sel_minute':'mm','sel_cat':0,'messid_0':'','messid_1':'','messid_2':'','messid_3':''})296 opener.addheaders = [('Referer','http://160by2.com/SendSMS?id=9AEF17EFFA4653F27EEFEC3CA5F4BA30.05')] 297 sms_sent_page = opener.open(send_sms_url,send_sms_data) 298 299 300 #if else end here301 elif key == 'x':302 self.response.out.write("<br>" )303 commands =smsdata[1].split('>')304 c1 = commands[0]305 c2 = commands[1]306 if c1 == "all" :307 if c2 == "disp":308 self.response.out.write("<br>" )309 self.response.out.write("<Table>")310 self.response.out.write("<tr>")311 self.response.out.write("<td>No</td><td>Code</td><td>Name</td><td>Moblie Number</td><td>Password</td><td>Provider</td>")312 self.response.out.write("</tr>")313 members = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 ", registration_key())314 count = 1315 for member in members:316 self.response.out.write("<tr>")317 self.response.out.write("<td>"+str(count)+"</td><td>"+member.code +"</td><td>"+member.name+"</td><td>"+member.mobno+"</td><td>"+member.pwd+"</td><td>"+member.pro )318 self.response.out.write("</tr>" ) 319 count =count +1320 self.response.out.write("</Table>")321 elif c2 == "del":322 deleteallreg = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 ", registration_key())323 if deleteallreg is not None:324 db.delete(deleteallreg) 325 deleteall = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key())326 if deleteall is not None:327 db.delete(deleteall)328 self.response.out.write("Database cleared successfully" )329 elif c1 == "block":330 blkuser = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 and code=:2 ", registration_key(),c2).get()331 if blkuser is not None:332 name = blkuser.name333 mobno =blkuser.mobno334 pswd = blkuser.pwd335 pro =blkuser.pro336 db.delete(blkuser)337 usrdata = Registration(parent=registration_key()) 338 usrdata.code = '0'339 usrdata.mobno = mobno340 usrdata.name = name341 usrdata.pro = pro342 usrdata.pwd = pswd343 usrdata.put() 344 self.response.out.write("User Blocked :"+name+" "+mobno )345 elif c1=="unblock":346 ublkuser = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 and mobno=:2 ", registration_key(),c2).get()347 if ublkuser is not None:348 self.response.out.write("User Unblocked :"+ublkuser.name+" "+ublkuser.mobno )349 db.delete(ublkuser) 350 351 352 353 else:354 op = urllib2.build_opener()355 url='http://thesaurus.com/browse/' 356 #self.response.out.write("<html><body>")357 l=[]358 l.append(url)359 l.append(data[0])360 url=''.join(l)361 intake=op.open(url)362 page=intake.read()363 line=page.split('\n')364 new=set([])365 for i in range (len(line)) :366 mat1=re.match(r'.*Antonyms:.*',line[i],re.M)367 if mat1:368 i=i+2
...
test_url_policy_open.py
Source:test_url_policy_open.py
...61 self.follow_reference_calls = []62 def follow_reference(self, url):63 self.follow_reference_calls.append(url)64 return self._reference_values[url]65 def make_branch_opener(self, should_follow_references, references,66 unsafe_urls=None):67 policy = _BlacklistPolicy(should_follow_references, unsafe_urls)68 opener = self.StubbedBranchOpener(references, policy)69 return opener70 def test_check_initial_url(self):71 # check_and_follow_branch_reference rejects all URLs that are not72 # allowed.73 opener = self.make_branch_opener(None, [], set(['a']))74 self.assertRaises(75 BadUrl, opener.check_and_follow_branch_reference, 'a')76 def test_not_reference(self):77 # When branch references are forbidden, check_and_follow_branch_reference78 # does not raise on non-references.79 opener = self.make_branch_opener(False, ['a', None])80 self.assertEqual(81 'a', opener.check_and_follow_branch_reference('a'))82 self.assertEqual(['a'], opener.follow_reference_calls)83 def test_branch_reference_forbidden(self):84 # check_and_follow_branch_reference raises BranchReferenceForbidden if85 # branch references are forbidden and the source URL points to a86 # branch reference.87 opener = self.make_branch_opener(False, ['a', 'b'])88 self.assertRaises(89 BranchReferenceForbidden,90 opener.check_and_follow_branch_reference, 'a')91 self.assertEqual(['a'], opener.follow_reference_calls)92 def test_allowed_reference(self):93 # check_and_follow_branch_reference does not raise if following references94 # is allowed and the source URL points to a branch reference to a95 # permitted location.96 opener = self.make_branch_opener(True, ['a', 'b', None])97 self.assertEqual(98 'b', opener.check_and_follow_branch_reference('a'))99 self.assertEqual(['a', 'b'], opener.follow_reference_calls)100 def test_check_referenced_urls(self):101 # check_and_follow_branch_reference checks if the URL a reference points102 # to is safe.103 opener = self.make_branch_opener(104 True, ['a', 'b', None], unsafe_urls=set('b'))105 self.assertRaises(106 BadUrl, opener.check_and_follow_branch_reference, 'a')107 self.assertEqual(['a'], opener.follow_reference_calls)108 def test_self_referencing_branch(self):109 # check_and_follow_branch_reference raises BranchReferenceLoopError if110 # following references is allowed and the source url points to a111 # self-referencing branch reference.112 opener = self.make_branch_opener(True, ['a', 'a'])113 self.assertRaises(114 BranchLoopError, opener.check_and_follow_branch_reference, 'a')115 self.assertEqual(['a'], opener.follow_reference_calls)116 def test_branch_reference_loop(self):117 # check_and_follow_branch_reference raises BranchReferenceLoopError if118 # following references is allowed and the source url points to a loop119 # of branch references.120 references = ['a', 'b', 'a']121 opener = self.make_branch_opener(True, references)122 self.assertRaises(123 BranchLoopError, opener.check_and_follow_branch_reference, 'a')124 self.assertEqual(['a', 'b'], opener.follow_reference_calls)125class TrackingProber(BzrProber):126 """Subclass of BzrProber which tracks URLs it has been asked to open."""127 seen_urls = []128 @classmethod129 def probe_transport(klass, transport):130 klass.seen_urls.append(transport.base)131 return BzrProber.probe_transport(transport)132class TestBranchOpenerStacking(TestCaseWithTransport):133 def setUp(self):134 super(TestBranchOpenerStacking, self).setUp()135 BranchOpener.install_hook()136 def make_branch_opener(self, allowed_urls, probers=None):137 policy = WhitelistPolicy(True, allowed_urls, True)138 return BranchOpener(policy, probers)139 def test_probers(self):140 # Only the specified probers should be used141 b = self.make_branch('branch')142 opener = self.make_branch_opener([b.base], probers=[])143 self.assertRaises(NotBranchError, opener.open, b.base)144 opener = self.make_branch_opener([b.base], probers=[BzrProber])145 self.assertEqual(b.base, opener.open(b.base).base)146 def test_default_probers(self):147 # If no probers are specified to the constructor148 # of BranchOpener, then a safe set will be used,149 # rather than all probers registered in bzr.150 self.addCleanup(ControlDirFormat.unregister_prober, TrackingProber)151 ControlDirFormat.register_prober(TrackingProber)152 # Open a location without any branches, so that all probers are153 # tried.154 # First, check that the TrackingProber tracks correctly.155 TrackingProber.seen_urls = []156 opener = self.make_branch_opener(["."], probers=[TrackingProber])157 self.assertRaises(NotBranchError, opener.open, ".")158 self.assertEqual(1, len(TrackingProber.seen_urls))159 TrackingProber.seen_urls = []160 # And make sure it's registered in such a way that ControlDir.open would161 # use it.162 self.assertRaises(NotBranchError, ControlDir.open, ".")163 self.assertEqual(1, len(TrackingProber.seen_urls))164 def test_allowed_url(self):165 # the opener does not raise an exception for branches stacked on166 # branches with allowed URLs.167 stacked_on_branch = self.make_branch('base-branch', format='1.6')168 stacked_branch = self.make_branch('stacked-branch', format='1.6')169 stacked_branch.set_stacked_on_url(stacked_on_branch.base)170 opener = self.make_branch_opener(171 [stacked_branch.base, stacked_on_branch.base])172 # This doesn't raise an exception.173 opener.open(stacked_branch.base)174 def test_nstackable_repository(self):175 # treats branches with UnstackableRepositoryFormats as176 # being not stacked.177 branch = self.make_branch('unstacked', format='knit')178 opener = self.make_branch_opener([branch.base])179 # This doesn't raise an exception.180 opener.open(branch.base)181 def test_allowed_relative_url(self):182 # passes on absolute urls to check_one_url, even if the183 # value of stacked_on_location in the config is set to a relative URL.184 stacked_on_branch = self.make_branch('base-branch', format='1.6')185 stacked_branch = self.make_branch('stacked-branch', format='1.6')186 stacked_branch.set_stacked_on_url('../base-branch')187 opener = self.make_branch_opener(188 [stacked_branch.base, stacked_on_branch.base])189 # Note that stacked_on_branch.base is not '../base-branch', it's an190 # absolute URL.191 self.assertNotEqual('../base-branch', stacked_on_branch.base)192 # This doesn't raise an exception.193 opener.open(stacked_branch.base)194 def test_allowed_relative_nested(self):195 # Relative URLs are resolved relative to the stacked branch.196 self.get_transport().mkdir('subdir')197 a = self.make_branch('subdir/a', format='1.6')198 b = self.make_branch('b', format='1.6')199 b.set_stacked_on_url('../subdir/a')200 c = self.make_branch('subdir/c', format='1.6')201 c.set_stacked_on_url('../../b')202 opener = self.make_branch_opener([c.base, b.base, a.base])203 # This doesn't raise an exception.204 opener.open(c.base)205 def test_forbidden_url(self):206 # raises a BadUrl exception if a branch is stacked on a207 # branch with a forbidden URL.208 stacked_on_branch = self.make_branch('base-branch', format='1.6')209 stacked_branch = self.make_branch('stacked-branch', format='1.6')210 stacked_branch.set_stacked_on_url(stacked_on_branch.base)211 opener = self.make_branch_opener([stacked_branch.base])212 self.assertRaises(BadUrl, opener.open, stacked_branch.base)213 def test_forbidden_url_nested(self):214 # raises a BadUrl exception if a branch is stacked on a215 # branch that is in turn stacked on a branch with a forbidden URL.216 a = self.make_branch('a', format='1.6')217 b = self.make_branch('b', format='1.6')218 b.set_stacked_on_url(a.base)219 c = self.make_branch('c', format='1.6')220 c.set_stacked_on_url(b.base)221 opener = self.make_branch_opener([c.base, b.base])222 self.assertRaises(BadUrl, opener.open, c.base)223 def test_self_stacked_branch(self):224 # raises StackingLoopError if a branch is stacked on225 # itself. This avoids infinite recursion errors.226 a = self.make_branch('a', format='1.6')227 # Bazaar 1.17 and up make it harder to create branches like this.228 # It's still worth testing that we don't blow up in the face of them,229 # so we grovel around a bit to create one anyway.230 a.get_config().set_user_option('stacked_on_location', a.base)231 opener = self.make_branch_opener([a.base])232 self.assertRaises(BranchLoopError, opener.open, a.base)233 def test_loop_stacked_branch(self):234 # raises StackingLoopError if a branch is stacked in such235 # a way so that it is ultimately stacked on itself. e.g. a stacked on236 # b stacked on a.237 a = self.make_branch('a', format='1.6')238 b = self.make_branch('b', format='1.6')239 a.set_stacked_on_url(b.base)240 b.set_stacked_on_url(a.base)241 opener = self.make_branch_opener([a.base, b.base])242 self.assertRaises(BranchLoopError, opener.open, a.base)243 self.assertRaises(BranchLoopError, opener.open, b.base)244 def test_custom_opener(self):245 # A custom function for opening a control dir can be specified.246 a = self.make_branch('a', format='2a')247 b = self.make_branch('b', format='2a')248 b.set_stacked_on_url(a.base)249 TrackingProber.seen_urls = []250 opener = self.make_branch_opener(251 [a.base, b.base], probers=[TrackingProber])252 opener.open(b.base)253 self.assertEqual(254 set(TrackingProber.seen_urls), set([b.base, a.base]))255 def test_custom_opener_with_branch_reference(self):256 # A custom function for opening a control dir can be specified.257 a = self.make_branch('a', format='2a')258 b_dir = self.make_bzrdir('b')259 b = BranchReferenceFormat().initialize(b_dir, target_branch=a)260 TrackingProber.seen_urls = []261 opener = self.make_branch_opener(262 [a.base, b.base], probers=[TrackingProber])263 opener.open(b.base)264 self.assertEqual(265 set(TrackingProber.seen_urls), set([b.base, a.base]))266class TestOpenOnlyScheme(TestCaseWithTransport):267 """Tests for `open_only_scheme`."""268 def setUp(self):269 super(TestOpenOnlyScheme, self).setUp()270 BranchOpener.install_hook()271 def test_hook_does_not_interfere(self):272 # The transform_fallback_location hook does not interfere with regular273 # stacked branch access outside of open_only_scheme.274 self.make_branch('stacked')275 self.make_branch('stacked-on')...
test_Request.py
Source:test_Request.py
1# -*- coding: utf-8 -*-2# (c) 2018 Matt Martz <matt@sivel.net>3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)4from __future__ import absolute_import, division, print_function5__metaclass__ = type6import datetime7import os8from ansible.module_utils.urls import (Request, open_url, urllib_request, HAS_SSLCONTEXT, cookiejar, RequestWithMethod,9 UnixHTTPHandler, UnixHTTPSConnection, httplib)10from ansible.module_utils.urls import SSLValidationHandler, HTTPSClientAuthHandler, RedirectHandlerFactory11import pytest12from mock import call13if HAS_SSLCONTEXT:14 import ssl15@pytest.fixture16def urlopen_mock(mocker):17 return mocker.patch('ansible.module_utils.urls.urllib_request.urlopen')18@pytest.fixture19def install_opener_mock(mocker):20 return mocker.patch('ansible.module_utils.urls.urllib_request.install_opener')21def test_Request_fallback(urlopen_mock, install_opener_mock, mocker):22 cookies = cookiejar.CookieJar()23 request = Request(24 headers={'foo': 'bar'},25 use_proxy=False,26 force=True,27 timeout=100,28 validate_certs=False,29 url_username='user',30 url_password='passwd',31 http_agent='ansible-tests',32 force_basic_auth=True,33 follow_redirects='all',34 client_cert='/tmp/client.pem',35 client_key='/tmp/client.key',36 cookies=cookies,37 unix_socket='/foo/bar/baz.sock',38 ca_path='/foo/bar/baz.pem',39 )40 fallback_mock = mocker.spy(request, '_fallback')41 r = request.open('GET', 'https://ansible.com')42 calls = [43 call(None, False), # use_proxy44 call(None, True), # force45 call(None, 100), # timeout46 call(None, False), # validate_certs47 call(None, 'user'), # url_username48 call(None, 'passwd'), # url_password49 call(None, 'ansible-tests'), # http_agent50 call(None, True), # force_basic_auth51 call(None, 'all'), # follow_redirects52 call(None, '/tmp/client.pem'), # client_cert53 call(None, '/tmp/client.key'), # client_key54 call(None, cookies), # cookies55 call(None, '/foo/bar/baz.sock'), # unix_socket56 call(None, '/foo/bar/baz.pem'), # ca_path57 ]58 fallback_mock.assert_has_calls(calls)59 assert fallback_mock.call_count == 14 # All but headers use fallback60 args = urlopen_mock.call_args[0]61 assert args[1] is None # data, this is handled in the Request not urlopen62 assert args[2] == 100 # timeout63 req = args[0]64 assert req.headers == {65 'Authorization': b'Basic dXNlcjpwYXNzd2Q=',66 'Cache-control': 'no-cache',67 'Foo': 'bar',68 'User-agent': 'ansible-tests'69 }70 assert req.data is None71 assert req.get_method() == 'GET'72def test_Request_open(urlopen_mock, install_opener_mock):73 r = Request().open('GET', 'https://ansible.com/')74 args = urlopen_mock.call_args[0]75 assert args[1] is None # data, this is handled in the Request not urlopen76 assert args[2] == 10 # timeout77 req = args[0]78 assert req.headers == {}79 assert req.data is None80 assert req.get_method() == 'GET'81 opener = install_opener_mock.call_args[0][0]82 handlers = opener.handlers83 if not HAS_SSLCONTEXT:84 expected_handlers = (85 SSLValidationHandler,86 RedirectHandlerFactory(), # factory, get handler87 )88 else:89 expected_handlers = (90 RedirectHandlerFactory(), # factory, get handler91 )92 found_handlers = []93 for handler in handlers:94 if isinstance(handler, SSLValidationHandler) or handler.__class__.__name__ == 'RedirectHandler':95 found_handlers.append(handler)96 assert len(found_handlers) == len(expected_handlers)97def test_Request_open_http(urlopen_mock, install_opener_mock):98 r = Request().open('GET', 'http://ansible.com/')99 args = urlopen_mock.call_args[0]100 opener = install_opener_mock.call_args[0][0]101 handlers = opener.handlers102 found_handlers = []103 for handler in handlers:104 if isinstance(handler, SSLValidationHandler):105 found_handlers.append(handler)106 assert len(found_handlers) == 0107def test_Request_open_unix_socket(urlopen_mock, install_opener_mock):108 r = Request().open('GET', 'http://ansible.com/', unix_socket='/foo/bar/baz.sock')109 args = urlopen_mock.call_args[0]110 opener = install_opener_mock.call_args[0][0]111 handlers = opener.handlers112 found_handlers = []113 for handler in handlers:114 if isinstance(handler, UnixHTTPHandler):115 found_handlers.append(handler)116 assert len(found_handlers) == 1117def test_Request_open_https_unix_socket(urlopen_mock, install_opener_mock):118 r = Request().open('GET', 'https://ansible.com/', unix_socket='/foo/bar/baz.sock')119 args = urlopen_mock.call_args[0]120 opener = install_opener_mock.call_args[0][0]121 handlers = opener.handlers122 found_handlers = []123 for handler in handlers:124 if isinstance(handler, HTTPSClientAuthHandler):125 found_handlers.append(handler)126 assert len(found_handlers) == 1127 inst = found_handlers[0]._build_https_connection('foo')128 assert isinstance(inst, UnixHTTPSConnection)129def test_Request_open_ftp(urlopen_mock, install_opener_mock, mocker):130 mocker.patch('ansible.module_utils.urls.ParseResultDottedDict.as_list', side_effect=AssertionError)131 # Using ftp scheme should prevent the AssertionError side effect to fire132 r = Request().open('GET', 'ftp://foo@ansible.com/')133def test_Request_open_headers(urlopen_mock, install_opener_mock):134 r = Request().open('GET', 'http://ansible.com/', headers={'Foo': 'bar'})135 args = urlopen_mock.call_args[0]136 req = args[0]137 assert req.headers == {'Foo': 'bar'}138def test_Request_open_username(urlopen_mock, install_opener_mock):139 r = Request().open('GET', 'http://ansible.com/', url_username='user')140 opener = install_opener_mock.call_args[0][0]141 handlers = opener.handlers142 expected_handlers = (143 urllib_request.HTTPBasicAuthHandler,144 urllib_request.HTTPDigestAuthHandler,145 )146 found_handlers = []147 for handler in handlers:148 if isinstance(handler, expected_handlers):149 found_handlers.append(handler)150 assert len(found_handlers) == 2151 assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user', None)}152def test_Request_open_username_in_url(urlopen_mock, install_opener_mock):153 r = Request().open('GET', 'http://user2@ansible.com/')154 opener = install_opener_mock.call_args[0][0]155 handlers = opener.handlers156 expected_handlers = (157 urllib_request.HTTPBasicAuthHandler,158 urllib_request.HTTPDigestAuthHandler,159 )160 found_handlers = []161 for handler in handlers:162 if isinstance(handler, expected_handlers):163 found_handlers.append(handler)164 assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user2', '')}165def test_Request_open_username_force_basic(urlopen_mock, install_opener_mock):166 r = Request().open('GET', 'http://ansible.com/', url_username='user', url_password='passwd', force_basic_auth=True)167 opener = install_opener_mock.call_args[0][0]168 handlers = opener.handlers169 expected_handlers = (170 urllib_request.HTTPBasicAuthHandler,171 urllib_request.HTTPDigestAuthHandler,172 )173 found_handlers = []174 for handler in handlers:175 if isinstance(handler, expected_handlers):176 found_handlers.append(handler)177 assert len(found_handlers) == 0178 args = urlopen_mock.call_args[0]179 req = args[0]180 assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='181def test_Request_open_auth_in_netloc(urlopen_mock, install_opener_mock):182 r = Request().open('GET', 'http://user:passwd@ansible.com/')183 args = urlopen_mock.call_args[0]184 req = args[0]185 assert req.get_full_url() == 'http://ansible.com/'186 opener = install_opener_mock.call_args[0][0]187 handlers = opener.handlers188 expected_handlers = (189 urllib_request.HTTPBasicAuthHandler,190 urllib_request.HTTPDigestAuthHandler,191 )192 found_handlers = []193 for handler in handlers:194 if isinstance(handler, expected_handlers):195 found_handlers.append(handler)196 assert len(found_handlers) == 2197def test_Request_open_netrc(urlopen_mock, install_opener_mock, monkeypatch):198 here = os.path.dirname(__file__)199 monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc'))200 r = Request().open('GET', 'http://ansible.com/')201 args = urlopen_mock.call_args[0]202 req = args[0]203 assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='204 r = Request().open('GET', 'http://foo.ansible.com/')205 args = urlopen_mock.call_args[0]206 req = args[0]207 assert 'Authorization' not in req.headers208 monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc.nonexistant'))209 r = Request().open('GET', 'http://ansible.com/')210 args = urlopen_mock.call_args[0]211 req = args[0]212 assert 'Authorization' not in req.headers213def test_Request_open_no_proxy(urlopen_mock, install_opener_mock, mocker):214 build_opener_mock = mocker.patch('ansible.module_utils.urls.urllib_request.build_opener')215 r = Request().open('GET', 'http://ansible.com/', use_proxy=False)216 handlers = build_opener_mock.call_args[0]217 found_handlers = []218 for handler in handlers:219 if isinstance(handler, urllib_request.ProxyHandler):220 found_handlers.append(handler)221 assert len(found_handlers) == 1222@pytest.mark.skipif(not HAS_SSLCONTEXT, reason="requires SSLContext")223def test_Request_open_no_validate_certs(urlopen_mock, install_opener_mock):224 r = Request().open('GET', 'https://ansible.com/', validate_certs=False)225 opener = install_opener_mock.call_args[0][0]226 handlers = opener.handlers227 ssl_handler = None228 for handler in handlers:229 if isinstance(handler, HTTPSClientAuthHandler):230 ssl_handler = handler231 break232 assert ssl_handler is not None233 inst = ssl_handler._build_https_connection('foo')234 assert isinstance(inst, httplib.HTTPSConnection)235 context = ssl_handler._context236 assert context.protocol == ssl.PROTOCOL_SSLv23237 if ssl.OP_NO_SSLv2:238 assert context.options & ssl.OP_NO_SSLv2239 assert context.options & ssl.OP_NO_SSLv3240 assert context.verify_mode == ssl.CERT_NONE241 assert context.check_hostname is False242def test_Request_open_client_cert(urlopen_mock, install_opener_mock):243 here = os.path.dirname(__file__)244 client_cert = os.path.join(here, 'fixtures/client.pem')245 client_key = os.path.join(here, 'fixtures/client.key')246 r = Request().open('GET', 'https://ansible.com/', client_cert=client_cert, client_key=client_key)247 opener = install_opener_mock.call_args[0][0]248 handlers = opener.handlers249 ssl_handler = None250 for handler in handlers:251 if isinstance(handler, HTTPSClientAuthHandler):252 ssl_handler = handler253 break254 assert ssl_handler is not None255 assert ssl_handler.client_cert == client_cert256 assert ssl_handler.client_key == client_key257 https_connection = ssl_handler._build_https_connection('ansible.com')258 assert https_connection.key_file == client_key259 assert https_connection.cert_file == client_cert260def test_Request_open_cookies(urlopen_mock, install_opener_mock):261 r = Request().open('GET', 'https://ansible.com/', cookies=cookiejar.CookieJar())262 opener = install_opener_mock.call_args[0][0]263 handlers = opener.handlers264 cookies_handler = None265 for handler in handlers:266 if isinstance(handler, urllib_request.HTTPCookieProcessor):267 cookies_handler = handler268 break269 assert cookies_handler is not None270def test_Request_open_invalid_method(urlopen_mock, install_opener_mock):271 r = Request().open('UNKNOWN', 'https://ansible.com/')272 args = urlopen_mock.call_args[0]273 req = args[0]274 assert req.data is None275 assert req.get_method() == 'UNKNOWN'276 # assert r.status == 504277def test_Request_open_custom_method(urlopen_mock, install_opener_mock):278 r = Request().open('DELETE', 'https://ansible.com/')279 args = urlopen_mock.call_args[0]280 req = args[0]281 assert isinstance(req, RequestWithMethod)282def test_Request_open_user_agent(urlopen_mock, install_opener_mock):283 r = Request().open('GET', 'https://ansible.com/', http_agent='ansible-tests')284 args = urlopen_mock.call_args[0]285 req = args[0]286 assert req.headers.get('User-agent') == 'ansible-tests'287def test_Request_open_force(urlopen_mock, install_opener_mock):288 r = Request().open('GET', 'https://ansible.com/', force=True, last_mod_time=datetime.datetime.now())289 args = urlopen_mock.call_args[0]290 req = args[0]291 assert req.headers.get('Cache-control') == 'no-cache'292 assert 'If-modified-since' not in req.headers293def test_Request_open_last_mod(urlopen_mock, install_opener_mock):294 now = datetime.datetime.now()295 r = Request().open('GET', 'https://ansible.com/', last_mod_time=now)296 args = urlopen_mock.call_args[0]297 req = args[0]298 assert req.headers.get('If-modified-since') == now.strftime('%a, %d %b %Y %H:%M:%S -0000')299def test_Request_open_headers_not_dict(urlopen_mock, install_opener_mock):300 with pytest.raises(ValueError):301 Request().open('GET', 'https://ansible.com/', headers=['bob'])302def test_Request_init_headers_not_dict(urlopen_mock, install_opener_mock):303 with pytest.raises(ValueError):304 Request(headers=['bob'])305@pytest.mark.parametrize('method,kwargs', [306 ('get', {}),307 ('options', {}),308 ('head', {}),309 ('post', {'data': None}),310 ('put', {'data': None}),311 ('patch', {'data': None}),312 ('delete', {}),313])314def test_methods(method, kwargs, mocker):315 expected = method.upper()316 open_mock = mocker.patch('ansible.module_utils.urls.Request.open')317 request = Request()318 getattr(request, method)('https://ansible.com')319 open_mock.assert_called_once_with(expected, 'https://ansible.com', **kwargs)320def test_open_url(urlopen_mock, install_opener_mock, mocker):321 req_mock = mocker.patch('ansible.module_utils.urls.Request.open')322 open_url('https://ansible.com/')323 req_mock.assert_called_once_with('GET', 'https://ansible.com/', data=None, headers=None, use_proxy=True,324 force=False, last_mod_time=None, timeout=10, validate_certs=True,325 url_username=None, url_password=None, http_agent=None,326 force_basic_auth=False, follow_redirects='urllib2',327 client_cert=None, client_key=None, cookies=None, use_gssapi=False,...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!