How to use it method in Mamba

Best Python code snippet using mamba

test.py

Source:test.py Github

copy

Full Screen

...16 def __init__(self):17 with open('start_urls.txt','r',encoding='utf-8') as f:18 sus = f.readlines()19 for s in sus:20 start_url = s.split()[0]21 self.start_urls.append(start_url)22 self.schools.append(s.split()[1])23 # 载入已经爬过的学院ID24 with open("traveled.txt", "r", encoding='utf-8') as f:25 tmp = f.readlines()26 for t in tmp:27 self.traveled.append(t.rstrip('\n').split(' '))28 # 得到每个学校里院系所导航页面的页数29 def parse(self, response):30 item = person()31 url = response.url32 id_index = self.start_urls.index(url)33 school_name = self.schools[id_index]34 # print(school_name+str(id_index)+' :'+id)35 item['university'] = school_name36 #print(url + item['university'])37 sel =Selector(response)38 col_pages = sel.xpath('/html/body/div/div/span/text()').extract_first()39 col_pages = col_pages.split('/')[-1].strip()40 if int(col_pages):41 for x in range(int(col_pages)):42 # x = 143 col_page = str(x+1)44 col_page_url = (url+'&pageIndex='+col_page)45 yield Request(col_page_url, callback=self.parse_getcol, meta={'item_l': item})46 # 进入每个学院47 def parse_getcol(self, response):48 item_l = response.meta['item_l']49 # print(response.url)50 # items = []51 sel = Selector(response)52 cols = sel.xpath('/html/body/div/ul/li/a')53 for col in cols[1:]:54 url = col.xpath('./@href').extract_first()55 # print(url)56 col_name_code = re.search('organname=(.*?)&cpage',url).group(1)57 col_name = unquote(col_name_code)58 # print(col_name)59 # # 检测是否爬过该学院60 sid = url.split("/author.aspx?idlevel=")[0].lstrip('/')61 traveled_flag = False62 for t in self.traveled:63 if sid == t[0] and col_name == t[1]:64 traveled_flag = True65 break66 if not traveled_flag:67 item = person()68 item['university'] = item_l['university']69 item['college'] = col_name70 # print(item['college'])71 item['col_url'] = 'http://www.irtree.cn' + url72 # print(item)73 yield Request(item['col_url'], callback=self.parse_college, meta={'item_l': item}, dont_filter=True)74 # 获取每个专家的url75 def parse_college(self, response):76 item_l = response.meta['item_l']77 page_url = response.url78 # if page_url == 'http://www.irtree.cn/tootip.html':79 # university = item_l['university']80 # index = school_names.index(university)81 # id = school_ids[index]82 # for_url = 'http://www.irtree.cn/Template/t5/UserControls/CollegeNavigator.ascx?id=' + id83 # flag = False84 # with open('forbidden.txt', 'r', encoding='utf-8') as f:85 # tmp = f.readlines()86 # for t in tmp:87 # if university in t:88 # flag = True89 # if not flag:90 # with open('forbidden.txt', 'a', encoding='utf-8') as file:91 # tmp = '没有权限:' + university + '\n'+for_url + '\n'92 # file.write(tmp)93 # else:94 print(page_url)95 items = []96 sel = Selector(response)97 urls = sel.xpath('//*[@id="author"]/div[1]/dl/dt/a[1]/@href').extract()98 if not urls:99 urls = sel.xpath('//*[@id="author"]/div/div[1]/dl/dt/a[1]/@href').extract()100 for url in urls:101 #print(url)102 item = person()103 item['university'] = item_l['university']104 item['college'] = item_l['college']105 item['expert_url'] = 'http://www.irtree.cn'+url106 item['expert_id'] = re.search('writer/(.*?)/rw_zp.aspx',url).group(1)107 # print(item)108 items.append(item)109 for item in items:110 # print(item['university']+' '+item['college']+' '+item['expert_url'])111 yield Request(url=item['expert_url'], callback=self.parse_content, meta={'item_l': item})112 #翻页113 next_page = sel.xpath('//*[@id="author"]/div[2]/div[2]/span[2]/a[3]/@href').extract_first()114 if next_page:115 next_page = re.search(r"g_GetGotoPage\('(.*?)'\)", next_page).group(1)116 next_url = page_url.split('&q=%7B%22page')[0]+'&q=%7B"page"%3A"'+next_page+'"%7D'117 yield Request(next_url, callback=self.parse_college, meta={'item_l':item_l})118 else:119 # 该学院已爬完,添加至traveled.txt中120 with open("traveled.txt", "a", encoding='utf-8') as f:121 sid = page_url.split("/author.aspx?idlevel=")[0].lstrip('http://www.irtree.cn/')122 cid = item_l['college']123 tmp = sid + " " + cid + "\n"124 f.write(tmp)125 # 分析每个专家主页(发文量需要大于等于3)126 def parse_content(self, response):127 item_l = response.meta['item_l']128 item = person()129 item = item_l130 sel = Selector(response)131 paper_count = sel.xpath('/html/body/div[2]/div/div[2]/div[2]/div[2]/div[1]/div[3]/p/i/text()').extract_first()132 paper_count = int(paper_count.strip())133 if paper_count < 3:134 name = sel.xpath('/html/body/div[2]/div/div[1]/h1/text()').extract_first()135 item['expert_name'] = name.strip()136 item['amount1'] = str(paper_count)137 ## 研究主题138 themes = sel.xpath('//*[@class="summary"]/p[4]/text()').extract_first()139 if themes:140 tmp = themes.strip().lstrip(" 研究主题:").split()141 theme_list ='、'.join(tmp)142 #print(theme_list)143 item['theme_list'] = theme_list144 ## 研究学科145 subs = sel.xpath('//*[@class="summary"]/p[5]/text()').extract_first()146 if subs:147 tmp = subs.lstrip(" 研究学科:").rstrip(" ").split()148 sub_list ='、'.join(tmp)149 #print(sub_list)150 item['sub_list'] = sub_list151 # ## 发文量152 # amount1 = sel.xpath('//*[@class="search_count"]/p/i/text()').extract_first().replace(' ', '').replace('\n',153 # '')154 # amount1 = amount1.lstrip('\r')155 # # print(amount1)156 # item['amount1'] = amount1157 ## 被引量158 amount2 = sel.xpath('//span[@class="zps"]/i/a/text()').extract_first()159 if amount2:160 amount2 = amount2.replace(',', '')161 #print(amount2)162 else:163 amount2 = sel.xpath('//span[@class="zps"]/i/text()').extract_first()164 amount2 = amount2.replace(',', '')165 item['amount2'] = amount2166 ## H指数167 h_index = sel.xpath('//span[@class="hzs"]/i/text()').extract_first()168 #print(h_index)169 item['h_index'] = h_index170 tags = sel.xpath('//p[@class="data"]/span')171 #nums = sel.xpath('//p[@class="data"]/span/i/a/text()').extract()[1:]172 item['core'] = ''173 item['cssci'] = ''174 item['rdfybkzl'] = ''175 for tag in tags:176 tag_text = tag.xpath('./text()').extract_first()177 if tag_text == '北大核心: ':178 core = tag.xpath('./i/a/text()').extract_first()179 item['core'] = core180 if tag_text == 'CSSCI: ':181 cssci = tag.xpath('./i/a/text()').extract_first()182 item['cssci'] = cssci183 if tag_text == 'RDFYBKZL: ':184 rdf = tag.xpath('./i/a/text()').extract_first()185 item['rdfybkzl'] = rdf186 #print(item['university'] + ' ' + item['college'] + ' ' + item['expert_url']+' '+item['amount1']+' '+item['h_index'])187 # 总页数188 tpagenum = sel.xpath('//*[@class="pages"]/span[1]/text()').extract_first()189 if tpagenum:190 pagenum = int(tpagenum.lstrip('共').rstrip('页'))191 #print(pagenum)192 for i in range(1, pagenum+1):193 paper_url = item['expert_url'] + '?q=%7B%22page%22%3A%22' + str(i) + '%22%7D'194 yield Request(paper_url, callback=self.get_papers, meta={'expert_name': item['expert_name'],195 'expert_id': item['expert_id']})196 tp_url = item['expert_url'].rstrip("zp.aspx") + "tp.aspx"197 # print(tp_url)198 yield Request(url=tp_url, callback=self.parse_tp, meta = {'item_l': item})199 def get_papers(self, response):200 sel = Selector(response)201 expert_name = response.meta['expert_name']202 expert_id = response.meta['expert_id']203 urls = sel.xpath('//a[@class="title"]/@href').extract()204 # ## 所有论文链接205 #url_list = []206 for url in urls:207 if 'article_detail.aspx' in url:208 tmp = "http://www.irtree.cn" + url209 #url_list.append(tmp)210 yield Request(tmp, callback=self.parse_paper, meta={'expert_name': expert_name,211 'expert_id': expert_id}, dont_filter=True)212 def parse_paper(self, response):213 sel = Selector(response)214 expert_name = response.meta['expert_name']215 expert_id = response.meta['expert_id']216 #print(expert_name)217 #print(expert_id)218 url = response.url219 #print(url)220 it = paper()221 ## 论文ID222 paper_id = url.lstrip("http://www.irtree.cn/").rstrip("/article_detail.aspx").split("/articles/")[1].strip()223 it['paper_id'] = paper_id224 title = sel.xpath('//*[@class="summary"]/h1/text()').extract_first().strip()225 it['paper_title'] = title226 ## 文献类型227 paper_type = sel.xpath('//p[@class="class"]/text()').extract_first().strip()228 it['paper_type']=paper_type229 ## 出处230 source = 'Null'231 it['data1'] = ''232 it['data2'] = ''233 it['data3'] = ''234 it['data4'] = ''235 it['data5'] = ''236 it['category'] = ''237 date = 'Null'238 if paper_type == '期刊文章':239 p_list = sel.xpath('//div[@class="m"]/div[2]/p')240 for p in p_list:241 p_title = p.xpath('./strong/text()').extract_first()242 if p_title == '出  处:':243 tmp = p.xpath('./text()').extract_first()244 if tmp:245 source = tmp.strip()246 if p_title == '基  金:':247 tmp = p.xpath('./text()').extract_first()248 if tmp:249 it['data1'] = tmp.strip()250 if p_title == '卷  号:':251 tmp = p.xpath('./text()').extract_first()252 if tmp:253 it['data2'] = tmp.strip()254 if p_title == '期  号:':255 tmp = p.xpath('./text()').extract_first()256 if tmp:257 it['data3'] = tmp.strip()258 if p_title == '起止页码:':259 tmp = p.xpath('./text()').extract_first()260 if tmp:261 it['data4'] = tmp.strip()262 if p_title == '收录情况:':263 tmp = p.xpath('./text()').extract_first()264 if tmp:265 it['data5'] = tmp.strip()266 if p_title == '年  份:':267 tmp = p.xpath('./text()').extract_first()268 if tmp:269 datet = tmp.strip()270 if datet != '0':271 date = datet272 if p_title == '分 类 号:':273 tmp = p.xpath('./text()').extract_first()274 if tmp:275 it['category'] = tmp.strip()276 if paper_type == '会议':277 p_list = sel.xpath('//div[@class="m"]/div[2]/p')278 for p in p_list:279 p_title = p.xpath('./strong/text()').extract_first()280 if p_title == '会议名称:':281 tmp = p.xpath('./text()').extract_first()282 if tmp:283 source = tmp.strip()284 if p_title == '会议文献:':285 tmp = p.xpath('./text()').extract_first()286 if tmp:287 it['data1'] = tmp.strip()288 if p_title == '会议地点:':289 tmp = p.xpath('./text()').extract_first()290 if tmp:291 it['data2'] = tmp.strip()292 if p_title == '主办单位:':293 tmp = p.xpath('./text()').extract_first()294 if tmp:295 it['data3'] = tmp.strip()296 if p_title == '收录情况:':297 tmp = p.xpath('./text()').extract_first()298 if tmp:299 it['data5'] = tmp.strip()300 if p_title == '会议日期:':301 tmp = p.xpath('./text()').extract_first()302 if tmp:303 datet = tmp.strip()304 if datet != '0':305 date = datet306 if p_title == '分 类 号:':307 tmp = p.xpath('./text()').extract_first()308 if tmp:309 it['category'] = tmp.strip()310 if paper_type == '学位论文':311 p_list = sel.xpath('//div[@class="m"]/div[2]/p')312 for p in p_list:313 p_title = p.xpath('./strong/text()').extract_first()314 if p_title == '导  师:':315 tmp = p.xpath('./text()').extract_first()316 if tmp:317 it['data1'] = tmp.strip()318 if p_title == '学科专业:':319 tmp = p.xpath('./text()').extract_first()320 if tmp:321 it['data2'] = tmp.strip()322 if p_title == '授予学位:':323 tmp = p.xpath('./text()').extract_first()324 if tmp:325 it['data3'] = tmp.strip()326 if p_title == '学位年度:':327 tmp = p.xpath('./text()').extract_first()328 if tmp:329 datet = tmp.strip()330 if datet != '0':331 date = datet332 if p_title == '分 类 号:':333 tmp = p.xpath('./text()').extract_first()334 if tmp:335 it['category'] = tmp.strip()336 if paper_type == '成果':337 p_list = sel.xpath('//div[@class="m"]/div[2]/p')338 for p in p_list:339 p_title = p.xpath('./strong/text()').extract_first()340 if p_title == '项目年度编号:':341 tmp = p.xpath('./text()').extract_first()342 if tmp:343 it['data1'] = tmp.strip()344 if p_title == '登 记 号:':345 tmp = p.xpath('./text()').extract_first()346 if tmp:347 it['data2'] = tmp.strip()348 if p_title == '成果类别:':349 tmp = p.xpath('./text()').extract_first()350 if tmp:351 it['data3'] = tmp.strip()352 if p_title == '应用行业:':353 tmp = p.xpath('./text()').extract_first()354 if tmp:355 it['data4'] = tmp.strip()356 if p_title == '公布年份:':357 tmp = p.xpath('./text()').extract_first()358 if tmp:359 datet = tmp.strip()360 if datet != '0':361 date = datet362 if p_title == '分 类 号:':363 tmp = p.xpath('./text()').extract_first()364 if tmp:365 it['category'] = tmp.strip()366 if paper_type == '专利':367 p_list = sel.xpath('//div[@class="m"]/div[2]/p')368 for p in p_list:369 p_title = p.xpath('./strong/text()').extract_first()370 if p_title == '公 开 号:':371 tmp = p.xpath('./text()').extract_first()372 if tmp:373 it['data1'] = tmp.strip()374 if p_title == '申 请 号:':375 tmp = p.xpath('./text()').extract_first()376 if tmp:377 it['data2'] = tmp.strip()378 if p_title == '专利类型:':379 tmp = p.xpath('./text()').extract_first()380 if tmp:381 it['data3'] = tmp.strip()382 if p_title == '代 理 人:':383 tmp = p.xpath('./text()').extract_first()384 if tmp:385 it['data4'] = tmp.strip()386 if p_title == '代理机构:':387 tmp = p.xpath('./text()').extract_first()388 if tmp:389 it['data5'] = tmp.strip()390 if p_title == '申 请 日:':391 tmp = p.xpath('./text()').extract_first()392 if tmp:393 datet = tmp.strip()394 if datet != '0':395 date = datet396 if p_title == '公 开 日:':397 tmp = p.xpath('./text()').extract_first()398 if tmp:399 datet = tmp.strip()400 if datet != '0':401 date = date + '-' + datet402 if p_title == 'IPC专利分类号:':403 tmp = p.xpath('./text()').extract_first()404 if tmp:405 it['category'] = tmp.strip()406 if paper_type == '专著':407 p_list = sel.xpath('//div[@class="m"]/div[2]/p')408 for p in p_list:409 p_title = p.xpath('./strong/text()').extract_first()410 if p_title == 'ISBN号:':411 tmp = p.xpath('./text()').extract_first()412 if tmp:413 it['data1'] = tmp.strip()414 if p_title == '出 版 社:':415 tmp = p.xpath('./text()').extract_first()416 if tmp:417 it['data2'] = tmp.strip()418 if p_title == '页  数:':419 tmp = p.xpath('./text()').extract_first()420 if tmp:421 it['data3'] = tmp.strip()422 if p_title == '出版日期:':423 tmp = p.xpath('./text()').extract_first()424 if tmp:425 datet = tmp.strip()426 if datet != '0':427 date = datet428 if paper_type == '标准':429 p_list = sel.xpath('//div[@class="m"]/div[2]/p')430 for p in p_list:431 p_title = p.xpath('./strong/text()').extract_first()432 if p_title == '标 准 号:':433 tmp = p.xpath('./text()').extract_first()434 if tmp:435 it['data1'] = tmp.strip()436 if p_title == '发布单位:':437 tmp = p.xpath('./text()').extract_first()438 if tmp:439 it['data2'] = tmp.strip()440 if p_title == '标准类型:':441 tmp = p.xpath('./text()').extract_first()442 if tmp:443 it['data3'] = tmp.strip()444 if p_title == '国际标准分类号:':445 tmp = p.xpath('./text()').extract_first()446 if tmp:447 it['data4'] = tmp.strip()448 if p_title == '标准技术委员会:':449 tmp = p.xpath('./text()').extract_first()450 if tmp:451 it['data5'] = tmp.strip()452 if p_title == '发布日期:':453 tmp = p.xpath('./text()').extract_first()454 if tmp:455 datet = tmp.strip()456 if datet != '0':457 date = datet458 if p_title == '实施试行日期:':459 tmp = p.xpath('./text()').extract_first()460 if tmp:461 datet = tmp.strip()462 if datet != '0':463 date = date + '-' + datet464 if p_title == '中国标准分类号:':465 tmp = p.xpath('./text()').extract_first()466 if tmp:467 it['category'] = tmp.strip()468 it['source'] = source469 it['date'] = date470 #print(paper_id+' '+title+' '+type+' '+source)471 ## 摘要472 abstract = ''473 abstrack = sel.xpath('//p[@class="abstrack"]/text()').extract_first()474 if abstrack:475 abstract = abstrack.strip()476 else:477 abstrack2 = sel.xpath('//p[@class="abstrack cboth"]/text()').extract_first()478 if abstrack2:479 abstract = abstrack2.strip()480 it['abstract'] = abstract481 ## 关键词482 keywords = ''483 tmp = sel.xpath('//p[@class="subject"]/text()').extract()484 if tmp:485 keywords = ' '.join(tmp)486 keywords = re.sub(r"\s{2,}", " ", keywords).strip()487 it['keyword'] = keywords488 ## 作者489 it['p_author1'] = ''490 it['p_author2'] = ''491 it['p_author3'] = ''492 it['p_author4'] = ''493 it['p_author5'] = ''494 tmp = sel.xpath('//p[@class="author"]/text()').extract()495 authors = ' '.join(tmp)496 authors = re.sub(r'\[.*?\]','',authors)497 authors = re.sub(r"\s{2,}", " ", authors).strip()498 it['p_authors'] = authors499 #authors = authors.split()500 authors_list = authors.split()501 for a in authors_list:502 if a == expert_name:503 num = authors_list.index(a)504 if num == 0:505 it['p_author1'] = expert_id506 #print('author1='+author1)507 elif num == 1:508 it['p_author2'] = expert_id509 #print('author2='+author2)510 elif num == 2:511 it['p_author3'] = expert_id512 #print('author3='+author3)513 elif num == 3:514 it['p_author4'] = expert_id515 #print('author4='+author4)516 elif num == 4:517 it['p_author5'] = expert_id518 #print('author5='+author5)519 yield it520 # print(expert_name+':'+it['paper_id']+'--'+it['p_author1']+' '+it['p_author2'])521 #print('-------------------------------------------------------')522 def parse_tp(self, response):523 item_l = response.meta['item_l']524 item = person()525 item = item_l526 sel = Selector(response)527 url = response.url528 ## 合作学者529 co_experts_urls = sel.xpath('//*[@class="list_writer"]//dt//@href').extract()530 co_experts_list = []531 for url in co_experts_urls:532 tmp = url.rstrip("/rw.aspx").split("/writer/")[1]533 co_experts_list.append(tmp)534 item['co_experts'] = str(co_experts_list)535 ## 合作机构536 co_agency_list = sel.xpath('//*[@class="list organ"]//li//@title').extract()537 item['co_agencies'] = str(co_agency_list)538 #print(item)...

Full Screen

Full Screen

queries.py

Source:queries.py Github

copy

Full Screen

1# Copyright 2017-2020 Siemens AG2# 3# Permission is hereby granted, free of charge, to any person obtaining4# a copy of this software and associated documentation files (the5# "Software"), to deal in the Software without restriction, including without6# limitation the rights to use, copy, modify, merge, publish, distribute,7# sublicense, and/or sell copies of the Software, and to permit persons to whom the8# Software is furnished to do so, subject to the following conditions:9# 10# The above copyright notice and this permission notice shall be11# included in all copies or substantial portions of the Software.12# 13# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,14# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF15# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT16# SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR17# OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,18# ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER19# DEALINGS IN THE SOFTWARE.20# 21# Author(s): Junes Najah, Pascal Eckmann, Thomas Riedmaier, Abian Blome22INSERT_SETTINGS = (23 "INSERT INTO settings(SettingName, SettingValue) VALUES(:SettingName, :SettingValue)")24INSERT_MODULE = (25 "INSERT INTO target_modules(ModuleName, ModulePath, RawBytes) VALUES(:ModuleName, :ModulePath, :RawBytes)")26INSERT_BLOCK_TO_COVER = (27 "INSERT INTO blocks_to_cover(ModuleID, Offset) VALUES(:ModuleID, :Offset) ON DUPLICATE KEY UPDATE ModuleID = :ModuleID, Offset = :Offset")28NUMBER_OF_NO_LONGER_LISTED = (29 "SELECT Amount FROM billing WHERE Resource='RunTestcasesNoLongerListed'")30COMPLETED_TESTCASES_COUNT = (31 "SELECT COUNT(*) FROM completed_testcases")32NUM_BLOCKS = (33 "SELECT COUNT(DISTINCT ModuleID, Offset) FROM covered_blocks")34GET_SETTINGS = (35 "SELECT ID, SettingName, SettingValue FROM settings")36GET_RUNNERTYPE = (37 "SELECT SettingValue FROM settings WHERE SettingName='runnerType'")38GET_TESTCASE_AND_PARENT = (39 "SELECT it.CreatorLocalID, it.CreatorServiceDescriptorGUID, it.ParentLocalID, it.ParentServiceDescriptorGUID, nn.NiceName, pnn.NiceName AS ParentNiceName, "40 "pnnmi.NiceName AS ParentNiceNameMI, nnmi.NiceName AS NiceNameMI "41 "FROM interesting_testcases AS it "42 "LEFT JOIN nice_names_testcase AS nn ON nn.CreatorLocalID = it.CreatorLocalID AND nn.CreatorServiceDescriptorGUID = it.CreatorServiceDescriptorGUID "43 "LEFT JOIN nice_names_testcase AS pnn ON pnn.CreatorLocalID = it.ParentLocalID AND nn.CreatorServiceDescriptorGUID = it.ParentServiceDescriptorGUID "44 "LEFT JOIN nice_names_managed_instance as nnmi on it.CreatorServiceDescriptorGUID = nnmi.ServiceDescriptorGUID "45 "LEFT JOIN nice_names_managed_instance as pnnmi on it.ParentServiceDescriptorGUID = pnnmi.ServiceDescriptorGUID "46 "WHERE it.ID=:ID;")47GET_PARENT_ID = (48 "SELECT ID FROM interesting_testcases "49 "WHERE CreatorLocalID=:parentID AND CreatorServiceDescriptorGUID=:parentSdGuid;")50GET_COUNT_OF_COVERED_BLOCKS = (51 "SELECT COUNT(DISTINCT ModuleID, Offset) AS CoveredBlocks FROM covered_blocks")52GET_TARGET_MODULES = (53 "SELECT tm.ID, tm.ModuleName, tm.ModulePath, cbc.CoveredBlocks "54 "FROM target_modules AS tm "55 "LEFT JOIN (SELECT ModuleID, COUNT(DISTINCT ModuleID, Offset) AS CoveredBlocks FROM covered_blocks GROUP BY ModuleID) as cbc ON tm.ID = cbc.ModuleID ORDER BY CoveredBlocks DESC;")56GET_COVERED_BLOCKS_OF_TESTCASE_FOR_EVERY_MODULE = (57 "SELECT tm.ModuleName, COUNT(DISTINCT ModuleID, Offset) AS CoveredBlocks "58 "FROM target_modules AS tm "59 "LEFT JOIN covered_blocks as cb ON tm.ID = cb.ModuleID "60 "WHERE CreatorTestcaseID=:ctID "61 "GROUP BY ModuleID;")62DELETE_TESTCASES = (63 "DELETE FROM interesting_testcases WHERE CreatorServiceDescriptorGUID <> 'initial'")64RESET_RATING = (65 "UPDATE interesting_testcases SET Rating = 10000 WHERE TestCaseType = 0")66DELETE_TESTCASES_WITHOUT_POPULATION = (67 "DELETE FROM interesting_testcases WHERE TestCaseType <> 0")68GET_MI_HOST_AND_PORT = (69 "SELECT managed_instances.ServiceDescriptorHostAndPort FROM managed_instances;")70GET_MAX_LOCALID = (71 "SELECT MAX(CreatorLocalID) FROM interesting_testcases WHERE CreatorServiceDescriptorGUID='initial'")72UPDATE_SETTINGS = (73 "UPDATE settings SET SettingValue=:SettingValue WHERE ID=:ID")74UPDATE_NICE_NAME_TESTCASE = (75 "UPDATE nice_names_testcase SET NiceName=:newName WHERE CreatorServiceDescriptorGUID=:guid AND CreatorLocalID=:localId")76UPDATE_NICE_NAME_MANAGED_INSTANACE = (77 "UPDATE nice_names_managed_instance SET NiceName=:newName WHERE ServiceDescriptorGUID=:sdguid")78DELETE_TC_WTIH_LOCALID = (79 "DELETE FROM interesting_testcases WHERE CreatorServiceDescriptorGUID=:guid AND CreatorLocalID=:localId")80DELETE_MODULE_BY_ID = (81 "DELETE FROM target_modules WHERE ID=:ID")82DELETE_SETTING_BY_ID = (83 "DELETE FROM settings WHERE ID=:ID")84GET_TOTAL_CPU_SECONDS = (85 "SELECT Amount FROM billing WHERE Resource = 'RunnerSeconds'")86GET_TESTCASE_ID = (87 "SELECT ID FROM interesting_testcases WHERE CreatorServiceDescriptorGUID='initial' "88 "AND CreatorLocalID=:creatorlocalID")89GET_LOCAL_MANAGERS = (90 "SELECT ServiceDescriptorGUID, ServiceDescriptorHostAndPort FROM localmanagers WHERE FuzzJob=:fuzzjobID")91GET_MANAGED_INSTANCES = (92 "SELECT managed_instances.ServiceDescriptorGUID, managed_instances.ServiceDescriptorHostAndPort, "93 "managed_instances.AgentType, managed_instances.Location, mis.TimeOfStatus, mis.Status, "94 "nice_names_managed_instance.NiceName FROM managed_instances "95 "LEFT JOIN (SELECT ServiceDescriptorGUID, Status, TimeOfStatus FROM managed_instances_statuses t1 "96 "WHERE TimeOfStatus = (SELECT MAX(TimeOfStatus) "97 "FROM managed_instances_statuses "98 "WHERE t1.ServiceDescriptorGUID = managed_instances_statuses.ServiceDescriptorGUID) "99 "GROUP BY ServiceDescriptorGUID "100 "ORDER BY TimeOfStatus DESC) AS mis "101 "ON managed_instances.ServiceDescriptorGUID = mis.ServiceDescriptorGUID "102 "LEFT JOIN nice_names_managed_instance "103 "ON managed_instances.ServiceDescriptorGUID = nice_names_managed_instance.ServiceDescriptorGUID " 104 "ORDER BY managed_instances.AgentType;")105GET_MANAGED_INSTANCE_LOGS = (106 "SELECT ServiceDescriptorGUID, TimeOfInsertion, LogMessage FROM managed_instances_logmessages "107 "WHERE ServiceDescriptorGUID=:sdguid ORDER BY TimeOfInsertion DESC LIMIT :limit OFFSET :offset;")108GET_LOCALMANAGER_LOGS = (109 "SELECT lmlogs.ServiceDescriptorGUID, fj.name, lm.ServiceDescriptorHostAndPort, lmlogs.LogMessage "110 "FROM localmanagers_logmessages AS lmlogs "111 "LEFT JOIN localmanagers AS lm "112 "ON lm.ServiceDescriptorGUID = lmlogs.ServiceDescriptorGUID "113 "LEFT JOIN fuzzjob AS fj "114 "ON fj.ID = lm.FuzzJob "115 "ORDER BY lmlogs.TimeOfInsertion DESC;")116GET_COUNT_OF_LOCALMANAGER_LOGS = (117 "SELECT COUNT(*) FROM localmanagers_logmessages;")118GET_COUNT_OF_MANAGED_INSTANCE_LOGS = (119 "SELECT COUNT(ServiceDescriptorGUID) FROM managed_instances_logmessages "120 "WHERE ServiceDescriptorGUID=:sdguid;")121GET_VIOLATIONS_AND_CRASHES = (122 "SELECT count(*), cd.CrashFootprint, it.TestCaseType, MIN(it.ID) AS group_min FROM interesting_testcases AS it "123 "JOIN crash_descriptions AS cd ON it.ID = cd.CreatorTestcaseID "124 "GROUP BY cd.CrashFootprint, it.TestCaseType ORDER BY group_min;")125GET_SMALLEST_VIO_OR_CRASH_TC = (126 "SELECT it.RawBytes "127 "FROM interesting_testcases AS it "128 "JOIN crash_descriptions AS cd ON it.ID = cd.CreatorTestcaseID "129 "WHERE cd.CrashFootprint=:footprint ORDER BY LENGTH(it.RawBytes) ASC LIMIT 1;")130NUM_UNIQUE_ACCESS_VIOLATION = (131 "SELECT count(*) FROM "132 "(SELECT cd.CrashFootprint, it.TestCaseType "133 "FROM interesting_testcases AS it "134 "JOIN crash_descriptions AS cd ON it.ID = cd.CreatorTestcaseID "135 "WHERE it.TestCaseType=2 GROUP BY cd.CrashFootprint) "136 "violations;")137UNIQUE_ACCESS_VIOLATION = (138 "SELECT av.CrashFootprint, av.TestCaseType, av.CreatorServiceDescriptorGUID, av.CreatorLocalID, av.Rating, "139 "av.TimeOfInsertion, av.ID, av.RawBytes, nn.NiceName "140 "FROM "141 "(SELECT cd.CrashFootprint, it.TestCaseType, it.CreatorServiceDescriptorGUID, it.CreatorLocalID, it.Rating, "142 "it.TimeOfInsertion, it.ID, it.RawBytes "143 " FROM interesting_testcases AS it "144 " JOIN crash_descriptions AS cd ON it.ID = cd.CreatorTestcaseID "145 " WHERE it.TestCaseType=2 Group by cd.CrashFootprint) av " 146 "LEFT JOIN nice_names_testcase AS nn ON (av.CreatorServiceDescriptorGUID = nn.CreatorServiceDescriptorGUID AND av.CreatorLocalID = nn.CreatorLocalID);")147UNIQUE_ACCESS_VIOLATION_NO_RAW = (148 "SELECT av.ID, av.CrashFootprint, av.TestCaseType, av.CreatorServiceDescriptorGUID, av.CreatorLocalID, av.Rating, "149 "av.TimeOfInsertion, cbc.CoveredBlocks, nn.NiceName, nnmi.NiceName as NiceNameMI "150 "FROM (SELECT cd.CrashFootprint, it.TestCaseType, it.CreatorServiceDescriptorGUID, it.CreatorLocalID, "151 "MIN(it.TimeOfInsertion) as TimeOfInsertion, it.ID, count(cd.CrashFootprint) as Rating "152 "FROM interesting_testcases AS it "153 "JOIN crash_descriptions AS cd ON it.ID = cd.CreatorTestcaseID "154 "WHERE it.TestCaseType=2 "155 "Group by cd.CrashFootprint) as av "156 "LEFT JOIN nice_names_managed_instance as nnmi on av.CreatorServiceDescriptorGUID = nnmi.ServiceDescriptorGUID "157 "LEFT JOIN nice_names_testcase AS nn ON (av.CreatorServiceDescriptorGUID = nn.CreatorServiceDescriptorGUID AND av.CreatorLocalID = nn.CreatorLocalID) "158 "LEFT JOIN (SELECT CreatorTestcaseID, COUNT(DISTINCT ModuleID, Offset) AS CoveredBlocks FROM covered_blocks GROUP BY CreatorTestcaseID) as cbc on av.ID = cbc.CreatorTestcaseID "159 "ORDER BY av.TimeOfInsertion asc;")160NUM_UNIQUE_CRASH = (161 "SELECT count(*) "162 "FROM (SELECT cd.CrashFootprint, it.TestCaseType FROM interesting_testcases AS it "163 "JOIN crash_descriptions AS cd ON it.ID = cd.CreatorTestcaseID "164 "WHERE TestCaseType=3 "165 "GROUP BY cd.CrashFootprint) observedCrashes;")166UNIQUE_CRASHES = (167 "SELECT oc.CrashFootprint, oc.ID, oc.TestCaseType, oc.RawBytes, oc.CreatorServiceDescriptorGUID, "168 "oc.CreatorLocalID, oc.Rating, oc.TimeOfInsertion, nn.NiceName "169 "FROM "170 "(SELECT cd.CrashFootprint, it.TestCaseType, it.CreatorServiceDescriptorGUID, it.CreatorLocalID, "171 "it.Rating, it.TimeOfInsertion, it.ID, it.RawBytes "172 " FROM interesting_testcases AS it "173 " JOIN crash_descriptions AS cd "174 " ON it.ID = cd.CreatorTestcaseID "175 " GROUP BY cd.CrashFootprint) oc " 176 "LEFT JOIN nice_names_testcase AS nn ON (oc.CreatorServiceDescriptorGUID = nn.CreatorServiceDescriptorGUID AND oc.CreatorLocalID = nn.CreatorLocalID) "177 "WHERE TestCaseType=3;")178UNIQUE_CRASHES_NO_RAW = (179 "SELECT oc.ID, oc.CrashFootprint, oc.TestCaseType, oc.CreatorServiceDescriptorGUID, oc.CreatorLocalID, oc.Rating, "180 "oc.TimeOfInsertion, cbc.CoveredBlocks, nn.NiceName, nnmi.NiceName as NiceNameMI "181 "FROM (SELECT cd.CrashFootprint, it.TestCaseType, it.CreatorServiceDescriptorGUID, it.CreatorLocalID, "182 "MIN(it.TimeOfInsertion) as TimeOfInsertion, it.ID, count(cd.CrashFootprint) as Rating "183 "FROM interesting_testcases AS it "184 "JOIN crash_descriptions AS cd "185 "ON it.ID = cd.CreatorTestcaseID "186 "WHERE TestCaseType=3 "187 "GROUP BY cd.CrashFootprint) as oc "188 "LEFT JOIN nice_names_managed_instance as nnmi on oc.CreatorServiceDescriptorGUID = nnmi.ServiceDescriptorGUID "189 "LEFT JOIN nice_names_testcase AS nn ON (oc.CreatorServiceDescriptorGUID = nn.CreatorServiceDescriptorGUID AND oc.CreatorLocalID = nn.CreatorLocalID) "190 "LEFT JOIN (SELECT CreatorTestcaseID, COUNT(DISTINCT ModuleID, Offset) AS CoveredBlocks FROM covered_blocks GROUP BY CreatorTestcaseID) as cbc on oc.ID = cbc.CreatorTestcaseID "191 "ORDER BY oc.TimeOfInsertion asc;")192MANAGED_INSTANCES_HOST_AND_PORT_AGENT_TYPE = (193 "SELECT managed_instances.ServiceDescriptorHostAndPort "194 "FROM managed_instances "195 "WHERE managed_instances.AgentType =:myType;")196MANAGED_INSTANCES_HOST_AND_PORT = (197 "SELECT managed_instances.ServiceDescriptorHostAndPort "198 "FROM managed_instances "199 "WHERE managed_instances.ServiceDescriptorGUID =:guid LIMIT 1;")200INSERT_TESTCASE_POPULATION = (201 "INSERT INTO interesting_testcases(CreatorServiceDescriptorGUID, CreatorLocalID, ParentServiceDescriptorGUID, "202 "ParentLocalID, RawBytes, Rating, TestCaseType) "203 "VALUES('initial', :localId, 'initial', :localId, :rawData, 10000, 0)")204INSERT_NICE_NAME_TESTCASE = (205 "INSERT INTO nice_names_testcase(NiceName, CreatorServiceDescriptorGUID, CreatorLocalID) "206 "VALUES(:newName, :guid, :localId)")207INSERT_NICE_NAME_MANAGED_INSTANACE = (208 "INSERT INTO nice_names_managed_instance(NiceName, ServiceDescriptorGUID) "209 "VALUES(:newName, :sdguid)")210GET_POPULATION_DETAILS = (211 "SELECT DISTINCT it.CreatorServiceDescriptorGUID, it.CreatorLocalID, it.ParentServiceDescriptorGUID, "212 "it.ParentLocalID, nnt.NiceName as NiceNameTC, nnmi.NiceName as NiceNameMI, it.TestCaseType "213 "FROM interesting_testcases as it "214 "LEFT JOIN nice_names_testcase as nnt on it.CreatorServiceDescriptorGUID = nnt.CreatorServiceDescriptorGUID AND it.CreatorLocalID = nnt.CreatorLocalID "215 "LEFT JOIN nice_names_managed_instance as nnmi on it.CreatorServiceDescriptorGUID = nnmi.ServiceDescriptorGUID "216 "WHERE (it.TestCaseType=0 OR it.TestCaseType=5);")217GET_CHECKED_RATING = (218 "SELECT it.Rating "219 "FROM interesting_testcases AS it "220 "WHERE TestCaseType=0 AND Rating > (SELECT o.Value FROM fluffi_gm.gm_options AS o WHERE Setting = 'checkrating')"221 "LIMIT 1")222GET_CRASH_DETAILS = (223 "SELECT DISTINCT cd.CrashFootprint FROM crash_descriptions as cd;")224GET_CRASH_PARENTS = (225 "SELECT it.ParentServiceDescriptorGUID, it.ParentLocalID, COUNT(*) AS NumberEdges FROM interesting_testcases as it "226 "JOIN crash_descriptions as cd ON it.ID = cd.CreatorTestcaseID "227 "WHERE cd.CrashFootprint = :CrashFootprint AND (it.TestCaseType=3 OR it.TestCaseType=2) "228 "GROUP BY it.ParentServiceDescriptorGUID, it.ParentLocalID;")229GET_NN_TESTCASE_RAWBYTES = (230 "SELECT it.RawBytes, nnt.NiceName FROM interesting_testcases as it LEFT JOIN nice_names_testcase as nnt "231 "ON (it.CreatorServiceDescriptorGUID = nnt.CreatorServiceDescriptorGUID AND it.CreatorLocalID = nnt.CreatorLocalID) "232 "WHERE it.CreatorServiceDescriptorGUID=:guid AND it.CreatorLocalID=:localId;")233GET_TESTCASE_DUMP = (234 "SELECT SUBSTR(it.RawBytes, :offset, 960), LENGTH(it.RawBytes), it.ParentLocalID, it.ParentServiceDescriptorGUID "235 "FROM interesting_testcases as it LEFT JOIN nice_names_testcase as nnt "236 "ON (it.CreatorServiceDescriptorGUID = nnt.CreatorServiceDescriptorGUID AND it.CreatorLocalID = nnt.CreatorLocalID) "237 "WHERE it.ID=:testcaseID ;")238GET_TESTCASE_PARENT = (239 "SELECT it.ParentLocalID, it.ParentServiceDescriptorGUID "240 "FROM interesting_testcases as it LEFT JOIN nice_names_testcase as nnt "241 "ON (it.CreatorServiceDescriptorGUID = nnt.CreatorServiceDescriptorGUID AND it.CreatorLocalID = nnt.CreatorLocalID) "242 "WHERE it.ID=:testcaseID ;")243GET_TESTCASE_PARENT_ID = (244 "SELECT it.ID "245 "FROM interesting_testcases as it LEFT JOIN nice_names_testcase as nnt "246 "ON (it.CreatorServiceDescriptorGUID = nnt.CreatorServiceDescriptorGUID AND it.CreatorLocalID = nnt.CreatorLocalID) "247 "WHERE it.CreatorLocalID=:parentID and it.CreatorServiceDescriptorGUID=:parentGUID;")248GET_PROJECTS = (249 "SELECT"250 "(SELECT COUNT(*) FROM completed_testcases), "251 "(SELECT Amount FROM billing WHERE Resource='RunTestcasesNoLongerListed'), "252 "SUM(CASE WHEN TestCaseType = 0 THEN 1 ELSE 0 END), "253 "SUM(CASE WHEN TestCaseType = 1 THEN 1 ELSE 0 END), "254 "SUM(CASE WHEN TestCaseType = 2 THEN 1 ELSE 0 END), "255 "SUM(CASE WHEN TestCaseType = 3 THEN 1 ELSE 0 END), "256 "SUM(CASE WHEN TestCaseType = 4 THEN 1 ELSE 0 END), "257 "(SELECT Rating FROM interesting_testcases WHERE TestCaseType=0 AND Rating > "258 "(SELECT o.Value FROM fluffi_gm.gm_options AS o WHERE Setting = 'checkrating') LIMIT 1) "259 "FROM interesting_testcases;"260)261ResetFuzzjobStmts = [262 "UPDATE billing SET amount = 0",263 "TRUNCATE TABLE completed_testcases",264 "TRUNCATE TABLE covered_blocks",265 "TRUNCATE TABLE crash_descriptions",266 "TRUNCATE TABLE managed_instances",267 "TRUNCATE TABLE managed_instances_statuses",268 "TRUNCATE TABLE nice_names_managed_instance"]269def getCrashesOrViosOfFootprint(footprint):270 return (271 "SELECT it.CreatorServiceDescriptorGUID, it.RawBytes, it.ID, nn.NiceName "272 "FROM interesting_testcases AS it "273 "JOIN crash_descriptions AS cd ON it.ID = cd.CreatorTestcaseID "274 "LEFT JOIN nice_names_testcase AS nn (ON it.CreatorServiceDescriptorGUID = nn.CreatorServiceDescriptorGUID AND it.CreatorLocalID = nn.CreatorLocalID) "275 "WHERE cd.CrashFootprint='{}' AND (it.TestCaseType=2 OR it.TestCaseType=3);".format(footprint)276 )277def getCrashesOrViosOfFootprintCount(footprint):278 return (279 "SELECT count(*) "280 "FROM interesting_testcases AS it "281 "JOIN crash_descriptions AS cd ON it.ID = cd.CreatorTestcaseID "282 "LEFT JOIN nice_names_testcase AS nn ON (it.CreatorServiceDescriptorGUID = nn.CreatorServiceDescriptorGUID AND it.CreatorLocalID = nn.CreatorLocalID) "283 "WHERE cd.CrashFootprint='{}' AND (it.TestCaseType=2 OR it.TestCaseType=3);".format(footprint)284 )285def getCrashesQuery(footprint, testCaseType):286 return (287 "SELECT it.CreatorServiceDescriptorGUID, it.RawBytes, it.ID, nn.NiceName "288 "FROM interesting_testcases AS it "289 "JOIN crash_descriptions AS cd ON it.ID = cd.CreatorTestcaseID "290 "LEFT JOIN nice_names_testcase AS nn ON (it.CreatorServiceDescriptorGUID = nn.CreatorServiceDescriptorGUID AND it.CreatorLocalID = nn.CreatorLocalID) "291 "WHERE cd.CrashFootprint='{}' AND it.TestCaseType={};".format(footprint, testCaseType)292 )293def getCrashesQueryCount(footprint, testCaseType):294 return (295 "SELECT COUNT(*) "296 "FROM interesting_testcases AS it "297 "JOIN crash_descriptions AS cd ON it.ID = cd.CreatorTestcaseID "298 "LEFT JOIN nice_names_testcase AS nn ON (it.CreatorServiceDescriptorGUID = nn.CreatorServiceDescriptorGUID AND it.CreatorLocalID = nn.CreatorLocalID) "299 "WHERE cd.CrashFootprint='{}' AND it.TestCaseType={};".format(footprint, testCaseType)300 )301def getITCountOfTypeQuery(n):302 return "SELECT COUNT(*) FROM interesting_testcases WHERE TestCaseType=" + str(n)303def getITQueryOfType(n):304 return (305 "SELECT it.ID, it.RawBytes, it.CreatorServiceDescriptorGUID, it.CreatorLocalID, cbc.CoveredBlocks, it.Rating, it.TimeOfInsertion, "306 "nn.NiceName, nnmi.NiceName as NiceNameMI "307 "FROM interesting_testcases AS it "308 "LEFT JOIN nice_names_testcase AS nn ON (it.CreatorServiceDescriptorGUID = nn.CreatorServiceDescriptorGUID AND it.CreatorLocalID = nn.CreatorLocalID) "309 "LEFT JOIN nice_names_managed_instance as nnmi on it.CreatorServiceDescriptorGUID = nnmi.ServiceDescriptorGUID "310 "LEFT JOIN (SELECT CreatorTestcaseID, COUNT(DISTINCT ModuleID, Offset) AS CoveredBlocks FROM covered_blocks GROUP BY CreatorTestcaseID) as cbc "311 "on it.ID = cbc.CreatorTestcaseID "312 "WHERE TestCaseType={};".format(n)313 )314def getITQueryOfTypeNoRaw(n):315 return (316 "SELECT it.ID, it.CreatorServiceDescriptorGUID, it.CreatorLocalID, cbc.CoveredBlocks, it.Rating, it.TimeOfInsertion, "317 "nn.NiceName, nnmi.NiceName as NiceNameMI "318 "FROM interesting_testcases AS it "319 "LEFT JOIN nice_names_testcase AS nn ON (it.CreatorServiceDescriptorGUID = nn.CreatorServiceDescriptorGUID AND it.CreatorLocalID = nn.CreatorLocalID) "320 "LEFT JOIN nice_names_managed_instance as nnmi on it.CreatorServiceDescriptorGUID = nnmi.ServiceDescriptorGUID "321 "LEFT JOIN (SELECT CreatorTestcaseID, COUNT(DISTINCT ModuleID, Offset) AS CoveredBlocks FROM covered_blocks GROUP BY CreatorTestcaseID) as cbc on it.ID = cbc.CreatorTestcaseID "322 "WHERE TestCaseType={};".format(n)323 )324def getMICountOfTypeQuery(n):325 return "SELECT COUNT(*) FROM managed_instances WHERE AgentType={};".format(n)326def getLatestTestcaseOfType(n):327 return (328 "SELECT TimeOfInsertion FROM interesting_testcases "329 "WHERE TestCaseType={} "330 "ORDER BY TimeOfInsertion DESC LIMIT 1;".format(n)331 )...

Full Screen

Full Screen

test_iterlen.py

Source:test_iterlen.py Github

copy

Full Screen

1""" Test Iterator Length Transparency2Some functions or methods which accept general iterable arguments have3optional, more efficient code paths if they know how many items to expect.4For instance, map(func, iterable), will pre-allocate the exact amount of5space required whenever the iterable can report its length.6The desired invariant is: len(it)==len(list(it)).7A complication is that an iterable and iterator can be the same object. To8maintain the invariant, an iterator needs to dynamically update its length.9For instance, an iterable such as range(10) always reports its length as ten,10but it=iter(range(10)) starts at ten, and then goes to nine after next(it).11Having this capability means that map() can ignore the distinction between12map(func, iterable) and map(func, iter(iterable)).13When the iterable is immutable, the implementation can straight-forwardly14report the original length minus the cumulative number of calls to next().15This is the case for tuples, range objects, and itertools.repeat().16Some containers become temporarily immutable during iteration. This includes17dicts, sets, and collections.deque. Their implementation is equally simple18though they need to permanently set their length to zero whenever there is19an attempt to iterate after a length mutation.20The situation slightly more involved whenever an object allows length mutation21during iteration. Lists and sequence iterators are dynamically updatable.22So, if a list is extended during iteration, the iterator will continue through23the new items. If it shrinks to a point before the most recent iteration,24then no further items are available and the length is reported at zero.25Reversed objects can also be wrapped around mutable objects; however, any26appends after the current position are ignored. Any other approach leads27to confusion and possibly returning the same item more than once.28The iterators not listed above, such as enumerate and the other itertools,29are not length transparent because they have no way to distinguish between30iterables that report static length and iterators whose length changes with31each call (i.e. the difference between enumerate('abc') and32enumerate(iter('abc')).33"""34import unittest35from itertools import repeat36from collections import deque37from operator import length_hint38n = 1039class TestInvariantWithoutMutations:40 def test_invariant(self):41 it = self.it42 for i in reversed(range(1, n+1)):43 self.assertEqual(length_hint(it), i)44 next(it)45 self.assertEqual(length_hint(it), 0)46 self.assertRaises(StopIteration, next, it)47 self.assertEqual(length_hint(it), 0)48class TestTemporarilyImmutable(TestInvariantWithoutMutations):49 def test_immutable_during_iteration(self):50 # objects such as deques, sets, and dictionaries enforce51 # length immutability during iteration52 it = self.it53 self.assertEqual(length_hint(it), n)54 next(it)55 self.assertEqual(length_hint(it), n-1)56 self.mutate()57 self.assertRaises(RuntimeError, next, it)58 self.assertEqual(length_hint(it), 0)59## ------- Concrete Type Tests -------60class TestRepeat(TestInvariantWithoutMutations, unittest.TestCase):61 def setUp(self):62 self.it = repeat(None, n)63class TestXrange(TestInvariantWithoutMutations, unittest.TestCase):64 def setUp(self):65 self.it = iter(range(n))66class TestXrangeCustomReversed(TestInvariantWithoutMutations, unittest.TestCase):67 def setUp(self):68 self.it = reversed(range(n))69class TestTuple(TestInvariantWithoutMutations, unittest.TestCase):70 def setUp(self):71 self.it = iter(tuple(range(n)))72## ------- Types that should not be mutated during iteration -------73class TestDeque(TestTemporarilyImmutable, unittest.TestCase):74 def setUp(self):75 d = deque(range(n))76 self.it = iter(d)77 self.mutate = d.pop78class TestDequeReversed(TestTemporarilyImmutable, unittest.TestCase):79 def setUp(self):80 d = deque(range(n))81 self.it = reversed(d)82 self.mutate = d.pop83class TestDictKeys(TestTemporarilyImmutable, unittest.TestCase):84 def setUp(self):85 d = dict.fromkeys(range(n))86 self.it = iter(d)87 self.mutate = d.popitem88class TestDictItems(TestTemporarilyImmutable, unittest.TestCase):89 def setUp(self):90 d = dict.fromkeys(range(n))91 self.it = iter(d.items())92 self.mutate = d.popitem93class TestDictValues(TestTemporarilyImmutable, unittest.TestCase):94 def setUp(self):95 d = dict.fromkeys(range(n))96 self.it = iter(d.values())97 self.mutate = d.popitem98class TestSet(TestTemporarilyImmutable, unittest.TestCase):99 def setUp(self):100 d = set(range(n))101 self.it = iter(d)102 self.mutate = d.pop103## ------- Types that can mutate during iteration -------104class TestList(TestInvariantWithoutMutations, unittest.TestCase):105 def setUp(self):106 self.it = iter(range(n))107 def test_mutation(self):108 d = list(range(n))109 it = iter(d)110 next(it)111 next(it)112 self.assertEqual(length_hint(it), n - 2)113 d.append(n)114 self.assertEqual(length_hint(it), n - 1) # grow with append115 d[1:] = []116 self.assertEqual(length_hint(it), 0)117 self.assertEqual(list(it), [])118 d.extend(range(20))119 self.assertEqual(length_hint(it), 0)120class TestListReversed(TestInvariantWithoutMutations, unittest.TestCase):121 def setUp(self):122 self.it = reversed(range(n))123 def test_mutation(self):124 d = list(range(n))125 it = reversed(d)126 next(it)127 next(it)128 self.assertEqual(length_hint(it), n - 2)129 d.append(n)130 self.assertEqual(length_hint(it), n - 2) # ignore append131 d[1:] = []132 self.assertEqual(length_hint(it), 0)133 self.assertEqual(list(it), []) # confirm invariant134 d.extend(range(20))135 self.assertEqual(length_hint(it), 0)136## -- Check to make sure exceptions are not suppressed by __length_hint__()137class BadLen(object):138 def __iter__(self):139 return iter(range(10))140 def __len__(self):141 raise RuntimeError('hello')142class BadLengthHint(object):143 def __iter__(self):144 return iter(range(10))145 def __length_hint__(self):146 raise RuntimeError('hello')147class NoneLengthHint(object):148 def __iter__(self):149 return iter(range(10))150 def __length_hint__(self):151 return NotImplemented152class TestLengthHintExceptions(unittest.TestCase):153 def test_issue1242657(self):154 self.assertRaises(RuntimeError, list, BadLen())155 self.assertRaises(RuntimeError, list, BadLengthHint())156 self.assertRaises(RuntimeError, [].extend, BadLen())157 self.assertRaises(RuntimeError, [].extend, BadLengthHint())158 b = bytearray(range(10))159 self.assertRaises(RuntimeError, b.extend, BadLen())160 self.assertRaises(RuntimeError, b.extend, BadLengthHint())161 def test_invalid_hint(self):162 # Make sure an invalid result doesn't muck-up the works163 self.assertEqual(list(NoneLengthHint()), list(range(10)))164if __name__ == "__main__":...

Full Screen

Full Screen

linesearch.py

Source:linesearch.py Github

copy

Full Screen

1import os2import numpy as np3from .log import write_log, write_status4from .wolfe import wolfe_conditions, update_alpha5from .cost import read_cost6from .descent import read_descent7from .gradient import read_gradient8from .metadata import read_optparams9def write_optvals(optvals, outdir, it, ls=None):10 """writes the optimization parameters to file11 Parameters12 ----------13 optvals : list14 the ndarray contains q, alphaleft, alpharight, alpha, w1, w2, w3.15 optdir : str16 optimization directory17 it : int18 iteration number19 ls : int, optional20 iteration number, by default None21 """22 # Get dir23 optdir = os.path.join(outdir, 'opt')24 if ls is not None:25 fname = f"optvals_it{it:05d}_ls{ls:05d}.npy"26 else:27 fname = f"optvals_it{it:05d}.npy"28 file = os.path.join(optdir, fname)29 np.save(file, optvals)30def read_optvals(outdir, it, ls=None):31 """Reads the optimization values q, alpha, alpha left, and alpha right,32 and the three wolf condition number w1,w2,w3. into a tuple33 Parameters34 ----------35 optdir : str36 optimization directory37 it : int38 iteration number39 ls : int, optional40 linesearch number, by default None41 """42 # Get dir43 optdir = os.path.join(outdir, 'opt')44 if ls is not None:45 fname = f"optvals_it{it:05d}_ls{ls:05d}.npy"46 else:47 fname = f"optvals_it{it:05d}.npy"48 file = os.path.join(optdir, fname)49 optvals = np.load(file)50 optvals = optvals.tolist()51 # Convert the wolfe conditions to booleans52 optvals[-1] = bool(optvals[-1])53 optvals[-2] = bool(optvals[-2])54 optvals[-3] = bool(optvals[-3])55 return optvals56def check_optvals(outdir, it, ls, status=True):57 """58 status flag, because checkopvals is called once at the end of the linesearch 59 and then another time at the end of the iteration"""60 # Read optimizaiton parameters61 optparams = read_optparams(outdir)62 # Get Max linesearch number63 nls_max = optparams["nls_max"]64 # Read previous set of optimization values65 _, alpha_l, alpha_r, alpha, w1, w2, w3 = read_optvals(66 outdir, it, ls)67 # Linesearch failed if w3 is False68 if w3 is False:69 if status:70 write_status(71 outdir,72 f"FAIL: NOT A DESCENT DIRECTION at it {it:05d} and ls {ls:05d}.")73 return "FAIL"74 # Line search successful75 elif (w1 is True) and (w2 is True):76 # Read initial cost and final cost77 initcost = read_cost(outdir, 0, 0)78 cost = read_cost(outdir, it, ls)79 # Write log message80 if status:81 write_log(outdir,82 f"iter = {it}, "83 f"f/fo={cost/initcost:5.4e}, "84 f"nls = {ls}, wolfe1 = {w1} wolfe2 = {w2}, "85 f"a={alpha}, al={alpha_l}, ar={alpha_r}")86 write_status(87 outdir,88 f"SUCCESS: it {it:05d} and ls {ls:05d}.")89 return "SUCCESS"90 # Check linesearch91 elif ls == (nls_max-1) and ((w1 is False) or (w2 is False)):92 if status:93 write_status(94 outdir,95 f"FAIL: LS ENDED at it {it:05d} and ls {ls:05d}.")96 return "FAIL"97 else:98 if status:99 write_status(100 outdir,101 f"ADDSTEP: it {it:05d} and ls {ls:05d}.")102 return "ADDSTEP"103def linesearch(outdir, it, ls):104 # Get the model update and grad105 dm = read_descent(outdir, it, 0)106 g = read_gradient(outdir, it, ls)107 # Compute q descent dot grad108 q = np.sum(dm*g)109 # Write first set of linesearch parameters110 if ls == 0:111 # Set all values to the initial values112 alpha = 1113 alpha_l = 0114 alpha_r = 0115 w1, w2, w3 = True, True, True116 # If linesearch is in progress117 else:118 # Read iteration's q119 q_old, _, _, _, _, _, _ = read_optvals(120 outdir, it, 0)121 # Read previous set of optimization values122 _, alpha_l, alpha_r, alpha, _, _, _ = read_optvals(123 outdir, it, ls-1)124 # Read current q and new queue125 cost = read_cost(outdir, it, ls)126 # Read current q and new queue127 cost_old = read_cost(outdir, it, 0)128 # Safeguard check for inf and nans...129 if np.isnan(cost) or np.isinf(cost):130 # assume we've been too far and reduce step131 alpha_r = alpha132 alpha = (alpha_l + alpha_r)*0.5133 w1, w2, w3 = False, False, True134 write_optvals(135 [q, alpha_l, alpha_r, alpha, w1, w2, w3], outdir, it, ls)136 else:137 # Compute wolfe conditions138 w1, w2, w3 = wolfe_conditions(139 q_old, q, cost_old, cost, alpha)140 if w3 is False or ((w1 is True) and (w2 is True)):141 # Write to optimization values to file142 write_optvals(143 [q, alpha_l, alpha_r, alpha, w1, w2, w3], outdir, it, ls)144 else:145 # Write to optimization values to file146 alpha_l, alpha_r, alpha = update_alpha(147 w1, w2, alpha_l, alpha_r, alpha, factor=10.0)148 # Write to optimization values to file149 write_optvals([q, alpha_l, alpha_r, alpha, w1, w2, w3], outdir, it, ls)150 print(" optvals:", np.array2string(...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run Mamba automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful