Best Python code snippet using localstack_python
opener.py
Source:opener.py
...139 def split_segments(self, fs_url): 140 match = self.re_fs_url.match(fs_url) 141 return match 142 143 def get_opener(self, name):144 """Retrieve an opener for the given protocol145 146 :param name: name of the opener to open147 :raises NoOpenerError: if no opener has been registered of that name148 149 """150 if name not in self.registry:151 raise NoOpenerError("No opener for %s" % name)152 index = self.registry[name]153 return self.openers[index] 154 155 def add(self, opener):156 """Adds an opener to the registry157 158 :param opener: a class derived from fs.opener.Opener159 160 """161 162 index = len(self.openers)163 self.openers[index] = opener164 for name in opener.names:165 self.registry[name] = index166 167 def parse(self, fs_url, default_fs_name=None, writeable=False, create_dir=False, cache_hint=True):168 """Parses a FS url and returns an fs object a path within that FS object169 (if indicated in the path). A tuple of (<FS instance>, <path>) is returned.170 171 :param fs_url: an FS url172 :param default_fs_name: the default FS to use if none is indicated (defaults is OSFS)173 :param writeable: if True, a writeable FS will be returned174 :param create_dir: if True, then the directory in the FS will be created175 176 """177 178 orig_url = fs_url 179 match = self.split_segments(fs_url)180 181 if match: 182 fs_name, credentials, url1, url2, path = match.groups()183 if credentials:184 fs_url = '%s@%s' % (credentials, url1)185 else:186 fs_url = url2187 path = path or ''188 fs_url = fs_url or ''189 if ':' in fs_name:190 fs_name, sub_protocol = fs_name.split(':', 1)191 fs_url = '%s://%s' % (sub_protocol, fs_url)192 if '!' in path:193 paths = path.split('!')194 path = paths.pop()195 fs_url = '%s!%s' % (fs_url, '!'.join(paths))196 197 fs_name = fs_name or self.default_opener198 else:199 fs_name = default_fs_name or self.default_opener200 fs_url = _expand_syspath(fs_url) 201 path = '' 202 203 fs_name, fs_name_params = _parse_name(fs_name) 204 opener = self.get_opener(fs_name)205 206 if fs_url is None:207 raise OpenerError("Unable to parse '%s'" % orig_url) 208 fs, fs_path = opener.get_fs(self, fs_name, fs_name_params, fs_url, writeable, create_dir) 209 fs.cache_hint(cache_hint)210 211 if fs_path and iswildcard(fs_path):212 pathname, resourcename = pathsplit(fs_path or '')213 if pathname:214 fs = fs.opendir(pathname)215 return fs, resourcename216 217 fs_path = join(fs_path, path)218 ...
cetsms.py
Source:cetsms.py
...57 atdata = data[1].split(' ')58 uid =atdata[0]59 psw = atdata[1]60 cj = cookielib.CookieJar()61 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))62 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]63 url = 'http://117.211.100.44:8080/index.php'64 url_data = urlencode({'userid':uid,'password':psw,'Submit':'Submit'})65 usock = opener.open(url, url_data) 66 url = 'http://117.211.100.44:8080/index.php'67 url_data = urlencode({'module':'com_views','task':'student_attendance_view'})68 atand = opener.open(url, url_data)69 page=atand.read()70 line=page.split('\n')71 new=set([])72 for i in range (len(line)) : 73 mat2=re.match(r'.*Logged in as.*',line[i],re.M)74 if mat2: 75 i = i+276 nam=re.match(r'.*<td>(.*?)</td>.*',line[i])77 if nam:78 str0= nam.group(1)79 self.response.out.write("Name : "+str0)80 mat1=re.match(r'.*Attendance till date :.*',line[i],re.M)81 if mat1: 82 self.response.out.write("</br>"+line[i]) 83 elif data[0] == 'send' and len(data)==2:84 self.response.out.write("Received" )85 smsdata = data[1].split('>',1) #contain all message exclude 'send'86 key = smsdata[0]87 if key == 'a' : #admit new user88 secondparse = smsdata[1].split('>',3)89 name = secondparse[0]90 mobno = secondparse[1]91 pro = secondparse[2]92 pswd =secondparse[3]93 94 deleteContact = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 AND mobno =:2 and pro =:3", registration_key(),mobno,pro).get()95 if deleteContact is not None:96 code = deleteContact.code97 if code == '0':98 sys.exit(1)99 db.delete(deleteContact)100 cd = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key()).get() 101 if cd is not None:102 tcode = cd.current103 code = str(int(tcode)+1)104 else:105 code = '1'106 deletecode = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key())107 if deletecode is not None:108 db.delete(deletecode) 109 codedata = Code(parent = code_key())110 codedata.current = code111 codedata.put()112 usrdata = Registration(parent=registration_key()) 113 usrdata.code = code114 usrdata.mobno = mobno115 usrdata.name = name116 usrdata.pro = pro117 usrdata.pwd = pswd118 usrdata.put()119 tono =mobno120 msg="Application activated, |"+pro+"|"+code+"|"121 urltosend = name+">"+mobno+">"+pswd+">"+pro+">"+tono+">"+msg #url to send122 if pro == 'u': #ultoo.com123 now = datetime.datetime.now()124 if mobno == '9037755659':125 msg=msg126 else:127 msg2= name+'('+mobno+'):'+msg128 msg=msg2129 cj = cookielib.CookieJar()130 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))131 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]132 url = 'http://ultoo.in/login.php'133 url_data = urlencode({'LoginMobile':'9037755659','LoginPassword':'47951','RememberMe':'1','submit2':'LOGIN HERE'})134 usock = opener.open(url, url_data)135 day=now.day136 day2=str(day)137 if len(day2) == 1 :138 day2='0'+str(day)139 mon=now.month140 mon2=str(mon)141 if len(mon2) == 1 :142 mon2='0'+str(mon) 143 send_sms_url = 'http://ultoo.in/home.php'144 send_sms_data = urlencode({'MessageLength':'140','MobileNos':tono,'Message': msg,'SendNow': 'Send Now','SendNowBtn':'Send Now','Day':day2,'Month':mon2,'Year':now.year,'TimeInterval':'09:00 - 09:59','CCode':''})145 146 opener.addheaders = [('Referer','http://ultoo.com/home.php')] 147 sms_sent_page = opener.open(send_sms_url,send_sms_data) 148 elif pro =='s': #sitetosms.com149 msg2= '('+name+'):'+msg150 msg=msg2 151 cj = cookielib.CookieJar()152 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))153 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]154 url = 'http://www.site2sms.com/auth.asp'155 url_data = urlencode({'txtCCode':'91','userid':mobno,'Password':pswd,'Submit':'Login'}) 156 usock = opener.open(url, url_data)157 send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'158 send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': tono,'txtUsed':len(msg)}) 159 opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')] 160 sms_sent_page = opener.open(send_sms_url,send_sms_data) 161 elif pro == 'w': #way2sms.com162 msg2= '('+name+'):'+msg163 msg=msg2 164 cj = cookielib.CookieJar()165 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))166 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]167 url='http://site4.way2sms.com/Login1.action'168 url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})169 usock = opener.open(url, url_data)170 ###############3171 send_sms_url = 'http://site4.way2sms.com/quicksms.action'172 send_sms_data = urlencode({'HiddenAction':'instantsms','catnamedis':'Birthday','Action': 'sa65sdf656fdfd','chkall': 'on','MobNo':tono,'textArea':msg})173 opener.addheaders = [('Referer','http://site4.way2sms.com/jsp/InstantSMS.jsp')] 174 sms_sent_page = opener.open(send_sms_url,send_sms_data)175 elif pro == 'b': #160by2.com176 msg2= '('+name+'):'+msg177 msg=msg2 178 cj = cookielib.CookieJar()179 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))180 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]181 url='http://160by2.com/re-login'182 opener.addheaders = [('Referer','Http://160by2.com/')] 183 url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})184 usock = opener.open(url, url_data)185 self.response.out.write("login :"+mobno+pswd)186 send_sms_url = 'http://160by2.com/SendSMSAction'187 send_sms_data = urlencode({'hid_exists':'no','action1':'sf55sa5655sdf5','mobile1':tono,'msg1':msg,'sel_month':0,'sel_day':0,'sel_year':0,'sel_hour':'hh','sel_minute':'mm','sel_cat':0,'messid_0':'','messid_1':'','messid_2':'','messid_3':''})188 opener.addheaders = [('Referer','http://160by2.com/SendSMS?id=9AEF17EFFA4653F27EEFEC3CA5F4BA30.05')] 189 sms_sent_page = opener.open(send_sms_url,send_sms_data) 190 #end else if 191 msg="New user registerd ."+"Name :"+name +", Mob : "+mobno+", In :"+pro192 cj = cookielib.CookieJar()193 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))194 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]195 url = 'http://www.site2sms.com/auth.asp'196 url_data = urlencode({'txtCCode':'91','userid':'9037755659','Password':'9054687','Submit':'Login'}) 197 usock = opener.open(url, url_data)198 send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'199 send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': '9037755659','txtUsed':len(msg)}) 200 opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')] 201 sms_sent_page = opener.open(send_sms_url,send_sms_data) 202 203 #usock = opener.open(url, url_data) #must add new url that i new user registerd to me204 205 elif key == 'm': #mailing to request206 secondparse2 = smsdata[1].split('>',2)207 code = secondparse2[0]208 tono = secondparse2[1]209 msg = secondparse2[2]210 my_text = msg.replace('_', ' ')211 msg=my_text212 #213 214 #215 rslt = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 AND code =:2",registration_key(),code).get() 216 if rslt is not None:217 mobno = rslt.mobno218 pswd = rslt.pwd 219 pro = rslt.pro220 name = rslt.name221 else:222 sys.exit(1)223 urltosend = name+">"+mobno+">"+pswd+">"+pro+">"+tono+">"+msg 224 if pro == 'u': #ultoo.com225 226 227 #Logging into the SMS Site228 cj = cookielib.CookieJar()229 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))230 231 # To fool the website as if a Web browser is visiting the site232 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]233 234 # 235 url = 'http://ultoo.in/login.php'236 url_data = urlencode({'LoginMobile':'9037755659','LoginPassword':'47951','RememberMe':'1','submit2':'LOGIN HERE'})237 #try:238 usock = opener.open(url, url_data)239 now = datetime.datetime.now()240 day=now.day241 day2=str(day)242 if len(day2) == 1 :243 day2='0'+str(day)244 245 mon=now.month246 mon2=str(mon)247 if len(mon2) == 1 :248 mon2='0'+str(mon) 249 send_sms_url = 'http://ultoo.in/home.php'250 send_sms_data = urlencode({'MessageLength':'140','MobileNos':tono,'Message': msg,'SendNow': 'Send Now','SendNowBtn':'Send Now','Day':day2,'Month':mon2,'Year':now.year,'TimeInterval':'09:00 - 09:59','CCode':''})251 opener.addheaders = [('Referer','http://ultoo.com/home.php')] 252 sms_sent_page = opener.open(send_sms_url,send_sms_data)253 254 255 elif pro =='s': #sitetosms.com256 #self.response.out.write("Site 2 sms > "+urltosend)257 msg2= '('+name+'):'+msg258 msg=msg2 259 cj = cookielib.CookieJar()260 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))261 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]262 url = 'http://www.site2sms.com/auth.asp'263 url_data = urlencode({'txtCCode':'91','userid':mobno,'Password':pswd,'Submit':'Login'}) 264 usock = opener.open(url, url_data)265 send_sms_url = 'http://www.site2sms.com/user/send_sms_next.asp'266 send_sms_data = urlencode({'txtCategory':'40','txtGroup':'0','txtLeft': len(msg) - TOTAL,'txtMessage': msg, 'txtMobileNo': tono,'txtUsed':len(msg)}) 267 opener.addheaders = [('Referer','http://www.site2sms.com/user/send_sms.asp')] 268 sms_sent_page = opener.open(send_sms_url,send_sms_data) 269 elif pro == 'w': #way2sms.com270 msg2= '('+name+'):'+msg271 msg=msg2 272 cj = cookielib.CookieJar()273 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))274 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]275 url='http://site4.way2sms.com/Login1.action'276 url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})277 usock = opener.open(url, url_data)278 send_sms_url = 'http://site4.way2sms.com/quicksms.action'279 send_sms_data = urlencode({'HiddenAction':'instantsms','catnamedis':'Birthday','Action': 'sa65sdf656fdfd','chkall': 'on','MobNo':tono,'textArea':msg})280 opener.addheaders = [('Referer','http://site4.way2sms.com/jsp/InstantSMS.jsp')] 281 sms_sent_page = opener.open(send_sms_url,send_sms_data)282 elif pro == 'b': #160by2.com283 self.response.out.write("enter")284 msg2= '('+name+'):'+msg285 msg=msg2 286 cj = cookielib.CookieJar()287 opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))288 opener.addheaders = [('User-Agent','Mozilla/5.0 (Ubuntu; X11; Linux x86_64; rv:8.0) Gecko/20100101 Firefox/8.0')]289 url='http://160by2.com/re-login'290 opener.addheaders = [('Referer','Http://160by2.com/')] 291 url_data = urlencode({'username':mobno,'password':pswd,'button':'Login'})292 usock = opener.open(url, url_data)293 self.response.out.write("login :"+mobno+pswd)294 send_sms_url = 'http://160by2.com/SendSMSAction'295 send_sms_data = urlencode({'hid_exists':'no','action1':'sf55sa5655sdf5','mobile1':tono,'msg1':msg,'sel_month':0,'sel_day':0,'sel_year':0,'sel_hour':'hh','sel_minute':'mm','sel_cat':0,'messid_0':'','messid_1':'','messid_2':'','messid_3':''})296 opener.addheaders = [('Referer','http://160by2.com/SendSMS?id=9AEF17EFFA4653F27EEFEC3CA5F4BA30.05')] 297 sms_sent_page = opener.open(send_sms_url,send_sms_data) 298 299 300 #if else end here301 elif key == 'x':302 self.response.out.write("<br>" )303 commands =smsdata[1].split('>')304 c1 = commands[0]305 c2 = commands[1]306 if c1 == "all" :307 if c2 == "disp":308 self.response.out.write("<br>" )309 self.response.out.write("<Table>")310 self.response.out.write("<tr>")311 self.response.out.write("<td>No</td><td>Code</td><td>Name</td><td>Moblie Number</td><td>Password</td><td>Provider</td>")312 self.response.out.write("</tr>")313 members = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 ", registration_key())314 count = 1315 for member in members:316 self.response.out.write("<tr>")317 self.response.out.write("<td>"+str(count)+"</td><td>"+member.code +"</td><td>"+member.name+"</td><td>"+member.mobno+"</td><td>"+member.pwd+"</td><td>"+member.pro )318 self.response.out.write("</tr>" ) 319 count =count +1320 self.response.out.write("</Table>")321 elif c2 == "del":322 deleteallreg = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 ", registration_key())323 if deleteallreg is not None:324 db.delete(deleteallreg) 325 deleteall = db.GqlQuery("SELECT * FROM Code WHERE ANCESTOR IS :1 ", code_key())326 if deleteall is not None:327 db.delete(deleteall)328 self.response.out.write("Database cleared successfully" )329 elif c1 == "block":330 blkuser = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 and code=:2 ", registration_key(),c2).get()331 if blkuser is not None:332 name = blkuser.name333 mobno =blkuser.mobno334 pswd = blkuser.pwd335 pro =blkuser.pro336 db.delete(blkuser)337 usrdata = Registration(parent=registration_key()) 338 usrdata.code = '0'339 usrdata.mobno = mobno340 usrdata.name = name341 usrdata.pro = pro342 usrdata.pwd = pswd343 usrdata.put() 344 self.response.out.write("User Blocked :"+name+" "+mobno )345 elif c1=="unblock":346 ublkuser = db.GqlQuery("SELECT * FROM Registration WHERE ANCESTOR IS :1 and mobno=:2 ", registration_key(),c2).get()347 if ublkuser is not None:348 self.response.out.write("User Unblocked :"+ublkuser.name+" "+ublkuser.mobno )349 db.delete(ublkuser) 350 351 352 353 else:354 op = urllib2.build_opener()355 url='http://thesaurus.com/browse/' 356 #self.response.out.write("<html><body>")357 l=[]358 l.append(url)359 l.append(data[0])360 url=''.join(l)361 intake=op.open(url)362 page=intake.read()363 line=page.split('\n')364 new=set([])365 for i in range (len(line)) :366 mat1=re.match(r'.*Antonyms:.*',line[i],re.M)367 if mat1:368 i=i+2
...
test_url_policy_open.py
Source:test_url_policy_open.py
...61 self.follow_reference_calls = []62 def follow_reference(self, url):63 self.follow_reference_calls.append(url)64 return self._reference_values[url]65 def make_branch_opener(self, should_follow_references, references,66 unsafe_urls=None):67 policy = _BlacklistPolicy(should_follow_references, unsafe_urls)68 opener = self.StubbedBranchOpener(references, policy)69 return opener70 def test_check_initial_url(self):71 # check_and_follow_branch_reference rejects all URLs that are not72 # allowed.73 opener = self.make_branch_opener(None, [], set(['a']))74 self.assertRaises(75 BadUrl, opener.check_and_follow_branch_reference, 'a')76 def test_not_reference(self):77 # When branch references are forbidden, check_and_follow_branch_reference78 # does not raise on non-references.79 opener = self.make_branch_opener(False, ['a', None])80 self.assertEqual(81 'a', opener.check_and_follow_branch_reference('a'))82 self.assertEqual(['a'], opener.follow_reference_calls)83 def test_branch_reference_forbidden(self):84 # check_and_follow_branch_reference raises BranchReferenceForbidden if85 # branch references are forbidden and the source URL points to a86 # branch reference.87 opener = self.make_branch_opener(False, ['a', 'b'])88 self.assertRaises(89 BranchReferenceForbidden,90 opener.check_and_follow_branch_reference, 'a')91 self.assertEqual(['a'], opener.follow_reference_calls)92 def test_allowed_reference(self):93 # check_and_follow_branch_reference does not raise if following references94 # is allowed and the source URL points to a branch reference to a95 # permitted location.96 opener = self.make_branch_opener(True, ['a', 'b', None])97 self.assertEqual(98 'b', opener.check_and_follow_branch_reference('a'))99 self.assertEqual(['a', 'b'], opener.follow_reference_calls)100 def test_check_referenced_urls(self):101 # check_and_follow_branch_reference checks if the URL a reference points102 # to is safe.103 opener = self.make_branch_opener(104 True, ['a', 'b', None], unsafe_urls=set('b'))105 self.assertRaises(106 BadUrl, opener.check_and_follow_branch_reference, 'a')107 self.assertEqual(['a'], opener.follow_reference_calls)108 def test_self_referencing_branch(self):109 # check_and_follow_branch_reference raises BranchReferenceLoopError if110 # following references is allowed and the source url points to a111 # self-referencing branch reference.112 opener = self.make_branch_opener(True, ['a', 'a'])113 self.assertRaises(114 BranchLoopError, opener.check_and_follow_branch_reference, 'a')115 self.assertEqual(['a'], opener.follow_reference_calls)116 def test_branch_reference_loop(self):117 # check_and_follow_branch_reference raises BranchReferenceLoopError if118 # following references is allowed and the source url points to a loop119 # of branch references.120 references = ['a', 'b', 'a']121 opener = self.make_branch_opener(True, references)122 self.assertRaises(123 BranchLoopError, opener.check_and_follow_branch_reference, 'a')124 self.assertEqual(['a', 'b'], opener.follow_reference_calls)125class TrackingProber(BzrProber):126 """Subclass of BzrProber which tracks URLs it has been asked to open."""127 seen_urls = []128 @classmethod129 def probe_transport(klass, transport):130 klass.seen_urls.append(transport.base)131 return BzrProber.probe_transport(transport)132class TestBranchOpenerStacking(TestCaseWithTransport):133 def setUp(self):134 super(TestBranchOpenerStacking, self).setUp()135 BranchOpener.install_hook()136 def make_branch_opener(self, allowed_urls, probers=None):137 policy = WhitelistPolicy(True, allowed_urls, True)138 return BranchOpener(policy, probers)139 def test_probers(self):140 # Only the specified probers should be used141 b = self.make_branch('branch')142 opener = self.make_branch_opener([b.base], probers=[])143 self.assertRaises(NotBranchError, opener.open, b.base)144 opener = self.make_branch_opener([b.base], probers=[BzrProber])145 self.assertEqual(b.base, opener.open(b.base).base)146 def test_default_probers(self):147 # If no probers are specified to the constructor148 # of BranchOpener, then a safe set will be used,149 # rather than all probers registered in bzr.150 self.addCleanup(ControlDirFormat.unregister_prober, TrackingProber)151 ControlDirFormat.register_prober(TrackingProber)152 # Open a location without any branches, so that all probers are153 # tried.154 # First, check that the TrackingProber tracks correctly.155 TrackingProber.seen_urls = []156 opener = self.make_branch_opener(["."], probers=[TrackingProber])157 self.assertRaises(NotBranchError, opener.open, ".")158 self.assertEqual(1, len(TrackingProber.seen_urls))159 TrackingProber.seen_urls = []160 # And make sure it's registered in such a way that ControlDir.open would161 # use it.162 self.assertRaises(NotBranchError, ControlDir.open, ".")163 self.assertEqual(1, len(TrackingProber.seen_urls))164 def test_allowed_url(self):165 # the opener does not raise an exception for branches stacked on166 # branches with allowed URLs.167 stacked_on_branch = self.make_branch('base-branch', format='1.6')168 stacked_branch = self.make_branch('stacked-branch', format='1.6')169 stacked_branch.set_stacked_on_url(stacked_on_branch.base)170 opener = self.make_branch_opener(171 [stacked_branch.base, stacked_on_branch.base])172 # This doesn't raise an exception.173 opener.open(stacked_branch.base)174 def test_nstackable_repository(self):175 # treats branches with UnstackableRepositoryFormats as176 # being not stacked.177 branch = self.make_branch('unstacked', format='knit')178 opener = self.make_branch_opener([branch.base])179 # This doesn't raise an exception.180 opener.open(branch.base)181 def test_allowed_relative_url(self):182 # passes on absolute urls to check_one_url, even if the183 # value of stacked_on_location in the config is set to a relative URL.184 stacked_on_branch = self.make_branch('base-branch', format='1.6')185 stacked_branch = self.make_branch('stacked-branch', format='1.6')186 stacked_branch.set_stacked_on_url('../base-branch')187 opener = self.make_branch_opener(188 [stacked_branch.base, stacked_on_branch.base])189 # Note that stacked_on_branch.base is not '../base-branch', it's an190 # absolute URL.191 self.assertNotEqual('../base-branch', stacked_on_branch.base)192 # This doesn't raise an exception.193 opener.open(stacked_branch.base)194 def test_allowed_relative_nested(self):195 # Relative URLs are resolved relative to the stacked branch.196 self.get_transport().mkdir('subdir')197 a = self.make_branch('subdir/a', format='1.6')198 b = self.make_branch('b', format='1.6')199 b.set_stacked_on_url('../subdir/a')200 c = self.make_branch('subdir/c', format='1.6')201 c.set_stacked_on_url('../../b')202 opener = self.make_branch_opener([c.base, b.base, a.base])203 # This doesn't raise an exception.204 opener.open(c.base)205 def test_forbidden_url(self):206 # raises a BadUrl exception if a branch is stacked on a207 # branch with a forbidden URL.208 stacked_on_branch = self.make_branch('base-branch', format='1.6')209 stacked_branch = self.make_branch('stacked-branch', format='1.6')210 stacked_branch.set_stacked_on_url(stacked_on_branch.base)211 opener = self.make_branch_opener([stacked_branch.base])212 self.assertRaises(BadUrl, opener.open, stacked_branch.base)213 def test_forbidden_url_nested(self):214 # raises a BadUrl exception if a branch is stacked on a215 # branch that is in turn stacked on a branch with a forbidden URL.216 a = self.make_branch('a', format='1.6')217 b = self.make_branch('b', format='1.6')218 b.set_stacked_on_url(a.base)219 c = self.make_branch('c', format='1.6')220 c.set_stacked_on_url(b.base)221 opener = self.make_branch_opener([c.base, b.base])222 self.assertRaises(BadUrl, opener.open, c.base)223 def test_self_stacked_branch(self):224 # raises StackingLoopError if a branch is stacked on225 # itself. This avoids infinite recursion errors.226 a = self.make_branch('a', format='1.6')227 # Bazaar 1.17 and up make it harder to create branches like this.228 # It's still worth testing that we don't blow up in the face of them,229 # so we grovel around a bit to create one anyway.230 a.get_config().set_user_option('stacked_on_location', a.base)231 opener = self.make_branch_opener([a.base])232 self.assertRaises(BranchLoopError, opener.open, a.base)233 def test_loop_stacked_branch(self):234 # raises StackingLoopError if a branch is stacked in such235 # a way so that it is ultimately stacked on itself. e.g. a stacked on236 # b stacked on a.237 a = self.make_branch('a', format='1.6')238 b = self.make_branch('b', format='1.6')239 a.set_stacked_on_url(b.base)240 b.set_stacked_on_url(a.base)241 opener = self.make_branch_opener([a.base, b.base])242 self.assertRaises(BranchLoopError, opener.open, a.base)243 self.assertRaises(BranchLoopError, opener.open, b.base)244 def test_custom_opener(self):245 # A custom function for opening a control dir can be specified.246 a = self.make_branch('a', format='2a')247 b = self.make_branch('b', format='2a')248 b.set_stacked_on_url(a.base)249 TrackingProber.seen_urls = []250 opener = self.make_branch_opener(251 [a.base, b.base], probers=[TrackingProber])252 opener.open(b.base)253 self.assertEqual(254 set(TrackingProber.seen_urls), set([b.base, a.base]))255 def test_custom_opener_with_branch_reference(self):256 # A custom function for opening a control dir can be specified.257 a = self.make_branch('a', format='2a')258 b_dir = self.make_bzrdir('b')259 b = BranchReferenceFormat().initialize(b_dir, target_branch=a)260 TrackingProber.seen_urls = []261 opener = self.make_branch_opener(262 [a.base, b.base], probers=[TrackingProber])263 opener.open(b.base)264 self.assertEqual(265 set(TrackingProber.seen_urls), set([b.base, a.base]))266class TestOpenOnlyScheme(TestCaseWithTransport):267 """Tests for `open_only_scheme`."""268 def setUp(self):269 super(TestOpenOnlyScheme, self).setUp()270 BranchOpener.install_hook()271 def test_hook_does_not_interfere(self):272 # The transform_fallback_location hook does not interfere with regular273 # stacked branch access outside of open_only_scheme.274 self.make_branch('stacked')275 self.make_branch('stacked-on')...
test_Request.py
Source:test_Request.py
1# -*- coding: utf-8 -*-2# (c) 2018 Matt Martz <matt@sivel.net>3# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)4from __future__ import absolute_import, division, print_function5__metaclass__ = type6import datetime7import os8from ansible.module_utils.urls import (Request, open_url, urllib_request, HAS_SSLCONTEXT, cookiejar, RequestWithMethod,9 UnixHTTPHandler, UnixHTTPSConnection, httplib)10from ansible.module_utils.urls import SSLValidationHandler, HTTPSClientAuthHandler, RedirectHandlerFactory11import pytest12from mock import call13if HAS_SSLCONTEXT:14 import ssl15@pytest.fixture16def urlopen_mock(mocker):17 return mocker.patch('ansible.module_utils.urls.urllib_request.urlopen')18@pytest.fixture19def install_opener_mock(mocker):20 return mocker.patch('ansible.module_utils.urls.urllib_request.install_opener')21def test_Request_fallback(urlopen_mock, install_opener_mock, mocker):22 cookies = cookiejar.CookieJar()23 request = Request(24 headers={'foo': 'bar'},25 use_proxy=False,26 force=True,27 timeout=100,28 validate_certs=False,29 url_username='user',30 url_password='passwd',31 http_agent='ansible-tests',32 force_basic_auth=True,33 follow_redirects='all',34 client_cert='/tmp/client.pem',35 client_key='/tmp/client.key',36 cookies=cookies,37 unix_socket='/foo/bar/baz.sock',38 ca_path='/foo/bar/baz.pem',39 )40 fallback_mock = mocker.spy(request, '_fallback')41 r = request.open('GET', 'https://ansible.com')42 calls = [43 call(None, False), # use_proxy44 call(None, True), # force45 call(None, 100), # timeout46 call(None, False), # validate_certs47 call(None, 'user'), # url_username48 call(None, 'passwd'), # url_password49 call(None, 'ansible-tests'), # http_agent50 call(None, True), # force_basic_auth51 call(None, 'all'), # follow_redirects52 call(None, '/tmp/client.pem'), # client_cert53 call(None, '/tmp/client.key'), # client_key54 call(None, cookies), # cookies55 call(None, '/foo/bar/baz.sock'), # unix_socket56 call(None, '/foo/bar/baz.pem'), # ca_path57 ]58 fallback_mock.assert_has_calls(calls)59 assert fallback_mock.call_count == 14 # All but headers use fallback60 args = urlopen_mock.call_args[0]61 assert args[1] is None # data, this is handled in the Request not urlopen62 assert args[2] == 100 # timeout63 req = args[0]64 assert req.headers == {65 'Authorization': b'Basic dXNlcjpwYXNzd2Q=',66 'Cache-control': 'no-cache',67 'Foo': 'bar',68 'User-agent': 'ansible-tests'69 }70 assert req.data is None71 assert req.get_method() == 'GET'72def test_Request_open(urlopen_mock, install_opener_mock):73 r = Request().open('GET', 'https://ansible.com/')74 args = urlopen_mock.call_args[0]75 assert args[1] is None # data, this is handled in the Request not urlopen76 assert args[2] == 10 # timeout77 req = args[0]78 assert req.headers == {}79 assert req.data is None80 assert req.get_method() == 'GET'81 opener = install_opener_mock.call_args[0][0]82 handlers = opener.handlers83 if not HAS_SSLCONTEXT:84 expected_handlers = (85 SSLValidationHandler,86 RedirectHandlerFactory(), # factory, get handler87 )88 else:89 expected_handlers = (90 RedirectHandlerFactory(), # factory, get handler91 )92 found_handlers = []93 for handler in handlers:94 if isinstance(handler, SSLValidationHandler) or handler.__class__.__name__ == 'RedirectHandler':95 found_handlers.append(handler)96 assert len(found_handlers) == len(expected_handlers)97def test_Request_open_http(urlopen_mock, install_opener_mock):98 r = Request().open('GET', 'http://ansible.com/')99 args = urlopen_mock.call_args[0]100 opener = install_opener_mock.call_args[0][0]101 handlers = opener.handlers102 found_handlers = []103 for handler in handlers:104 if isinstance(handler, SSLValidationHandler):105 found_handlers.append(handler)106 assert len(found_handlers) == 0107def test_Request_open_unix_socket(urlopen_mock, install_opener_mock):108 r = Request().open('GET', 'http://ansible.com/', unix_socket='/foo/bar/baz.sock')109 args = urlopen_mock.call_args[0]110 opener = install_opener_mock.call_args[0][0]111 handlers = opener.handlers112 found_handlers = []113 for handler in handlers:114 if isinstance(handler, UnixHTTPHandler):115 found_handlers.append(handler)116 assert len(found_handlers) == 1117def test_Request_open_https_unix_socket(urlopen_mock, install_opener_mock):118 r = Request().open('GET', 'https://ansible.com/', unix_socket='/foo/bar/baz.sock')119 args = urlopen_mock.call_args[0]120 opener = install_opener_mock.call_args[0][0]121 handlers = opener.handlers122 found_handlers = []123 for handler in handlers:124 if isinstance(handler, HTTPSClientAuthHandler):125 found_handlers.append(handler)126 assert len(found_handlers) == 1127 inst = found_handlers[0]._build_https_connection('foo')128 assert isinstance(inst, UnixHTTPSConnection)129def test_Request_open_ftp(urlopen_mock, install_opener_mock, mocker):130 mocker.patch('ansible.module_utils.urls.ParseResultDottedDict.as_list', side_effect=AssertionError)131 # Using ftp scheme should prevent the AssertionError side effect to fire132 r = Request().open('GET', 'ftp://foo@ansible.com/')133def test_Request_open_headers(urlopen_mock, install_opener_mock):134 r = Request().open('GET', 'http://ansible.com/', headers={'Foo': 'bar'})135 args = urlopen_mock.call_args[0]136 req = args[0]137 assert req.headers == {'Foo': 'bar'}138def test_Request_open_username(urlopen_mock, install_opener_mock):139 r = Request().open('GET', 'http://ansible.com/', url_username='user')140 opener = install_opener_mock.call_args[0][0]141 handlers = opener.handlers142 expected_handlers = (143 urllib_request.HTTPBasicAuthHandler,144 urllib_request.HTTPDigestAuthHandler,145 )146 found_handlers = []147 for handler in handlers:148 if isinstance(handler, expected_handlers):149 found_handlers.append(handler)150 assert len(found_handlers) == 2151 assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user', None)}152def test_Request_open_username_in_url(urlopen_mock, install_opener_mock):153 r = Request().open('GET', 'http://user2@ansible.com/')154 opener = install_opener_mock.call_args[0][0]155 handlers = opener.handlers156 expected_handlers = (157 urllib_request.HTTPBasicAuthHandler,158 urllib_request.HTTPDigestAuthHandler,159 )160 found_handlers = []161 for handler in handlers:162 if isinstance(handler, expected_handlers):163 found_handlers.append(handler)164 assert found_handlers[0].passwd.passwd[None] == {(('ansible.com', '/'),): ('user2', '')}165def test_Request_open_username_force_basic(urlopen_mock, install_opener_mock):166 r = Request().open('GET', 'http://ansible.com/', url_username='user', url_password='passwd', force_basic_auth=True)167 opener = install_opener_mock.call_args[0][0]168 handlers = opener.handlers169 expected_handlers = (170 urllib_request.HTTPBasicAuthHandler,171 urllib_request.HTTPDigestAuthHandler,172 )173 found_handlers = []174 for handler in handlers:175 if isinstance(handler, expected_handlers):176 found_handlers.append(handler)177 assert len(found_handlers) == 0178 args = urlopen_mock.call_args[0]179 req = args[0]180 assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='181def test_Request_open_auth_in_netloc(urlopen_mock, install_opener_mock):182 r = Request().open('GET', 'http://user:passwd@ansible.com/')183 args = urlopen_mock.call_args[0]184 req = args[0]185 assert req.get_full_url() == 'http://ansible.com/'186 opener = install_opener_mock.call_args[0][0]187 handlers = opener.handlers188 expected_handlers = (189 urllib_request.HTTPBasicAuthHandler,190 urllib_request.HTTPDigestAuthHandler,191 )192 found_handlers = []193 for handler in handlers:194 if isinstance(handler, expected_handlers):195 found_handlers.append(handler)196 assert len(found_handlers) == 2197def test_Request_open_netrc(urlopen_mock, install_opener_mock, monkeypatch):198 here = os.path.dirname(__file__)199 monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc'))200 r = Request().open('GET', 'http://ansible.com/')201 args = urlopen_mock.call_args[0]202 req = args[0]203 assert req.headers.get('Authorization') == b'Basic dXNlcjpwYXNzd2Q='204 r = Request().open('GET', 'http://foo.ansible.com/')205 args = urlopen_mock.call_args[0]206 req = args[0]207 assert 'Authorization' not in req.headers208 monkeypatch.setenv('NETRC', os.path.join(here, 'fixtures/netrc.nonexistant'))209 r = Request().open('GET', 'http://ansible.com/')210 args = urlopen_mock.call_args[0]211 req = args[0]212 assert 'Authorization' not in req.headers213def test_Request_open_no_proxy(urlopen_mock, install_opener_mock, mocker):214 build_opener_mock = mocker.patch('ansible.module_utils.urls.urllib_request.build_opener')215 r = Request().open('GET', 'http://ansible.com/', use_proxy=False)216 handlers = build_opener_mock.call_args[0]217 found_handlers = []218 for handler in handlers:219 if isinstance(handler, urllib_request.ProxyHandler):220 found_handlers.append(handler)221 assert len(found_handlers) == 1222@pytest.mark.skipif(not HAS_SSLCONTEXT, reason="requires SSLContext")223def test_Request_open_no_validate_certs(urlopen_mock, install_opener_mock):224 r = Request().open('GET', 'https://ansible.com/', validate_certs=False)225 opener = install_opener_mock.call_args[0][0]226 handlers = opener.handlers227 ssl_handler = None228 for handler in handlers:229 if isinstance(handler, HTTPSClientAuthHandler):230 ssl_handler = handler231 break232 assert ssl_handler is not None233 inst = ssl_handler._build_https_connection('foo')234 assert isinstance(inst, httplib.HTTPSConnection)235 context = ssl_handler._context236 assert context.protocol == ssl.PROTOCOL_SSLv23237 if ssl.OP_NO_SSLv2:238 assert context.options & ssl.OP_NO_SSLv2239 assert context.options & ssl.OP_NO_SSLv3240 assert context.verify_mode == ssl.CERT_NONE241 assert context.check_hostname is False242def test_Request_open_client_cert(urlopen_mock, install_opener_mock):243 here = os.path.dirname(__file__)244 client_cert = os.path.join(here, 'fixtures/client.pem')245 client_key = os.path.join(here, 'fixtures/client.key')246 r = Request().open('GET', 'https://ansible.com/', client_cert=client_cert, client_key=client_key)247 opener = install_opener_mock.call_args[0][0]248 handlers = opener.handlers249 ssl_handler = None250 for handler in handlers:251 if isinstance(handler, HTTPSClientAuthHandler):252 ssl_handler = handler253 break254 assert ssl_handler is not None255 assert ssl_handler.client_cert == client_cert256 assert ssl_handler.client_key == client_key257 https_connection = ssl_handler._build_https_connection('ansible.com')258 assert https_connection.key_file == client_key259 assert https_connection.cert_file == client_cert260def test_Request_open_cookies(urlopen_mock, install_opener_mock):261 r = Request().open('GET', 'https://ansible.com/', cookies=cookiejar.CookieJar())262 opener = install_opener_mock.call_args[0][0]263 handlers = opener.handlers264 cookies_handler = None265 for handler in handlers:266 if isinstance(handler, urllib_request.HTTPCookieProcessor):267 cookies_handler = handler268 break269 assert cookies_handler is not None270def test_Request_open_invalid_method(urlopen_mock, install_opener_mock):271 r = Request().open('UNKNOWN', 'https://ansible.com/')272 args = urlopen_mock.call_args[0]273 req = args[0]274 assert req.data is None275 assert req.get_method() == 'UNKNOWN'276 # assert r.status == 504277def test_Request_open_custom_method(urlopen_mock, install_opener_mock):278 r = Request().open('DELETE', 'https://ansible.com/')279 args = urlopen_mock.call_args[0]280 req = args[0]281 assert isinstance(req, RequestWithMethod)282def test_Request_open_user_agent(urlopen_mock, install_opener_mock):283 r = Request().open('GET', 'https://ansible.com/', http_agent='ansible-tests')284 args = urlopen_mock.call_args[0]285 req = args[0]286 assert req.headers.get('User-agent') == 'ansible-tests'287def test_Request_open_force(urlopen_mock, install_opener_mock):288 r = Request().open('GET', 'https://ansible.com/', force=True, last_mod_time=datetime.datetime.now())289 args = urlopen_mock.call_args[0]290 req = args[0]291 assert req.headers.get('Cache-control') == 'no-cache'292 assert 'If-modified-since' not in req.headers293def test_Request_open_last_mod(urlopen_mock, install_opener_mock):294 now = datetime.datetime.now()295 r = Request().open('GET', 'https://ansible.com/', last_mod_time=now)296 args = urlopen_mock.call_args[0]297 req = args[0]298 assert req.headers.get('If-modified-since') == now.strftime('%a, %d %b %Y %H:%M:%S -0000')299def test_Request_open_headers_not_dict(urlopen_mock, install_opener_mock):300 with pytest.raises(ValueError):301 Request().open('GET', 'https://ansible.com/', headers=['bob'])302def test_Request_init_headers_not_dict(urlopen_mock, install_opener_mock):303 with pytest.raises(ValueError):304 Request(headers=['bob'])305@pytest.mark.parametrize('method,kwargs', [306 ('get', {}),307 ('options', {}),308 ('head', {}),309 ('post', {'data': None}),310 ('put', {'data': None}),311 ('patch', {'data': None}),312 ('delete', {}),313])314def test_methods(method, kwargs, mocker):315 expected = method.upper()316 open_mock = mocker.patch('ansible.module_utils.urls.Request.open')317 request = Request()318 getattr(request, method)('https://ansible.com')319 open_mock.assert_called_once_with(expected, 'https://ansible.com', **kwargs)320def test_open_url(urlopen_mock, install_opener_mock, mocker):321 req_mock = mocker.patch('ansible.module_utils.urls.Request.open')322 open_url('https://ansible.com/')323 req_mock.assert_called_once_with('GET', 'https://ansible.com/', data=None, headers=None, use_proxy=True,324 force=False, last_mod_time=None, timeout=10, validate_certs=True,325 url_username=None, url_password=None, http_agent=None,326 force_basic_auth=False, follow_redirects='urllib2',327 client_cert=None, client_key=None, cookies=None, use_gssapi=False,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!