Best Python code snippet using playwright-python
comments.py
Source:comments.py
1import scrapy2from scrapy.loader import ItemLoader3from scrapy.exceptions import CloseSpider4from fbcrawl.spiders.fbcrawl import FacebookSpider5from fbcrawl.items import CommentsItem, parse_date, parse_date26from datetime import datetime7class CommentsSpider(FacebookSpider):8 """9 Parse FB comments, given a post (needs credentials)10 """ 11 name = "comments"12 custom_settings = {13 'FEED_EXPORT_FIELDS': ['source','reply_to','date','reactions','text', \14 'source_url','url'],15 'DUPEFILTER_CLASS' : 'scrapy.dupefilters.BaseDupeFilter',16 'CONCURRENT_REQUESTS' : 117 }18 def __init__(self, *args, **kwargs):19 if 'post' in kwargs and 'page' in kwargs:20 raise AttributeError('You need to specifiy only one between post and page')21 elif 'post' in kwargs:22 self.page = kwargs['post']23 self.type = 'post'24 elif 'page' in kwargs:25 self.type = 'page'26 27 super().__init__(*args,**kwargs)28 def parse_page(self, response):29 '''30 '''31 if self.type == 'post':32 yield scrapy.Request(url=response.url,33 callback=self.parse_post,34 priority=10,35 meta={'index':1})36 elif self.type == 'page':37 #select all posts38 for post in response.xpath("//div[contains(@data-ft,'top_level_post_id')]"): 39 many_features = post.xpath('./@data-ft').get()40 date = []41 date.append(many_features)42 date = parse_date(date,{'lang':self.lang})43 current_date = datetime.strptime(date,'%Y-%m-%d %H:%M:%S') if date is not None else date44 45 if current_date is None:46 date_string = post.xpath('.//abbr/text()').get()47 date = parse_date2([date_string],{'lang':self.lang})48 current_date = datetime(date.year,date.month,date.day) if date is not None else date 49 date = str(date)50 if abs(self.count) + 1 > self.max:51 raise CloseSpider('Reached max num of post: {}. Crawling finished'.format(abs(self.count)))52 self.logger.info('Parsing post n = {}, post_date = {}'.format(abs(self.count)+1,date))53 #returns full post-link in a list54 post = post.xpath(".//a[contains(@href,'footer')]/@href").extract() 55 temp_post = response.urljoin(post[0])56 self.count -= 157 yield scrapy.Request(temp_post, 58 self.parse_post, 59 priority = self.count,60 meta={'index':1})61 62 #load following page, try to click on "more"63 #after few pages have been scraped, the "more" link might disappears 64 #if not present look for the highest year not parsed yet65 #click once on the year and go back to clicking "more"66 #new_page is different for groups67 if self.group == 1:68 new_page = response.xpath("//div[contains(@id,'stories_container')]/div[2]/a/@href").extract() 69 else:70 new_page = response.xpath("//div[2]/a[contains(@href,'timestart=') and not(contains(text(),'ent')) and not(contains(text(),number()))]/@href").extract() 71 #this is why lang is needed 72 if not new_page: 73 self.logger.info('[!] "more" link not found, will look for a "year" link')74 #self.k is the year link that we look for 75 if response.meta['flag'] == self.k and self.k >= self.year: 76 xpath = "//div/a[contains(@href,'time') and contains(text(),'" + str(self.k) + "')]/@href"77 new_page = response.xpath(xpath).extract()78 if new_page:79 new_page = response.urljoin(new_page[0])80 self.k -= 181 self.logger.info('Found a link for year "{}", new_page = {}'.format(self.k,new_page))82 yield scrapy.Request(new_page, 83 callback=self.parse_page, 84 priority = -1000, 85 meta={'flag':self.k})86 else:87 while not new_page: #sometimes the years are skipped this handles small year gaps88 self.logger.info('Link not found for year {}, trying with previous year {}'.format(self.k,self.k-1))89 self.k -= 190 if self.k < self.year:91 raise CloseSpider('Reached date: {}. Crawling finished'.format(self.date))92 xpath = "//div/a[contains(@href,'time') and contains(text(),'" + str(self.k) + "')]/@href"93 new_page = response.xpath(xpath).extract()94 self.logger.info('Found a link for year "{}", new_page = {}'.format(self.k,new_page))95 new_page = response.urljoin(new_page[0])96 self.k -= 197 yield scrapy.Request(new_page, 98 callback=self.parse_page,99 priority = -1000,100 meta={'flag':self.k}) 101 else:102 self.logger.info('Crawling has finished with no errors!')103 else:104 new_page = response.urljoin(new_page[0])105 if 'flag' in response.meta:106 self.logger.info('Page scraped, clicking on "more"! new_page = {}'.format(new_page))107 yield scrapy.Request(new_page, 108 callback=self.parse_page, 109 priority = -1000, 110 meta={'flag':response.meta['flag']})111 else:112 self.logger.info('First page scraped, clicking on "more"! new_page = {}'.format(new_page))113 yield scrapy.Request(new_page, 114 callback=self.parse_page, 115 priority = -1000, 116 meta={'flag':self.k})117 def parse_post(self, response):118 '''119 parse post does multiple things:120 1) loads replied-to-comments page one-by-one (for DFS)121 2) call parse_reply on the nested comments122 3) adds simple (not-replied-to) comments123 4) follows to new comment page124 '''125 #load replied-to comments pages126 #select nested comment one-by-one matching with the index: response.meta['index']127 path = './/div[string-length(@class) = 2 and count(@id)=1 and contains("0123456789", substring(@id,1,1)) and .//div[contains(@id,"comment_replies")]]' + '['+ str(response.meta['index']) + ']'128 group_flag = response.meta['group'] if 'group' in response.meta else None129 for reply in response.xpath(path):130 source = reply.xpath('.//h3/a/text()').extract()131 answer = reply.xpath('.//a[contains(@href,"repl")]/@href').extract()132 ans = response.urljoin(answer[::-1][0])133 self.logger.info('{} nested comment'.format(str(response.meta['index'])))134 yield scrapy.Request(ans,135 callback=self.parse_reply,136 priority=1000,137 meta={'reply_to':source,138 'url':response.url,139 'index':response.meta['index'],140 'flag':'init',141 'group':group_flag})142 #load regular comments 143 if not response.xpath(path): #prevents from exec144 path2 = './/div[string-length(@class) = 2 and count(@id)=1 and contains("0123456789", substring(@id,1,1)) and not(.//div[contains(@id,"comment_replies")])]'145 for i,reply in enumerate(response.xpath(path2)):146 self.logger.info('{} regular comment'.format(i+1))147 new = ItemLoader(item=CommentsItem(),selector=reply)148 new.context['lang'] = self.lang 149 new.add_xpath('source','.//h3/a/text()') 150 new.add_xpath('source_url','.//h3/a/@href') 151 new.add_xpath('text','.//div[h3]/div[1]//text()')152 new.add_xpath('date','.//abbr/text()')153 new.add_xpath('reactions','.//a[contains(@href,"reaction/profile")]//text()')154 new.add_value('url',response.url)155 yield new.load_item()156 157 #new comment page158 if not response.xpath(path):159 #for groups160 next_xpath = './/div[contains(@id,"see_next")]'161 prev_xpath = './/div[contains(@id,"see_prev")]'162 if not response.xpath(next_xpath) or group_flag == 1:163 for next_page in response.xpath(prev_xpath):164 new_page = next_page.xpath('.//@href').extract()165 new_page = response.urljoin(new_page[0])166 self.logger.info('New page to be crawled {}'.format(new_page))167 yield scrapy.Request(new_page,168 callback=self.parse_post,169 meta={'index':1,170 'group':1}) 171 else:172 for next_page in response.xpath(next_xpath):173 new_page = next_page.xpath('.//@href').extract()174 new_page = response.urljoin(new_page[0])175 self.logger.info('New page to be crawled {}'.format(new_page))176 yield scrapy.Request(new_page,177 callback=self.parse_post,178 meta={'index':1,179 'group':group_flag}) 180 181 def parse_reply(self,response):182 '''183 parse reply to comments, root comment is added if flag184 '''185# from scrapy.utils.response import open_in_browser186# open_in_browser(response)187 188 if response.meta['flag'] == 'init':189 #parse root comment190 for root in response.xpath('//div[contains(@id,"root")]/div/div/div[count(@id)!=1 and contains("0123456789", substring(@id,1,1))]'): 191 new = ItemLoader(item=CommentsItem(),selector=root)192 new.context['lang'] = self.lang 193 new.add_xpath('source','.//h3/a/text()') 194 new.add_xpath('source_url','.//h3/a/@href') 195 new.add_value('reply_to','ROOT')196 new.add_xpath('text','.//div[1]//text()')197 new.add_xpath('date','.//abbr/text()')198 new.add_xpath('reactions','.//a[contains(@href,"reaction/profile")]//text()')199 new.add_value('url',response.url)200 yield new.load_item()201 #parse all replies in the page202 for reply in response.xpath('//div[contains(@id,"root")]/div/div/div[count(@id)=1 and contains("0123456789", substring(@id,1,1))]'): 203 new = ItemLoader(item=CommentsItem(),selector=reply)204 new.context['lang'] = self.lang 205 new.add_xpath('source','.//h3/a/text()') 206 new.add_xpath('source_url','.//h3/a/@href') 207 new.add_value('reply_to',response.meta['reply_to'])208 new.add_xpath('text','.//div[h3]/div[1]//text()')209 new.add_xpath('date','.//abbr/text()')210 new.add_xpath('reactions','.//a[contains(@href,"reaction/profile")]//text()')211 new.add_value('url',response.url) 212 yield new.load_item()213 214 back = response.xpath('//div[contains(@id,"comment_replies_more_1")]/a/@href').extract()215 if back:216 self.logger.info('Back found, more nested comments')217 back_page = response.urljoin(back[0])218 yield scrapy.Request(back_page, 219 callback=self.parse_reply,220 priority = 1000,221 meta={'reply_to':response.meta['reply_to'],222 'flag':'back',223 'url':response.meta['url'],224 'index':response.meta['index'],225 'group':response.meta['group']})226 else:227 next_reply = response.meta['url']228 self.logger.info('Nested comments crawl finished, heading to proper page: {}'.format(response.meta['url']))229 yield scrapy.Request(next_reply,230 callback=self.parse_post,231 meta={'index':response.meta['index']+1,232 'group':response.meta['group']})233 234 elif response.meta['flag'] == 'back':235 #parse all comments236 for reply in response.xpath('//div[contains(@id,"root")]/div/div/div[count(@id)=1 and contains("0123456789", substring(@id,1,1))]'): 237 new = ItemLoader(item=CommentsItem(),selector=reply)238 new.context['lang'] = self.lang 239 new.add_xpath('source','.//h3/a/text()') 240 new.add_xpath('source_url','.//h3/a/@href') 241 new.add_value('reply_to',response.meta['reply_to'])242 new.add_xpath('text','.//div[h3]/div[1]//text()')243 new.add_xpath('date','.//abbr/text()')244 new.add_xpath('reactions','.//a[contains(@href,"reaction/profile")]//text()')245 new.add_value('url',response.url) 246 yield new.load_item()247 #keep going backwards248 back = response.xpath('//div[contains(@id,"comment_replies_more_1")]/a/@href').extract()249 self.logger.info('Back found, more nested comments')250 if back:251 back_page = response.urljoin(back[0])252 yield scrapy.Request(back_page, 253 callback=self.parse_reply,254 priority=1000,255 meta={'reply_to':response.meta['reply_to'],256 'flag':'back',257 'url':response.meta['url'],258 'index':response.meta['index'],259 'group':response.meta['group']})260 else:261 next_reply = response.meta['url']262 self.logger.info('Nested comments crawl finished, heading to home page: {}'.format(response.meta['url']))263 yield scrapy.Request(next_reply,264 callback=self.parse_post,265 meta={'index':response.meta['index']+1,266 'group':response.meta['group']})267 268# =============================================================================269# CRAWL REACTIONS270# =============================================================================271# def parse_reactions(self,response):272# new = ItemLoader(item=CommentsItem(),response=response, parent=response.meta['item'])273# new.context['lang'] = self.lang 274# new.add_xpath('likes',"//a[contains(@href,'reaction_type=1')]/span/text()")275# new.add_xpath('ahah',"//a[contains(@href,'reaction_type=4')]/span/text()")276# new.add_xpath('love',"//a[contains(@href,'reaction_type=2')]/span/text()")277# new.add_xpath('wow',"//a[contains(@href,'reaction_type=3')]/span/text()")278# new.add_xpath('sigh',"//a[contains(@href,'reaction_type=7')]/span/text()")279# new.add_xpath('grrr',"//a[contains(@href,'reaction_type=8')]/span/text()") 280# yield new.load_item() 281#282# #substitute283# yield new.load_item()284# â¾â¾â¾â¾â¾â¾â¾â¾â¾|â¾â¾â¾â¾â¾â¾â¾â¾â¾â¾â¾285# _________v___286# #response --> reply/root287# reactions = response.xpath(".//a[contains(@href,'reaction/profile')]/@href")288# reactions = response.urljoin(reactions[0].extract())289# if reactions:290# yield scrapy.Request(reactions, callback=self.parse_reactions, meta={'item':new})291# else:...
profiles.py
Source:profiles.py
1import scrapy2from scrapy.loader import ItemLoader3from scrapy.exceptions import CloseSpider4from fbcrawl.spiders.fbcrawl import FacebookSpider5from fbcrawl.items import ProfileItem, parse_date, parse_date26from datetime import datetime7class ProfileSpider(FacebookSpider):8 """9 Parse FB profiles10 """ 11 name = "profiles"12 custom_settings = {13 'FEED_EXPORT_FIELDS': ['name','gender','birthday','current_city',14 'hometown','work','education','interested_in',15 'page'],16 'DUPEFILTER_CLASS' : 'scrapy.dupefilters.BaseDupeFilter',17 'CONCURRENT_REQUESTS' : 118 }19 def __init__(self, *args, **kwargs):20 if 'post' in kwargs and 'page' in kwargs:21 raise AttributeError('You need to specifiy only one between post and page')22 elif 'post' in kwargs:23 self.page = kwargs['post']24 self.type = 'post'25 elif 'page' in kwargs:26 self.type = 'page'27 28 super().__init__(*args,**kwargs)29 def parse_page(self, response):30 '''31 '''32 if self.type == 'post':33 yield scrapy.Request(url=response.url,34 callback=self.parse_post,35 priority=10,36 meta={'index':1})37 elif self.type == 'page':38 #select all posts39 for post in response.xpath("//div[contains(@data-ft,'top_level_post_id')]"): 40 many_features = post.xpath('./@data-ft').get()41 date = []42 date.append(many_features)43 date = parse_date(date,{'lang':self.lang})44 current_date = datetime.strptime(date,'%Y-%m-%d %H:%M:%S') if date is not None else date45 46 if current_date is None:47 date_string = post.xpath('.//abbr/text()').get()48 date = parse_date2([date_string],{'lang':self.lang})49 current_date = datetime(date.year,date.month,date.day) if date is not None else date 50 date = str(date)51 if abs(self.count) + 1 > self.max:52 raise CloseSpider('Reached max num of post: {}. Crawling finished'.format(abs(self.count)))53 self.logger.info('Parsing post n = {}, post_date = {}'.format(abs(self.count)+1,date))54 #returns full post-link in a list55 post = post.xpath(".//a[contains(@href,'footer')]/@href").extract() 56 temp_post = response.urljoin(post[0])57 self.count -= 158 yield scrapy.Request(temp_post, 59 self.parse_post, 60 priority = self.count,61 meta={'index':1})62 63 #load following page, try to click on "more"64 #after few pages have been scraped, the "more" link might disappears 65 #if not present look for the highest year not parsed yet66 #click once on the year and go back to clicking "more"67 #new_page is different for groups68 if self.group == 1:69 new_page = response.xpath("//div[contains(@id,'stories_container')]/div[2]/a/@href").extract() 70 else:71 new_page = response.xpath("//div[2]/a[contains(@href,'timestart=') and not(contains(text(),'ent')) and not(contains(text(),number()))]/@href").extract() 72 #this is why lang is needed 73 if not new_page: 74 self.logger.info('[!] "more" link not found, will look for a "year" link')75 #self.k is the year link that we look for 76 if response.meta['flag'] == self.k and self.k >= self.year: 77 xpath = "//div/a[contains(@href,'time') and contains(text(),'" + str(self.k) + "')]/@href"78 new_page = response.xpath(xpath).extract()79 if new_page:80 new_page = response.urljoin(new_page[0])81 self.k -= 182 self.logger.info('Found a link for year "{}", new_page = {}'.format(self.k,new_page))83 yield scrapy.Request(new_page, 84 callback=self.parse_page, 85 priority = -1000, 86 meta={'flag':self.k})87 else:88 while not new_page: #sometimes the years are skipped this handles small year gaps89 self.logger.info('Link not found for year {}, trying with previous year {}'.format(self.k,self.k-1))90 self.k -= 191 if self.k < self.year:92 raise CloseSpider('Reached date: {}. Crawling finished'.format(self.date))93 xpath = "//div/a[contains(@href,'time') and contains(text(),'" + str(self.k) + "')]/@href"94 new_page = response.xpath(xpath).extract()95 self.logger.info('Found a link for year "{}", new_page = {}'.format(self.k,new_page))96 new_page = response.urljoin(new_page[0])97 self.k -= 198 yield scrapy.Request(new_page, 99 callback=self.parse_page,100 priority = -1000,101 meta={'flag':self.k}) 102 else:103 self.logger.info('Crawling has finished with no errors!')104 else:105 new_page = response.urljoin(new_page[0])106 if 'flag' in response.meta:107 self.logger.info('Page scraped, clicking on "more"! new_page = {}'.format(new_page))108 yield scrapy.Request(new_page, 109 callback=self.parse_page, 110 priority = -1000, 111 meta={'flag':response.meta['flag']})112 else:113 self.logger.info('First page scraped, clicking on "more"! new_page = {}'.format(new_page))114 yield scrapy.Request(new_page, 115 callback=self.parse_page, 116 priority = -1000, 117 meta={'flag':self.k})118 def parse_post(self, response):119 '''120 parse post does multiple things:121 1) loads replied-to-comments page one-by-one (for DFS)122 2) call parse_reply on the nested comments123 3) adds simple (not-replied-to) comments124 4) follows to new comment page125 '''126 #load replied-to comments pages127 #select nested comment one-by-one matching with the index: response.meta['index']128 path = './/div[string-length(@class) = 2 and count(@id)=1 and contains("0123456789", substring(@id,1,1)) and .//div[contains(@id,"comment_replies")]]' + '['+ str(response.meta['index']) + ']'129 group_flag = response.meta['group'] if 'group' in response.meta else None130 for reply in response.xpath(path):131 rep = reply.xpath('.//h3/a/@href').get()132 profile = 'https://mbasic.facebook.com' + rep[:rep.find('?rc')] + '/about'133 yield scrapy.Request(profile,134 callback=self.parse_profile,135 priority=1000,136 meta={'url':response.url,137 'index':response.meta['index'],138 'flag':'init',139 'group':group_flag})140 #load regular comments 141 if not response.xpath(path): #prevents from exec142 path2 = './/div[string-length(@class) = 2 and count(@id)=1 and contains("0123456789", substring(@id,1,1)) and not(.//div[contains(@id,"comment_replies")])]'143 for i,reply in enumerate(response.xpath(path2)):144 self.logger.info('{} regular comment'.format(i+1))145 rep = reply.xpath('.//h3/a/@href').get()146 profile = 'https://mbasic.facebook.com' + rep[:rep.find('?rc')] + '/about'147 yield scrapy.Request(profile,148 callback=self.parse_profile,149 priority=1000,150 meta={'url':response.url,151 'index':response.meta['index'],152 'flag':'init',153 'group':group_flag})154 155 #new comment page156 if not response.xpath(path):157 #for groups158 next_xpath = './/div[contains(@id,"see_next")]'159 prev_xpath = './/div[contains(@id,"see_prev")]'160 if not response.xpath(next_xpath) or group_flag == 1:161 for next_page in response.xpath(prev_xpath):162 new_page = next_page.xpath('.//@href').extract()163 new_page = response.urljoin(new_page[0])164 self.logger.info('New page to be crawled {}'.format(new_page))165 yield scrapy.Request(new_page,166 callback=self.parse_post,167 meta={'index':1,168 'group':1}) 169 else:170 for next_page in response.xpath(next_xpath):171 new_page = next_page.xpath('.//@href').extract()172 new_page = response.urljoin(new_page[0])173 self.logger.info('New page to be crawled {}'.format(new_page))174 yield scrapy.Request(new_page,175 callback=self.parse_post,176 meta={'index':1,177 'group':group_flag}) 178 179 def parse_reply(self,response):180 '''181 parse reply to comments, root comment is added if flag182 '''183# from scrapy.utils.response import open_in_browser184# open_in_browser(response)185 186 if response.meta['flag'] == 'init':187 #parse root comment188 for root in response.xpath('//div[contains(@id,"root")]/div/div/div[count(@id)!=1 and contains("0123456789", substring(@id,1,1))]'): 189 rep = root.xpath('.//h3/a/@href').get()190 profile = 'https://mbasic.facebook.com' + rep[:rep.find('?rc')] + '/about'191 yield scrapy.Request(profile,192 callback=self.parse_profile,193 priority=1000,194 meta={'url':response.url,195 'index':response.meta['index'],196 'flag':'init',197 'group':response['group_flag']})198 #parse all replies in the page199 for reply in response.xpath('//div[contains(@id,"root")]/div/div/div[count(@id)=1 and contains("0123456789", substring(@id,1,1))]'): 200 rep = reply.xpath('.//h3/a/@href').get()201 profile = 'https://mbasic.facebook.com' + rep[:rep.find('?rc')] + '/about'202 yield scrapy.Request(profile,203 callback=self.parse_profile,204 priority=1000,205 meta={'url':response.url,206 'index':response.meta['index'],207 'flag':'init',208 'group':response['group_flag']})209 210 back = response.xpath('//div[contains(@id,"comment_replies_more_1")]/a/@href').extract()211 if back:212 self.logger.info('Back found, more nested comments')213 back_page = response.urljoin(back[0])214 yield scrapy.Request(back_page, 215 callback=self.parse_reply,216 priority = 1000,217 meta={'reply_to':response.meta['reply_to'],218 'flag':'back',219 'url':response.meta['url'],220 'index':response.meta['index'],221 'group':response.meta['group']})222 else:223 next_reply = response.meta['url']224 self.logger.info('Nested comments crawl finished, heading to proper page: {}'.format(response.meta['url']))225 yield scrapy.Request(next_reply,226 callback=self.parse_post,227 meta={'index':response.meta['index']+1,228 'group':response.meta['group']})229 230 elif response.meta['flag'] == 'back':231 #parse all comments232 for reply in response.xpath('//div[contains(@id,"root")]/div/div/div[count(@id)=1 and contains("0123456789", substring(@id,1,1))]'): 233 rep = reply.xpath('.//h3/a/@href').extract()[0]234 profile = 'https://mbasic.facebook.com' + rep[:rep.find('?rc')] + '/about'235 yield scrapy.Request(profile,236 callback=self.parse_profile,237 priority=1000,238 meta={'url':response.url,239 'index':response.meta['index'],240 'flag':'init',241 'group':response['group_flag']})242 #keep going backwards243 back = response.xpath('//div[contains(@id,"comment_replies_more_1")]/a/@href').extract()244 self.logger.info('Back found, more nested comments')245 if back:246 back_page = response.urljoin(back[0])247 yield scrapy.Request(back_page, 248 callback=self.parse_reply,249 priority=1000,250 meta={'reply_to':response.meta['reply_to'],251 'flag':'back',252 'url':response.meta['url'],253 'index':response.meta['index'],254 'group':response.meta['group']})255 else:256 next_reply = response.meta['url']257 self.logger.info('Nested comments crawl finished, heading to home page: {}'.format(response.meta['url']))258 yield scrapy.Request(next_reply,259 callback=self.parse_post,260 meta={'index':response.meta['index']+1,261 'group':response.meta['group']})262 263 264 def parse_profile(self,response):265 new = ItemLoader(item=ProfileItem(),response=response)266 self.logger.info('Crawling profile info')267 new.add_xpath('name','//span/div/span/strong/text()')268 new.add_xpath('gender',"//div[@id='basic-info']//div[@title='Gender']//div/text()")269 new.add_xpath('birthday',"//div[@id='basic-info']//div[@title='Birthday']//div/text()")270 new.add_xpath('current_city',"//div[@id='living']//div[@title='Current City']//a/text()")271 new.add_xpath('hometown',"//div[@id='living']//div[@title='Hometown']//a/text()")272 new.add_xpath('work',"//div[@id='work']//a/text()")273 new.add_xpath('education',"//div[@id='education']//a/text()")274 new.add_xpath('interested_in',"//div[@id='interested-in']//div[not(contains(text(),'Interested In'))]/text()")275 new.add_xpath('page',"//div[@id='contact-info']//div[@title='Facebook']//div/text()")...
helpers.py
Source:helpers.py
...14from v1.models.sublanding_page import SublandingPage15def save_page(page):16 page.save()17 return page.save_revision()18def save_new_page(child, root=None):19 if not root:20 root = HomePage.objects.get(title="CFGov")21 root.add_child(instance=child)22 return save_page(page=child)23def publish_page(child):24 revision = save_new_page(child=child)25 revision.publish()26def publish_changes(child):27 revision = save_page(page=child)28 revision.publish()29def get_parent_route(site, parent_path=None):30 # return list of route paths31 root = site.root_page32 # since parent was not provided, make root33 parent = root34 # if a parent path is provided, use that as parent35 if parent_path:36 path_components = [37 component for component in parent_path.split("/") if component38 ]...
test_infobase.py
Source:test_infobase.py
1from infogami.infobase import server2import web3import unittest4import urllib, urllib25import simplejson6def browser():7 if web.config.get('test_url'):8 b = web.browser.Browser()9 b.open('http://0.0.0.0:8080')10 return b11 else:12 return server.app.browser()13b = browser()14def request(path, method="GET", data=None, headers={}):15 if method == 'GET' and data is not None:16 path = path + '?' + urllib.urlencode(data)17 data = None18 if isinstance(data, dict):19 data = simplejson.dumps(data)20 url = urllib.basejoin(b.url, path)21 req = urllib2.Request(url, data, headers)22 req.get_method = lambda: method23 b.do_request(req)24 if b.status == 200:25 return b.data and simplejson.loads(b.data)26 else:27 return None28def get(key):29 d = request('/test/get?key=' + key)30 return d31def echo(msg):32 request('/_echo', method='POST', data=msg)33def save(query):34 return request('/test/save' + query['key'], method='POST', data=query)35def save_many(query, comment=''):36 return request('/test/save_many', method='POST', data=urllib.urlencode({'query': simplejson.dumps(query), 'comment': comment}))37 38class DatabaseTest(unittest.TestCase):39 pass40 41class InfobaseTestCase(unittest.TestCase):42 def clear_threadlocal(self):43 import threading44 t = threading.currentThread()45 if hasattr(t, '_d'):46 del t._d47 def setUp(self):48 self.clear_threadlocal()49 global b50 b = browser()51 try:52 # create new database with name "test"53 self.assertEquals2(request("/test", method="PUT"), {"ok": True})54 except Exception:55 self.tearDown()56 raise57 # reset browser cookies58 b.reset()59 def tearDown(self):60 self.clear_threadlocal()61 # delete test database62 request('/test', method="DELETE")63 def assertEquals2(self, a, b):64 """Asserts two objects are same.65 """66 # special case to say don't worry about this value.67 if b == '*':68 return True69 elif isinstance(a, dict):70 self.assertTrue(isinstance(b, dict))71 # key '*' means skip additional keys.72 skip_additional = b.pop('*', False)73 if not skip_additional:74 self.assertEquals(a.keys(), b.keys())75 for k in b.keys():76 self.assertEquals2(a[k], b[k])77 elif isinstance(a, list):78 self.assertEquals(len(a), len(b))79 for x, y in zip(a, b):80 self.assertEquals2(x, y)81 else:82 self.assertEquals(a, b)83class DocumentTest(InfobaseTestCase):84 def test_simple(self):85 self.assertEquals2(request('/'), {'infobase': 'welcome', 'version': '*'})86 self.assertEquals2(request('/test'), {'name': 'test'})87 self.assertEquals2(request('/test/get?key=/type/type'), {'key': '/type/type', 'type': {'key': '/type/type'}, '*': True})88 89 request('/test/get?key=/not-there')90 self.assertEquals(b.status, 404)91 92 def test_save(self):93 x = {'key': '/new_page', 'type': {'key': '/type/object'}, 'x': 1, 's': 'hello'}94 d = request('/test/save/new_page', method="POST", data=x)95 self.assertEquals(b.status, 200)96 self.assertEquals(d, {'key': '/new_page', 'revision': 1})97 98 # verify data99 d = request('/test/get?key=/new_page')100 expected = dict({'latest_revision': 1, 'revision': 1, '*': True}, **d)101 self.assertEquals2(d, expected)102 # nothing should be modified when saved with the same data.103 d = request('/test/save/new_page', method="POST", data=x)104 self.assertEquals(b.status, 200)105 self.assertEquals(d, {})106 def test_versions(self):107 x = {'key': '/new_page', 'type': {'key': '/type/object'}, 'x': 1, 's': 'hello'}108 d = request('/test/save/new_page', method="POST", data=x)109 # verify revisions110 q = {'key': '/new_page'}111 d = request('/test/versions', method='GET', data={'query': simplejson.dumps({'key': '/new_page'})}) 112 self.assertEquals2(d, [{'key': '/new_page', 'revision': 1, '*': True}])113 d = request('/test/versions', method='GET', data={'query': simplejson.dumps({'limit': 1})}) 114 self.assertEquals2(d, [{'key': '/new_page', 'revision': 1, '*': True}])115 116 # try a failed save and make sure new revisions are not created117 request('/test/save/new_page', method='POST', data={'key': '/new_page', 'type': '/type/no-such-type'})118 self.assertNotEquals(b.status, 200)119 q = {'key': '/new_page'}120 d = request('/test/versions', method='GET', data={'query': simplejson.dumps({'key': '/new_page'})}) 121 self.assertEquals2(d, [{'key': '/new_page', 'revision': 1, '*': True}])122 d = request('/test/versions', method='GET', data={'query': simplejson.dumps({'limit': 1})}) 123 self.assertEquals2(d, [{'key': '/new_page', 'revision': 1, '*': True}])124 # save the page and make sure new revision is created.125 d = request('/test/save/new_page', method='POST', data=dict(x, title='foo'))126 self.assertEquals(d, {'key': '/new_page', 'revision': 2})127 d = request('/test/versions', method='GET', data={'query': simplejson.dumps({'key': '/new_page'})}) 128 self.assertEquals2(d, [{'key': '/new_page', 'revision': 2, '*': True}, {'key': '/new_page', 'revision': 1, '*': True}])129 def test_save_many(self):130 q = [131 {'key': '/one', 'type': {'key': '/type/object'}, 'n': 1},132 {'key': '/two', 'type': {'key': '/type/object'}, 'n': 2}133 ]134 d = request('/test/save_many', method='POST', data=urllib.urlencode({'query': simplejson.dumps(q)}))135 self.assertEquals(d, [{'key': '/one', 'revision': 1}, {'key': '/two', 'revision': 1}])136 self.assertEquals2(get('/one'), {'key': '/one', 'type': {'key': '/type/object'}, 'n': 1, 'revision': 1,'*': True})137 self.assertEquals2(get('/two'), {'key': '/two', 'type': {'key': '/type/object'}, 'n': 2, 'revision': 1, '*': True})138 # saving with same data should not create new revisions139 d = request('/test/save_many', method='POST', data=urllib.urlencode({'query': simplejson.dumps(q)}))140 self.assertEquals(d, [])141 # try bad query142 q = [143 {'key': '/zero', 'type': {'key': '/type/object'}, 'n': 0},144 {'key': '/one', 'type': {'key': '/type/object'}, 'n': 11},145 {'key': '/two', 'type': {'key': '/type/no-such-type'}, 'n': 2}146 ]147 d = request('/test/save_many', method='POST', data=urllib.urlencode({'query': simplejson.dumps(q)}))148 self.assertNotEquals(b.status, 200)149 d = get('/zero')150 self.assertEquals(b.status, 404)151# create author, book and collection types to test validations152types = [{153 "key": "/type/author",154 "type": "/type/type",155 "kind": "regular",156 "properties": [{157 "name": "name",158 "expected_type": {"key": "/type/string"},159 "unique": True160 }, {161 "name": "bio",162 "expected_type": {"key": "/type/text"},163 "unique": True164 }]165}, {166 "key": "/type/book",167 "type": "/type/type",168 "kind": "regular",169 "properties": [{170 "name": "title",171 "expected_type": {"key": "/type/string"},172 "unique": True173 }, {174 "name": "authors",175 "expected_type": {"key": "/type/author"},176 "unique": False177 }, {178 "name": "publisher",179 "expected_type": {"key": "/type/string"},180 "unique": True181 }, {182 "name": "description",183 "expected_type": {"key": "/type/text"},184 "unique": True185 }]186}, {187 "key": "/type/collection",188 "type": "/type/type",189 "kind": "regular",190 "properties": [{191 "name": "name",192 "expected_type": {"key": "/type/string"},193 "unique": True194 }, {195 "name": "books",196 "expected_type": {"key": "/type/book"},197 "unique": False198 }]199}]200class MoreDocumentTest(DocumentTest):201 def setUp(self):202 DocumentTest.setUp(self)203 save_many(types)204 def test_save_validation(self):205 # ok: name is string206 d = save({'key': '/author/x', 'type': '/type/author', 'name': 'x'})207 self.assertEquals(b.status, 200)208 self.assertEquals(d, {"key": "/author/x", "revision": 1})209 210 # error: name is int instead of string211 d = save({'key': '/author/x', 'type': '/type/author', 'name': 42})212 self.assertEquals(b.status, 400)213 # error: name is list instead of single value214 d = save({'key': '/author/x', 'type': '/type/author', 'name': ['x', 'y']})215 self.assertEquals(b.status, 400)216 def test_validation_when_type_changes(self):217 # create an author and a book218 save({'key': '/author/x', 'type': '/type/author', 'name': 'x'})219 save({'key': '/book/x', 'type': '/type/book', 'title': 'x', 'authors': [{'key': '/author/x'}], 'publisher': 'publisher_x'})220 # change schema of "/type/book" and make expected_type of "publisher" as "/type/publisher"221 save({222 "key": "/type/publisher",223 "type": "/type/type",224 "kind": "regular",225 "properties": [{226 "name": "name",227 "expected_type": "/type/string",228 "unique": True229 }]230 })231 d = get('/type/book')232 assert d['properties'][2]['name'] == "publisher"233 d['properties'][2]['expected_type'] = {"key": "/type/publisher"}234 save(d)235 # now changing just the title of the book should not fail.236 d = get('/book/x')237 d['title'] = 'xx'238 save(d)239 self.assertEquals(b.status, 200)240if __name__ == "__main__":...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!