Best Python code snippet using yandex-tank
test_tap.py
Source:test_tap.py
1#2# Copyright (C) 2011 Loic Dachary <loic@dachary.org>3#4# This software's license gives you freedom; you can copy, convey,5# propagate, redistribute and/or modify this program under the terms of6# the GNU Affero General Public License (AGPL) as published by the Free7# Software Foundation (FSF), either version 3 of the License, or (at your8# option) any later version of the AGPL published by the FSF.9#10# This program is distributed in the hope that it will be useful, but11# WITHOUT ANY WARRANTY; without even the implied warranty of12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero13# General Public License for more details.14#15# You should have received a copy of the GNU Affero General Public License16# along with this program in a file in the toplevel directory called17# "AGPLv3". If not, see <http://www.gnu.org/licenses/>.18#19import sys20import os21sys.path.insert(0, os.path.abspath("..")) # so that for M-x pdb works22from twisted.trial import unittest, runner, reporter23from twisted.web import client24from cardstories import tap25from cardstories.service import CardstoriesService26class CardstoriesPluginsTest(unittest.TestCase):27 def test00_start_stop(self):28 self.port = '14834'29 options = tap.Options()30 options.parseOptions(['--port', self.port, '--db', 'test.sqlite',31 '--plugins-dir', '..',32 '--plugins', 'plugin'])33 self.service = tap.makeService(options)34 for service in self.service:35 if isinstance(service, CardstoriesService):36 break37 self.service.startService()38 self.assertEqual(service.plugin_event['type'], 'start')39 d = self.service.stopService()40 self.assertEqual(service.plugin_event['type'], 'stop')41 return d42class CardstoriesServerTest(unittest.TestCase):43 def setUp(self):44 self.port = '14834'45 options = tap.Options()46 options.parseOptions(['--port', self.port, '--db', 'test.sqlite' ])47 self.service = tap.makeService(options)48 self.service.startService()49 def tearDown(self):50 return self.service.stopService()51 def test01_connect(self):52 d = client.getPage('http://127.0.0.1:%s/resource' % self.port)53 def check(result):54 self.assertEquals('true', result)55 d.addCallback(check)56 return d57 def test02_parseOptions(self):58 settings = tap.Options()59 pem = 'cert.pem'60 db = 'base.sqlite'61 ssl_port = '8080'62 internal_secret = 'My Most Secret Key'63 plugins = 'P1 P2 P3'64 plugins_dir = 'PLUGINDIR'65 plugins_libdir = 'PLUGINDIR/LIB'66 plugins_logdir = 'LOGDIR'67 plugins_confdir = 'PLUGINDIR/CONF'68 plugins_pre_process = 'P1 P2'69 plugins_post_process = 'P2 P1'70 settings.parseOptions(['--port', self.port,71 '--ssl-port', ssl_port,72 '--ssl-pem', pem,73 '--verbose',74 '--db', db,75 '--internal-secret', internal_secret,76 '--plugins', plugins,77 '--plugins-dir', plugins_dir,78 '--plugins-libdir', plugins_libdir,79 '--plugins-logdir', plugins_logdir,80 '--plugins-confdir', plugins_confdir,81 '--plugins-pre-process', plugins_pre_process,82 '--plugins-post-process', plugins_post_process83 ])84 self.assertEquals(pem, settings['ssl-pem'])85 self.assertEquals(1, settings['verbose'])86 self.assertEquals(db, settings['db'])87 self.assertEquals(internal_secret, settings['internal-secret'])88 self.assertEquals(plugins, settings['plugins'])89 self.assertEquals(plugins_dir, settings['plugins-dir'])90 self.assertEquals(plugins_libdir, settings['plugins-libdir'])91 self.assertEquals(plugins_confdir, settings['plugins-confdir'])92 self.assertEquals(plugins_logdir, settings['plugins-logdir'])93 self.assertEquals(plugins_pre_process, settings['plugins-pre-process'])94 self.assertEquals(plugins_post_process, settings['plugins-post-process'])95 self.assertEquals(int(self.port), settings['port'])96 self.assertEquals(int(ssl_port), settings['ssl-port'])97 settings.parseOptions(['-p', self.port,98 '-s', ssl_port,99 '-P', pem,100 '-v',101 ])102 self.assertEquals(pem, settings['ssl-pem'])103 self.assertEquals(1, settings['verbose'])104 self.assertEquals(db, settings['db'])105 self.assertEquals(int(self.port), settings['port'])106 self.assertEquals(int(ssl_port), settings['ssl-port'])107 108# Dummy CERT borrowed from Debian's snake-oil certificate. Including it109# here since I can't assume what distribution I am on.110snake_oil_cert = """-----BEGIN CERTIFICATE-----111MIIDKzCCApQCCQDEKuqSPjfcEDANBgkqhkiG9w0BAQUFADCB2TELMAkGA1UEBhMC112WFgxKjAoBgNVBAgTIVRoZXJlIGlzIG5vIHN1Y2ggdGhpbmcgb3V0c2lkZSBVUzET113MBEGA1UEBxMKRXZlcnl3aGVyZTEOMAwGA1UEChMFT0NPU0ExPDA6BgNVBAsTM09m114ZmljZSBmb3IgQ29tcGxpY2F0aW9uIG9mIE90aGVyd2lzZSBTaW1wbGUgQWZmYWly115czEXMBUGA1UEAxMObWFwbGUuc2ZsYy12cG4xIjAgBgkqhkiG9w0BCQEWE3Jvb3RA116bWFwbGUuc2ZsYy12cG4wHhcNMDkwMTAyMTg1NzA0WhcNMDkwMjAxMTg1NzA0WjCB1172TELMAkGA1UEBhMCWFgxKjAoBgNVBAgTIVRoZXJlIGlzIG5vIHN1Y2ggdGhpbmcg118b3V0c2lkZSBVUzETMBEGA1UEBxMKRXZlcnl3aGVyZTEOMAwGA1UEChMFT0NPU0Ex119PDA6BgNVBAsTM09mZmljZSBmb3IgQ29tcGxpY2F0aW9uIG9mIE90aGVyd2lzZSBT120aW1wbGUgQWZmYWlyczEXMBUGA1UEAxMObWFwbGUuc2ZsYy12cG4xIjAgBgkqhkiG1219w0BCQEWE3Jvb3RAbWFwbGUuc2ZsYy12cG4wgZ8wDQYJKoZIhvcNAQEBBQADgY0A122MIGJAoGBAO0t+HjxiiliSHO9kge943+cXHGCtJp4/RPpHDN7hbpblY+FYCjuCmW/123/m2G59aMMl2Uwj1BO8cDwdGDtkNV21vcIo0siSD9VREFiYcLthaOK98muqD+Tfqa124MuGzZyui1RKuirCZzqyJPS2SXOtWSXUW8YQa75y/o4vcQSWWZ3VDAgMBAAEwDQYJ125KoZIhvcNAQEFBQADgYEApx7Q+PzLgdJu7OQJ776Kr+EI+Ho03pM5Nb5e26P5ZL6h126hk+gRLfBt8q3bihx4qjBSOpx1Qxq+BAMg6SAkDzkz+tN2CSr/vv2uuc26cDaf1co127oKCay2gMThIoURl+FSPeWAraGWbrcVy9ctoCipxMza9fn42dbn9OHxP/M+0qgvY=128-----END CERTIFICATE-----129-----BEGIN RSA PRIVATE KEY-----130MIICXQIBAAKBgQDtLfh48YopYkhzvZIHveN/nFxxgrSaeP0T6Rwze4W6W5WPhWAo1317gplv/5thufWjDJdlMI9QTvHA8HRg7ZDVdtb3CKNLIkg/VURBYmHC7YWjivfJrqg132/k36mjLhs2crotUSroqwmc6siT0tklzrVkl1FvGEGu+cv6OL3EEllmd1QwIDAQAB133AoGBAL4ws+QABIOE/YZaSKSOn8Rv1S1s23hXdtGlh2i9L5It6LOrB14q7AmFuPeJ134S5We3LBwHoZSLiY7nAtvLBO44GmwpSiJuLaI0z/7YIqkS6KjiDy1GFdQ5IEaRzxK135nyDcvES4h4QdOa/UeMEWg8TmasEoG3Wm3+aZt5KRz57HQQJRAkEA/uN0aw+1jqVP136YKbG89k7DEdNOdfgLjFofXruwXPfQmEFNg3Vp5ke1SeaR0tzYDXgZ5fDlwnR0EgA137HrR0om3PKwJBAO42vxdAVjrfMt0ws0wTmKS7mLlY8p7dKVKKIwP6F2b/61QyEX7z138czjyBaegw8qbX93OD0g2TETms73Py4WFJkkCQBV97FUSsAZlHfpSVbg9+uKgKHzW139HQsIE31xHiylro+USrIyHG/TU2w5uKKGVCYqpM9XVqCnrU9Yotnz8Vm41J0CQQCf140VccjikkjP8AJ61VCgakMJt7UuwYt9Mh7CSK6ukGFB5Ek1AiX3ccoQ9o8cXAEyUCq141X/Yg2xDQ1W9Mev0q5hDhAkBKSJF0V/24bz27z1yUSzHRHO3FAKXepkR81g6IRl41142r9nOQTOBo04TLBXtyP+o7GFNzBjEm6fVaqwk5SVsdQ+t143-----END RSA PRIVATE KEY-----144"""145class CardstoriesServerTestSSL(unittest.TestCase):146 def setUp(self):147 self.port = '14834'148 self.ssl_port = '14835'149 pem = "cardstories.pem"150 fd = open(pem, "w")151 fd.write(snake_oil_cert)152 fd.close()153 options = tap.Options()154 options.parseOptions(['--port', self.port, '--ssl-port', self.ssl_port, '--ssl-pem', pem, '--db', 'test.sqlite' ])155 self.service = tap.makeService(options)156 self.service.startService()157 def tearDown(self):158 return self.service.stopService()159 def test01(self):160 d = client.getPage('https://127.0.0.1:%s/resource' % self.ssl_port)161 def check(result):162 self.assertEquals('{}', result)163 d.addCallback(check)164 return d165def Run():166 loader = runner.TestLoader()167# loader.methodPrefix = "test_trynow"168 suite = loader.suiteFactory()169 suite.addTest(loader.loadClass(CardstoriesServerTest))170 suite.addTest(loader.loadClass(CardstoriesPluginsTest))171# suite.addTest(loader.loadClass(CardstoriesServerTestSSL))172 return runner.TrialRunner(173 reporter.VerboseTextReporter,174 tracebackFormat='default',175 ).run(suite)176if __name__ == '__main__':177 if Run().wasSuccessful():178 sys.exit(0)179 else:180 sys.exit(1)181# Interpreted by emacs182# Local Variables:183# compile-command: "python-coverage -e ; python-coverage -x test_tap.py ; python-coverage -m -a -r ../cardstories/tap.py"...
apiworker.py
Source:apiworker.py
...88 raw_input("Press Enter key to start test")89 self.core.plugins_start_test()90 retcode = self.core.wait_for_finish()91 retcode = self.core.plugins_end_test(retcode)92 retcode = self.core.plugins_post_process(retcode)93 except KeyboardInterrupt as ex:94 self.log.info(95 "Do not press Ctrl+C again, the test will be broken otherwise")96 self.log.debug(97 "Caught KeyboardInterrupt: %s", traceback.format_exc(ex))98 try:99 retcode = self.__graceful_shutdown()100 except KeyboardInterrupt as ex:101 self.log.debug(102 "Caught KeyboardInterrupt again: %s",103 traceback.format_exc(ex))104 self.log.info(105 "User insists on exiting, aborting graceful shutdown...")106 retcode = 1107 except Exception as ex:108 self.log.info("Exception: %s", traceback.format_exc(ex))109 self.log.error("%s", ex)110 retcode = self.__graceful_shutdown()111 self.core.release_lock()112 self.log.info("Done performing test with code %s", retcode)113 return retcode114 def get_default_configs(self):115 """ returns default configs list, from /etc, home dir and package_data"""116 # initialize basic defaults117 configs = [resource_filename(__name__, 'config/00-base.ini')]118 try:119 conf_files = sorted(os.listdir(self.baseconfigs_location))120 for filename in conf_files:121 if fnmatch.fnmatch(filename, '*.ini'):122 configs += [123 os.path.realpath(124 self.baseconfigs_location + os.sep + filename)125 ]126 except OSError:127 self.log.warn(128 self.baseconfigs_location +129 ' is not accessible to get configs list')130 configs += [os.path.expanduser('~/.yandex-tank')]131 return configs132 def __graceful_shutdown(self):133 """ call shutdown routines """134 retcode = 1135 self.log.info("Trying to shutdown gracefully...")136 retcode = self.core.plugins_end_test(retcode)137 retcode = self.core.plugins_post_process(retcode)138 self.log.info("Done graceful shutdown")139 return retcode140class SingleLevelFilter(logging.Filter):141 """Exclude or approve one msg type at a time. """142 def __init__(self, passlevel, reject):143 logging.Filter.__init__(self)144 self.passlevel = passlevel145 self.reject = reject146 def filter(self, record):147 if self.reject:148 return record.levelno != self.passlevel149 else:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!