Best Python code snippet using playwright-python
feed.py
Source:feed.py
...206 break207 if not systempost:208 self.sys_posts_count = len(sys_posts_content)209 return systempost210 def _on_page(self, data, add_bottom=True, **kwargs):211 if not data:212 self.loading = False213 return214 # only insert when getting next page -- i.e. adding from the bottom215 systempost = self._on_page_add_system_post() if add_bottom else None216 removed = removed_ds.data()217 systempost_place = 0218 for pdata in data:219 systempost_place += 1220 if systempost and systempost_place == 4:221 self._add_system_post(222 key='system_post_%s' % (self.sys_posts_count + 1),223 index=0,224 **systempost)225 post = pdata.get('post', None)226 hash = pdata.get('hash', None)227 if post:228 key = post.get('key', None)229 if key in removed:230 continue231 d = convert_json_from_unicode(post)232 d['hash'] = hash233 if not key:234 continue235 if self.stream_system and self.stream_system.key in d['channels']:236 print 'Got system post'237 self.stream_system.hash_bounds = hash238 if len(d['content']) > 2 and d['content'][0] == '#':239 print 'Systempost -- deep link'240 key = d['content'][1:]241 def on_get_deeplink(data, hash):242 print 'Got deep links data'243 if len(data) > 0:244 wrapper = {'hash': hash, 'post': data[0]}245 self._on_page([wrapper], add_bottom=False)246 from api.items import get as getPostData247 getPostData(keys=[key], on_items=on_get_deeplink)248 continue249 d['background'] = 'bkg.jpg'250 d['when'] = 'once'251 if d['role_text'].lower() == 'invite':252 d['action'] = 'invite'253 d['button'] = 'send invitation'254 elif d['role_text'].lower() == 'linkedin':255 d['action'] = 'linkedin'256 d['button'] = 'Login with linkedin'257 else:258 d['action'] = 'close'259 d['key'] = 'system_post_%s' % (self.sys_posts_count + 1)260 d['index'] = -1261 self._add_system_post(**d)262 continue263 if add_bottom:264 self.container.enqueue(key, d, index=0)265 else:266 self.container.enqueue(key, d, index=-1)267 self.loading = False268 def _on_updates(self, data, hash=None):269 self.last_update_hash = hash270 removed = removed_ds.data()271 for pdata in data:272 post = pdata.get('obj', None)273 if post:274 key = post.get('key', None)275 if key and key not in removed:276 self.container.enqueue(key, post)277 def _update(self, *largs):278 if not self.stream:279 return280 keys = self.container.get_widget_keys()281 self._get_next_page()282 args = {'hash': self.last_update_hash} if self.last_update_hash else {}283 get_updates(284 keys=keys,285 on_updates=self._on_updates,286 on_error=self.feed_network_error_message,287 **args)288 def _on_nextpage_getsystemposts(self, data, add_bottom=True):289 if self.stream_system and self.stream:290 if not self.stream_system.hash_bounds:291 self.stream_system.hash_bounds = self.stream.hash_bounds292 self.stream_system.get_next_page(293 on_page=partial(self._on_page, add_bottom=False),294 on_error=self.feed_network_error_message)295 self._on_page(data, add_bottom=add_bottom)296 def _get_page(self, *args):297 if self.stream:298 self.stream.get_current_page(299 on_page=self._on_nextpage_getsystemposts,300 on_error=self.feed_network_error_message)301 def _get_prev_page(self, *args):302 LogTestFairy('Feed prev page')303 if self.stream:304 self.stream.get_prev_page(305 on_page=self._on_page,306 on_error=self.feed_network_error_message)307 @staticmethod308 def feed_network_error_message(*args):309 return...
data_manager.py
Source:data_manager.py
1# This class will deal with the low-level data management, saving and loading to files.2import enum3import math4import os5import random6from typing import Any, Iterator, List, Tuple7import attr8import glob9import numpy as np10from go_space import exceptions11from . import datum_lib12Batch = Any # List[np.ndarray, np.ndarray]13Data = List[datum_lib.Datum]14PAGE_SIZE = 20015PAGES_IN_MEMORY = 1016class TrainTest(enum.Enum):17 TRAIN = 118 TEST = 219@attr.s20class Page(object):21 page_num: int = attr.ib()22 content: Data = attr.ib()23 def __len__(self) -> int:24 return len(self.content)25# TODO: Clean up26class DataManager(object):27 def __init__(self, tgt_dir):28 # TODO: Rename cursors to be include "write". These are a mess.29 self.page_cursor = -130 self.entry_cursor = 031 self.test_pages = set()32 self._page_cache = list()33 self.data_path = tgt_dir34 # Used in the course of generating batches35 self.reset()36 self._note_existing_pages()37 def _note_existing_pages(self) -> None:38 # Assumes files are written 1, 2, ..., n39 num_files = len(glob.glob(os.path.join(self.data_path, "*.txt")))40 self.page_cursor = num_files41 self.entry_cursor = 042 if os.path.exists(os.path.join(self.data_path, str(self.page_cursor) + ".txt")):43 self.entry_cursor = len(self._read_page(self.page_cursor))44 def _read_page(self, page_num: int) -> Page:45 if page_num > self.page_cursor:46 raise exceptions.DataException(f"Page {page_num} doesn't exist")47 # Check cache first48 for page in self._page_cache:49 if page.page_num == page_num:50 return page51 # Read with an LRU cache52 page_data = list()53 with open(os.path.join(self.data_path, str(page_num) + ".txt"), "r") as f:54 for line in f.readlines():55 page_data.append(datum_lib.Datum.from_json(line))56 page = Page(page_num=page_num, content=page_data)57 self._page_cache = [page] + self._page_cache58 self._page_cache = self._page_cache[:PAGES_IN_MEMORY]59 return page60 def _read_entry(self, page_num: int, entry_num: int) -> datum_lib.Datum:61 page = self._read_page(page_num)62 if entry_num >= len(page):63 raise exceptions.DataException(64 f"Trying to read entry {entry_num} off of page {page_num}, but entries only go to {len(page)-1}."65 )66 return page.content[entry_num]67 def _choose_next(self, data_split: TrainTest) -> Tuple[int, int]:68 # Pick a random page, then go through the data on that page in order. Subject to change, I suppose.69 def choose_new_page() -> int:70 if data_split == TrainTest.TRAIN:71 if len(self._read_train_pages | self.test_pages) == self.page_cursor:72 raise exceptions.DataException("Tried to read too many pages.")73 if data_split == TrainTest.TEST:74 if len(self._read_test_pages) == len(self.test_pages):75 raise exceptions.DataException("Tried to read too many pages.")76 def already_read(try_page: Page) -> bool:77 nonlocal data_split78 if data_split == TrainTest.TRAIN:79 return try_page in self._read_train_pages80 if data_split == TrainTest.TEST:81 return try_page in self._read_test_pages82 def wrong_data(try_page: Page) -> bool:83 nonlocal data_split84 if data_split == TrainTest.TRAIN:85 return try_page in self.test_pages86 if data_split == TrainTest.TEST:87 return try_page not in self.test_pages88 try_page = random.randint(0, self.page_cursor - 1)89 while already_read(try_page) or wrong_data(try_page):90 try_page = random.randint(0, self.page_cursor - 1)91 # Mark as read92 if data_split == TrainTest.TRAIN:93 self._read_train_pages.add(try_page)94 if data_split == TrainTest.TEST:95 self._read_test_pages.add(try_page)96 return try_page97 if self._on_page == -1:98 self._on_page = choose_new_page()99 self._read_cursor = 0100 if self._read_cursor >= len(self._read_page(self._on_page)):101 self._on_page = choose_new_page()102 self._read_cursor = 0103 result = (self._on_page, self._read_cursor)104 self._read_cursor += 1105 return result106 def size(self) -> int:107 return (self.page_cursor - 1) * PAGE_SIZE + self.entry_cursor108 def save_datum(self, datum: datum_lib.Datum) -> None:109 if self.page_cursor == -1 or self.entry_cursor == PAGE_SIZE:110 self.page_cursor += 1111 self.entry_cursor = 0112 with open(113 os.path.join(self.data_path, str(self.page_cursor) + ".txt"), "a"114 ) as f:115 f.write(datum.to_json() + "\n")116 self.entry_cursor += 1117 def train_test_split(self, portion_test: float) -> None:118 # Will split on a page level119 if len(self.test_pages) > 0:120 raise exceptions.DataException("Ran train_test_split multiple times.")121 if self.page_cursor == -1:122 raise exceptions.DataException("No data saved.")123 num_test_pages = math.ceil(self.page_cursor * portion_test)124 for page in random.sample(range(self.page_cursor), num_test_pages):125 self.test_pages.add(page)126 def get_batch(127 self, batch_size: int, data_split: TrainTest, reset: bool = True128 ) -> Batch:129 # Should be semi-random.130 if self.page_cursor == -1:131 raise exceptions.DataException("No data saved.")132 if reset:133 self.reset()134 features, targets = list(), list()135 for _ in range(batch_size):136 next_datum = self._read_entry(*self._choose_next(data_split))137 features.append(next_datum.np_feature())138 targets.append(next_datum.np_target())139 return np.stack(features, axis=0), np.stack(targets, axis=0)140 def reset(self) -> None:141 """Needs to be called between looping batches"""142 print("RESET")143 self._read_train_pages = set()144 self._read_test_pages = set()145 self._on_page = -1146 self._read_cursor = 0147 def generate_batches(148 self, batch_size: int, data_split: TrainTest149 ) -> Iterator[Batch]:150 while True:...
project_screen.py
Source:project_screen.py
...100 self._previous.setDisabled(self._on_first_page())101 self._next.setDisabled(self._on_last_page())102 self._pages_navigation.setCurrentIndex(index)103 def _on_last_page(self):104 return self._on_page(self._page_count - 1)105 def _on_first_page(self):106 return self._on_page(0)107 def _on_page(self, index):108 return self.current_page == index109 def _insert_page(self, widget, position):110 self._pages.insertWidget(position, widget)111 def _remove_page(self, number):112 page = self._pages.widget(number)113 self._pages.removeWidget(page)114 page.setParent(None)115 page.close()116 @property117 def current_page(self):118 return self._pages.currentIndex()119 @property120 def _page_count(self):121 return self._pages.count()...
dailypuzzle.py
Source:dailypuzzle.py
...19 def run(self):20 d = self._account.get('community/index.phtml')21 d.addCallback(self._on_page)22 return d23 def _on_page(self, page):24 form = page.find('form', attrs=self._POLL_FORM_ATTRS)25 if not form:26 self._logger.info('Puzzle is not available')27 return28 d = self._outside_browser.get(29 'http://www.jellyneo.net/?go=dailypuzzle')30 d.addCallback(self._on_answers_page)31 return d32 def _on_answers_page(self, page):33 page = BeautifulSoup(page)34 daily_answer = page.find(text=self._ANSWER_DATE_RE)35 if not daily_answer:36 raise PageParseError(page)37 answer_date = datetime.datetime.strptime(...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!