Best Python code snippet using playwright-python
test_selectors.py
Source:test_selectors.py
1import errno2import os3import random4import selectors5import signal6import socket7import sys8from test import support9from time import sleep10import unittest11import unittest.mock12import tempfile13from time import monotonic as time14try:15 import resource16except ImportError:17 resource = None18if hasattr(socket, 'socketpair'):19 socketpair = socket.socketpair20else:21 def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):22 with socket.socket(family, type, proto) as l:23 l.bind((support.HOST, 0))24 l.listen()25 c = socket.socket(family, type, proto)26 try:27 c.connect(l.getsockname())28 caddr = c.getsockname()29 while True:30 a, addr = l.accept()31 # check that we've got the correct client32 if addr == caddr:33 return c, a34 a.close()35 except OSError:36 c.close()37 raise38def find_ready_matching(ready, flag):39 match = []40 for key, events in ready:41 if events & flag:42 match.append(key.fileobj)43 return match44class BaseSelectorTestCase(unittest.TestCase):45 def make_socketpair(self):46 rd, wr = socketpair()47 self.addCleanup(rd.close)48 self.addCleanup(wr.close)49 return rd, wr50 def test_register(self):51 s = self.SELECTOR()52 self.addCleanup(s.close)53 rd, wr = self.make_socketpair()54 key = s.register(rd, selectors.EVENT_READ, "data")55 self.assertIsInstance(key, selectors.SelectorKey)56 self.assertEqual(key.fileobj, rd)57 self.assertEqual(key.fd, rd.fileno())58 self.assertEqual(key.events, selectors.EVENT_READ)59 self.assertEqual(key.data, "data")60 # register an unknown event61 self.assertRaises(ValueError, s.register, 0, 999999)62 # register an invalid FD63 self.assertRaises(ValueError, s.register, -10, selectors.EVENT_READ)64 # register twice65 self.assertRaises(KeyError, s.register, rd, selectors.EVENT_READ)66 # register the same FD, but with a different object67 self.assertRaises(KeyError, s.register, rd.fileno(),68 selectors.EVENT_READ)69 def test_unregister(self):70 s = self.SELECTOR()71 self.addCleanup(s.close)72 rd, wr = self.make_socketpair()73 s.register(rd, selectors.EVENT_READ)74 s.unregister(rd)75 # unregister an unknown file obj76 self.assertRaises(KeyError, s.unregister, 999999)77 # unregister twice78 self.assertRaises(KeyError, s.unregister, rd)79 def test_unregister_after_fd_close(self):80 s = self.SELECTOR()81 self.addCleanup(s.close)82 rd, wr = self.make_socketpair()83 r, w = rd.fileno(), wr.fileno()84 s.register(r, selectors.EVENT_READ)85 s.register(w, selectors.EVENT_WRITE)86 rd.close()87 wr.close()88 s.unregister(r)89 s.unregister(w)90 @unittest.skipUnless(os.name == 'posix', "requires posix")91 def test_unregister_after_fd_close_and_reuse(self):92 s = self.SELECTOR()93 self.addCleanup(s.close)94 rd, wr = self.make_socketpair()95 r, w = rd.fileno(), wr.fileno()96 s.register(r, selectors.EVENT_READ)97 s.register(w, selectors.EVENT_WRITE)98 rd2, wr2 = self.make_socketpair()99 rd.close()100 wr.close()101 os.dup2(rd2.fileno(), r)102 os.dup2(wr2.fileno(), w)103 self.addCleanup(os.close, r)104 self.addCleanup(os.close, w)105 s.unregister(r)106 s.unregister(w)107 def test_unregister_after_socket_close(self):108 s = self.SELECTOR()109 self.addCleanup(s.close)110 rd, wr = self.make_socketpair()111 s.register(rd, selectors.EVENT_READ)112 s.register(wr, selectors.EVENT_WRITE)113 rd.close()114 wr.close()115 s.unregister(rd)116 s.unregister(wr)117 def test_modify(self):118 s = self.SELECTOR()119 self.addCleanup(s.close)120 rd, wr = self.make_socketpair()121 key = s.register(rd, selectors.EVENT_READ)122 # modify events123 key2 = s.modify(rd, selectors.EVENT_WRITE)124 self.assertNotEqual(key.events, key2.events)125 self.assertEqual(key2, s.get_key(rd))126 s.unregister(rd)127 # modify data128 d1 = object()129 d2 = object()130 key = s.register(rd, selectors.EVENT_READ, d1)131 key2 = s.modify(rd, selectors.EVENT_READ, d2)132 self.assertEqual(key.events, key2.events)133 self.assertNotEqual(key.data, key2.data)134 self.assertEqual(key2, s.get_key(rd))135 self.assertEqual(key2.data, d2)136 # modify unknown file obj137 self.assertRaises(KeyError, s.modify, 999999, selectors.EVENT_READ)138 # modify use a shortcut139 d3 = object()140 s.register = unittest.mock.Mock()141 s.unregister = unittest.mock.Mock()142 s.modify(rd, selectors.EVENT_READ, d3)143 self.assertFalse(s.register.called)144 self.assertFalse(s.unregister.called)145 def test_modify_unregister(self):146 # Make sure the fd is unregister()ed in case of error on147 # modify(): http://bugs.python.org/issue30014148 if self.SELECTOR.__name__ == 'EpollSelector':149 patch = unittest.mock.patch(150 'selectors.EpollSelector._selector_cls')151 elif self.SELECTOR.__name__ == 'PollSelector':152 patch = unittest.mock.patch(153 'selectors.PollSelector._selector_cls')154 elif self.SELECTOR.__name__ == 'DevpollSelector':155 patch = unittest.mock.patch(156 'selectors.DevpollSelector._selector_cls')157 else:158 raise self.skipTest("")159 with patch as m:160 m.return_value.modify = unittest.mock.Mock(161 side_effect=ZeroDivisionError)162 s = self.SELECTOR()163 self.addCleanup(s.close)164 rd, wr = self.make_socketpair()165 s.register(rd, selectors.EVENT_READ)166 self.assertEqual(len(s._map), 1)167 with self.assertRaises(ZeroDivisionError):168 s.modify(rd, selectors.EVENT_WRITE)169 self.assertEqual(len(s._map), 0)170 def test_close(self):171 s = self.SELECTOR()172 self.addCleanup(s.close)173 mapping = s.get_map()174 rd, wr = self.make_socketpair()175 s.register(rd, selectors.EVENT_READ)176 s.register(wr, selectors.EVENT_WRITE)177 s.close()178 self.assertRaises(RuntimeError, s.get_key, rd)179 self.assertRaises(RuntimeError, s.get_key, wr)180 self.assertRaises(KeyError, mapping.__getitem__, rd)181 self.assertRaises(KeyError, mapping.__getitem__, wr)182 def test_get_key(self):183 s = self.SELECTOR()184 self.addCleanup(s.close)185 rd, wr = self.make_socketpair()186 key = s.register(rd, selectors.EVENT_READ, "data")187 self.assertEqual(key, s.get_key(rd))188 # unknown file obj189 self.assertRaises(KeyError, s.get_key, 999999)190 def test_get_map(self):191 s = self.SELECTOR()192 self.addCleanup(s.close)193 rd, wr = self.make_socketpair()194 keys = s.get_map()195 self.assertFalse(keys)196 self.assertEqual(len(keys), 0)197 self.assertEqual(list(keys), [])198 key = s.register(rd, selectors.EVENT_READ, "data")199 self.assertIn(rd, keys)200 self.assertEqual(key, keys[rd])201 self.assertEqual(len(keys), 1)202 self.assertEqual(list(keys), [rd.fileno()])203 self.assertEqual(list(keys.values()), [key])204 # unknown file obj205 with self.assertRaises(KeyError):206 keys[999999]207 # Read-only mapping208 with self.assertRaises(TypeError):209 del keys[rd]210 def test_select(self):211 s = self.SELECTOR()212 self.addCleanup(s.close)213 rd, wr = self.make_socketpair()214 s.register(rd, selectors.EVENT_READ)215 wr_key = s.register(wr, selectors.EVENT_WRITE)216 result = s.select()217 for key, events in result:218 self.assertTrue(isinstance(key, selectors.SelectorKey))219 self.assertTrue(events)220 self.assertFalse(events & ~(selectors.EVENT_READ |221 selectors.EVENT_WRITE))222 self.assertEqual([(wr_key, selectors.EVENT_WRITE)], result)223 def test_context_manager(self):224 s = self.SELECTOR()225 self.addCleanup(s.close)226 rd, wr = self.make_socketpair()227 with s as sel:228 sel.register(rd, selectors.EVENT_READ)229 sel.register(wr, selectors.EVENT_WRITE)230 self.assertRaises(RuntimeError, s.get_key, rd)231 self.assertRaises(RuntimeError, s.get_key, wr)232 def test_fileno(self):233 s = self.SELECTOR()234 self.addCleanup(s.close)235 if hasattr(s, 'fileno'):236 fd = s.fileno()237 self.assertTrue(isinstance(fd, int))238 self.assertGreaterEqual(fd, 0)239 def test_selector(self):240 s = self.SELECTOR()241 self.addCleanup(s.close)242 NUM_SOCKETS = 12243 MSG = b" This is a test."244 MSG_LEN = len(MSG)245 readers = []246 writers = []247 r2w = {}248 w2r = {}249 for i in range(NUM_SOCKETS):250 rd, wr = self.make_socketpair()251 s.register(rd, selectors.EVENT_READ)252 s.register(wr, selectors.EVENT_WRITE)253 readers.append(rd)254 writers.append(wr)255 r2w[rd] = wr256 w2r[wr] = rd257 bufs = []258 while writers:259 ready = s.select()260 ready_writers = find_ready_matching(ready, selectors.EVENT_WRITE)261 if not ready_writers:262 self.fail("no sockets ready for writing")263 wr = random.choice(ready_writers)264 wr.send(MSG)265 for i in range(10):266 ready = s.select()267 ready_readers = find_ready_matching(ready,268 selectors.EVENT_READ)269 if ready_readers:270 break271 # there might be a delay between the write to the write end and272 # the read end is reported ready273 sleep(0.1)274 else:275 self.fail("no sockets ready for reading")276 self.assertEqual([w2r[wr]], ready_readers)277 rd = ready_readers[0]278 buf = rd.recv(MSG_LEN)279 self.assertEqual(len(buf), MSG_LEN)280 bufs.append(buf)281 s.unregister(r2w[rd])282 s.unregister(rd)283 writers.remove(r2w[rd])284 self.assertEqual(bufs, [MSG] * NUM_SOCKETS)285 @unittest.skipIf(sys.platform == 'win32',286 'select.select() cannot be used with empty fd sets')287 def test_empty_select(self):288 # Issue #23009: Make sure EpollSelector.select() works when no FD is289 # registered.290 s = self.SELECTOR()291 self.addCleanup(s.close)292 self.assertEqual(s.select(timeout=0), [])293 def test_timeout(self):294 s = self.SELECTOR()295 self.addCleanup(s.close)296 rd, wr = self.make_socketpair()297 s.register(wr, selectors.EVENT_WRITE)298 t = time()299 self.assertEqual(1, len(s.select(0)))300 self.assertEqual(1, len(s.select(-1)))301 self.assertLess(time() - t, 0.5)302 s.unregister(wr)303 s.register(rd, selectors.EVENT_READ)304 t = time()305 self.assertFalse(s.select(0))306 self.assertFalse(s.select(-1))307 self.assertLess(time() - t, 0.5)308 t0 = time()309 self.assertFalse(s.select(1))310 t1 = time()311 dt = t1 - t0312 # Tolerate 2.0 seconds for very slow buildbots313 self.assertTrue(0.8 <= dt <= 2.0, dt)314 @unittest.skipUnless(hasattr(signal, "alarm"),315 "signal.alarm() required for this test")316 def test_select_interrupt_exc(self):317 s = self.SELECTOR()318 self.addCleanup(s.close)319 rd, wr = self.make_socketpair()320 class InterruptSelect(Exception):321 pass322 def handler(*args):323 raise InterruptSelect324 orig_alrm_handler = signal.signal(signal.SIGALRM, handler)325 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)326 try:327 signal.alarm(1)328 s.register(rd, selectors.EVENT_READ)329 t = time()330 # select() is interrupted by a signal which raises an exception331 with self.assertRaises(InterruptSelect):332 s.select(30)333 # select() was interrupted before the timeout of 30 seconds334 self.assertLess(time() - t, 5.0)335 finally:336 signal.alarm(0)337 @unittest.skipUnless(hasattr(signal, "alarm"),338 "signal.alarm() required for this test")339 def test_select_interrupt_noraise(self):340 s = self.SELECTOR()341 self.addCleanup(s.close)342 rd, wr = self.make_socketpair()343 orig_alrm_handler = signal.signal(signal.SIGALRM, lambda *args: None)344 self.addCleanup(signal.signal, signal.SIGALRM, orig_alrm_handler)345 try:346 signal.alarm(1)347 s.register(rd, selectors.EVENT_READ)348 t = time()349 # select() is interrupted by a signal, but the signal handler doesn't350 # raise an exception, so select() should by retries with a recomputed351 # timeout352 self.assertFalse(s.select(1.5))353 self.assertGreaterEqual(time() - t, 1.0)354 finally:355 signal.alarm(0)356class ScalableSelectorMixIn:357 # see issue #18963 for why it's skipped on older OS X versions358 @support.requires_mac_ver(10, 5)359 @unittest.skipUnless(resource, "Test needs resource module")360 def test_above_fd_setsize(self):361 # A scalable implementation should have no problem with more than362 # FD_SETSIZE file descriptors. Since we don't know the value, we just363 # try to set the soft RLIMIT_NOFILE to the hard RLIMIT_NOFILE ceiling.364 soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)365 try:366 resource.setrlimit(resource.RLIMIT_NOFILE, (hard, hard))367 self.addCleanup(resource.setrlimit, resource.RLIMIT_NOFILE,368 (soft, hard))369 NUM_FDS = min(hard, 2**16)370 except (OSError, ValueError):371 NUM_FDS = soft372 # guard for already allocated FDs (stdin, stdout...)373 NUM_FDS -= 32374 s = self.SELECTOR()375 self.addCleanup(s.close)376 for i in range(NUM_FDS // 2):377 try:378 rd, wr = self.make_socketpair()379 except OSError:380 # too many FDs, skip - note that we should only catch EMFILE381 # here, but apparently *BSD and Solaris can fail upon connect()382 # or bind() with EADDRNOTAVAIL, so let's be safe383 self.skipTest("FD limit reached")384 try:385 s.register(rd, selectors.EVENT_READ)386 s.register(wr, selectors.EVENT_WRITE)387 except OSError as e:388 if e.errno == errno.ENOSPC:389 # this can be raised by epoll if we go over390 # fs.epoll.max_user_watches sysctl391 self.skipTest("FD limit reached")392 raise393 try:394 fds = s.select()395 except OSError as e:396 if e.errno == errno.EINVAL and sys.platform == 'darwin':397 # unexplainable errors on macOS don't need to fail the test398 self.skipTest("Invalid argument error calling poll()")399 raise400 self.assertEqual(NUM_FDS // 2, len(fds))401class DefaultSelectorTestCase(BaseSelectorTestCase):402 SELECTOR = selectors.DefaultSelector403class SelectSelectorTestCase(BaseSelectorTestCase):404 SELECTOR = selectors.SelectSelector405@unittest.skipUnless(hasattr(selectors, 'PollSelector'),406 "Test needs selectors.PollSelector")407class PollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):408 SELECTOR = getattr(selectors, 'PollSelector', None)409@unittest.skipUnless(hasattr(selectors, 'EpollSelector'),410 "Test needs selectors.EpollSelector")411class EpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):412 SELECTOR = getattr(selectors, 'EpollSelector', None)413 def test_register_file(self):414 # epoll(7) returns EPERM when given a file to watch415 s = self.SELECTOR()416 with tempfile.NamedTemporaryFile() as f:417 with self.assertRaises(IOError):418 s.register(f, selectors.EVENT_READ)419 # the SelectorKey has been removed420 with self.assertRaises(KeyError):421 s.get_key(f)422@unittest.skipUnless(hasattr(selectors, 'KqueueSelector'),423 "Test needs selectors.KqueueSelector)")424class KqueueSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):425 SELECTOR = getattr(selectors, 'KqueueSelector', None)426 def test_register_bad_fd(self):427 # a file descriptor that's been closed should raise an OSError428 # with EBADF429 s = self.SELECTOR()430 bad_f = support.make_bad_fd()431 with self.assertRaises(OSError) as cm:432 s.register(bad_f, selectors.EVENT_READ)433 self.assertEqual(cm.exception.errno, errno.EBADF)434 # the SelectorKey has been removed435 with self.assertRaises(KeyError):436 s.get_key(bad_f)437@unittest.skipUnless(hasattr(selectors, 'DevpollSelector'),438 "Test needs selectors.DevpollSelector")439class DevpollSelectorTestCase(BaseSelectorTestCase, ScalableSelectorMixIn):440 SELECTOR = getattr(selectors, 'DevpollSelector', None)441def test_main():442 tests = [DefaultSelectorTestCase, SelectSelectorTestCase,443 PollSelectorTestCase, EpollSelectorTestCase,444 KqueueSelectorTestCase, DevpollSelectorTestCase]445 support.run_unittest(*tests)446 support.reap_children()447if __name__ == "__main__":...
stores.py
Source:stores.py
1from selenium import webdriver2from selenium.common.exceptions import TimeoutException3from selenium.webdriver.support import expected_conditions as EC4from selenium.webdriver.common.by import By5from selenium.webdriver.common.keys import Keys6from selenium.webdriver.support.ui import WebDriverWait7import loc_data8import aux_func9def academy(obj):10 price_selectors = {"input#dlItemPrice":"value",}11 sale_selectors = {"span#currentPrice":"innerHTML",}12 broken_link_selectors = {"p#search_results_total_count":"innerHTML"}13 try:14 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)15 except:16 obj.log("Failed to aquire pricing data")17 #No third party18 #check out of stock19 try:20 try:21 oos = obj._driver.find_element_by_css_selector("button#add2CartBtn").get_attribute("innerHTML")22 except:23 oos = "in stock"24 if "Out of Stock" in oos:25 obj.set_out_of_stock()26 except:27 obj.log("Out of stock check failed")28 finally:29 try:30 obj.kill_driver()31 except:32 obj.log("Error Deleting Driver")33 return34def acehardware(obj):35 price_selectors = {"div.productPrice span script":"innerHTML",}36 try:37 obj.pricing(price_selectors)38 except:39 obj.log("Failed to aquire pricing data")40 #No third party41 #No out of stock42 finally:43 try:44 obj.kill_driver()45 except:46 obj.log("Error Deleting Driver")47 return48def autozone(obj):49 price_selectors = {"table.part-pricing-info td.price.base-price" : "innerText",}50 sale_selectors = {"span.price.light-gray>strong":"innerHTML"}51 broken_link_selectors = {"":""}52 loc_ins="loc_data.autozone(self)"53 try:54 obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)55 except:56 obj.log("Failed to acquire pricing data")57 #No third party58 #out of stock check59 try:60 if obj._find_data("div.button-bar-msg-out-of-stock","innerText|||Not Available"):61 obj.set_out_of_stock()62 except:63 obj.log("out of stock check failed")64 finally:65 try:66 obj.kill_driver()67 except:68 obj.log("Error Deleting Driver")69 return70def basspro(obj):71 loc_ins = """72bpsku = loc_data.basspro(obj)73for p,value in price_dict.iteritems():74price_dict[p.format(bpsku)] = price_dict.pop(p)75for p,value in sale_dict.iteritems():76sale_dict[p.format(bpsku)] = sale_dict.pop(p)"""77 price_selectors = {"div.top.namePartPriceContainer span[itemprop=price]" : "innerHTML","span#listPrice_{}.old_price":"innerHTML",\78 "span#offerPrice_{} > span":"innerHTML","div[itemprop=offers]>span[itemprop=price]":"content"}79 sale_selectors = {"span#offerPrice_{}.price.sale > span":"innerHTML"}80 broken_link_selectors = {"":""}81 try:82 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)83 except:84 obj.log("Failed to acqure pricing data")85 finally:86 try:87 obj.kill_driver()88 except:89 obj.log("Error Deleting Driver")90 return91def blain(obj):92 price_selectors = {"meta[itemprop=lowprice]":"content",\93 "div.active-price>div.price>span":"innerHTML",\94 "div.original-price>span.price>span":"innerHTML"}95 sale_selectors = {"div.active-price.promo > div.price > span:not([class])":"innerHTML",}96 broken_link_selectors = {"div.list-header-text > span":"innerHTML"}97 try:98 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)99 except:100 obj.log("Failed to acquire pricing data")101 #No third party102 try:103 if not obj._find_data("span.stock-msg.in-stock"):104 obj.set_out_of_stock()105 except:106 obj.log("Out of stock check failed")107 finally:108 try:109 obj.kill_driver()110 except:111 obj.log("Error Deleting Driver")112 return113def bootbarn(obj):114 price_selectors = {"span.price-original.price-holder-alt":"innerHTML",\115 "h6.product-callout-title > strong":"innerHTML"}116 sale_selectors = {"h6.product-callout-title > strong":"innerHTML"}117 broken_link_selectors = {"":""}118 try:119 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)120 except:121 obj.log("Failed to aquire pricing data")122 #no third Party123 #no out of stock124 finally:125 try:126 obj.kill_driver()127 except:128 obj.log("Error Deleting Driver")129 return130def cabela(obj):131 price_selectors = {"dd.regularnprange":"innerHTML",\132 "div.price > dl > dd.nprange":"innerHTML",\133 "div.price > dl > dd.prange":"innerHTML"}134 sale_selectors = {"dd.saleprice":"innerHTML"}135 broken_link_selectors = {"":""}136 try:137 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)138 except:139 obj.log("Failed to acquire pricing data")140 finally:141 try:142 obj.kill_driver()143 except:144 obj.log("Error Deleting Driver")145 return146def Chewy(obj):147 price_selectors = {"span.ga-eec__price" : "innerHTML",}148 sale_selectors = {"p.autoship-pricing" : "innerHTML"}149 broken_link_selectors = {"":""}150 try:151 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)152 except:153 raise154 obj.log("Failed to acquire pricing data")155 #No third party156 try:157 if obj._find_data("div#availability span.unavailable"):158 obj.set_out_of_stock()159 except:160 obj.log("Out of Stock check failed")161 finally:162 try:163 obj.kill_driver()164 except:165 obj.log("Error Deleting Driver")166 return167def dickeybub(obj):168 price_selectors = {"p.price > del > span.woocommerce-Price-amount.amount" : "innerHTML",\169 "p.price > span.woocommerce-Price-amount.amount" : "innerHTML",}170 sale_selectors = {"p.price > ins > span.woocommerce-Price-amount.amount" : "innerHTML",}171 try:172 obj.pricing(price_selectors,sale_selectors,)173 except:174 obj.log("Failed to acquire pricing data")175 finally:176 try:177 obj.kill_driver()178 except:179 obj.log("Error Deleting Driver")180 return181def farm_and_home(obj):182 if obj.comp_id == 25:183 loc_ins = "loc_data.farm_and_home(self,10)"184 elif obj.comp_id == 4:185 loc_ins = "loc_data.farm_and_home(self,2)"186 price_selectors = {"div.product-main-info > div.price-box > span.regular-price > span.price" : "innerHTML", "div.product-info-price span.price" : "innerHTML"}187 sale_selectors = {}188 broken_link_selectors = {}189 try:190 obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)191 except:192 obj.log("Failed to acquire pricing data")193 finally:194 try:195 obj.kill_driver()196 except:197 obj.log("Error Deleting Driver")198 #obj.log("Competitor: %d not yet defined" %obj.comp_id)199 #obj.set_undefined()200 return201def home_depot(obj):202 if obj.comp_id == 23:203 loc_ins = "loc_data.home_depot(self,62226)"204 elif obj.comp_id == 5:205 loc_ins = "loc_data.home_depot(self,63028)"206 elif obj.comp_id == 17:207 loc_ins = "loc_data.home_depot(self,62650)"208 price_selectors = {"input#ciItemPrice":"value","span#ajaxPriceStrikeThru":"innerHTML","span#ajaxPriceAlt":"innerHTML",\209 "span#ajaxPrice":"content"}210 sale_selectors = {"span#ajaxPrice":"content"}211 broken_link_selectors = {"div.buybelt__flex-wrapper.buybelt__store-wrapper span.u__text--danger":"innerHTML|||Unavailable",\212 "div#productinfo_ctn > div.error >p":"innerHTML|||not currently available"}213 try:214 obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)215 except:216 obj.log("Failed to acquire pricing data")217 #Out of stock check218 try:219#Checking for hidden field that indicates availability220 try:221 if obj._find_data("input#availableInLocalStore","value|||false"):222 obj.set_out_of_stock()223 except:224 pass225#Check for if the quantity box under Pick Up In Store is zero226 try:227 WebDriverWait(obj._driver, 10).until(EC.presence_of_element_located((By.CSS_SELECTOR, "div.buybelt__box")))228 if '0' == str(obj._retrieve_data("span.quantity","innerHTML")):229 obj.set_out_of_stock()230 except TimeoutException as error:231 pass232 except:233 obj.log("Out of stock check failed")234 finally:235 try:236 obj.kill_driver()237 except:238 obj.log("Error Deleting Driver")239 return240def lowes(obj):241 if obj.comp_id == 6:242 loc_ins = "loc_data.lowes(self,63028,'Festus')"243 elif obj.comp_id == 15:244 loc_ins = "loc_data.lowes(self,63701,'Cape Girardeau')"245 elif obj.comp_id == 16:246 loc_ins = "loc_data.lowes(self,62704,'Springfield')"247 elif obj.comp_id == 24:248 loc_ins = "loc_data.lowes(self,62221,'Belleville')"249 price_selectors = {"input[name=productId]":"data-productprice","span.secondary-text.small-type.art-pd-wasPriceLbl":"innerHTML",\250 "span[itemprop=price]":"content","span.primary-font.jumbo.strong.art-pd-price":"innerHTML"}251 sale_selectors = {"span.primary-font.jumbo.strong.art-pd-contractPricing":"innerHTML"}252 broken_link_selectors = {"div.alert.alert-warning i.icon-error-outline.red":"",\253 " div.pd-shipping-delivery.met-fulfillment-delivery.grid-50.tablet-grid-50 div.media-body > p":"innerHTML|||unavailable"}254 try:255 obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)256 except:257 raise258 obj.log("Failed to acquire pricing data")259 #Out of stock check260 try:261 if obj._find_data("div.fulfillment-method div.media-body >p","innerHTML|||Unavailable"):262 obj.set_out_of_stock()263 except:264 obj.log("Out of stock check failed")265 finally:266 try:267 obj.kill_driver()268 except:269 obj.log("Error Deleting Driver")270 return271def menards(obj):272 if obj.comp_id == 7:273 loc_ins = "loc_data.menards(self,'3286')"274 elif obj.comp_id == 26:275 loc_ins = "loc_data.menards(self,'3334')"276 elif obj.comp_id == 27:277 loc_ins = "loc_data.menards(self,'3293')"278 price_selectors = {"span.bargainStrike" : "innerHTML",\279 "span.EDLP.fontSize16.fontBold.alignRight" : "innerHTML",\280 "span#totalItemPriceFloater" : "innerHTML",281 "span.finalPriceSpan.leftFloat":"innerText" }282 sale_selectors = {"span.bargainPrice" : "innerHTML", \283 "span#totalItemPriceFloater" : "innerHTML",284 "span.finalPriceSpan.leftFloat":"innerText"}285 broken_link_selectors = {"h3.resettitle":"innerHTML"}286 try:287 obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)288 except:289 obj.log("Failed to acquire pricing data")290 finally:291 try:292 obj.kill_driver()293 except:294 obj.log("Error Deleting Driver")295 return296def orscheln(obj):297 price_selectors = {"span.product_unit_price" : "innerHTML","meta[itemprop=price]" : "content"}298 sale_selectors = {"":""}299 broken_link_selectors = {"":""}300 try:301 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)302 except:303 obj.log("Failed to acquire pricing data")304 #No third party305 #No out of stock306 try:307 if obj._find_data("div.product-info-stock-sku","innerHTML|||Out of Stock Online"):308 obj.set_out_of_stock()309 except:310 obj.log("Out of Stock check failed")311 finally:312 try:313 obj.kill_driver()314 except:315 obj.log("Error Deleting Driver")316 return317def petsense(obj):318 obj.log("Competitor: %d not yet defined" %obj.comp_id)319 obj.set_undefined()320 return321 price_selectors = {"div#product_price span.money" : "innerHTML",}322 sale_selectors = {"":""}323 broken_link_selectors = {"div.selector-wrapper>select.single-option-selector":"innerHTML"}324 try:325 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)326 except:327 obj.log("Failed to acquire pricing data")328 #No third party329 #No out of stock330 finally:331 try:332 obj.kill_driver()333 except:334 obj.log("Error Deleting Driver")335 return336def ruralking(obj):337 price_selectors = {"meta[itemprop=price]":"content","meta[property='product:price:amount']":"content","span.price" : "innerHTML"}338 #"span[itemprop=offers] > span[itemprop=price]":"innerHTML"}339 sale_selectors = {"":""}340 broken_link_selectors = {"div.bluefoot-row.bluefoot-structural.with-media-background.not-found-404-page span" : "innerHTML|||SORRY",\341 "div.page-head-alt >h2":"innerHTML|||Sorry","div.page-head-alt >h3":"innerHTML|||Sorry"}342 try:343 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)344 except:345 obj.log("Failed to acquire pricing data")346 try:347 if "OUT OF STOCK" in obj._retrieve_data("div.product-shop h1[style]","innerHTML"):348 obj.set_out_of_stock()349 elif obj._find_data("p.prod_availability >span.backorder"):350 obj.set_out_of_stock()351 except:352 obj.log("Out of stock check failed")353 finally:354 try:355 obj.kill_driver()356 except:357 obj.log("Error Deleting Driver")358 return359def sears(obj):360 price_selectors = {"span.price-wrapper":"innerHTML"}361 sale_selectors = {"h4.redSalePrice span.price-wrapper":"innerHTML"}362 broken_link_selectors = {"":""}363 try:364 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)365 except:366 obj.log("Failed to acquire pricing data")367 #No Third Party368 #No out of Stock369 finally:370 try:371 obj.kill_driver()372 except:373 obj.log("Error Deleting Driver")374 return375def shelper(obj):376 price_selectors = {"div.product-content-inner > div.product-price > span.price-original.price-holder-alt > strong" : "innerHTML",}377 sale_selectors = {"div.product-content-inner > div.product-callout > h6.product-callout-title > strong" : "innerHTML",}378 broken_link_selectors = {"":""}379 try:380 obj.pricing(price_selectors,sale_selectors)381 except:382 obj.log("Failed to acquire pricing data")383 #No third party384 #No out of stock385 finally:386 try:387 obj.kill_driver()388 except:389 obj.log("Error Deleting Driver")390 return391def tsc(obj):392 #view in cart item393 # https://www.tractorsupply.com/tsc/product/jonsered-502cc-gas-chainsaw-cs2250s?cm_vc=-10005394 if obj.comp_id == 73:395 loc_ins = "loc_data.tsc(self,'63049')"396 elif obj.comp_id == 74:397 loc_ins = "loc_data.tsc(self,'63701')"398 elif obj.comp_id == 8:399 loc_ins = "loc_data.tsc(self,'63640')"400 elif obj.comp_id == 124:401 loc_ins = "loc_data.tsc(self,'63801')"402 price_selectors = {"div.was_save_sku span.was_text":"innerHTML","span.dollar_price":"innerHTML"}403 sale_selectors = {"span.dollar_price":"innerHTML"}404 broken_link_selectors = {"div#WC_GenericError_6.info":"innerHTML","div#UnpubProductErrormsg":"innerHTML|||selection below"}405 try:406 obj.pricing(price_selectors,sale_selectors,broken_link_selectors,loc_ins)407 except:408 obj.log("Failed to acquire pricing data")409 #no third party410 #no out of stock411 finally:412 try:413 obj.kill_driver()414 except:415 obj.log("Error Deleting Driver")416 return417def valleyvet(obj):418 obj.log("Competitor: %d not yet defined" %obj.comp_id)419 obj.set_undefined()420 return421def walmart(obj):422 price_selectors = {"span.price-characteristic[itemprop=price]":"content","div.Price-old.display-inline-block.arrange-fill.font-normal.u-textNavyBlue.display-inline > span.Price-group" : "title",\423 "div.prod-BotRow.prod-showBottomBorder.prod-OfferSection.prod-OfferSection-twoPriceDisplay div.Grid-col:nth-child(4) span.Price-group" : "title",\424 "span.display-inline-block.arrange-fit.Price.Price-enhanced.u-textNavyBlue > span.Price-group" : "title",\425 "span.display-inline-block.arrange-fit.Price.Price-enhanced.u-textGray > span.Price-group" : "title",}426 broken_link_selectors = {"div.font-semibold.prod-Bot-partial-head" : "innerHTML",\427 "p.error-ErrorPage-copy":"innerHTML"}428 sale_selectors = {}429 try:430 obj.pricing(price_selectors,sale_selectors,broken_link_selectors)431 except:432 obj.log("Failed to aquire pricing data")433 #check for Third party434 try:435 if not obj._find_data("a[data-tl-id=ProductSellerInfo-SellerName]","innerHTML|||Walmart"):436 sellers = obj._driver.find_elements_by_css_selector("div.marketplace-body")437 for sell in sellers:438 if sell.find_element_by_css_selector("span.seller-shipping-msg.u-textBlue").get_attribute("innerHTML").encode('utf-8') == 'Walmart':439 obj.set_price(aux_func.clean(sell.find_element_by_css_selector("span.Price-group").get_attribute('title')))440 break441 else:442 obj.set_third_party()443 except:444 obj.log("Third party check failed")445 #check Out of stock446 try:447 try:448 oos = obj._driver.find_element_by_css_selector("div.prod-ProductOffer-oosMsg.prod-PaddingTop--xxs > span").get_attribute("innerHTML")449 except:450 oos = "in stock"451 if "Out of stock" in oos:452 obj.set_out_of_stock()453 except:454 obj.log("Out of stock check failed")455 finally:456 try:457 obj.kill_driver()458 except:459 obj.log("Error Deleting Driver")460 return461def _default(obj):462 obj.log("Unknown Competitor ID")463 obj.set_undefined()...
flowablebaseandselectors.py
Source:flowablebaseandselectors.py
...79 # is untouched.80 if isinstance(result.right, IdentitySeqMapInfo):81 # extend right selector with the observable that maps the selector_base to base82 if isinstance(result.left, ObservableSeqMapInfo):83 selector_map = ObservableSeqMapInfo(merge_selectors(84 result.left.observable,85 selector_obs,86 subscribe_scheduler=subscriber.scheduler,87 stack=stack,88 ))89 if other.selectors is not None:90 def gen_new_selectors():91 for key, val in other.selectors.items():92 selector = merge_selectors(93 left=merge_selectors(94 left=val,95 right=result.left.observable,96 subscribe_scheduler=subscriber.scheduler,97 stack=stack,98 ),99 right=selector_obs,100 subscribe_scheduler=subscriber.scheduler,101 stack=stack,102 )103 yield key, selector104 selectors = dict(gen_new_selectors())105 else:106 selectors = None107 elif isinstance(result.left, IdentitySeqMapInfo):108 selector_map = ObservableSeqMapInfo(selector_obs)109 if other.selectors is not None:110 def gen_new_selectors():111 for key, val in other.selectors.items():112 selector = merge_selectors(113 left=val,114 right=selector_obs,115 subscribe_scheduler=subscriber.scheduler,116 stack=stack,117 )118 yield key, selector119 selectors = dict(gen_new_selectors())120 else:121 selectors = None122 else:123 raise Exception(f'illegal result "{result.left}"')124 if selectors is not None or self.selectors is not None:125 if selectors is not None:126 if self.selectors is not None:127 selectors = {**self.selectors, **selectors}128 else:129 selectors = selectors130 else:131 selectors = self.selectors132 return FlowableBaseAndSelectors(self.base, selectors), result.right, selector_map133 return None134 def match_with(135 self,136 other: 'FlowableBaseAndSelectors',137 subscriber: Subscriber,138 stack: List[FrameSummary],139 ) -> Optional[FlowableBaseAndSelectorsMatch]:140 # bases are of type Optional[Base], therefore check first if base is not None141 if self.base is not None and other.base is not None:142 result = self.base.match_with(other.base, subscriber=subscriber, stack=stack)143 # this BaseAndSelectors and the other BaseAndSelectors match directly with144 # their bases145 if isinstance(result, FlowableBaseMatch):146 # the selectors can be taken over to the new base selector tuple147 if self.selectors is not None and other.selectors is not None:148 if self.selectors is not None:149 if other.selectors is None:150 selectors = self.selectors151 else:152 selectors = {**self.selectors, **other.selectors}153 else:154 selectors = other.selectors155 else:156 selectors = {}157 return FlowableBaseAndSelectorsMatch(158 base_selectors=FlowableBaseAndSelectors(159 base=result.base,160 selectors=selectors,161 ),162 left=result.left,163 right=result.right,164 )165 result = self.get_selector_maps(166 other=other,167 subscriber=subscriber,168 stack=stack,169 )170 if result is not None:171 base_selectors, left_selector_map, right_selector_map = result172 return FlowableBaseAndSelectorsMatch(173 left=left_selector_map,174 right=right_selector_map,175 base_selectors=base_selectors,176 )177 result = other.get_selector_maps(178 other=self,179 subscriber=subscriber,180 stack=stack,181 )182 if result is not None:183 base_selectors, right_selector_map, left_selector_map = result184 return FlowableBaseAndSelectorsMatch(185 left=left_selector_map,186 right=right_selector_map,187 base_selectors=base_selectors,188 )189 # two bases B1, B2 defined in selectors match, the following map is created190 # - define a new anonymous base B_match191 # - define two selectors that map a sequence with base B1 and B2 to base B_match192 # in case B1 and B2 match with identity, only define no selector193 # - define two selectors that map the two sequences of the match operator to base B_match194 if self.selectors is not None and other.selectors is not None:195 # check if some tuple of bases matches196 for sel_base_1, sel_observable_1 in self.selectors.items():197 for sel_base_2, sel_observable_2 in other.selectors.items():198 result = sel_base_1.match_with(199 sel_base_2,200 subscriber=subscriber,201 stack=stack,202 )203 # if two bases match ...204 if isinstance(result, FlowableBaseMatch):205 # once right is completed, keep consuming left side until it is completed as well206 def request_left(left, right):207 return not (isinstance(left, SelectCompleted) and isinstance(right, SelectNext))208 def request_right(left, right):209 return not (isinstance(right, SelectCompleted) and isinstance(left, SelectNext))210 if isinstance(result.left, IdentitySeqMapInfo) and isinstance(result.right, IdentitySeqMapInfo):211 merge_sel = RefCountObservable(212 source=ControlledZipObservable(213 left=sel_observable_1, #init_debug_observable(sel_observable_1, name='d1', stack=stack, subscriber=subscriber),214 right=sel_observable_2, #init_debug_observable(sel_observable_2, name='d1', stack=stack, subscriber=subscriber),215 request_left=request_left,216 request_right=request_right,217 match_func=lambda _, __: True,218 scheduler=subscriber.scheduler,219 stack=stack,220 ), #name='d2', stack=stack, subscriber=subscriber),221 subject=PublishObservableSubject(),222 subscribe_scheduler=subscriber.subscribe_scheduler,223 stack=stack,224 )225 def left_selector(t):226 if isinstance(t[1], SelectNext):227 return [select_next, select_completed]228 else:229 return [select_completed]230 def right_selector(t):231 if isinstance(t[0], SelectNext):232 return [select_next, select_completed]233 else:234 return [select_completed]235 left_sel = RefCountObservable(236 source=MapToIteratorObservable(237 source=FilterObservable(238 source=merge_sel,239 predicate=lambda t: isinstance(t[0], SelectNext),240 ),241 func=left_selector,242 ),243 subject=PublishObservableSubject(),244 subscribe_scheduler=subscriber.subscribe_scheduler,245 stack=stack,246 )247 right_sel = RefCountObservable(248 source=MapToIteratorObservable(249 source=FilterObservable(250 source=merge_sel,251 predicate=lambda t: isinstance(t[1], SelectNext),252 ),253 func=right_selector,254 ),255 subject=PublishObservableSubject(),256 subscribe_scheduler=subscriber.scheduler,257 stack=stack,258 )259 right_selector_map = MapObservable(260 source=FilterObservable(261 source=merge_sel,262 predicate=lambda t: type(t[0]) == type(t[1]),263 ),264 func=lambda t: t[0],265 )266 return FlowableBaseAndSelectorsMatch(267 left=ObservableSeqMapInfo(left_sel),268 right=ObservableSeqMapInfo(right_sel),269 base_selectors=FlowableBaseAndSelectors(270 base=None,271 selectors={272 sel_base_1: right_selector_map,273 **{k: merge_selectors(v, left_sel, subscribe_scheduler=subscriber.scheduler, stack=stack) for k, v in274 self.selectors.items() if k != sel_base_1},275 **{k: merge_selectors(v, right_sel, subscribe_scheduler=subscriber.scheduler, stack=stack) for k, v in276 other.selectors.items() if k != sel_base_2}277 },278 ),279 )...
test_baseselectortuple.py
Source:test_baseselectortuple.py
...93 # #94 # #95 # # t1 = BaseAndSelectors(base=TestBase(self.num_base_1), selectors={self.num_base_2: self.sel2})96 # # t2 = BaseAndSelectors(base=self.num_base_2, selectors={self.num_base_3: self.sel3})97 # # result: Optional[BaseSelectorAndSelectorMaps] = t1.get_selectors(t2, self.subscriber)98 # #99 # # self.assertIsInstance(result, BaseSelectorAndSelectorMaps)100 # # self.assertIsInstance(result.right, IdentitySelectorMap)101 # # self.assertIsInstance(result.left, IdentitySelectorMap)102 # # self.assertIsInstance(result.base_selectors.base, NumericalBase)103 # # self.assertEqual(1, result.base_selectors.base.num)104 # # self.assertEqual({2: None, 3: None}, result.base_selectors.selectors)105 # def test_not_matching(self):106 # t1 = BaseAndSelectors(base=NumericalBase(1))107 # t2 = BaseAndSelectors(base=NumericalBase(2))108 # result: Optional[BaseSelectorsAndSelectorMaps] = t1.match_with(t2, self.subscriber)...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!