How to use system_encode method in Robotframework

Best Python code snippet using robotframework

request.py

Source: request.py Github

copy

Full Screen

...130 if self.args.debug:131 self.lock.acquire()132 print '*' * self.console_width133 print '[Response headers and response text]\n'134 print res_headers + '\n' + system_encode(html_doc)135 print '\n' + '*' * self.console_width136 self.lock.release()137 if self.args.rtxt and html_doc.find(self.args.rtxt) >= 0:138 raise Exception('Retry for <%s>' % system_encode(self.args.rtxt) )139 if self.args.rntxt and html_doc.find(self.args.rntxt) < 0:140 raise Exception('Retry for no <%s>' % system_encode(self.args.rntxt))141 if self.args.rheader and res_headers.find(self.args.rheader) >= 0:142 raise Exception('Retry for header <%s>' % self.args.rheader)143 if self.args.rnheader and res_headers.find(self.args.rnheader) < 0:144 raise Exception('Retry for no header <%s>' % self.args.rnheader)145 found_err_tag = False146 for tag in self.args.err:147 if html_doc.find(tag) >= 0:148 found_err_tag = True149 found_suc_tag = False150 suc_tag_matched = ''151 for tag in self.args.suc:152 if html_doc.find(tag) >= 0:153 suc_tag_matched += tag + ' '154 found_suc_tag = True155 suc_tag_matched = suc_tag_matched.strip()156 data = urllib.unquote(data)157 cracked_msg = ''158 if (not self.args.no302 and response.status == 302):159 cracked_msg = '[+]%s \t\t{302 redirect}' % data160 if response.status == 200 and self.args.err and not found_err_tag:161 cracked_msg = '[+]%s \t\t{%s not found}' % (data, self.args.err)162 if self.args.suc and found_suc_tag:163 cracked_msg = '[+]%s \t\t[Found %s]' % (data, suc_tag_matched)164 if self.args.herr and res_headers.find(self.args.herr) < 0:165 cracked_msg = '[+]%s \t\t[%s not found in headers]' % (data, self.args.herr)166 if self.args.hsuc and res_headers.find(self.args.hsuc) >=0:167 cracked_msg = '[+]%s \t\t[Found %s in headers]' % (data, self.args.hsuc)168 if self.args.basic and response.status < 400:169 local_headers['Authorization'] = ''170 cracked_msg = '[+][Basic Auth] %s %s' % (params, self.args.u)171 if cracked_msg:172 add_cracked_count(self)173 if self.args.checkproxy:174 self.print_s('[+OK] %s' % cur_proxy, color_red=True)175 with open('001.proxy.servers.txt', 'a') as outFile:176 outFile.write(cur_proxy + '\n')177 else:178 self.print_s(system_encode('[+OK]%s' % data_print), color_red=True)179 with open(self.args.o, 'a') as outFile:180 outFile.write(cracked_msg + '\n')181 if err_count == max_err_count: self.queue.put(origin_params) # put in queue again182 if self.args.sleep: time.sleep( float(self.args.sleep) )183 break184 except Exception, e:185 err_count += 1186 if not self.args.checkproxy: self.print_s('[Exception in do_request] %s' % e)187 try:188 conn.close()189 except:190 pass191 time.sleep(3.0)192 self.queue.task_done()

Full Screen

Full Screen

weiboPicDownloader.py

Source: weiboPicDownloader.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2"""3Created on Fri Sep 15 01:38:07 20174@author: nondanee5"""6import os, sys, locale7import json, re, time8import concurrent.futures9import requests10try:11 requests.packages.urllib3.disable_warnings(requests.packages.urllib3.exceptions.InsecureRequestWarning)12except:13 pass14try:15 reload(sys)16 sys.setdefaultencoding('utf8') 17except:18 pass19try:20 input = raw_input21except NameError:22 pass23IS_PYTHON2 = sys.version[0] == "2"24SYSTEM_ENCODE = sys.stdin.encoding or locale.getpreferredencoding(True)25USER_AGENT = "Mozilla/​5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/​537.36 (KHTML, like Gecko) Chrome/​59.0.3071.115 Safari/​537.36"26PIC_AMOUNT = 027SAVE_PATH = ""28def print_fit(string,flush=False):29 if IS_PYTHON2:30 string = string.encode(SYSTEM_ENCODE)31 if flush == True:32 sys.stdout.write("\r"+string)33 sys.stdout.flush()34 else:35 sys.stdout.write(string+"\n")36def raw_input_fit(string=""):37 if IS_PYTHON2:38 return input(string.encode(SYSTEM_ENCODE)).decode(SYSTEM_ENCODE)39 else:40 return input(string)41def requests_retry(url,max_retry=0):42 retry = 043 while True:44 if retry > max_retry: 45 return46 try:47 response = requests.request("GET",url,headers={"User-Agent":USER_AGENT},timeout=5,verify=False)48 return response49 except:50 retry = retry + 151def get_img_urls(containerid):52 page = 153 amount = 054 total = 055 urls = []56 while True:57 url = "https:/​/​m.weibo.cn/​api/​container/​getIndex?count={}&page={}&containerid={}".format(25,page,containerid)58 response = requests_retry(url=url,max_retry=3)59 if response == None: continue60 json_data = json.loads(response.text)61 if json_data["ok"] != 1: break62 total = json_data["data"]["cardlistInfo"]["total"]63 cards = json_data["data"]["cards"]64 for card in cards:65 amount = amount + 166 print_fit("分析微博中... {}".format(amount),flush=True)67 if "mblog" in card:68 if "pics" in card["mblog"]:69 for pic in card["mblog"]["pics"]:70 if "large" in pic:71 urls.append(pic["large"]["url"])72 page = page + 173 time.sleep(1)74 75 print_fit("\n分析完毕, 微博总数 {}, 实际获得 {}".format(total,amount))76 return urls77def uid_to_containerid(uid):78 if re.search(r'^\d{10}$',uid):79 return "107603" + uid 80def nickname_to_containerid(nickname):81 url = "https:/​/​m.weibo.com/​n/​{}".format(nickname)82 response = requests_retry(url=url) 83 uid_check = re.search(r'(\d{16})',response.url)84 if uid_check:85 return "107603" + uid_check.group(1)[-10:]86def get_containerid(account_type):87 input_tips = ["请输入用户ID: ","请输入用户昵称: "]88 error_info = ["用户ID无效\n","无法找到该用户\n"]89 functions = [uid_to_containerid,nickname_to_containerid]90 input_string = raw_input_fit(input_tips[account_type]).strip()91 if input_string == "":92 return93 containerid = functions[account_type](input_string)94 if containerid == None:95 print_fit(error_info[account_type])96 return97 else:98 return containerid 99def download_and_save(url,index):100 file_type = url[-3:]101 file_name = str(index + 1).zfill(len(str(PIC_AMOUNT))) + "." + file_type 102 file_path = os.path.join(SAVE_PATH,file_name) 103 response = requests_retry(url=url)104 if response == None:105 return index,0106 else:107 f = open(file_path,"wb")108 f.write(response.content)109 f.close()110 return index,1111 112def get_task(urls,miss):113 task = []114 for index in miss:115 task.append(urls[index])116 return task117def main():118 while True:119 home_path = os.path.realpath(raw_input_fit("请输入图片存储位置: "))120 print_fit("选择的目录为 {}".format(home_path))121 if os.path.exists(home_path) == True:122 break123 confirm = raw_input_fit("该目录不存在, 是否创建?(Y/​n): ").strip()124 if confirm == "y" or confirm == "Y" or confirm == "":125 try:126 os.makedirs(home_path)127 except:128 print_fit("目录创建失败, 请尝试手动创建, 或者更改目录\n")129 else:130 print_fit("目录已创建")131 break132 else:133 print_fit("请手动创建, 或者更改目录\n")134 135 while True:136 print_fit("请输入要下载的账号类型:\n[1]用户ID [2]用户昵称")137 choice = raw_input_fit("(1/​2): ")138 try:139 choice = int(choice)140 except:141 choice = 2142 if choice not in [1,2]:143 choice = 2144 containerid = get_containerid(choice - 1)145 if containerid != None:146 break 147 global SAVE_PATH148 SAVE_PATH = os.path.join(home_path,containerid[6:]) 149 if os.path.exists(SAVE_PATH) == False:150 os.mkdir(SAVE_PATH)151 urls = get_img_urls(containerid)152 153 global PIC_AMOUNT154 PIC_AMOUNT = len(urls)155 print_fit("图片数量 {}".format(PIC_AMOUNT))156 157 max_workers = raw_input_fit("设置下载线程数(1-20): ")158 try:159 max_workers = int(max_workers)160 except:161 max_workers = 20162 if max_workers > 20:163 max_workers = 20164 elif max_workers < 1:165 max_workers = 1166 miss = range(0,PIC_AMOUNT) 167 while True:168 task = get_task(urls,miss)169 executor = concurrent.futures.ThreadPoolExecutor(max_workers = max_workers)170 results = executor.map(download_and_save,task,miss)171 172 miss = []173 for result in results:174 index = result[0]175 status = result[1]176 if status == 0: miss.append(index)177 print_fit("已处理 {dealt}/​{amount}, 下载失败 {failed}/​{amount}".format(dealt=index+1,failed=len(miss),amount=PIC_AMOUNT),flush=True)178 179 if len(miss) != 0:180 confirm = raw_input_fit("是否继续尝试?(Y/​n): ").strip()181 if confirm == "y" or confirm == "Y" or confirm == "":182 continue183 else:184 break185 else:186 print_fit("全部完成")187 break188 189 for index in miss:190 print_fit("图片 {} 下载失败 {}".format(index + 1,urls[index]))191 192 print_fit("下载结束, 路径是 {}".format(SAVE_PATH))193 sys.stdin.read()194 exit()195 196if __name__ == "__main__":...

Full Screen

Full Screen

weibo.py

Source: weibo.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import os3import sys4import locale5import json6import re7import time8import requests9import threading10import datetime11from localdb import DB12try:13 requests.packages.urllib3.disable_warnings(14 requests.packages.urllib3.exceptions.InsecureRequestWarning)15except:16 pass17try:18 reload(sys)19 sys.setdefaultencoding('utf8')20except:21 pass22try:23 input = raw_input24except NameError:25 pass26IS_PYTHON2 = sys.version[0] == "2"27SYSTEM_ENCODE = sys.stdin.encoding or locale.getpreferredencoding(True)28USER_AGENT = "Mozilla/​5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/​537.36 (KHTML, like Gecko) Chrome/​59.0.3071.115 Safari/​537.36"29PIC_AMOUNT = 030total = 031containerid=032SAVE_PATH = ""33pictures = {}34post_db=DB('Post')35picture_db=DB('Picture')36id_db=DB('ID')37def print_fit(string, flush=False):38 if IS_PYTHON2:39 string = string.encode(SYSTEM_ENCODE)40 if flush == True:41 sys.stdout.write("\r" + string)42 sys.stdout.flush()43 else:44 sys.stdout.write(string + "\n")45def requests_retry(url, max_retry=0):46 retry = 047 while True:48 if retry > max_retry:49 return50 try:51 response = requests.request(52 "GET", url, headers={"User-Agent": USER_AGENT}, timeout=5, verify=False)53 return response54 except:55 retry = retry + 156def uid_to_containerid(uid):57 if re.search(r'^\d{10}$', uid):58 return "107603" + uid59def nickname_to_containerid(nickname):60 url = "https:/​/​m.weibo.com/​n/​{}".format(nickname)61 response = requests_retry(url=url)62 uid_check = re.search(r'(\d{16})', response.url)63 if uid_check:64 return "107603" + uid_check.group(1)[-10:]65def get_pic_and_video(card):66 global pictures67 if 'mblog' in card.keys():68 blog = card['mblog']69 try:70 posttime = datetime.datetime.strptime(71 blog['created_at'], '%Y-%m-%d').strftime('%Y%m%d')72 except:73 try:74 year = str(datetime.datetime.now().year)75 posttime = datetime.datetime.strptime(76 year + '-' + blog['created_at'], '%Y-%m-%d').strftime('%Y%m%d')77 except:78 posttime=datetime.datetime.now().strftime('%Y%m%d')79 description = ''.join(re.findall(u'[\u4e00-\u9fa5]',blog['text']))80 pid=blog['id']81 if int(posttime)>=20171001:82 if 'retweeted_status' in blog.keys():83 if "pics" in blog["retweeted_status"]:84 pictures.setdefault(posttime,[])85 for pic in blog["retweeted_status"]["pics"]:86 if "large" in pic:87 picture = pic["large"]["url"]88 pictures.setdefault(posttime,[]).append(picture)89 # elif "page_info" in blog["retweeted_status"]:90 # if "media_info" in blog["retweeted_status"]["page_info"]:91 # video = blog["retweeted_status"]["page_info"]["media_info"]["stream_url"]92 # poster = blog["retweeted_status"]["page_info"]["page_pic"]["url"]93 # videos.append((pid,posttime, description, poster, video))94 else:95 if "pics" in blog:96 pictures.setdefault(posttime,[])97 for pic in blog["pics"]:98 if "large" in pic:99 picture = pic["large"]["url"]100 pictures.setdefault(posttime,[]).append(picture)101 # elif "page_info" in blog:102 # if "media_info" in blog["page_info"]:103 # video = blog["page_info"]["media_info"]["stream_url"]104 # poster = blog["page_info"]["page_pic"]["url"]105 # videos.append((pid,posttime, description, poster, video))106def parse_url(url):107 response = requests_retry(url=url, max_retry=3)108 json_data = response.json()109 try:110 cards = json_data["data"]["cards"]111 for card in cards:112 get_pic_and_video(card)113 except Exception as e:114 print(e)115 print('empty')116def get_img_urls(containerid):117 global pictures118 global total119 id = id_db.select(table='ID',id=containerid)[0]120 page = 1121 amount = 0122 url = "https:/​/​m.weibo.cn/​api/​container/​getIndex?count={}&page={}&containerid={}".format(123 25, 1, containerid)124 response = requests_retry(url=url, max_retry=3)125 json_data = response.json()126 total = json_data["data"]["cardlistInfo"]["total"]127 if id is None:128 pages = int(round(total /​ 25.0, 0))129 elif id.postnum is None:130 pages = int(round(total /​ 25.0, 0))131 else:132 pages = int(round((total - id.postnum) /​ 25.0, 0))133 urls = ["https:/​/​m.weibo.cn/​api/​container/​getIndex?count={}&page={}&containerid={}".format(134 25, i, containerid) for i in range(1, pages + 1)]135 threads = []136 for url in urls:137 t = threading.Thread(target=parse_url, args=(url,))138 threads.append(t)139 for t in threads:140 t.start()141 for t in threads:142 t.join()143 print_fit("\n分析完毕,图片数 {}".format( len(pictures.items())))144def write(name,containerid):145 global pictures146 for k,picture in pictures.items():147 id=str(containerid)+str(k)148 title=u'微博采集-'+name+'-'+str(k)149 cate=u'微博'150 tags=name151 poster=picture[0]152 post_db.insertnew(id=id,name=title,poster=poster,category=cate,tags=tags,status=False)153 for idx,url in enumerate(picture):154 sys.stdout.write('weibo {} day {} insert {} picture\r'.format(name,k,idx))155 sys.stdout.flush()156 data=picture_db.insertnew(pid=id,subid=idx,url=url)157def main(name):158 global containerid159 global total160 containerid = nickname_to_containerid(name)161 #containerid = name162 get_img_urls(containerid)163 if id_db.select(table='ID',id=containerid)[0]==None:164 id_db.insertnew(id=containerid,postnum=total)165 else:166 id_db.update(id=containerid,postnum=total)167 write(name,containerid)168if __name__ == "__main__":169 #name = sys.argv[1]170 #name = unicode(name.strip())171 #main(name)172 namelist=[]173 with open('wblist.txt','r') as f:174 for name in f.readlines():175 namelist.append(unicode(name.strip()))176 for name in namelist:177 print 'start crawler weibo user:{}'.format(name)...

Full Screen

Full Screen

encoding.py

Source: encoding.py Github

copy

Full Screen

...64# These interpreters handle communication with system APIs using Unicode.65if PY3 or JYTHON or IRONPYTHON:66 def system_decode(string):67 return string if is_unicode(string) else unic(string)68 def system_encode(string, errors='replace'):69 return string if is_unicode(string) else unic(string)70else:71 def system_decode(string):72 """Decodes bytes from system (e.g. cli args or env vars) to Unicode."""73 try:74 return string.decode(SYSTEM_ENCODING)75 except UnicodeError:76 return unic(string)77 def system_encode(string, errors='replace'):78 """Encodes Unicode to system encoding (e.g. cli args and env vars).79 Non-Unicode values are first converted to Unicode.80 """81 if not is_unicode(string):82 string = unic(string)...

Full Screen

Full Screen

Blogs

Check out the latest blogs from LambdaTest on this topic:

Testing in Production: A Detailed Guide

When most firms employed a waterfall development model, it was widely joked about in the industry that Google kept its products in beta forever. Google has been a pioneer in making the case for in-production testing. Traditionally, before a build could go live, a tester was responsible for testing all scenarios, both defined and extempore, in a testing environment. However, this concept is evolving on multiple fronts today. For example, the tester is no longer testing alone. Developers, designers, build engineers, other stakeholders, and end users, both inside and outside the product team, are testing the product and providing feedback.

Why Agile Is Great for Your Business

Agile project management is a great alternative to traditional methods, to address the customer’s needs and the delivery of business value from the beginning of the project. This blog describes the main benefits of Agile for both the customer and the business.

How To Test React Native Apps On iOS And Android

As everyone knows, the mobile industry has taken over the world and is the fastest emerging industry in terms of technology and business. It is possible to do all the tasks using a mobile phone, for which earlier we had to use a computer. According to Statista, in 2021, smartphone vendors sold around 1.43 billion smartphones worldwide. The smartphone penetration rate has been continuously rising, reaching 78.05 percent in 2020. By 2025, it is expected that almost 87 percent of all mobile users in the United States will own a smartphone.

10 Best Software Testing Certifications To Take In 2021

Software testing is fueling the IT sector forward by scaling up the test process and continuous product delivery. Currently, this profession is in huge demand, as it needs certified testers with expertise in automation testing. When it comes to outsourcing software testing jobs, whether it’s an IT company or an individual customer, they all look for accredited professionals. That’s why having an software testing certification has become the need of the hour for the folks interested in the test automation field. A well-known certificate issued by an authorized institute kind vouches that the certificate holder is skilled in a specific technology.

The Art of Testing the Untestable

It’s strange to hear someone declare, “This can’t be tested.” In reply, I contend that everything can be tested. However, one must be pleased with the outcome of testing, which might include failure, financial loss, or personal injury. Could anything be tested when a claim is made with this understanding?

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Robotframework automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful