Best Python code snippet using pom_python
test_ping.py
Source:test_ping.py
1from landscape.client.tests.helpers import LandscapeTest2from twisted.internet.defer import fail3from landscape.lib import bpickle4from landscape.lib.fetch import fetch5from landscape.lib.testing import FakeReactor6from landscape.client.broker.ping import PingClient, Pinger7from landscape.client.broker.tests.helpers import ExchangeHelper8class FakePageGetter(object):9 """An fake web client."""10 def __init__(self, response):11 self.response = response12 self.fetches = []13 def get_page(self, url, post, headers, data):14 """15 A method which is supposed to act like a limited version of16 L{landscape.lib.fetch.fetch}.17 Record attempts to get pages, and return a deferred with pre-cooked18 data.19 """20 self.fetches.append((url, post, headers, data))21 return bpickle.dumps(self.response)22 def failing_get_page(self, url, post, headers, data):23 """24 A method which is supposed to act like a limited version of25 L{landscape.lib.fetch.fetch}.26 Record attempts to get pages, and return a deferred with pre-cooked27 data.28 """29 raise AssertionError("That's a failure!")30class PingClientTest(LandscapeTest):31 def setUp(self):32 super(PingClientTest, self).setUp()33 self.reactor = FakeReactor()34 def test_default_get_page(self):35 """36 The C{get_page} argument to L{PingClient} should be optional, and37 default to L{twisted.web.client.getPage}.38 """39 client = PingClient(self.reactor)40 self.assertEqual(client.get_page, fetch)41 def test_ping(self):42 """43 L{PingClient} should be able to send a web request to a specified URL44 about a particular insecure ID.45 """46 client = FakePageGetter(None)47 url = "http://localhost/ping"48 insecure_id = 1049 pinger = PingClient(self.reactor, get_page=client.get_page)50 pinger.ping(url, insecure_id)51 self.assertEqual(52 client.fetches,53 [(url, True, {"Content-Type": "application/x-www-form-urlencoded"},54 "insecure_id=10")])55 def test_ping_no_insecure_id(self):56 """57 If a L{PingClient} does not have an insecure-id yet, then the ping58 should not happen.59 """60 client = FakePageGetter(None)61 pinger = PingClient(self.reactor, get_page=client.get_page)62 d = pinger.ping("http://ping/url", None)63 d.addCallback(self.assertEqual, False)64 self.assertEqual(client.fetches, [])65 def test_respond(self):66 """67 The L{PingClient.ping} fire the Deferred it returns with True if the68 web request indicates that the computer has messages.69 """70 client = FakePageGetter({"messages": True})71 pinger = PingClient(self.reactor, get_page=client.get_page)72 d = pinger.ping("http://ping/url", 23)73 d.addCallback(self.assertEqual, True)74 def test_errback(self):75 """76 If the HTTP request fails the deferred returned by L{PingClient.ping}77 fires back with an error.78 """79 client = FakePageGetter(None)80 pinger = PingClient(self.reactor, get_page=client.failing_get_page)81 d = pinger.ping("http://ping/url", 23)82 failures = []83 def errback(failure):84 failures.append(failure)85 d.addErrback(errback)86 self.assertEqual(len(failures), 1)87 self.assertEqual(failures[0].getErrorMessage(), "That's a failure!")88 self.assertEqual(failures[0].type, AssertionError)89class PingerTest(LandscapeTest):90 helpers = [ExchangeHelper]91 # Tell the Plugin helper to not add a MessageExchange plugin, to interfere92 # with our code which asserts stuff about when *our* plugin fires93 # exchanges.94 install_exchanger = False95 def setUp(self):96 super(PingerTest, self).setUp()97 self.page_getter = FakePageGetter(None)98 def factory(reactor):99 return PingClient(reactor, get_page=self.page_getter.get_page)100 self.config.ping_url = "http://localhost:8081/whatever"101 self.config.ping_interval = 10102 self.pinger = Pinger(self.reactor,103 self.identity,104 self.exchanger,105 self.config,106 ping_client_factory=factory)107 def test_default_ping_client(self):108 """109 The C{ping_client_factory} argument to L{Pinger} should be optional,110 and default to L{PingClient}.111 """112 pinger = Pinger(self.reactor,113 self.identity,114 self.exchanger,115 self.config)116 self.assertEqual(pinger.ping_client_factory, PingClient)117 def test_occasional_ping(self):118 """119 The L{Pinger} should be able to occasionally ask if there are120 messages.121 """122 self.pinger.start()123 self.identity.insecure_id = 23124 self.reactor.advance(9)125 self.assertEqual(len(self.page_getter.fetches), 0)126 self.reactor.advance(1)127 self.assertEqual(len(self.page_getter.fetches), 1)128 def test_load_insecure_id(self):129 """130 If the insecure-id has already been saved when the plugin is131 registered, it should immediately start pinging.132 """133 self.identity.insecure_id = 42134 self.pinger.start()135 self.reactor.advance(10)136 self.assertEqual(len(self.page_getter.fetches), 1)137 def test_response(self):138 """139 When a ping indicates there are messages, an exchange should occur.140 """141 self.pinger.start()142 self.identity.insecure_id = 42143 self.page_getter.response = {"messages": True}144 # 70 = ping delay + urgent exchange delay145 self.reactor.advance(70)146 self.assertEqual(len(self.transport.payloads), 1)147 def test_negative_response(self):148 """149 When a ping indicates there are no messages, no exchange should occur.150 """151 self.pinger.start()152 self.identity.insecure_id = 42153 self.page_getter.response = {"messages": False}154 self.reactor.advance(10)155 self.assertEqual(len(self.transport.payloads), 0)156 def test_ping_error(self):157 """158 When the web interaction fails for some reason, a message159 should be logged.160 """161 self.log_helper.ignore_errors(ZeroDivisionError)162 self.identity.insecure_id = 42163 class BadPingClient(object):164 def __init__(self, *args, **kwargs):165 pass166 def ping(self, url, secure_id):167 self.url = url168 return fail(ZeroDivisionError("Couldn't fetch page"))169 self.config.ping_url = "http://foo.com/"170 pinger = Pinger(self.reactor,171 self.identity,172 self.exchanger,173 self.config,174 ping_client_factory=BadPingClient)175 pinger.start()176 self.reactor.advance(30)177 log = self.logfile.getvalue()178 self.assertIn("Error contacting ping server at http://foo.com/", log)179 self.assertIn("ZeroDivisionError", log)180 self.assertIn("Couldn't fetch page", log)181 def test_get_interval(self):182 self.assertEqual(self.pinger.get_interval(), 10)183 def test_set_intervals_handling(self):184 self.pinger.start()185 self.reactor.fire("message", {"type": "set-intervals", "ping": 73})186 self.assertEqual(self.pinger.get_interval(), 73)187 # The server may set specific intervals only, not including the ping.188 self.reactor.fire("message", {"type": "set-intervals"})189 self.assertEqual(self.pinger.get_interval(), 73)190 self.identity.insecure_id = 23191 self.reactor.advance(72)192 self.assertEqual(len(self.page_getter.fetches), 0)193 self.reactor.advance(1)194 self.assertEqual(len(self.page_getter.fetches), 1)195 def test_get_url(self):196 self.assertEqual(self.pinger.get_url(),197 "http://localhost:8081/whatever")198 def test_config_url(self):199 """200 The L{Pinger} uses the ping URL set in the given configuration.201 """202 self.identity.insecure_id = 23203 url = "http://example.com/mysuperping"204 self.config.ping_url = url205 self.pinger.start()206 self.reactor.advance(10)207 self.assertEqual(self.page_getter.fetches[0][0], url)208 def test_reschedule(self):209 """210 Each time a ping is completed the L{Pinger} schedules a new ping using211 the current ping interval.212 """213 self.identity.insecure_id = 23214 self.pinger.start()215 self.reactor.advance(10)216 self.assertEqual(1, len(self.page_getter.fetches))217 self.reactor.advance(10)218 self.assertEqual(2, len(self.page_getter.fetches))219 def test_reschedule_with_ping_interval_change(self):220 """221 If the ping interval changes, new pings will be scheduled accordingly.222 """223 self.identity.insecure_id = 23224 self.pinger.start()225 self.reactor.advance(5)226 # Simulate interval changing in the meantime227 self.config.ping_interval = 20228 self.reactor.advance(5)229 self.assertEqual(1, len(self.page_getter.fetches))230 # The new interval is 20, so after only 10 seconds nothing happens231 self.reactor.advance(10)232 self.assertEqual(1, len(self.page_getter.fetches))233 # After another 10 seconds we reach the 20 seconds interval and the234 # ping is triggered235 self.reactor.advance(10)236 self.assertEqual(2, len(self.page_getter.fetches))237 def test_change_url_after_start(self):238 """239 If the C{ping_url} set in the configuration is changed after the240 pinger has started, the target HTTP url will adjust accordingly.241 """242 url = "http://example.com/mysuperping"243 self.pinger.start()244 self.config.ping_url = url245 self.identity.insecure_id = 23246 self.reactor.advance(10)247 self.assertEqual(self.page_getter.fetches[0][0], url)248 def test_ping_doesnt_ping_if_stopped(self):249 """If the L{Pinger} is stopped, no pings are performed."""250 self.pinger.start()251 self.pinger.stop()252 self.reactor.advance(10)...
call_classify_spider.py
Source:call_classify_spider.py
1import csv2import json3import math4from icecream import ic5from bs4 import BeautifulSoup6from dao.MySqlConn import MyPymysqlPool7from spiders.page_getter import PageGetter8class Spider:9 """10 å
·ä½çç¬è«å¯¹è±¡11 """12 # ç¬ææçåç±»13 def get_classify(self, url):14 """15 ç¬åææçåç±»16 :param url:17 :return:18 """19 page = PageGetter().get_page(url=url)20 soup = BeautifulSoup(page, 'html.parser')21 classes_div = soup.select(".listL div")22 res = []23 for type in classes_div:24 type_soup = BeautifulSoup(str(type), 'html.parser')25 type_url = type_soup.select_one("a")['href']26 type_name = type_soup.select_one("nobr").text27 res.append({28 "type_url": f"http://12345.chengdu.gov.cn/{type_url}&page=",29 "type_name": type_name30 })31 return res32 # è·åä¸ä¸ªç±»åçåç±»ä¸ä¸å
±æå¤å°é¡µ33 def get_type_total_pages(self, url):34 """35 è·åä¸ä¸ªç±»åçåç±»ä¸ä¸å
±æå¤å°é¡µ36 :param url:37 :return:38 """39 ic(url)40 page = PageGetter().get_page(url=url)41 import re42 iRecCount = re.findall('var iRecCount = [0-9]*', page)43 news_count = int(str(iRecCount[0]).split("=")[1].strip())44 page_count = math.ceil(news_count / 15)45 return page_count46 # è·ååºç¡ä¿¡æ¯47 def get_call_base_info(self, url, type_name):48 page_getter = PageGetter()49 page = page_getter.get_page(url=url)50 soup = BeautifulSoup(page, 'html.parser')51 lis = soup.select("li.f12px")52 # è·åæ¯ä¸æ¡çµè¯ä¿¡æ¯53 for li in lis:54 a_soup = BeautifulSoup(str(li), 'html.parser')55 a = a_soup.select_one("a")56 cell = a_soup.select("div")57 item = {58 "url": f"http://12345.chengdu.gov.cn/{a['href']}",59 "call_title": cell[0].text, # æ¥çµæ é¢60 "handling_unit": cell[1].text, # åçåä½61 "status": cell[2].text, # ç¶æ62 "category": cell[3].text, # ç±»å«63 "visit_volume": cell[4].text, # 访é®é64 "tile": cell[5].text, # æ¶é´65 "type_name": type_name # åç±»å66 }67 # è·åçµè¯ä¿¡æ¯è¯¦æ
ï¼ä¹å°±æ¯ç¹è¿å»çå°æ¹68 res_item = self.get_call_detail_info(item, page_getter)69 # æç»æä¿åå°mysql70 self.save_in_mysql(item)71 # res.append(res_item)72 ic(res_item['call_title'])73 # æ ¹æ®æ é¢ç¬åä¸ä¸ªæ¥çµç详ç»ä¿¡æ¯74 def get_call_detail_info(self, item, page_getter):75 """76 æ ¹æ®æ é¢ç¬åä¸ä¸ªæ¥çµç详ç»ä¿¡æ¯77 :param item:78 :param page_getter:79 :return:80 """81 detail_page = page_getter.get_page(item['url'])82 detail_soup = BeautifulSoup(str(detail_page), 'html.parser')83 detail_cells = detail_soup.select(".tb .td2")84 # çµè¯å
容85 item['call_content'] = detail_cells[2].text86 # å¤çç»æ87 item['handle_result'] = detail_soup.select(".tb td")[-1].text88 return item89 # ä¿åç»æå°mysql90 # ä¿åç»æå°json91 def save_in_mysql(self, item):92 f = open("æ¿åºæ¥çµç»æ.json", 'a+', encoding='utf-8')93 json.dump(item, f, ensure_ascii=False)94 f.write(",\n")95 f.close()96 # mysql = MyPymysqlPool()97 # sql = "insert into call_info(url,call_title,handling_unit,status,category,visit_volume,tile,type_name,call_content,handle_result) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"98 # param = (item['url'], item['call_title'], item['handling_unit'], item['status'],99 # item['category'], item['visit_volume'], item['tile'],100 # item['type_name'], item['call_content'], item['handle_result'])...
base.py
Source:base.py
...42 cls._registered_pages.sort(43 key=lambda page: len(page.url), reverse=True)44 for page in pages:45 func_name = camel2snake(page.__name__)46 def page_getter(self, page=page):47 return page(self)48 page_getter.__name__ = func_name49 page_getter = property(cache(page_getter))50 setattr(cls, func_name, page_getter)51 return cls52 return wrapper53class App(object):54 """Web application."""55 _registered_pages = []56 def __init__(self, url, browser, *args, **kwgs):57 """Constructor."""58 self.app_url = url.strip('/')59 LOGGER.info('Start {!r} browser'.format(browser))60 self.webdriver = browsers[browser](*args, **kwgs)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!