Best Python code snippet using playwright-python
in_memory_document_session_operations.py
Source:in_memory_document_session_operations.py
...1529 self, timeout: datetime.timedelta1530 ) -> InMemoryDocumentSessionOperations.ReplicationWaitOptsBuilder:1531 self.get_options().replication_options.wait_for_indexes_timeout = timeout1532 return self1533 def throw_on_timeout(self, should_throw: bool) -> InMemoryDocumentSessionOperations.ReplicationWaitOptsBuilder:1534 self.get_options().replication_options.throw_on_timeout_in_wait_for_replicas = should_throw1535 return self1536 def number_of_replicas(self, replicas: int) -> InMemoryDocumentSessionOperations.ReplicationWaitOptsBuilder:1537 self.get_options().replication_options.number_of_replicas_to_wait_for = replicas1538 return self1539 def majority(self, wait_for_majority: bool) -> InMemoryDocumentSessionOperations.ReplicationWaitOptsBuilder:1540 self.get_options().replication_options.majority = wait_for_majority1541 return self1542 class IndexesWaitOptsBuilder:1543 def __init__(self, session: InMemoryDocumentSessionOperations):1544 self.__session = session1545 def get_options(self) -> BatchOptions:1546 if self.__session._save_changes_options is None:1547 self.__session._save_changes_options = BatchOptions()1548 if self.__session._save_changes_options.index_options is None:1549 self.__session._save_changes_options.index_options = IndexBatchOptions()1550 return self.__session._save_changes_options1551 def with_timeout(self, timeout: datetime.timedelta) -> InMemoryDocumentSessionOperations.IndexesWaitOptsBuilder:1552 self.get_options().index_options.wait_for_indexes_timeout = timeout1553 return self1554 def throw_on_timeout(self, should_throw: bool) -> InMemoryDocumentSessionOperations.IndexesWaitOptsBuilder:1555 self.get_options().index_options.throw_on_timeout_in_wait_for_replicas = should_throw1556 return self1557 def wait_for_indexes(self, *indexes: str) -> InMemoryDocumentSessionOperations.IndexesWaitOptsBuilder:1558 self.get_options().index_options.wait_for_indexes = indexes...
webdriver.py
Source:webdriver.py
1from __future__ import absolute_import2import base643import contextlib4import time5import typing as tp6from PIL import Image7from selenium.common.exceptions import WebDriverException8from selenium.webdriver.common.by import By9from selenium.webdriver.remote.switch_to import SwitchTo10from selenium.webdriver.remote.webdriver import WebDriver11from selenium.webdriver.support.wait import WebDriverWait12from applitools.core import logger13from applitools.core.errors import EyesError14from applitools.core.geometry import Point, Region15from applitools.utils import cached_property, image_utils, general_utils16from . import eyes_selenium_utils, StitchMode17from .positioning import ElementPositionProvider, build_position_provider_for, ScrollPositionProvider18from .webelement import EyesWebElement19from .frames import Frame, FrameChain20if tp.TYPE_CHECKING:21 from applitools.core.scaling import ScaleProvider22 from applitools.utils.custom_types import Num, ViewPort, FrameReference, AnyWebDriver, AnyWebElement23 from .eyes import Eyes24class FrameResolver(object):25 def __init__(self, frame_ref, driver):26 # Find the frame's location and add it to the current driver offset27 if isinstance(frame_ref, str):28 frame_eyes_webelement = driver.find_element_by_name(frame_ref)29 elif isinstance(frame_ref, int):30 frame_elements_list = driver.find_elements_by_css_selector('frame, iframe')31 frame_eyes_webelement = frame_elements_list[frame_ref]32 elif isinstance(frame_ref, EyesWebElement):33 frame_eyes_webelement = frame_ref34 else:35 # It must be a WebElement36 frame_eyes_webelement = EyesWebElement(frame_ref, driver)37 self.eyes_webelement = frame_eyes_webelement38 self.webelement = frame_eyes_webelement.element39class _EyesSwitchTo(object):40 """41 Wraps a selenium "SwitchTo" object, so we can keep track of switching between frames.42 It has name EyesTargetLocator in other SDK's43 """44 # TODO: Make more similar to EyesTargetLocator45 _READONLY_PROPERTIES = ['alert', 'active_element']46 PARENT_FRAME = 147 def __init__(self, driver, switch_to):48 # type: (EyesWebDriver, SwitchTo) -> None49 """50 Ctor.51 :param driver: EyesWebDriver instance.52 :param switch_to: Selenium switchTo object.53 """54 self._switch_to = switch_to55 self._driver = driver56 self._scroll_position = ScrollPositionProvider(driver)57 general_utils.create_proxy_interface(self, switch_to, self._READONLY_PROPERTIES)58 @contextlib.contextmanager59 def frame_and_back(self, frame_reference):60 # type: (FrameReference) -> tp.Generator61 self.frame(frame_reference)62 yield63 self.parent_frame()64 def frame(self, frame_reference):65 # type: (FrameReference) -> None66 """67 Switch to a given frame.68 :param frame_reference: The reference to the frame.69 """70 frame = FrameResolver(frame_reference, self._driver)71 self.will_switch_to_frame(frame.eyes_webelement)72 self._switch_to.frame(frame.webelement)73 def frames(self, frame_chain):74 # type: (FrameChain) -> None75 """76 Switches to the frames one after the other.77 :param frame_chain: A list of frames.78 """79 self._switch_to.default_content()80 for frame in frame_chain:81 self.frame(frame.reference)82 def default_content(self):83 # type: () -> None84 """85 Switch to default content.86 """87 self._driver.frame_chain.clear()88 self._switch_to.default_content()89 def parent_frame(self):90 """91 Switch to parent frame.92 """93 frames = self._driver.frame_chain94 if frames:95 frames.pop()96 try:97 self._switch_to.parent_frame()98 except WebDriverException as e:99 self._switch_to.default_content()100 for frame in frames:101 self._switch_to.frame(frame.reference)102 def window(self, window_name):103 # type: (tp.Text) -> None104 """105 Switch to window.106 """107 self._driver.frame_chain.clear()108 self._switch_to.window(window_name)109 def will_switch_to_frame(self, target_frame):110 # type: (EyesWebElement) -> None111 """112 Will be called before switching into a frame.113 :param target_frame: The element about to be switched to.114 """115 assert target_frame is not None116 pl = target_frame.location117 size_and_borders = target_frame.size_and_borders118 borders = size_and_borders.borders119 frame_inner_size = size_and_borders.size120 content_location = Point(pl['x'] + borders['left'], pl['y'] + borders['top'])121 original_location = self._scroll_position.get_current_position()122 self._driver.scroll_to(content_location)123 frame = Frame(target_frame, content_location, target_frame.size, frame_inner_size,124 parent_scroll_position=original_location)125 self._driver.frame_chain.push(frame)126class EyesWebDriver(object):127 """128 A wrapper for selenium web driver which creates wrapped elements, and notifies us about129 events / actions.130 """131 # Properties require special handling since even testing if they're callable "activates"132 # them, which makes copying them automatically a problem.133 _READONLY_PROPERTIES = ['application_cache', 'current_url', 'current_window_handle',134 'desired_capabilities', 'log_types', 'name', 'page_source', 'title',135 'window_handles', 'switch_to', 'mobile', 'current_context', 'context',136 'current_activity', 'network_connection', 'available_ime_engines',137 'active_ime_engine', 'device_time', 'w3c', 'contexts', 'current_package',138 # Appium specific139 'battery_info', 'location']140 _SETTABLE_PROPERTIES = ['orientation', 'file_detector']141 # This should pretty much cover all scroll bars (and some fixed position footer elements :) ).142 _MAX_SCROLL_BAR_SIZE = 50143 _MIN_SCREENSHOT_PART_HEIGHT = 10144 def __init__(self, driver, eyes, stitch_mode=StitchMode.Scroll):145 # type: (WebDriver, Eyes, tp.Text) -> None146 """147 Ctor.148 :param driver: remote WebDriver instance.149 :param eyes: A Eyes sdk instance.150 :param stitch_mode: How to stitch a page (default is with scrolling).151 """152 self.driver = driver153 self._eyes = eyes154 self._origin_position_provider = build_position_provider_for(StitchMode.Scroll, driver)155 self._position_provider = build_position_provider_for(stitch_mode, driver)156 # tp.List of frames the user switched to, and the current offset, so we can properly157 # calculate elements' coordinates158 self._frame_chain = FrameChain()159 self._default_content_viewport_size = None # type: tp.Optional[ViewPort]160 self.driver_takes_screenshot = driver.capabilities.get('takesScreenshot', False)161 # Creating the rest of the driver interface by simply forwarding it to the underlying162 # driver.163 general_utils.create_proxy_interface(self, driver,164 self._READONLY_PROPERTIES + self._SETTABLE_PROPERTIES)165 for attr in self._READONLY_PROPERTIES:166 if not hasattr(self.__class__, attr):167 setattr(self.__class__, attr, general_utils.create_proxy_property(attr, 'driver'))168 for attr in self._SETTABLE_PROPERTIES:169 if not hasattr(self.__class__, attr):170 setattr(self.__class__, attr, general_utils.create_proxy_property(attr, 'driver', True))171 def get_display_rotation(self):172 # type: () -> int173 """174 Get the rotation of the screenshot.175 :return: The rotation of the screenshot we get from the webdriver in (degrees).176 """177 if self.platform_name == 'Android' and self.driver.orientation == "LANDSCAPE":178 return -90179 return 0180 def get_platform_name(self):181 return self.platform_name182 def get_platform_version(self):183 return self.platform_version184 @cached_property185 def platform_name(self):186 # type: () -> tp.Optional[tp.Text]187 return self.driver.desired_capabilities.get('platformName', None)188 @cached_property189 def platform_version(self):190 # type: () -> tp.Optional[tp.Text]191 return self.driver.desired_capabilities.get('platformVersion', None)192 @cached_property193 def browser_version(self):194 # type: () -> tp.Optional[float]195 caps = self.driver.capabilities196 version = caps.get('browserVersion', caps.get('version', None))197 if version:198 # convert version that has few dots in to float number (e.g. Edge 1.23.45)199 if version.find('.') != -1:200 version = float(version[:version.index('.') + 2])201 else:202 version = float(version)203 return version204 @cached_property205 def browser_name(self):206 # type: () -> tp.Optional[tp.Text]207 caps = self.driver.capabilities208 return caps.get('browserName', caps.get('browser', None))209 @cached_property210 def user_agent(self):211 try:212 user_agent = self.driver.execute_script("return navigator.userAgent")213 logger.info("user agent: {}".format(user_agent))214 except Exception as e:215 logger.info("Failed to obtain user-agent string")216 user_agent = None217 return user_agent218 def is_mobile_device(self):219 # type: () -> bool220 """221 Returns whether the platform running is a mobile device or not.222 :return: True if the platform running the test is a mobile platform. False otherwise.223 """224 return eyes_selenium_utils.is_mobile_device(self.driver)225 def get(self, url):226 # type: (tp.Text) -> tp.Optional[tp.Any]227 """228 Navigates the driver to the given url.229 :param url: The url to navigate to.230 :return: A driver that navigated to the given url.231 """232 # We're loading a new page, so the frame location resets233 self._frame_chain.clear()234 return self.driver.get(url)235 def find_element(self, by=By.ID, value=None):236 # type: (tp.Text, tp.Text) -> EyesWebElement237 """238 Returns a WebElement denoted by "By".239 :param by: By which option to search for (default is by ID).240 :param value: The value to search for.241 :return: A element denoted by "By".242 """243 # Get result from the original implementation of the underlying driver.244 result = self.driver.find_element(by, value)245 # Wrap the element.246 if result:247 result = EyesWebElement(result, self)248 return result249 def find_elements(self, by=By.ID, value=None):250 # type: (tp.Text, tp.Text) -> tp.List[EyesWebElement]251 """252 Returns a list of web elements denoted by "By".253 :param by: By which option to search for (default is by ID).254 :param value: The value to search for.255 :return: List of elements denoted by "By".256 """257 # Get result from the original implementation of the underlying driver.258 results = self.driver.find_elements(by, value)259 # Wrap all returned elements.260 if results:261 updated_results = []262 for element in results:263 updated_results.append(EyesWebElement(element, self))264 results = updated_results265 return results266 def find_element_by_id(self, id_):267 # type: (tp.Text) -> EyesWebElement268 """269 Finds an element by id.270 :params id_: The id of the element to be found.271 """272 return self.find_element(by=By.ID, value=id_)273 def find_elements_by_id(self, id_):274 # type: (tp.Text) -> tp.List[EyesWebElement]275 """276 Finds multiple elements by id.277 :param id_: The id of the elements to be found.278 """279 return self.find_elements(by=By.ID, value=id_)280 def find_element_by_xpath(self, xpath):281 # type: (tp.Text) -> EyesWebElement282 """283 Finds an element by xpath.284 :param xpath: The xpath locator of the element to find.285 """286 return self.find_element(by=By.XPATH, value=xpath)287 def find_elements_by_xpath(self, xpath):288 # type: (tp.Text) -> tp.List[EyesWebElement]289 """290 Finds multiple elements by xpath.291 :param xpath: The xpath locator of the elements to be found.292 """293 return self.find_elements(by=By.XPATH, value=xpath)294 def find_element_by_link_text(self, link_text):295 # type: (tp.Text) -> EyesWebElement296 """297 Finds an element by link text.298 :param link_text: The text of the element to be found.299 """300 return self.find_element(by=By.LINK_TEXT, value=link_text)301 def find_elements_by_link_text(self, text):302 # type: (tp.Text) -> tp.List[EyesWebElement]303 """304 Finds elements by link text.305 :param text: The text of the elements to be found.306 """307 return self.find_elements(by=By.LINK_TEXT, value=text)308 def find_element_by_partial_link_text(self, link_text):309 # type: (tp.Text) -> EyesWebElement310 """311 Finds an element by a partial match of its link text.312 :param link_text: The text of the element to partially match on.313 """314 return self.find_element(by=By.PARTIAL_LINK_TEXT, value=link_text)315 def find_elements_by_partial_link_text(self, link_text):316 # type: (tp.Text) -> tp.List[EyesWebElement]317 """318 Finds elements by a partial match of their link text.319 :param link_text: The text of the element to partial match on.320 """321 return self.find_elements(by=By.PARTIAL_LINK_TEXT, value=link_text)322 def find_element_by_name(self, name):323 # type: (tp.Text) -> EyesWebElement324 """325 Finds an element by name.326 :param name: The name of the element to find.327 """328 return self.find_element(by=By.NAME, value=name)329 def find_elements_by_name(self, name):330 # type: (tp.Text) -> tp.List[EyesWebElement]331 """332 Finds elements by name.333 :param name: The name of the elements to find.334 """335 return self.find_elements(by=By.NAME, value=name)336 def find_element_by_tag_name(self, name):337 # type: (tp.Text) -> EyesWebElement338 """339 Finds an element by tag name.340 :param name: The tag name of the element to find.341 """342 return self.find_element(by=By.TAG_NAME, value=name)343 def find_elements_by_tag_name(self, name):344 # type: (tp.Text) -> tp.List[EyesWebElement]345 """346 Finds elements by tag name.347 :param name: The tag name to use when finding elements.348 """349 return self.find_elements(by=By.TAG_NAME, value=name)350 def find_element_by_class_name(self, name):351 # type: (tp.Text) -> EyesWebElement352 """353 Finds an element by class name.354 :param name: The class name of the element to find.355 """356 return self.find_element(by=By.CLASS_NAME, value=name)357 def find_elements_by_class_name(self, name):358 # type: (tp.Text) -> tp.List[EyesWebElement]359 """360 Finds elements by class name.361 :param name: The class name of the elements to find.362 """363 return self.find_elements(by=By.CLASS_NAME, value=name)364 def find_element_by_css_selector(self, css_selector):365 # type: (tp.Text) -> EyesWebElement366 """367 Finds an element by css selector.368 :param css_selector: The css selector to use when finding elements.369 """370 return self.find_element(by=By.CSS_SELECTOR, value=css_selector)371 def find_elements_by_css_selector(self, css_selector):372 # type: (tp.Text) -> tp.List[EyesWebElement]373 """374 Finds elements by css selector.375 :param css_selector: The css selector to use when finding elements.376 """377 return self.find_elements(by=By.CSS_SELECTOR, value=css_selector)378 def get_screenshot_as_base64(self):379 # type: () -> tp.Text380 """381 Gets the screenshot of the current window as a base64 encoded string382 which is useful in embedded images in HTML.383 """384 screenshot64 = self.driver.get_screenshot_as_base64()385 display_rotation = self.get_display_rotation()386 if display_rotation != 0:387 logger.info('Rotation required.')388 # num_quadrants = int(-(display_rotation / 90))389 logger.debug('Done! Creating image object...')390 screenshot = image_utils.image_from_base64(screenshot64)391 # rotating392 if display_rotation == -90:393 screenshot64 = image_utils.get_base64(screenshot.rotate(90))394 logger.debug('Done! Rotating...')395 return screenshot64396 def get_screesnhot_as_base64_from_main_frame(self):397 # type: () -> tp.Text398 """399 Make screenshot from main frame400 """401 original_frame = self.frame_chain.clone()402 self.switch_to.default_content()403 screenshot64 = self.get_screenshot_as_base64()404 self.switch_to.frames(original_frame)405 return screenshot64406 def extract_full_page_width(self):407 # type: () -> int408 """409 Extracts the full page width.410 :return: The width of the full page.411 """412 # noinspection PyUnresolvedReferences413 default_scroll_width = int(round(self.driver.execute_script(414 "return document.documentElement.scrollWidth")))415 body_scroll_width = int(round(self.driver.execute_script("return document.body.scrollWidth")))416 return max(default_scroll_width, body_scroll_width)417 def extract_full_page_height(self):418 # type: () -> int419 """420 Extracts the full page height.421 :return: The height of the full page.422 IMPORTANT: Notice there's a major difference between scrollWidth and scrollHeight.423 While scrollWidth is the maximum between an element's width and its content width,424 scrollHeight might be smaller(!) than the clientHeight, which is why we take the425 maximum between them.426 """427 # noinspection PyUnresolvedReferences428 default_client_height = int(round(self.driver.execute_script(429 "return document.documentElement.clientHeight")))430 # noinspection PyUnresolvedReferences431 default_scroll_height = int(round(self.driver.execute_script(432 "return document.documentElement.scrollHeight")))433 # noinspection PyUnresolvedReferences434 body_client_height = int(round(self.driver.execute_script("return document.body.clientHeight")))435 # noinspection PyUnresolvedReferences436 body_scroll_height = int(round(self.driver.execute_script("return document.body.scrollHeight")))437 max_document_element_height = max(default_client_height, default_scroll_height)438 max_body_height = max(body_client_height, body_scroll_height)439 return max(max_document_element_height, max_body_height)440 def get_current_position(self):441 # type: () -> Point442 """443 Extracts the current scroll position from the browser.444 :return: The scroll position.445 """446 return self._origin_position_provider.get_current_position()447 def scroll_to(self, point):448 # type: (Point) -> None449 """450 Commands the browser to scroll to a given position.451 :param point: The point to scroll to.452 """453 self._origin_position_provider.set_position(point)454 def get_entire_page_size(self):455 # type: () -> tp.Dict[tp.Text, int]456 """457 Extracts the size of the current page from the browser using Javascript.458 :return: The page width and height.459 """460 return {'width': self.extract_full_page_width(),461 'height': self.extract_full_page_height()}462 def set_overflow(self, overflow, stabilization_time=None):463 # type: (tp.Text, tp.Optional[int]) -> tp.Text464 """465 Sets the overflow of the current context's document element.466 :param overflow: The overflow value to set. If the given value is None, then overflow will be set to467 undefined.468 :param stabilization_time: The time to wait for the page to stabilize after overflow is set. If the value is469 None, then no waiting will take place. (Milliseconds)470 :return: The previous overflow value.471 """472 logger.debug("Setting overflow: %s" % overflow)473 if overflow is None:474 script = "var origOverflow = document.documentElement.style.overflow; " \475 "document.documentElement.style.overflow = undefined; " \476 "return origOverflow;"477 else:478 script = "var origOverflow = document.documentElement.style.overflow; " \479 "document.documentElement.style.overflow = \"{0}\"; " \480 "return origOverflow;".format(overflow)481 # noinspection PyUnresolvedReferences482 original_overflow = self.driver.execute_script(script)483 logger.debug("Original overflow: %s" % original_overflow)484 if stabilization_time is not None:485 time.sleep(stabilization_time / 1000)486 eyes_selenium_utils.add_data_overflow_to_element(self.driver, None, original_overflow)487 return original_overflow488 def wait_for_page_load(self, timeout=3, throw_on_timeout=False):489 # type: (int, bool) -> None490 """491 Waits for the current document to be "loaded".492 :param timeout: The maximum time to wait, in seconds.493 :param throw_on_timeout: Whether to throw an exception when timeout is reached.494 """495 # noinspection PyBroadException496 try:497 WebDriverWait(self.driver, timeout) \498 .until(lambda driver: driver.execute_script('return document.readyState') == 'complete')499 except Exception:500 logger.debug('Page load timeout reached!')501 if throw_on_timeout:502 raise503 def hide_scrollbars(self):504 # type: () -> tp.Text505 """506 Hides the scrollbars of the current context's document element.507 :return: The previous value of the overflow property (could be None).508 """509 logger.debug('HideScrollbars() called. Waiting for page load...')510 self.wait_for_page_load()511 logger.debug('About to hide scrollbars')512 return self.set_overflow('hidden')513 @property514 def frame_chain(self):515 """516 Gets the frame chain.517 :return: A list of Frame instances which represents the path to the current frame.518 This can later be used as an argument to _EyesSwitchTo.frames().519 """520 return self._frame_chain521 def get_viewport_size(self):522 # type: () -> ViewPort523 """524 Returns:525 The viewport size of the current frame.526 """527 return eyes_selenium_utils.get_viewport_size(self)528 def get_default_content_viewport_size(self, force_query=False):529 # type: (bool) -> ViewPort530 """531 Gets the viewport size.532 :return: The viewport size of the most outer frame.533 """534 if self._default_content_viewport_size and not force_query:535 return self._default_content_viewport_size536 current_frames = self.frame_chain.clone()537 # If we're inside a frame, then we should first switch to the most outer frame.538 # Optimization539 if current_frames:540 self.switch_to.default_content()541 self._default_content_viewport_size = eyes_selenium_utils.get_viewport_size_or_display_size(self.driver)542 if current_frames:543 self.switch_to.frames(current_frames)544 return self._default_content_viewport_size545 def reset_origin(self):546 # type: () -> None547 """548 Reset the origin position to (0, 0).549 :raise EyesError: Couldn't scroll to position (0, 0).550 """551 self._origin_position_provider.push_state()552 self._origin_position_provider.set_position(Point(0, 0))553 current_scroll_position = self._origin_position_provider.get_current_position()554 if current_scroll_position.x != 0 or current_scroll_position.y != 0:555 self._origin_position_provider.pop_state()556 raise EyesError("Couldn't scroll to the top/left part of the screen!")557 def restore_origin(self):558 # type: () -> None559 """560 Restore the origin position.561 """562 self._origin_position_provider.pop_state()563 def save_position(self):564 """565 Saves the position in the _position_provider list.566 """567 self._position_provider.push_state()568 def restore_position(self):569 """570 Restore the position.571 """572 self._position_provider.pop_state()573 @staticmethod574 def _wait_before_screenshot(seconds):575 logger.debug("Waiting {} ms before taking screenshot..".format(int(seconds * 1000)))576 time.sleep(seconds)577 logger.debug("Finished waiting!")578 def get_full_page_screenshot(self, wait_before_screenshots, scale_provider):579 # type: (Num, ScaleProvider) -> Image.Image580 """581 Gets a full page screenshot.582 :param wait_before_screenshots: Seconds to wait before taking each screenshot.583 :return: The full page screenshot.584 """585 logger.info('getting full page screenshot..')586 # Saving the current frame reference and moving to the outermost frame.587 original_frame = self.frame_chain588 self.switch_to.default_content()589 self.reset_origin()590 entire_page_size = self.get_entire_page_size()591 # Starting with the screenshot at 0,0592 EyesWebDriver._wait_before_screenshot(wait_before_screenshots)593 part64 = self.get_screenshot_as_base64()594 screenshot = image_utils.image_from_bytes(base64.b64decode(part64))595 scale_provider.update_scale_ratio(screenshot.width)596 pixel_ratio = 1.0 / scale_provider.scale_ratio597 need_to_scale = True if pixel_ratio != 1.0 else False598 if need_to_scale:599 screenshot = image_utils.scale_image(screenshot, 1.0 / pixel_ratio)600 # IMPORTANT This is required! Since when calculating the screenshot parts for full size,601 # we use a screenshot size which is a bit smaller (see comment below).602 if (screenshot.width >= entire_page_size['width']) and \603 (screenshot.height >= entire_page_size['height']):604 self.restore_origin()605 self.switch_to.frames(original_frame)606 logger.debug("Entire page has size as screenshot")607 return screenshot608 # We use a smaller size than the actual screenshot size in order to eliminate duplication609 # of bottom scroll bars, as well as footer-like elements with fixed position.610 screenshot_part_size = {'width': screenshot.width,611 'height': max(screenshot.height - self._MAX_SCROLL_BAR_SIZE,612 self._MIN_SCREENSHOT_PART_HEIGHT)}613 logger.debug("Total size: {0}, Screenshot part size: {1}".format(entire_page_size,614 screenshot_part_size))615 entire_page = Region(0, 0, entire_page_size['width'], entire_page_size['height'])616 screenshot_parts = entire_page.get_sub_regions(screenshot_part_size)617 # Starting with the screenshot we already captured at (0,0).618 stitched_image = Image.new('RGBA', (entire_page.width, entire_page.height))619 stitched_image.paste(screenshot, box=(0, 0))620 self.save_position()621 for part in screenshot_parts:622 # Since we already took the screenshot for 0,0623 if part.left == 0 and part.top == 0:624 logger.debug('Skipping screenshot for 0,0 (already taken)')625 continue626 logger.debug("Taking screenshot for {0}".format(part))627 # Scroll to the part's top/left and give it time to stabilize.628 self._position_provider.set_position(Point(part.left, part.top))629 # self.scroll_to(Point(part.left, part.top))630 EyesWebDriver._wait_before_screenshot(wait_before_screenshots)631 # Since screen size might cause the scroll to reach only part of the way632 current_scroll_position = self._position_provider.get_current_position()633 logger.debug("Scrolled To ({0},{1})".format(current_scroll_position.x,634 current_scroll_position.y))635 part64 = self.get_screenshot_as_base64()636 part_image = image_utils.image_from_bytes(base64.b64decode(part64))637 if need_to_scale:638 part_image = image_utils.scale_image(part_image, 1.0 / pixel_ratio)639 stitched_image.paste(part_image, box=(current_scroll_position.x, current_scroll_position.y))640 self.restore_position()641 self.restore_origin()642 self.switch_to.frames(original_frame)643 return stitched_image644 def get_stitched_screenshot(self, element_region, wait_before_screenshots, scale_provider):645 # type: (Region, int, ScaleProvider) -> Image.Image646 """647 Gets a stitched screenshot for specific element648 :param element_region: Region of required screenshot649 :param wait_before_screenshots: Seconds to wait before taking each screenshot.650 :param scale_provider: Scale image if needed.651 :return: The full element screenshot.652 """653 logger.info('getting stitched element screenshot..')654 self._position_provider = self._eyes._element_position_provider655 entire_size = self._position_provider.get_entire_size()656 logger.debug("Element region: {}".format(element_region))657 # Firefox 60 and above make a screenshot of the current frame when other browsers658 # make a screenshot of the viewport. So we scroll down to frame at _will_switch_to method659 # and add a left margin here.660 # TODO: Refactor code. Use EyesScreenshot661 if self._frame_chain:662 if ((self.browser_name == 'firefox' and self.browser_version < 60.0)663 or self.browser_name in ('chrome', 'MicrosoftEdge', 'internet explorer', 'safari')):664 element_region.left += int(self._frame_chain.peek.location.x)665 screenshot_part_size = {'width': element_region.width,666 'height': max(element_region.height - self._MAX_SCROLL_BAR_SIZE,667 self._MIN_SCREENSHOT_PART_HEIGHT)}668 entire_element = Region(0, 0, entire_size['width'], entire_size['height'])669 screenshot_parts = entire_element.get_sub_regions(screenshot_part_size)670 viewport = self.get_viewport_size()671 screenshot = image_utils.image_from_bytes(base64.b64decode(self.get_screenshot_as_base64()))672 scale_provider.update_scale_ratio(screenshot.width)673 pixel_ratio = 1 / scale_provider.scale_ratio674 need_to_scale = True if pixel_ratio != 1.0 else False675 if need_to_scale:676 element_region = element_region.scale(scale_provider.device_pixel_ratio)677 # Starting with element region size part of the screenshot. Use it as a size template.678 stitched_image = Image.new('RGBA', (entire_element.width, entire_element.height))679 for part in screenshot_parts:680 logger.debug("Taking screenshot for {0}".format(part))681 # Scroll to the part's top/left and give it time to stabilize.682 self._position_provider.set_position(Point(part.left, part.top))683 EyesWebDriver._wait_before_screenshot(wait_before_screenshots)684 # Since screen size might cause the scroll to reach only part of the way685 current_scroll_position = self._position_provider.get_current_position()686 logger.debug("Scrolled To ({0},{1})".format(current_scroll_position.x,687 current_scroll_position.y))688 part64 = self.get_screenshot_as_base64()689 part_image = image_utils.image_from_bytes(base64.b64decode(part64))690 # Cut to viewport size the full page screenshot of main frame for some browsers691 if self._frame_chain:692 if (self.browser_name == 'firefox' and self.browser_version < 60.0693 or self.browser_name in ('internet explorer', 'safari')):694 # TODO: Refactor this to make main screenshot only once695 frame_scroll_position = int(self._frame_chain.peek.location.y)696 part_image = image_utils.get_image_part(part_image, Region(top=frame_scroll_position,697 height=viewport['height'],698 width=viewport['width']))699 # We cut original image before scaling to prevent appearing of artifacts700 part_image = image_utils.get_image_part(part_image, element_region)701 if need_to_scale:702 part_image = image_utils.scale_image(part_image, 1.0 / pixel_ratio)703 # first iteration704 if stitched_image is None:705 stitched_image = part_image706 continue707 stitched_image.paste(part_image, box=(current_scroll_position.x, current_scroll_position.y))708 self._position_provider = self._origin_position_provider709 return stitched_image710 @property711 def switch_to(self):712 return _EyesSwitchTo(self, self.driver.switch_to)713 @property714 def current_offset(self):715 # type: () -> Point716 """717 Return the current offset of the context we're in (e.g., due to switching into frames)718 """719 x, y = 0, 0720 for frame in self._frame_chain:721 x += frame.location['x']722 y += frame.location['y']723 return Point(x, y)724 def execute_script(self, script, *args):725 return self.driver.execute_script(script, *args)726 def get_window_size(self, windowHandle='current'):727 return self.driver.get_window_size(windowHandle)728 def set_window_size(self, width, height, windowHandle='current'):729 self.driver.set_window_size(width, height, windowHandle)730 def set_window_position(self, x, y, windowHandle='current'):...
test_deployment.py
Source:test_deployment.py
1# -*- coding: utf-8 -*-2# Licensed to the Apache Software Foundation (ASF) under one or more§3# contributor license agreements. See the NOTICE file distributed with4# this work for additional information regarding copyright ownership.5# The ASF licenses this file to You under the Apache License, Version 2.06# (the "License"); you may not use this file except in compliance with7# the License. You may obtain a copy of the License at8#9# http://www.apache.org/licenses/LICENSE-2.010#11# Unless required by applicable law or agreed to in writing, software12# distributed under the License is distributed on an "AS IS" BASIS,13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14# See the License for the specific language governing permissions and15# limitations under the License.16from __future__ import with_statement17import os18import sys19import time20import unittest21from libcloud.utils.py3 import httplib22from libcloud.utils.py3 import u23from libcloud.utils.py3 import PY324from libcloud.utils.py3 import assertRaisesRegex25from libcloud.compute.deployment import MultiStepDeployment, Deployment26from libcloud.compute.deployment import SSHKeyDeployment, ScriptDeployment27from libcloud.compute.deployment import ScriptFileDeployment, FileDeployment28from libcloud.compute.base import Node29from libcloud.compute.base import NodeAuthPassword30from libcloud.compute.types import NodeState, DeploymentError, LibcloudError31from libcloud.compute.ssh import BaseSSHClient32from libcloud.compute.ssh import have_paramiko33from libcloud.compute.ssh import SSHCommandTimeoutError34from libcloud.compute.drivers.rackspace import RackspaceFirstGenNodeDriver as Rackspace35from libcloud.test import MockHttp, XML_HEADERS36from libcloud.test.file_fixtures import ComputeFileFixtures37from mock import Mock, patch38from libcloud.test.secrets import RACKSPACE_PARAMS39# Keyword arguments which are specific to deploy_node() method, but not40# create_node()41DEPLOY_NODE_KWARGS = ['deploy', 'ssh_username', 'ssh_alternate_usernames',42 'ssh_port', 'ssh_timeout', 'ssh_key', 'timeout',43 'max_tries', 'ssh_interface']44FILE_PATH = '{0}home{0}ubuntu{0}relative.sh'.format(os.path.sep)45class MockDeployment(Deployment):46 def run(self, node, client):47 return node48class MockClient(BaseSSHClient):49 def __init__(self, throw_on_timeout=False, *args, **kwargs):50 self.stdout = ''51 self.stderr = ''52 self.exit_status = 053 self.throw_on_timeout = throw_on_timeout54 def put(self, path, contents, chmod=755, mode='w'):55 return contents56 def putfo(self, path, fo, chmod=755):57 return fo.read()58 def run(self, cmd, timeout=None):59 if self.throw_on_timeout and timeout is not None:60 raise ValueError("timeout")61 return self.stdout, self.stderr, self.exit_status62 def delete(self, name):63 return True64class DeploymentTests(unittest.TestCase):65 def setUp(self):66 Rackspace.connectionCls.conn_class = RackspaceMockHttp67 RackspaceMockHttp.type = None68 self.driver = Rackspace(*RACKSPACE_PARAMS)69 # normally authentication happens lazily, but we force it here70 self.driver.connection._populate_hosts_and_request_paths()71 self.driver.features = {'create_node': ['generates_password']}72 self.node = Node(id=12345, name='test', state=NodeState.RUNNING,73 public_ips=['1.2.3.4'], private_ips=['1.2.3.5'],74 driver=Rackspace)75 self.node2 = Node(id=123456, name='test', state=NodeState.RUNNING,76 public_ips=['1.2.3.4'], private_ips=['1.2.3.5'],77 driver=Rackspace)78 def test_multi_step_deployment(self):79 msd = MultiStepDeployment()80 self.assertEqual(len(msd.steps), 0)81 msd.add(MockDeployment())82 self.assertEqual(len(msd.steps), 1)83 self.assertEqual(self.node, msd.run(node=self.node, client=None))84 def test_ssh_key_deployment(self):85 sshd = SSHKeyDeployment(key='1234')86 self.assertEqual(self.node, sshd.run(node=self.node,87 client=MockClient(hostname='localhost')))88 def test_file_deployment(self):89 # use this file (__file__) for obtaining permissions90 target = os.path.join('/tmp', os.path.basename(__file__))91 fd = FileDeployment(__file__, target)92 self.assertEqual(target, fd.target)93 self.assertEqual(__file__, fd.source)94 self.assertEqual(self.node, fd.run(95 node=self.node, client=MockClient(hostname='localhost')))96 def test_script_deployment(self):97 sd1 = ScriptDeployment(script='foobar', delete=True)98 sd2 = ScriptDeployment(script='foobar', delete=False)99 sd3 = ScriptDeployment(100 script='foobar', delete=False, name='foobarname')101 sd4 = ScriptDeployment(102 script='foobar', delete=False, name='foobarname', timeout=10)103 self.assertTrue(sd1.name.find('deployment') != '1')104 self.assertEqual(sd3.name, 'foobarname')105 self.assertEqual(sd3.timeout, None)106 self.assertEqual(sd4.timeout, 10)107 self.assertEqual(self.node, sd1.run(node=self.node,108 client=MockClient(hostname='localhost')))109 self.assertEqual(self.node, sd2.run(node=self.node,110 client=MockClient(hostname='localhost')))111 self.assertEqual(self.node, sd3.run(node=self.node,112 client=MockClient(hostname='localhost')))113 assertRaisesRegex(self, ValueError, 'timeout', sd4.run,114 node=self.node,115 client=MockClient(hostname='localhost',116 throw_on_timeout=True))117 def test_script_file_deployment(self):118 file_path = os.path.abspath(__file__)119 with open(file_path, 'rb') as fp:120 content = fp.read()121 if PY3:122 content = content.decode('utf-8')123 sfd1 = ScriptFileDeployment(script_file=file_path)124 self.assertEqual(sfd1.script, content)125 self.assertEqual(sfd1.timeout, None)126 sfd2 = ScriptFileDeployment(script_file=file_path, timeout=20)127 self.assertEqual(sfd2.timeout, 20)128 def test_script_deployment_relative_path(self):129 client = Mock()130 client.put.return_value = FILE_PATH131 client.run.return_value = ('', '', 0)132 sd = ScriptDeployment(script='echo "foo"', name='relative.sh')133 sd.run(self.node, client)134 client.run.assert_called_once_with(FILE_PATH, timeout=None)135 def test_script_deployment_absolute_path(self):136 file_path = '{0}root{0}relative.sh'.format(os.path.sep)137 client = Mock()138 client.put.return_value = file_path139 client.run.return_value = ('', '', 0)140 sd = ScriptDeployment(script='echo "foo"', name=file_path)141 sd.run(self.node, client)142 client.run.assert_called_once_with(file_path, timeout=None)143 def test_script_deployment_with_arguments(self):144 file_path = '{0}root{0}relative.sh'.format(os.path.sep)145 client = Mock()146 client.put.return_value = file_path147 client.run.return_value = ('', '', 0)148 args = ['arg1', 'arg2', '--option1=test']149 sd = ScriptDeployment(script='echo "foo"', args=args,150 name=file_path)151 sd.run(self.node, client)152 expected = '%s arg1 arg2 --option1=test' % (file_path)153 client.run.assert_called_once_with(expected, timeout=None)154 client.reset_mock()155 args = []156 sd = ScriptDeployment(script='echo "foo"', args=args,157 name=file_path)158 sd.run(self.node, client)159 expected = file_path160 client.run.assert_called_once_with(expected, timeout=None)161 def test_script_file_deployment_with_arguments(self):162 file_path = os.path.abspath(__file__)163 client = Mock()164 client.put.return_value = FILE_PATH165 client.run.return_value = ('', '', 0)166 args = ['arg1', 'arg2', '--option1=test', 'option2']167 sfd = ScriptFileDeployment(script_file=file_path, args=args,168 name=file_path)169 sfd.run(self.node, client)170 expected = '%s arg1 arg2 --option1=test option2' % (file_path)171 client.run.assert_called_once_with(expected, timeout=None)172 def test_script_deployment_and_sshkey_deployment_argument_types(self):173 class FileObject(object):174 def __init__(self, name):175 self.name = name176 def read(self):177 return 'bar'178 ScriptDeployment(script='foobar')179 ScriptDeployment(script=u('foobar'))180 ScriptDeployment(script=FileObject('test'))181 SSHKeyDeployment(key='foobar')182 SSHKeyDeployment(key=u('foobar'))183 SSHKeyDeployment(key=FileObject('test'))184 try:185 ScriptDeployment(script=[])186 except TypeError:187 pass188 else:189 self.fail('TypeError was not thrown')190 try:191 SSHKeyDeployment(key={})192 except TypeError:193 pass194 else:195 self.fail('TypeError was not thrown')196 def test_wait_until_running_running_instantly(self):197 node2, ips = self.driver.wait_until_running(198 nodes=[self.node], wait_period=0.1,199 timeout=0.5)[0]200 self.assertEqual(self.node.uuid, node2.uuid)201 self.assertEqual(['67.23.21.33'], ips)202 def test_wait_until_running_without_ip(self):203 RackspaceMockHttp.type = 'NO_IP'204 try:205 node2, ips = self.driver.wait_until_running(206 nodes=[self.node], wait_period=0.1,207 timeout=0.2)[0]208 except LibcloudError as e:209 self.assertTrue(e.value.find('Timed out after 0.2 second') != -1)210 else:211 self.fail('Exception was not thrown')212 def test_wait_until_running_with_only_ipv6(self):213 RackspaceMockHttp.type = 'IPV6'214 try:215 node2, ips = self.driver.wait_until_running(216 nodes=[self.node], wait_period=0.1,217 timeout=0.2)[0]218 except LibcloudError as e:219 self.assertTrue(e.value.find('Timed out after 0.2 second') != -1)220 else:221 self.fail('Exception was not thrown')222 def test_wait_until_running_with_ipv6_ok(self):223 RackspaceMockHttp.type = 'IPV6'224 node2, ips = self.driver.wait_until_running(225 nodes=[self.node], wait_period=1, force_ipv4=False,226 timeout=0.5)[0]227 self.assertEqual(self.node.uuid, node2.uuid)228 self.assertEqual(['2001:DB8::1'], ips)229 def test_wait_until_running_running_after_0_2_second(self):230 RackspaceMockHttp.type = '05_SECOND_DELAY'231 node2, ips = self.driver.wait_until_running(232 nodes=[self.node], wait_period=0.2,233 timeout=0.1)[0]234 self.assertEqual(self.node.uuid, node2.uuid)235 self.assertEqual(['67.23.21.33'], ips)236 def test_wait_until_running_running_after_0_2_second_private_ips(self):237 RackspaceMockHttp.type = '05_SECOND_DELAY'238 node2, ips = self.driver.wait_until_running(239 nodes=[self.node], wait_period=0.2,240 timeout=0.1, ssh_interface='private_ips')[0]241 self.assertEqual(self.node.uuid, node2.uuid)242 self.assertEqual(['10.176.168.218'], ips)243 def test_wait_until_running_invalid_ssh_interface_argument(self):244 try:245 self.driver.wait_until_running(nodes=[self.node], wait_period=1,246 ssh_interface='invalid')247 except ValueError:248 pass249 else:250 self.fail('Exception was not thrown')251 def test_wait_until_running_timeout(self):252 RackspaceMockHttp.type = 'TIMEOUT'253 try:254 self.driver.wait_until_running(nodes=[self.node], wait_period=0.1,255 timeout=0.2)256 except LibcloudError as e:257 self.assertTrue(e.value.find('Timed out') != -1)258 else:259 self.fail('Exception was not thrown')260 def test_wait_until_running_running_node_missing_from_list_nodes(self):261 RackspaceMockHttp.type = 'MISSING'262 try:263 self.driver.wait_until_running(nodes=[self.node], wait_period=0.1,264 timeout=0.2)265 except LibcloudError as e:266 self.assertTrue(e.value.find('Timed out after 0.2 second') != -1)267 else:268 self.fail('Exception was not thrown')269 def test_wait_until_running_running_multiple_nodes_have_same_uuid(self):270 RackspaceMockHttp.type = 'SAME_UUID'271 try:272 self.driver.wait_until_running(nodes=[self.node], wait_period=0.1,273 timeout=0.2)274 except LibcloudError as e:275 self.assertTrue(276 e.value.find('Unable to match specified uuids') != -1)277 else:278 self.fail('Exception was not thrown')279 def test_wait_until_running_running_wait_for_multiple_nodes(self):280 RackspaceMockHttp.type = 'MULTIPLE_NODES'281 nodes = self.driver.wait_until_running(282 nodes=[self.node, self.node2], wait_period=0.1,283 timeout=0.5)284 self.assertEqual(self.node.uuid, nodes[0][0].uuid)285 self.assertEqual(self.node2.uuid, nodes[1][0].uuid)286 self.assertEqual(['67.23.21.33'], nodes[0][1])287 self.assertEqual(['67.23.21.34'], nodes[1][1])288 def test_ssh_client_connect_success(self):289 mock_ssh_client = Mock()290 mock_ssh_client.return_value = None291 ssh_client = self.driver._ssh_client_connect(292 ssh_client=mock_ssh_client,293 timeout=0.5)294 self.assertEqual(mock_ssh_client, ssh_client)295 def test_ssh_client_connect_timeout(self):296 mock_ssh_client = Mock()297 mock_ssh_client.connect = Mock()298 mock_ssh_client.connect.side_effect = IOError('bam')299 try:300 self.driver._ssh_client_connect(ssh_client=mock_ssh_client,301 wait_period=0.1,302 timeout=0.2)303 except LibcloudError as e:304 self.assertTrue(e.value.find('Giving up') != -1)305 else:306 self.fail('Exception was not thrown')307 @unittest.skipIf(not have_paramiko, 'Skipping because paramiko is not available')308 def test_ssh_client_connect_immediately_throws_on_fatal_execption(self):309 # Verify that fatal exceptions are immediately propagated and ensure310 # we don't try to retry on them311 from paramiko.ssh_exception import SSHException312 from paramiko.ssh_exception import PasswordRequiredException313 mock_ssh_client = Mock()314 mock_ssh_client.connect = Mock()315 mock_ssh_client.connect.side_effect = IOError('bam')316 mock_exceptions = [317 SSHException('Invalid or unsupported key type'),318 PasswordRequiredException('private key file is encrypted'),319 SSHException('OpenSSH private key file checkints do not match')320 ]321 for mock_exception in mock_exceptions:322 mock_ssh_client.connect = Mock(side_effect=mock_exception)323 assertRaisesRegex(self, mock_exception.__class__, str(mock_exception),324 self.driver._ssh_client_connect,325 ssh_client=mock_ssh_client,326 wait_period=0.1,327 timeout=0.2)328 def test_run_deployment_script_success(self):329 task = Mock()330 ssh_client = Mock()331 ssh_client2 = self.driver._run_deployment_script(task=task,332 node=self.node,333 ssh_client=ssh_client,334 max_tries=2)335 self.assertTrue(isinstance(ssh_client2, Mock))336 def test_run_deployment_script_exception(self):337 task = Mock()338 task.run = Mock()339 task.run.side_effect = Exception('bar')340 ssh_client = Mock()341 try:342 self.driver._run_deployment_script(task=task,343 node=self.node,344 ssh_client=ssh_client,345 max_tries=2)346 except LibcloudError as e:347 self.assertTrue(e.value.find('Failed after 2 tries') != -1)348 else:349 self.fail('Exception was not thrown')350 def test_run_deployment_script_ssh_command_timeout_fatal_exception(self):351 # We shouldn't retry on SSHCommandTimeoutError error since it's fatal352 task = Mock()353 task.run = Mock()354 task.run.side_effect = SSHCommandTimeoutError('ls -la', 10)355 ssh_client = Mock()356 try:357 self.driver._run_deployment_script(task=task,358 node=self.node,359 ssh_client=ssh_client,360 max_tries=5)361 except SSHCommandTimeoutError as e:362 self.assertTrue(e.message.find('Command didn\'t finish') != -1)363 else:364 self.fail('Exception was not thrown')365 def test_run_deployment_script_reconnect_on_ssh_session_not_active(self):366 # Verify that we try to reconnect if task.run() throws exception with367 # "SSH client not active" message368 global exception_counter369 exception_counter = 0370 def mock_run(*args, **kwargs):371 # Mock run() method which throws "SSH session not active" exception372 # during first two calls and on third one it returns None.373 global exception_counter374 exception_counter += 1375 if exception_counter > 2:376 return None377 raise Exception("SSH session not active")378 task = Mock()379 task.run = Mock()380 task.run = mock_run381 ssh_client = Mock()382 ssh_client.timeout = 20383 self.assertEqual(ssh_client.connect.call_count, 0)384 self.assertEqual(ssh_client.close.call_count, 0)385 self.driver._run_deployment_script(task=task,386 node=self.node,387 ssh_client=ssh_client,388 max_tries=5)389 self.assertEqual(ssh_client.connect.call_count, 2)390 self.assertEqual(ssh_client.close.call_count, 2 + 1)391 @patch('libcloud.compute.base.SSHClient')392 @patch('libcloud.compute.ssh')393 def test_deploy_node_success(self, mock_ssh_module, _):394 self.driver.create_node = Mock()395 self.driver.create_node.return_value = self.node396 mock_ssh_module.have_paramiko = True397 deploy = Mock()398 node = self.driver.deploy_node(deploy=deploy)399 self.assertEqual(self.node.id, node.id)400 @patch('libcloud.compute.base.atexit')401 @patch('libcloud.compute.base.SSHClient')402 @patch('libcloud.compute.ssh')403 def test_deploy_node_at_exit_func_functionality(self, mock_ssh_module, _, mock_at_exit):404 self.driver.create_node = Mock()405 self.driver.create_node.return_value = self.node406 mock_ssh_module.have_paramiko = True407 deploy = Mock()408 def mock_at_exit_func(driver, node):409 pass410 # On success, at exit handler should be unregistered411 self.assertEqual(mock_at_exit.register.call_count, 0)412 self.assertEqual(mock_at_exit.unregister.call_count, 0)413 node = self.driver.deploy_node(deploy=deploy, at_exit_func=mock_at_exit_func)414 self.assertEqual(mock_at_exit.register.call_count, 1)415 self.assertEqual(mock_at_exit.unregister.call_count, 1)416 self.assertEqual(self.node.id, node.id)417 # On deploy failure, at exit handler should also be unregistered418 mock_at_exit.reset_mock()419 deploy.run.side_effect = Exception('foo')420 self.assertEqual(mock_at_exit.register.call_count, 0)421 self.assertEqual(mock_at_exit.unregister.call_count, 0)422 try:423 self.driver.deploy_node(deploy=deploy, at_exit_func=mock_at_exit_func)424 except DeploymentError as e:425 self.assertTrue(e.node.id, self.node.id)426 else:427 self.fail('Exception was not thrown')428 self.assertEqual(mock_at_exit.register.call_count, 1)429 self.assertEqual(mock_at_exit.unregister.call_count, 1)430 # But it should not be registered on create_node exception431 mock_at_exit.reset_mock()432 self.driver.create_node = Mock(side_effect=Exception('Failure'))433 self.assertEqual(mock_at_exit.register.call_count, 0)434 self.assertEqual(mock_at_exit.unregister.call_count, 0)435 try:436 self.driver.deploy_node(deploy=deploy, at_exit_func=mock_at_exit_func)437 except Exception as e:438 self.assertTrue('Failure' in str(e))439 else:440 self.fail('Exception was not thrown')441 self.assertEqual(mock_at_exit.register.call_count, 0)442 self.assertEqual(mock_at_exit.unregister.call_count, 0)443 @patch('libcloud.compute.base.SSHClient')444 @patch('libcloud.compute.ssh')445 def test_deploy_node_deploy_node_kwargs_except_auth_are_not_propagated_on(self, mock_ssh_module, _):446 # Verify that keyword arguments which are specific to deploy_node()447 # are not propagated to create_node()448 mock_ssh_module.have_paramiko = True449 self.driver.create_node = Mock()450 self.driver.create_node.return_value = self.node451 self.driver._connect_and_run_deployment_script = Mock()452 self.driver._wait_until_running = Mock()453 kwargs = {}454 for key in DEPLOY_NODE_KWARGS:455 kwargs[key] = key456 kwargs['ssh_interface'] = 'public_ips'457 kwargs['ssh_alternate_usernames'] = ['foo', 'bar']458 kwargs['timeout'] = 10459 auth = NodeAuthPassword('P@$$w0rd')460 node = self.driver.deploy_node(name='name', image='image', size='size',461 auth=auth, ex_foo='ex_foo', **kwargs)462 self.assertEqual(self.node.id, node.id)463 self.assertEqual(self.driver.create_node.call_count, 1)464 call_kwargs = self.driver.create_node.call_args_list[0][1]465 expected_call_kwargs = {466 'name': 'name',467 'image': 'image',468 'size': 'size',469 'auth': auth,470 'ex_foo': 'ex_foo'471 }472 self.assertEqual(expected_call_kwargs, call_kwargs)473 # If driver throws an exception it should fall back to passing in all474 # the arguments475 global call_count476 call_count = 0477 def create_node(name, image, size, ex_custom_arg_1, ex_custom_arg_2,478 ex_foo=None, auth=None, **kwargs):479 global call_count480 call_count += 1481 if call_count == 1:482 msg = 'create_node() takes at least 5 arguments (7 given)'483 raise TypeError(msg)484 return self.node485 self.driver.create_node = create_node486 node = self.driver.deploy_node(name='name', image='image', size='size',487 auth=auth, ex_foo='ex_foo', ex_custom_arg_1='a',488 ex_custom_arg_2='b', **kwargs)489 self.assertEqual(self.node.id, node.id)490 self.assertEqual(call_count, 2)491 call_count = 0492 def create_node(name, image, size, ex_custom_arg_1, ex_custom_arg_2,493 ex_foo=None, auth=None, **kwargs):494 global call_count495 call_count += 1496 if call_count == 1:497 msg = 'create_node() missing 3 required positional arguments'498 raise TypeError(msg)499 return self.node500 self.driver.create_node = create_node501 node = self.driver.deploy_node(name='name', image='image', size='size',502 auth=auth, ex_foo='ex_foo', ex_custom_arg_1='a',503 ex_custom_arg_2='b', **kwargs)504 self.assertEqual(self.node.id, node.id)505 self.assertEqual(call_count, 2)506 @patch('libcloud.compute.base.SSHClient')507 @patch('libcloud.compute.ssh')508 def test_deploy_node_exception_run_deployment_script(self, mock_ssh_module,509 _):510 self.driver.create_node = Mock()511 self.driver.create_node.return_value = self.node512 mock_ssh_module.have_paramiko = True513 deploy = Mock()514 deploy.run = Mock()515 deploy.run.side_effect = Exception('foo')516 try:517 self.driver.deploy_node(deploy=deploy)518 except DeploymentError as e:519 self.assertTrue(e.node.id, self.node.id)520 else:521 self.fail('Exception was not thrown')522 @patch('libcloud.compute.base.SSHClient')523 @patch('libcloud.compute.ssh')524 def test_deploy_node_exception_ssh_client_connect(self, mock_ssh_module,525 ssh_client):526 self.driver.create_node = Mock()527 self.driver.create_node.return_value = self.node528 mock_ssh_module.have_paramiko = True529 deploy = Mock()530 ssh_client.side_effect = IOError('bar')531 try:532 self.driver.deploy_node(deploy=deploy)533 except DeploymentError as e:534 self.assertTrue(e.node.id, self.node.id)535 else:536 self.fail('Exception was not thrown')537 @patch('libcloud.compute.ssh')538 def test_deploy_node_depoy_node_not_implemented(self, mock_ssh_module):539 self.driver.features = {'create_node': []}540 mock_ssh_module.have_paramiko = True541 try:542 self.driver.deploy_node(deploy=Mock())543 except NotImplementedError:544 pass545 else:546 self.fail('Exception was not thrown')547 self.driver.features = {}548 try:549 self.driver.deploy_node(deploy=Mock())550 except NotImplementedError:551 pass552 else:553 self.fail('Exception was not thrown')554 @patch('libcloud.compute.base.SSHClient')555 @patch('libcloud.compute.ssh')556 def test_deploy_node_password_auth(self, mock_ssh_module, _):557 self.driver.features = {'create_node': ['password']}558 mock_ssh_module.have_paramiko = True559 self.driver.create_node = Mock()560 self.driver.create_node.return_value = self.node561 node = self.driver.deploy_node(deploy=Mock())562 self.assertEqual(self.node.id, node.id)563 @patch('libcloud.compute.base.SSHClient')564 @patch('libcloud.compute.ssh')565 def test_exception_is_thrown_is_paramiko_is_not_available(self,566 mock_ssh_module,567 _):568 self.driver.features = {'create_node': ['password']}569 self.driver.create_node = Mock()570 self.driver.create_node.return_value = self.node571 mock_ssh_module.have_paramiko = False572 try:573 self.driver.deploy_node(deploy=Mock())574 except RuntimeError as e:575 self.assertTrue(str(e).find('paramiko is not installed') != -1)576 else:577 self.fail('Exception was not thrown')578 mock_ssh_module.have_paramiko = True579 node = self.driver.deploy_node(deploy=Mock())580 self.assertEqual(self.node.id, node.id)581class RackspaceMockHttp(MockHttp):582 fixtures = ComputeFileFixtures('openstack')583 def _v2_0_tokens(self, method, url, body, headers):584 body = self.fixtures.load('_v2_0__auth_deployment.json')585 headers = {586 'content-type': 'application/json'587 }588 return (httplib.OK, body, headers,589 httplib.responses[httplib.OK])590 def _v1_0_slug_servers_detail(self, method, url, body, headers):591 body = self.fixtures.load(592 'v1_slug_servers_detail_deployment_success.xml')593 return (httplib.OK, body, XML_HEADERS, httplib.responses[httplib.OK])594 def _v1_0_slug_servers_detail_05_SECOND_DELAY(self, method, url, body, headers):595 time.sleep(0.5)596 body = self.fixtures.load(597 'v1_slug_servers_detail_deployment_success.xml')598 return (httplib.OK, body, XML_HEADERS, httplib.responses[httplib.OK])599 def _v1_0_slug_servers_detail_TIMEOUT(self, method, url, body, headers):600 body = self.fixtures.load(601 'v1_slug_servers_detail_deployment_pending.xml')602 return (httplib.OK, body, XML_HEADERS, httplib.responses[httplib.OK])603 def _v1_0_slug_servers_detail_MISSING(self, method, url, body, headers):604 body = self.fixtures.load(605 'v1_slug_servers_detail_deployment_missing.xml')606 return (httplib.OK, body, XML_HEADERS, httplib.responses[httplib.OK])607 def _v1_0_slug_servers_detail_SAME_UUID(self, method, url, body, headers):608 body = self.fixtures.load(609 'v1_slug_servers_detail_deployment_same_uuid.xml')610 return (httplib.OK, body, XML_HEADERS, httplib.responses[httplib.OK])611 def _v1_0_slug_servers_detail_MULTIPLE_NODES(self, method, url, body, headers):612 body = self.fixtures.load(613 'v1_slug_servers_detail_deployment_multiple_nodes.xml')614 return (httplib.OK, body, XML_HEADERS, httplib.responses[httplib.OK])615 def _v1_0_slug_servers_detail_IPV6(self, method, url, body, headers):616 body = self.fixtures.load(617 'v1_slug_servers_detail_deployment_ipv6.xml')618 return (httplib.OK, body, XML_HEADERS, httplib.responses[httplib.OK])619 def _v1_0_slug_servers_detail_NO_IP(self, method, url, body, headers):620 body = self.fixtures.load(621 'v1_slug_servers_detail_deployment_no_ip.xml')622 return (httplib.OK, body, XML_HEADERS, httplib.responses[httplib.OK])623if __name__ == '__main__':...
optimize.py
Source:optimize.py
1"""Optimization, fitting and analysis"""2import numpy as np3import multiprocessing4import pyswarms as ps5from os import cpu_count6from PIL import Image7from matplotlib import pyplot as plt8import sme9def _hessian_f(f, x0, i, j, rel_eps):10 x = np.array(x0, dtype=np.float64)11 if i is None:12 return f(x)13 if j is None:14 x[i] = (1.0 + rel_eps) * x[i]15 return f(x)16 x[i] = (1.0 + rel_eps) * x[i]17 x[j] = (1.0 + rel_eps) * x[j]18 return f(x)19def hessian(f, x0, rel_eps=1e-2, processes=None):20 """Approximate Hessian of function ``f`` at point ``x0``21 Uses a `finite difference`_ approximation where the step size used22 for each element ``i`` of ``x0`` is ``rel_eps * x[i]``.23 Requires :math:`N^2 + N + 1` evalulations of ``f``, where :math:`N`24 is the number of elements of ``x0``25 The evaluations of ``f`` are done in parallel, so ``f`` must be a26 thread-safe function that can safely be called from multiple threads27 at the same time.28 Note:29 This choice of step size allows the different elements of x0 to have30 vastly different scales without causing numerical instabilities,31 but it will fail if an element of ``x0`` is equal to 0.32 Args:33 f: The function to evaluate, it should be callable as f(x0) and return a scalar34 x0: The point at which to evaluate the function, a flot or list of floats.35 rel_eps: The relative step size to use36 processes: The number of processes to use (the default ``None`` means use all available cpu cores)37 Returns:38 np.array: The Hessian as a 2d numpy array of floats39 .. _finite difference:40 https://en.wikipedia.org/wiki/Finite_difference#Multivariate_finite_differences41 """42 if processes is None:43 processes = cpu_count()44 n = len(x0)45 # make list of arguments for each f call required46 # f(x)47 args = [(f, x0, None, None, rel_eps)]48 # f([.., x_i+dx_i, ..])49 for i in range(n):50 args.append((f, x0, i, None, +rel_eps))51 # f([.., x_i-dx_i, ..])52 for i in range(n):53 args.append((f, x0, i, None, -rel_eps))54 # f([.., x_j+dx_j, .., x_i+dx_i, ..])55 for i in range(n):56 for j in range(0, i):57 args.append((f, x0, i, j, +rel_eps))58 # f([.., x_j-dx_j, .., x_i-dx_i, ..])59 for i in range(n):60 for j in range(0, i):61 args.append((f, x0, i, j, -rel_eps))62 # call f with each set of args63 pool = multiprocessing.Pool(processes)64 ff = pool.starmap(_hessian_f, args)65 pool.close()66 pool.join()67 # construct hessian elements from these values68 h = np.zeros((n, n))69 # diagonal elements70 for i in range(n):71 h[(i, i)] = ff[i + 1] - ff[0] + ff[n + i + 1] - ff[0]72 h[(i, i)] /= rel_eps * rel_eps * x0[i] * x0[i]73 # off-diagonal elements74 offset_ij = 2 * n + 175 n_ij = (n * (n - 1)) // 276 index_ij = 077 for i in range(n):78 for j in range(0, i):79 h[(i, j)] = (80 ff[0]81 - ff[1 + i]82 + ff[0]83 - ff[1 + j]84 + ff[offset_ij + index_ij]85 - ff[1 + n + i]86 + ff[offset_ij + n_ij + index_ij]87 - ff[1 + n + j]88 )89 h[(i, j)] /= 2.0 * rel_eps * rel_eps * x0[i] * x0[j]90 h[(j, i)] = h[(i, j)]91 index_ij += 192 return h93# calculate function f for each x in xs94def _minimize_f(xs, f, processes):95 pool = multiprocessing.Pool(processes=processes)96 norms = pool.map(f, xs)97 pool.close()98 pool.join()99 return np.array(norms)100# minimize objective function using particle swarm101def minimize(102 f,103 lowerbounds,104 upperbounds,105 particles=20,106 iterations=20,107 processes=None,108 ps_options=None,109):110 """Minimize function ``f`` using particle swarm111 The function ``f`` should take an array or list of parameters ``x``, and return a112 value: parameters will be found using particle swarm that minimize this value.113 Each parameter should have a specified lower and upper bound.114 The evaluations of ``f`` are done in parallel, so ``f`` must be a115 thread-safe function that can safely be called from multiple threads116 at the same time. The evaluations are parallelized over the particles117 for each iteration, so for good performance the number of particles should118 be larger than the number of processes.119 Args:120 f: The function to evaluate, it should be callable as f(x) and return a scalar121 lowerbounds: The lower bound for each element of x.122 upperbounds: The upper bound for each element of x.123 particles: The number of particles to use in the swarm124 iterations: The number of iterations to do125 processes: The number of processes to use (the default ``None`` means use all available cpu cores)126 ps_options: A map of the particle swarm hyper parameters to use127 Returns:128 ps_cost: The lowest cost129 ps_res: The parameters that gave this lowest cost130 optimizer: The PySwarms optimizer object131 .. _finite difference:132 https://en.wikipedia.org/wiki/Finite_difference#Multivariate_finite_differences133 """134 if processes is None:135 processes = cpu_count()136 if ps_options is None:137 ps_options = {"c1": 0.5, "c2": 0.3, "w": 0.9}138 optimizer = ps.single.GlobalBestPSO(139 particles,140 dimensions=len(lowerbounds),141 options=ps_options,142 bounds=(np.array(lowerbounds), np.array(upperbounds)),143 )144 ps_cost, ps_res = optimizer.optimize(145 _minimize_f, iters=iterations, f=f, processes=processes146 )147 return ps_cost, ps_res, optimizer148def rescale(x, new_max_element):149 """Rescale an array150 Args:151 x(numpy.array): The array to rescale152 new_max_element(float): The desired new maximum element value153 Returns:154 np.array: The rescaled array155 """156 old_max_element = np.amax(x)157 scale_factor = new_max_element / old_max_element158 return np.multiply(x, scale_factor)159def abs_diff(x, y):160 """Absolute difference between two arrays161 :math:`\\frac{1}{2} \\sum_i (x_i - y_i)^2`162 Args:163 x(numpy.array): The first array164 y(numpy.array): The second array165 Returns:166 float: absolute difference between the two arrays167 """168 return 0.5 * np.sum(np.power(x - y, 2))169# plot 2d array as heat-map image170def _ss_plot_image(conc, title, ax=None, cmap=None):171 if ax is None:172 ax = plt.gca()173 ax.imshow(conc, cmap=cmap)174 ax.set_title(title)175 return ax176# plot 1d array as line177def _ss_plot_line(x, y, title, ax=None):178 if ax is None:179 ax = plt.gca()180 ax.set_title(title)181 ax.plot(x, y)182 return ax183def _img_as_normalized_nparray(imagefile, mask):184 # todo: check image dimensions match mask dimensions185 img = Image.open(imagefile)186 if len(img.getbands()) > 1:187 # convert RGB or RGBA image to 8-bit grayscale188 img = img.convert("L")189 a = np.asarray(img, dtype=np.float64)190 a[~mask] = 0.0191 a_max = np.amax(a)192 return a / a_max193def _get_geometry_mask(model):194 mask = np.array(model.compartments[0].geometry_mask)195 for compartment in model.compartments:196 mask = mask | np.array(compartment.geometry_mask)197 return mask198class SteadyState:199 """Steady state parameter fitting200 Given a model and an image of the target steady state distribution of201 a species (or the sum of multiple species), this class tries to find202 a set of parameters where the simulated model has a steady state solution203 that is as close as possible to the target image.204 Args:205 modelfile(str): The sbml file containing the model206 imagefile(str): The image file containing the target concentration207 Optionally this can instead be a dict of geometryimagefilename:targetconcentrationfilename,208 in which case the model is simultaneously fitted to the target concentration image209 steady state for each geometry image.210 species(List of str): The species to compare to the target concentration211 function_to_apply_params: A function that sets the parameters in the model.212 This should be a function with signature ``f(model, params)``, and213 which sets the value of the parameters to be fitted in ``model``214 according to the values in ``params``, which will be a list of floats.215 lower_bounds(List of float): The lower bound for each parameter to be fitted216 upper_bounds(List of float): The upper bound for each parameter to be fitted217 simulation_time(float): The length of time to simulate the model218 steady_state_time(float): The length of time to multiply the final rate of change of concentration.219 The cost function that is minimized is the sum of squares over all pixels220 of the difference between the final concentration and the target concentration, plus the221 sum of squares over all pixels of the difference between ``steady_state_time`` * dc/dt and222 zero. Multiplying the rate of change by a time makes the second term have the same units as223 the first term, and the relative importance of being close to steady state versus close224 to the desired concentration in the fit can be adjusted by altering ``steady_state_time``.225 The larger it is, the closer the results will be to a steady state.226 Attributes:227 params(numpy.array): The best model parameters found228 cost_history(List of float): The history of the best cost at each iteration229 cost_history_pbest(List of float): The history of the mean particle best at each iteration230 """231 def __init__(232 self,233 modelfile,234 imagefile,235 species,236 function_to_apply_params,237 lower_bounds,238 upper_bounds,239 simulation_time=1000,240 steady_state_time=200,241 timeout_seconds=10,242 ):243 self.filename = modelfile244 self.species = species245 self.geom_img_files = []246 self.target_concs = []247 if isinstance(imagefile, str):248 # single target image, use existing model geometry249 self.geom_img_files.append("")250 m = sme.open_sbml_file(self.filename)251 mask = _get_geometry_mask(m)252 self.target_concs.append(_img_as_normalized_nparray(imagefile, mask))253 else:254 # multiple geometry images, each with target concentration image255 for geom_img_file, imgfile in imagefile.items():256 self.geom_img_files.append(geom_img_file)257 m = sme.open_sbml_file(self.filename)258 m.import_geometry_from_image(geom_img_file)259 mask = _get_geometry_mask(m)260 self.target_concs.append(_img_as_normalized_nparray(imgfile, mask))261 self.simulation_time = simulation_time262 self.steady_state_time = steady_state_time263 self.apply_params = function_to_apply_params264 self.lower_bounds = lower_bounds265 self.upper_bounds = upper_bounds266 self.timeout_seconds = timeout_seconds267 def _get_conc(self, result):268 c = np.array(result.species_concentration[self.species[0]])269 for i in range(1, len(self.species)):270 c = np.add(c, np.array(result.species_concentration[self.species[i]]))271 return c272 def _get_dcdt(self, result):273 dcdt = np.array(result.species_dcdt[self.species[0]])274 for i in range(1, len(self.species)):275 dcdt = np.add(dcdt, np.array(result.species_dcdt[self.species[i]]))276 return dcdt277 def _rescale(self, result):278 c = self._get_conc(result)279 dcdt = self._get_dcdt(result)280 scale_factor = 1.0 / np.clip(np.amax(c), 1e-100, 1e100)281 return scale_factor * c, scale_factor * dcdt282 def _obj_func(self, params, verbose=False):283 m = sme.open_sbml_file(self.filename)284 self.apply_params(m, params)285 obj_sum = 0286 model_concs = []287 conc_norm = 0288 dcdt_norm = 0289 for geom_img_file, target_conc in zip(self.geom_img_files, self.target_concs):290 if geom_img_file:291 m.import_geometry_from_image(geom_img_file)292 results = m.simulate(293 simulation_time=self.simulation_time,294 image_interval=self.simulation_time,295 timeout_seconds=self.timeout_seconds,296 throw_on_timeout=False,297 )298 if len(results) == 1:299 # simulation fail or timeout: no result300 print(301 f"simulation timeout with timeout {self.timeout_seconds}s, params: {params}"302 )303 conc_norm = conc_norm + abs_diff(0, target_conc)304 dcdt_norm = dcdt_norm + abs_diff(0, target_conc)305 c, dcdt = self._rescale(results[-1])306 conc_norm = conc_norm + abs_diff(c, target_conc)307 dcdt_norm = dcdt_norm + abs_diff(self.steady_state_time * dcdt, 0)308 model_concs.append(c)309 obj_sum = obj_sum + conc_norm + dcdt_norm310 if verbose:311 return conc_norm, dcdt_norm, model_concs312 return obj_sum313 def find(self, particles=20, iterations=20, processes=None):314 """Find parameters that result in a steady state concentration close to the target image315 Uses particle swarm to minimize the difference between the rescaled concentration316 and the target image, as well as the distance from a steady state solution.317 Args:318 particles(int): The number of particles in the particle swarm319 iterations(int): The number of particle swarm iterations320 processes: The number of processes to use (the default ``None`` means use all available cpu cores)321 Returns:322 List of float: the best parameters found323 Note:324 On Windows, calling this function from a jupyter notebook can result in an error325 message of the form `Can't get attribute 'apply_params' on <module '__main__'`,326 where ``apply_params`` is the function you have defined to apply the parameters to the model.327 This is a known `issue <https://docs.python.org/3/library/multiprocessing.html#using-a-pool-of-workers>`_328 with Python multiprocessing, and a workaround is to define the ``apply_params`` function329 in a separate `.py` file and import it into the notebook.330 """331 cost, params, optimizer = minimize(332 self._obj_func,333 self.lower_bounds,334 self.upper_bounds,335 particles=particles,336 iterations=iterations,337 processes=processes,338 )339 self.cost_history = optimizer.cost_history340 self.cost_history_pbest = optimizer.mean_pbest_history341 self.conc_norm, self.dcdt_norm, self.model_concs = self._obj_func(342 params, verbose=True343 )344 self.params = params345 return params346 def hessian(self, rel_eps=0.1, processes=None):347 return hessian(self._obj_func, self.params, rel_eps, processes)348 def plot_target_concentration(self, index=0, ax=None, cmap=None):349 """Plot the target concentration as a 2d heat map350 Args:351 index(int): Optionally specify index of concentration352 ax(matplotlib.axes._subplots.AxesSubplot): Optionally specify the axes to draw the plot on353 cmap(matplotlib.Colormap): Optionally specify the colormap to use354 Returns:355 matplotlib.axes._subplots.AxesSubplot: The axes the plot was drawn on356 """357 return _ss_plot_image(358 self.target_concs[index], "Target Concentration", ax, cmap359 )360 def plot_model_concentration(self, index=0, ax=None, cmap=None):361 """Plot the model concentration as a 2d heat map362 The model concentration is normalized such that the maximum pixel intensity363 matches the maximum pixel intensity of the target concentration image364 Args:365 index(int): Optionally specify index of concentration366 ax(matplotlib.axes._subplots.AxesSubplot): Optionally specify the axes to draw the plot on367 cmap(matplotlib.Colormap): Optionally specify the colormap to use368 Returns:369 matplotlib.axes._subplots.AxesSubplot: The axes the plot was drawn on370 """371 return _ss_plot_image(self.model_concs[index], "Model Concentration", ax, cmap)372 def plot_cost_history(self, ax=None):373 """Plot the cost history374 The cost of the best set of parameters at each iteration of particle swarm.375 Args:376 ax(matplotlib.axes._subplots.AxesSubplot): Optionally specify the axes to draw the plot on377 Returns:378 matplotlib.axes._subplots.AxesSubplot: The axes the plot was drawn on379 """380 return _ss_plot_line(381 [*range(len(self.cost_history))], self.cost_history, "Best cost history", ax382 )383 def plot_cost_history_pbest(self, ax=None):384 """Plot the mean particle best cost history385 The mean of the best cost for each particle in the swarm, at each iteration of particle swarm.386 Args:387 ax(matplotlib.axes._subplots.AxesSubplot): Optionally specify the axes to draw the plot on388 Returns:389 matplotlib.axes._subplots.AxesSubplot: The axes the plot was drawn on390 """391 return _ss_plot_line(392 [*range(len(self.cost_history_pbest))],393 self.cost_history_pbest,394 "Mean particle best cost history",395 ax,396 )397 def plot_timeseries(self, simulation_time, image_interval_time, ax=None):398 """Plot a timeseries of the sum of concentrations399 The sum of all species concentrations summed over all pixels,400 as a function of the simulation time. This is a convenience plot401 just to see by eye how close the simulation is to a steady state.402 Args:403 simulation_time(float): The simulation time to simulate404 image_interval_time(float): The interval in between images405 ax(matplotlib.axes._subplots.AxesSubplot): Optionally specify the axes to draw the plot on406 Returns:407 matplotlib.axes._subplots.AxesSubplot: The axes the plot was drawn on408 """409 m = sme.open_sbml_file(self.filename)410 self.apply_params(m, self.params)411 for geom_img_file in self.geom_img_files:412 if geom_img_file:413 m.import_geometry_from_image(geom_img_file)414 results = m.simulate(415 simulation_time=simulation_time, image_interval=image_interval_time416 )417 concs = []418 times = []419 for result in results:420 concs.append(np.sum(self._get_conc(result)))421 times.append(result.time_point)422 ax = _ss_plot_line(times, concs, "Concentration time series", ax)423 return ax424 def plot_all(self, cmap=None):425 """Generate all plots426 Helper function for interactive use in a jupyter notebook.427 Generates and shows all plots for user to see at a glance the results of the fit.428 Args:429 cmap(matplotlib.Colormap): Optionally specify the colormap to use for heatmap plots430 """431 fig, (ax1, ax2, ax3) = plt.subplots(nrows=1, ncols=3, figsize=(20, 12))432 self.plot_cost_history(ax1)433 self.plot_cost_history_pbest(ax2)434 self.plot_timeseries(self.simulation_time, self.simulation_time / 100.0, ax3)435 plt.show()436 for index, geom_img_file in enumerate(self.geom_img_files):437 fig, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(18, 12))438 self.plot_target_concentration(index, ax1, cmap)439 self.plot_model_concentration(index, ax2, cmap)440 plt.show()441 def get_model(self):442 """Returns the model with best parameters applied443 Returns:444 sme.Model: The model with the best parameters applied445 """446 m = sme.open_sbml_file(self.filename)447 self.apply_params(m, self.params)...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!