Best Python code snippet using selene_python
executormarionette.py
Source:executormarionette.py
1import json2import os3import socket4import threading5import traceback6import urlparse7import uuid8errors = None9marionette = None10pytestrunner = None11here = os.path.join(os.path.split(__file__)[0])12from .base import (CallbackHandler,13 RefTestExecutor,14 RefTestImplementation,15 TestharnessExecutor,16 WdspecExecutor,17 WebDriverProtocol,18 extra_timeout,19 strip_server)20from .protocol import (AssertsProtocolPart,21 BaseProtocolPart,22 TestharnessProtocolPart,23 PrefsProtocolPart,24 Protocol,25 StorageProtocolPart,26 SelectorProtocolPart,27 ClickProtocolPart,28 SendKeysProtocolPart,29 TestDriverProtocolPart,30 CoverageProtocolPart)31from ..testrunner import Stop32from ..webdriver_server import GeckoDriverServer33def do_delayed_imports():34 global errors, marionette35 # Marionette client used to be called marionette, recently it changed36 # to marionette_driver for unfathomable reasons37 try:38 import marionette39 from marionette import errors40 except ImportError:41 from marionette_driver import marionette, errors42class MarionetteBaseProtocolPart(BaseProtocolPart):43 def __init__(self, parent):44 super(MarionetteBaseProtocolPart, self).__init__(parent)45 self.timeout = None46 def setup(self):47 self.marionette = self.parent.marionette48 def execute_script(self, script, async=False):49 method = self.marionette.execute_async_script if async else self.marionette.execute_script50 return method(script, new_sandbox=False, sandbox=None)51 def set_timeout(self, timeout):52 """Set the Marionette script timeout.53 :param timeout: Script timeout in seconds54 """55 if timeout != self.timeout:56 self.marionette.timeout.script = timeout57 self.timeout = timeout58 @property59 def current_window(self):60 return self.marionette.current_window_handle61 def set_window(self, handle):62 self.marionette.switch_to_window(handle)63 def wait(self):64 try:65 socket_timeout = self.marionette.client.socket_timeout66 except AttributeError:67 # This can happen if there was a crash68 return69 if socket_timeout:70 try:71 self.marionette.timeout.script = socket_timeout / 272 except (socket.error, IOError):73 self.logger.debug("Socket closed")74 return75 while True:76 try:77 self.marionette.execute_async_script("")78 except errors.NoSuchWindowException:79 # The window closed80 break81 except errors.ScriptTimeoutException:82 self.logger.debug("Script timed out")83 pass84 except (socket.timeout, IOError):85 self.logger.debug("Socket closed")86 break87 except Exception as e:88 self.logger.warning(traceback.format_exc(e))89 break90class MarionetteTestharnessProtocolPart(TestharnessProtocolPart):91 def __init__(self, parent):92 super(MarionetteTestharnessProtocolPart, self).__init__(parent)93 self.runner_handle = None94 def setup(self):95 self.marionette = self.parent.marionette96 def load_runner(self, url_protocol):97 # Check if we previously had a test window open, and if we did make sure it's closed98 self.parent.base.execute_script("if (window.win) {window.win.close()}")99 url = urlparse.urljoin(self.parent.executor.server_url(url_protocol),100 "/testharness_runner.html")101 self.logger.debug("Loading %s" % url)102 self.runner_handle = self.marionette.current_window_handle103 try:104 self.dismiss_alert(lambda: self.marionette.navigate(url))105 except Exception as e:106 self.logger.critical(107 "Loading initial page %s failed. Ensure that the "108 "there are no other programs bound to this port and "109 "that your firewall rules or network setup does not "110 "prevent access.\e%s" % (url, traceback.format_exc(e)))111 raise112 self.parent.base.execute_script(113 "document.title = '%s'" % threading.current_thread().name.replace("'", '"'))114 def close_old_windows(self, url_protocol):115 handles = self.marionette.window_handles116 runner_handle = None117 try:118 handles.remove(self.runner_handle)119 runner_handle = self.runner_handle120 except ValueError:121 # The runner window probably changed id but we can restore it122 # This isn't supposed to happen, but marionette ids are not yet stable123 # We assume that the first handle returned corresponds to the runner,124 # but it hopefully doesn't matter too much if that assumption is125 # wrong since we reload the runner in that tab anyway.126 runner_handle = handles.pop(0)127 for handle in handles:128 try:129 self.dismiss_alert(lambda: self.marionette.switch_to_window(handle))130 self.marionette.switch_to_window(handle)131 self.marionette.close()132 except errors.NoSuchWindowException:133 # We might have raced with the previous test to close this134 # window, skip it.135 pass136 self.marionette.switch_to_window(runner_handle)137 if runner_handle != self.runner_handle:138 self.load_runner(url_protocol)139 return self.runner_handle140 def dismiss_alert(self, f):141 while True:142 try:143 f()144 except errors.UnexpectedAlertOpen:145 alert = self.marionette.switch_to_alert()146 try:147 alert.dismiss()148 except errors.NoAlertPresentException:149 pass150 else:151 break152 def get_test_window(self, window_id, parent):153 test_window = None154 if window_id:155 try:156 # Try this, it's in Level 1 but nothing supports it yet157 win_s = self.parent.base.execute_script("return window['%s'];" % self.window_id)158 win_obj = json.loads(win_s)159 test_window = win_obj["window-fcc6-11e5-b4f8-330a88ab9d7f"]160 except Exception:161 pass162 if test_window is None:163 after = self.marionette.window_handles164 if len(after) == 2:165 test_window = next(iter(set(after) - set([parent])))166 elif after[0] == parent and len(after) > 2:167 # Hope the first one here is the test window168 test_window = after[1]169 else:170 raise Exception("unable to find test window")171 assert test_window != parent172 return test_window173class MarionettePrefsProtocolPart(PrefsProtocolPart):174 def setup(self):175 self.marionette = self.parent.marionette176 def set(self, name, value):177 if value.lower() not in ("true", "false"):178 try:179 int(value)180 except ValueError:181 value = "'%s'" % value182 else:183 value = value.lower()184 self.logger.info("Setting pref %s (%s)" % (name, value))185 script = """186 let prefInterface = Components.classes["@mozilla.org/preferences-service;1"]187 .getService(Components.interfaces.nsIPrefBranch);188 let pref = '%s';189 let type = prefInterface.getPrefType(pref);190 let value = %s;191 switch(type) {192 case prefInterface.PREF_STRING:193 prefInterface.setCharPref(pref, value);194 break;195 case prefInterface.PREF_BOOL:196 prefInterface.setBoolPref(pref, value);197 break;198 case prefInterface.PREF_INT:199 prefInterface.setIntPref(pref, value);200 break;201 }202 """ % (name, value)203 with self.marionette.using_context(self.marionette.CONTEXT_CHROME):204 self.marionette.execute_script(script)205 def clear(self, name):206 self.logger.info("Clearing pref %s" % (name))207 script = """208 let prefInterface = Components.classes["@mozilla.org/preferences-service;1"]209 .getService(Components.interfaces.nsIPrefBranch);210 let pref = '%s';211 prefInterface.clearUserPref(pref);212 """ % name213 with self.marionette.using_context(self.marionette.CONTEXT_CHROME):214 self.marionette.execute_script(script)215 def get(self, name):216 script = """217 let prefInterface = Components.classes["@mozilla.org/preferences-service;1"]218 .getService(Components.interfaces.nsIPrefBranch);219 let pref = '%s';220 let type = prefInterface.getPrefType(pref);221 switch(type) {222 case prefInterface.PREF_STRING:223 return prefInterface.getCharPref(pref);224 case prefInterface.PREF_BOOL:225 return prefInterface.getBoolPref(pref);226 case prefInterface.PREF_INT:227 return prefInterface.getIntPref(pref);228 case prefInterface.PREF_INVALID:229 return null;230 }231 """ % name232 with self.marionette.using_context(self.marionette.CONTEXT_CHROME):233 self.marionette.execute_script(script)234class MarionetteStorageProtocolPart(StorageProtocolPart):235 def setup(self):236 self.marionette = self.parent.marionette237 def clear_origin(self, url):238 self.logger.info("Clearing origin %s" % (url))239 script = """240 let url = '%s';241 let uri = Components.classes["@mozilla.org/network/io-service;1"]242 .getService(Ci.nsIIOService)243 .newURI(url);244 let ssm = Components.classes["@mozilla.org/scriptsecuritymanager;1"]245 .getService(Ci.nsIScriptSecurityManager);246 let principal = ssm.createCodebasePrincipal(uri, {});247 let qms = Components.classes["@mozilla.org/dom/quota-manager-service;1"]248 .getService(Components.interfaces.nsIQuotaManagerService);249 qms.clearStoragesForPrincipal(principal, "default", true);250 """ % url251 with self.marionette.using_context(self.marionette.CONTEXT_CHROME):252 self.marionette.execute_script(script)253class MarionetteAssertsProtocolPart(AssertsProtocolPart):254 def setup(self):255 self.assert_count = {"chrome": 0, "content": 0}256 self.chrome_assert_count = 0257 self.marionette = self.parent.marionette258 def get(self):259 script = """260 debug = Cc["@mozilla.org/xpcom/debug;1"].getService(Ci.nsIDebug2);261 if (debug.isDebugBuild) {262 return debug.assertionCount;263 }264 return 0;265 """266 def get_count(context, **kwargs):267 try:268 context_count = self.marionette.execute_script(script, **kwargs)269 if context_count:270 self.parent.logger.info("Got %s assert count %s" % (context, context_count))271 test_count = context_count - self.assert_count[context]272 self.assert_count[context] = context_count273 return test_count274 except errors.NoSuchWindowException:275 # If the window was already closed276 self.parent.logger.warning("Failed to get assertion count; window was closed")277 except (errors.MarionetteException, socket.error):278 # This usually happens if the process crashed279 pass280 counts = []281 with self.marionette.using_context(self.marionette.CONTEXT_CHROME):282 counts.append(get_count("chrome"))283 if self.parent.e10s:284 counts.append(get_count("content", sandbox="system"))285 counts = [item for item in counts if item is not None]286 if not counts:287 return None288 return sum(counts)289class MarionetteSelectorProtocolPart(SelectorProtocolPart):290 def setup(self):291 self.marionette = self.parent.marionette292 def elements_by_selector(self, selector):293 return self.marionette.find_elements("css selector", selector)294class MarionetteClickProtocolPart(ClickProtocolPart):295 def setup(self):296 self.marionette = self.parent.marionette297 def element(self, element):298 return element.click()299class MarionetteSendKeysProtocolPart(SendKeysProtocolPart):300 def setup(self):301 self.marionette = self.parent.marionette302 def send_keys(self, element, keys):303 return element.send_keys(keys)304class MarionetteTestDriverProtocolPart(TestDriverProtocolPart):305 def setup(self):306 self.marionette = self.parent.marionette307 def send_message(self, message_type, status, message=None):308 obj = {309 "type": "testdriver-%s" % str(message_type),310 "status": str(status)311 }312 if message:313 obj["message"] = str(message)314 self.parent.base.execute_script("window.postMessage(%s, '*')" % json.dumps(obj))315class MarionetteCoverageProtocolPart(CoverageProtocolPart):316 def setup(self):317 self.marionette = self.parent.marionette318 if not self.parent.ccov:319 self.is_enabled = False320 return321 script = """322 ChromeUtils.import("chrome://marionette/content/PerTestCoverageUtils.jsm");323 return PerTestCoverageUtils.enabled;324 """325 with self.marionette.using_context(self.marionette.CONTEXT_CHROME):326 self.is_enabled = self.marionette.execute_script(script)327 def reset(self):328 script = """329 var callback = arguments[arguments.length - 1];330 ChromeUtils.import("chrome://marionette/content/PerTestCoverageUtils.jsm");331 PerTestCoverageUtils.beforeTest().then(callback, callback);332 """333 with self.marionette.using_context(self.marionette.CONTEXT_CHROME):334 try:335 error = self.marionette.execute_async_script(script)336 if error is not None:337 raise Exception('Failure while resetting counters: %s' % json.dumps(error))338 except (errors.MarionetteException, socket.error):339 # This usually happens if the process crashed340 pass341 def dump(self):342 if len(self.marionette.window_handles):343 handle = self.marionette.window_handles[0]344 self.marionette.switch_to_window(handle)345 script = """346 var callback = arguments[arguments.length - 1];347 ChromeUtils.import("chrome://marionette/content/PerTestCoverageUtils.jsm");348 PerTestCoverageUtils.afterTest().then(callback, callback);349 """350 with self.marionette.using_context(self.marionette.CONTEXT_CHROME):351 try:352 error = self.marionette.execute_async_script(script)353 if error is not None:354 raise Exception('Failure while dumping counters: %s' % json.dumps(error))355 except (errors.MarionetteException, socket.error):356 # This usually happens if the process crashed357 pass358class MarionetteProtocol(Protocol):359 implements = [MarionetteBaseProtocolPart,360 MarionetteTestharnessProtocolPart,361 MarionettePrefsProtocolPart,362 MarionetteStorageProtocolPart,363 MarionetteSelectorProtocolPart,364 MarionetteClickProtocolPart,365 MarionetteSendKeysProtocolPart,366 MarionetteTestDriverProtocolPart,367 MarionetteAssertsProtocolPart,368 MarionetteCoverageProtocolPart]369 def __init__(self, executor, browser, capabilities=None, timeout_multiplier=1, e10s=True, ccov=False):370 do_delayed_imports()371 super(MarionetteProtocol, self).__init__(executor, browser)372 self.marionette = None373 self.marionette_port = browser.marionette_port374 self.capabilities = capabilities375 self.timeout_multiplier = timeout_multiplier376 self.runner_handle = None377 self.e10s = e10s378 self.ccov = ccov379 def connect(self):380 self.logger.debug("Connecting to Marionette on port %i" % self.marionette_port)381 startup_timeout = marionette.Marionette.DEFAULT_STARTUP_TIMEOUT * self.timeout_multiplier382 self.marionette = marionette.Marionette(host='127.0.0.1',383 port=self.marionette_port,384 socket_timeout=None,385 startup_timeout=startup_timeout)386 self.logger.debug("Waiting for Marionette connection")387 while True:388 try:389 self.marionette.raise_for_port()390 break391 except IOError:392 # When running in a debugger wait indefinitely for Firefox to start393 if self.executor.debug_info is None:394 raise395 self.logger.debug("Starting Marionette session")396 self.marionette.start_session(self.capabilities)397 self.logger.debug("Marionette session started")398 def after_connect(self):399 self.testharness.load_runner(self.executor.last_environment["protocol"])400 def teardown(self):401 try:402 self.marionette._request_in_app_shutdown()403 self.marionette.delete_session(send_request=False)404 except Exception:405 # This is typically because the session never started406 pass407 if self.marionette is not None:408 del self.marionette409 super(MarionetteProtocol, self).teardown()410 @property411 def is_alive(self):412 try:413 self.marionette.current_window_handle414 except Exception:415 return False416 return True417 def on_environment_change(self, old_environment, new_environment):418 #Unset all the old prefs419 for name in old_environment.get("prefs", {}).iterkeys():420 value = self.executor.original_pref_values[name]421 if value is None:422 self.prefs.clear(name)423 else:424 self.prefs.set(name, value)425 for name, value in new_environment.get("prefs", {}).iteritems():426 self.executor.original_pref_values[name] = self.prefs.get(name)427 self.prefs.set(name, value)428class ExecuteAsyncScriptRun(object):429 def __init__(self, logger, func, protocol, url, timeout):430 self.logger = logger431 self.result = (None, None)432 self.protocol = protocol433 self.func = func434 self.url = url435 self.timeout = timeout436 self.result_flag = threading.Event()437 def run(self):438 index = self.url.rfind("/storage/")439 if index != -1:440 # Clear storage441 self.protocol.storage.clear_origin(self.url)442 timeout = self.timeout443 try:444 if timeout is not None:445 self.protocol.base.set_timeout(timeout + extra_timeout)446 else:447 # We just want it to never time out, really, but marionette doesn't448 # make that possible. It also seems to time out immediately if the449 # timeout is set too high. This works at least.450 self.protocol.base.set_timeout(2**28 - 1)451 except IOError:452 self.logger.error("Lost marionette connection before starting test")453 return Stop454 if timeout is not None:455 wait_timeout = timeout + 2 * extra_timeout456 else:457 wait_timeout = None458 timer = threading.Timer(wait_timeout, self._timeout)459 timer.start()460 self._run()461 self.result_flag.wait()462 timer.cancel()463 if self.result == (None, None):464 self.logger.debug("Timed out waiting for a result")465 self.result = False, ("EXTERNAL-TIMEOUT", None)466 elif self.result[1] is None:467 # We didn't get any data back from the test, so check if the468 # browser is still responsive469 if self.protocol.is_alive:470 self.result = False, ("INTERNAL-ERROR", None)471 else:472 self.result = False, ("CRASH", None)473 return self.result474 def _run(self):475 try:476 self.result = True, self.func(self.protocol, self.url, self.timeout)477 except errors.ScriptTimeoutException:478 self.logger.debug("Got a marionette timeout")479 self.result = False, ("EXTERNAL-TIMEOUT", None)480 except (socket.timeout, IOError):481 # This can happen on a crash482 # Also, should check after the test if the firefox process is still running483 # and otherwise ignore any other result and set it to crash484 self.result = False, ("CRASH", None)485 except Exception as e:486 message = getattr(e, "message", "")487 if message:488 message += "\n"489 message += traceback.format_exc(e)490 self.logger.warning(message)491 self.result = False, ("INTERNAL-ERROR", None)492 finally:493 self.result_flag.set()494 def _timeout(self):495 self.result = False, ("EXTERNAL-TIMEOUT", None)496 self.result_flag.set()497class MarionetteTestharnessExecutor(TestharnessExecutor):498 supports_testdriver = True499 def __init__(self, browser, server_config, timeout_multiplier=1,500 close_after_done=True, debug_info=None, capabilities=None,501 debug=False, ccov=False, **kwargs):502 """Marionette-based executor for testharness.js tests"""503 TestharnessExecutor.__init__(self, browser, server_config,504 timeout_multiplier=timeout_multiplier,505 debug_info=debug_info)506 self.protocol = MarionetteProtocol(self, browser, capabilities, timeout_multiplier, kwargs["e10s"], ccov)507 self.script = open(os.path.join(here, "testharness_webdriver.js")).read()508 self.script_resume = open(os.path.join(here, "testharness_webdriver_resume.js")).read()509 self.close_after_done = close_after_done510 self.window_id = str(uuid.uuid4())511 self.debug = debug512 self.original_pref_values = {}513 if marionette is None:514 do_delayed_imports()515 def is_alive(self):516 return self.protocol.is_alive517 def on_environment_change(self, new_environment):518 self.protocol.on_environment_change(self.last_environment, new_environment)519 if new_environment["protocol"] != self.last_environment["protocol"]:520 self.protocol.testharness.load_runner(new_environment["protocol"])521 def do_test(self, test):522 timeout = (test.timeout * self.timeout_multiplier if self.debug_info is None523 else None)524 success, data = ExecuteAsyncScriptRun(self.logger,525 self.do_testharness,526 self.protocol,527 self.test_url(test),528 timeout).run()529 # The format of data depends on whether the test ran to completion or not530 # For asserts we only care about the fact that if it didn't complete, the531 # status is in the first field.532 status = None533 if not success:534 status = data[0]535 extra = None536 if self.debug and (success or status not in ("CRASH", "INTERNAL-ERROR")):537 assertion_count = self.protocol.asserts.get()538 if assertion_count is not None:539 extra = {"assertion_count": assertion_count}540 if success:541 return self.convert_result(test, data, extra=extra)542 return (test.result_cls(extra=extra, *data), [])543 def do_testharness(self, protocol, url, timeout):544 protocol.base.execute_script("if (window.win) {window.win.close()}")545 parent_window = protocol.testharness.close_old_windows(protocol)546 if timeout is not None:547 timeout_ms = str(timeout * 1000)548 else:549 timeout_ms = "null"550 if self.protocol.coverage.is_enabled:551 self.protocol.coverage.reset()552 format_map = {"abs_url": url,553 "url": strip_server(url),554 "window_id": self.window_id,555 "timeout_multiplier": self.timeout_multiplier,556 "timeout": timeout_ms,557 "explicit_timeout": timeout is None}558 script = self.script % format_map559 rv = protocol.base.execute_script(script)560 test_window = protocol.testharness.get_test_window(self.window_id, parent_window)561 handler = CallbackHandler(self.logger, protocol, test_window)562 while True:563 result = protocol.base.execute_script(564 self.script_resume % format_map, async=True)565 if result is None:566 # This can happen if we get an content process crash567 return None568 done, rv = handler(result)569 if done:570 break571 if self.protocol.coverage.is_enabled:572 self.protocol.coverage.dump()573 return rv574class MarionetteRefTestExecutor(RefTestExecutor):575 def __init__(self, browser, server_config, timeout_multiplier=1,576 screenshot_cache=None, close_after_done=True,577 debug_info=None, reftest_internal=False,578 reftest_screenshot="unexpected", ccov=False,579 group_metadata=None, capabilities=None, debug=False, **kwargs):580 """Marionette-based executor for reftests"""581 RefTestExecutor.__init__(self,582 browser,583 server_config,584 screenshot_cache=screenshot_cache,585 timeout_multiplier=timeout_multiplier,586 debug_info=debug_info)587 self.protocol = MarionetteProtocol(self, browser, capabilities,588 timeout_multiplier, kwargs["e10s"],589 ccov)590 self.implementation = (InternalRefTestImplementation591 if reftest_internal592 else RefTestImplementation)(self)593 self.implementation_kwargs = ({"screenshot": reftest_screenshot} if594 reftest_internal else {})595 self.close_after_done = close_after_done596 self.has_window = False597 self.original_pref_values = {}598 self.group_metadata = group_metadata599 self.debug = debug600 with open(os.path.join(here, "reftest.js")) as f:601 self.script = f.read()602 with open(os.path.join(here, "reftest-wait_marionette.js")) as f:603 self.wait_script = f.read()604 def setup(self, runner):605 super(self.__class__, self).setup(runner)606 self.implementation.setup(**self.implementation_kwargs)607 def teardown(self):608 try:609 self.implementation.teardown()610 handle = self.protocol.marionette.window_handles[0]611 self.protocol.marionette.switch_to_window(handle)612 super(self.__class__, self).teardown()613 except Exception as e:614 # Ignore errors during teardown615 self.logger.warning(traceback.format_exc(e))616 def is_alive(self):617 return self.protocol.is_alive618 def on_environment_change(self, new_environment):619 self.protocol.on_environment_change(self.last_environment, new_environment)620 def do_test(self, test):621 if not isinstance(self.implementation, InternalRefTestImplementation):622 if self.close_after_done and self.has_window:623 self.protocol.marionette.close()624 self.protocol.marionette.switch_to_window(625 self.protocol.marionette.window_handles[-1])626 self.has_window = False627 if not self.has_window:628 self.protocol.base.execute_script(self.script)629 self.protocol.base.set_window(self.protocol.marionette.window_handles[-1])630 self.has_window = True631 if self.protocol.coverage.is_enabled:632 self.protocol.coverage.reset()633 result = self.implementation.run_test(test)634 if self.protocol.coverage.is_enabled:635 self.protocol.coverage.dump()636 if self.debug:637 assertion_count = self.protocol.asserts.get()638 if "extra" not in result:639 result["extra"] = {}640 result["extra"]["assertion_count"] = assertion_count641 return self.convert_result(test, result)642 def screenshot(self, test, viewport_size, dpi):643 # https://github.com/w3c/wptrunner/issues/166644 assert viewport_size is None645 assert dpi is None646 timeout = self.timeout_multiplier * test.timeout if self.debug_info is None else None647 test_url = self.test_url(test)648 return ExecuteAsyncScriptRun(self.logger,649 self._screenshot,650 self.protocol,651 test_url,652 timeout).run()653 def _screenshot(self, protocol, url, timeout):654 protocol.marionette.navigate(url)655 protocol.base.execute_script(self.wait_script, async=True)656 screenshot = protocol.marionette.screenshot(full=False)657 # strip off the data:img/png, part of the url658 if screenshot.startswith("data:image/png;base64,"):659 screenshot = screenshot.split(",", 1)[1]660 return screenshot661class InternalRefTestImplementation(object):662 def __init__(self, executor):663 self.timeout_multiplier = executor.timeout_multiplier664 self.executor = executor665 @property666 def logger(self):667 return self.executor.logger668 def setup(self, screenshot="unexpected"):669 data = {"screenshot": screenshot}670 if self.executor.group_metadata is not None:671 data["urlCount"] = {urlparse.urljoin(self.executor.server_url(key[0]), key[1]):value672 for key, value in self.executor.group_metadata.get("url_count", {}).iteritems()673 if value > 1}674 self.executor.protocol.marionette.set_context(self.executor.protocol.marionette.CONTEXT_CHROME)675 self.executor.protocol.marionette._send_message("reftest:setup", data)676 def run_test(self, test):677 references = self.get_references(test)678 rv = self.executor.protocol.marionette._send_message("reftest:run",679 {"test": self.executor.test_url(test),680 "references": references,681 "expected": test.expected(),682 "timeout": test.timeout * 1000})["value"]683 return rv684 def get_references(self, node):685 rv = []686 for item, relation in node.references:687 rv.append([self.executor.test_url(item), self.get_references(item), relation])688 return rv689 def teardown(self):690 try:691 self.executor.protocol.marionette._send_message("reftest:teardown", {})692 self.executor.protocol.marionette.set_context(self.executor.protocol.marionette.CONTEXT_CONTENT)693 except Exception as e:694 # Ignore errors during teardown695 self.logger.warning(traceback.format_exc(e))696class GeckoDriverProtocol(WebDriverProtocol):697 server_cls = GeckoDriverServer698class MarionetteWdspecExecutor(WdspecExecutor):...
test_navigation.py
Source:test_navigation.py
1# ***** BEGIN LICENSE BLOCK *****2# Version: MPL 1.1/GPL 2.0/LGPL 2.1 3# 4# The contents of this file are subject to the Mozilla Public License Version 5# 1.1 (the "License"); you may not use this file except in compliance with 6# the License. You may obtain a copy of the License at 7# http://www.mozilla.org/MPL/ # 8# Software distributed under the License is distributed on an "AS IS" basis, 9# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License 10# for the specific language governing rights and limitations under the 11# License. 12# 13# The Original Code is Marionette Client. 14# 15# The Initial Developer of the Original Code is 16# Mozilla Foundation. 17# Portions created by the Initial Developer are Copyright (C) 201118# the Initial Developer. All Rights Reserved. 19# 20# Contributor(s): 21# 22# Alternatively, the contents of this file may be used under the terms of 23# either the GNU General Public License Version 2 or later (the "GPL"), or 24# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), 25# in which case the provisions of the GPL or the LGPL are applicable instead 26# of those above. If you wish to allow use of your version of this file only 27# under the terms of either the GPL or the LGPL, and not to allow others to 28# use your version of this file under the terms of the MPL, indicate your 29# decision by deleting the provisions above and replace them with the notice 30# and other provisions required by the GPL or the LGPL. If you do not delete 31# the provisions above, a recipient may use your version of this file under 32# the terms of any one of the MPL, the GPL or the LGPL. 33# 34# ***** END LICENSE BLOCK ***** 35import os36from marionette_test import MarionetteTestCase37class TestNavigate(MarionetteTestCase):38 def test_navigate(self):39 self.assertTrue(self.marionette.execute_script("window.location.href = 'about:blank'; return true;"))40 self.assertEqual("about:blank", self.marionette.execute_script("return window.location.href;"))41 test_html = self.marionette.absolute_url("test.html")42 self.marionette.navigate(test_html)43 self.assertNotEqual("about:blank", self.marionette.execute_script("return window.location.href;"))44 self.assertEqual("Marionette Test", self.marionette.execute_script("return window.document.title;"))45 def test_getUrl(self):46 test_html = self.marionette.absolute_url("test.html")47 self.marionette.navigate(test_html)48 self.assertTrue(test_html in self.marionette.get_url())49 self.marionette.navigate("about:blank")50 self.assertEqual("about:blank", self.marionette.get_url())51 def test_goBack(self):52 self.assertTrue(self.marionette.execute_script("window.location.href = 'about:blank'; return true;"))53 self.assertEqual("about:blank", self.marionette.execute_script("return window.location.href;"))54 test_html = self.marionette.absolute_url("test.html")55 self.marionette.navigate(test_html)56 self.assertNotEqual("about:blank", self.marionette.execute_script("return window.location.href;"))57 self.assertEqual("Marionette Test", self.marionette.execute_script("return window.document.title;"))58 self.marionette.navigate("about:blank")59 self.assertEqual("about:blank", self.marionette.execute_script("return window.location.href;"))60 self.marionette.go_back()61 self.assertNotEqual("about:blank", self.marionette.execute_script("return window.location.href;"))62 self.assertEqual("Marionette Test", self.marionette.execute_script("return window.document.title;"))63 def test_goForward(self):64 self.assertTrue(self.marionette.execute_script("window.location.href = 'about:blank'; return true;"))65 self.assertEqual("about:blank", self.marionette.execute_script("return window.location.href;"))66 test_html = self.marionette.absolute_url("test.html")67 self.marionette.navigate(test_html)68 self.assertNotEqual("about:blank", self.marionette.execute_script("return window.location.href;"))69 self.assertEqual("Marionette Test", self.marionette.execute_script("return window.document.title;"))70 self.marionette.navigate("about:blank")71 self.assertEqual("about:blank", self.marionette.execute_script("return window.location.href;"))72 self.marionette.go_back()73 self.assertNotEqual("about:blank", self.marionette.execute_script("return window.location.href;"))74 self.assertEqual("Marionette Test", self.marionette.execute_script("return window.document.title;"))75 self.marionette.go_forward()76 self.assertEqual("about:blank", self.marionette.execute_script("return window.location.href;"))77 def test_refresh(self):78 test_html = self.marionette.absolute_url("test.html")79 self.marionette.navigate(test_html)80 self.assertEqual("Marionette Test", self.marionette.execute_script("return window.document.title;"))81 self.assertTrue(self.marionette.execute_script("var elem = window.document.createElement('div'); elem.id = 'someDiv';" +82 "window.document.body.appendChild(elem); return true;"))83 self.assertFalse(self.marionette.execute_script("return window.document.getElementById('someDiv') == undefined;"))84 self.marionette.refresh()85 self.assertEqual("Marionette Test", self.marionette.execute_script("return window.document.title;"))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!