Best Python code snippet using localstack_python
test_pxf.py
Source:test_pxf.py
1#!/usr/bin/env python2'''3Licensed to the Apache Software Foundation (ASF) under one4or more contributor license agreements. See the NOTICE file5distributed with this work for additional information6regarding copyright ownership. The ASF licenses this file7to you under the Apache License, Version 2.0 (the8"License"); you may not use this file except in compliance9with the License. You may obtain a copy of the License at10 http://www.apache.org/licenses/LICENSE-2.011Unless required by applicable law or agreed to in writing, software12distributed under the License is distributed on an "AS IS" BASIS,13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14See the License for the specific language governing permissions and15limitations under the License.16'''17import copy18import json19import os20from mock.mock import patch21from stacks.utils.RMFTestCase import Template, RMFTestCase22class TestPxf(RMFTestCase):23 PXF_PACKAGE_DIR = "PXF/3.0.0/package"24 STACK_VERSION = "2.3"25 PXF_USER = 'pxf'26 PXF_GROUP = 'pxf'27 TOMCAT_GROUP = 'tomcat'28 BASH_SHELL = '/bin/bash'29 DEFAULT_TIMEOUT = 60030 CONFIG_FILE = os.path.join(os.path.dirname(__file__), '../configs/pxf_default.json')31 with open(CONFIG_FILE, "r") as f:32 pxf_config = json.load(f)33 def setUp(self):34 self.config_dict = copy.deepcopy(self.pxf_config)35 def assert_configure_default(self):36 self.assertResourceCalled('User', self.PXF_USER,37 groups=[self.getConfig()['configurations']['hdfs-site']['dfs.permissions.superusergroup'],38 self.getConfig()['configurations']['cluster-env']['user_group'],39 self.TOMCAT_GROUP],40 shell=self.BASH_SHELL)41 self.assertResourceCalled('File', '/etc/pxf/conf/pxf-env.sh',42 content=Template('pxf-env.j2'))43 self.assertResourceCalled('File', '/etc/pxf/conf/pxf-public.classpath',44 content = self.getConfig()['configurations']['pxf-public-classpath']['content'].lstrip())45 self.assertResourceCalled('File', '/etc/pxf/conf/pxf-profiles.xml',46 content = self.getConfig()['configurations']['pxf-profiles']['content'].lstrip())47 self.assertResourceCalled('XmlConfig', 'pxf-site.xml',48 conf_dir='/etc/pxf/conf',49 configurations=self.getConfig()['configurations']['pxf-site'],50 configuration_attributes=self.getConfig()['configuration_attributes']['pxf-site'])51 self.assertResourceCalled('Execute', 'service pxf-service init',52 timeout=self.DEFAULT_TIMEOUT,53 logoutput=True)54 @patch('shutil.copy2')55 def test_install_default(self, shutil_copy2_mock):56 self.executeScript(self.PXF_PACKAGE_DIR + "/scripts/pxf.py",57 classname="Pxf",58 command="install",59 config_dict=self.config_dict,60 stack_version=self.STACK_VERSION,61 target=RMFTestCase.TARGET_COMMON_SERVICES,62 try_install=True)63 self.assertResourceCalled('Package', 'pxf-service',64 retry_count=5,65 retry_on_repo_unavailability=False)66 self.assertResourceCalled('Package', 'apache-tomcat',67 retry_count=5,68 retry_on_repo_unavailability=False)69 self.assertResourceCalled('Package', 'pxf-hive',70 retry_count=5,71 retry_on_repo_unavailability=False)72 self.assertResourceCalled('Package', 'pxf-hdfs',73 retry_count=5,74 retry_on_repo_unavailability=False)75 self.assertResourceCalled('Package', 'pxf-hbase',76 retry_count=5,77 retry_on_repo_unavailability=False)78 self.assertResourceCalled('Package', 'pxf-json',79 retry_count=5,80 retry_on_repo_unavailability=False)81 self.assert_configure_default()82 @patch('shutil.copy2')83 def test_configure_default(self, shutil_copy2_mock):84 self.executeScript(self.PXF_PACKAGE_DIR + "/scripts/pxf.py",85 classname="Pxf",86 command="configure",87 config_dict=self.config_dict,88 stack_version=self.STACK_VERSION,89 target=RMFTestCase.TARGET_COMMON_SERVICES,90 try_install=True)91 self.assert_configure_default()92 @patch('shutil.copy2')93 def test_start_default(self, shutil_copy2_mock):94 self.executeScript(self.PXF_PACKAGE_DIR + "/scripts/pxf.py",95 classname="Pxf",96 command="start",97 config_dict=self.config_dict,98 stack_version=self.STACK_VERSION,99 target=RMFTestCase.TARGET_COMMON_SERVICES,100 try_install=True)101 self.assert_configure_default()102 self.assertResourceCalled('Directory', '/var/pxf',103 owner=self.PXF_USER,104 group=self.PXF_GROUP,105 create_parents = True)106 self.assertResourceCalled('Execute', 'service pxf-service restart',107 timeout=self.DEFAULT_TIMEOUT,108 logoutput=True)109 def test_stop_default(self):110 self.executeScript(self.PXF_PACKAGE_DIR + "/scripts/pxf.py",111 classname="Pxf",112 command="stop",113 config_dict=self.config_dict,114 stack_version=self.STACK_VERSION,115 target=RMFTestCase.TARGET_COMMON_SERVICES,116 try_install=True)117 self.assertResourceCalled('Execute', 'service pxf-service stop',118 timeout=self.DEFAULT_TIMEOUT,119 logoutput=True)120 def test_status_default(self):121 self.executeScript(self.PXF_PACKAGE_DIR + "/scripts/pxf.py",122 classname="Pxf",123 command="status",124 config_dict=self.config_dict,125 stack_version=self.STACK_VERSION,126 target=RMFTestCase.TARGET_COMMON_SERVICES,127 try_install=True)128 self.assertResourceCalled('Execute', 'service pxf-service status',129 timeout=self.DEFAULT_TIMEOUT,130 logoutput=True)131 def tearDown(self):...
test_basic.py
Source:test_basic.py
...8from bottle import Bottle9from bottle_argsmap import ArgsMapPlugin, try_install10def test_try_install_new():11 app = Bottle()12 plugin = try_install(app)13 assert isinstance(plugin, ArgsMapPlugin)14 assert app.plugins[-1] is plugin15def test_try_install_exists():16 app = Bottle()17 plugin = try_install(app)18 assert plugin is try_install(app)19 assert plugin is try_install(app)20 assert plugin is try_install(app)21 assert len([p for p in app.plugins if isinstance(p, ArgsMapPlugin)]) == 122def test_without_inject():23 plugin = ArgsMapPlugin()24 app = Bottle()25 app.install(plugin)26 @app.get('/entires/<name>/<value>')27 def entires(name: str, value: str):28 return dict(name=name, value=value)29 tapp = webtest.TestApp(app)30 resp = tapp.get('/entires/test123/test4566')31 assert resp.status_code == 20032 assert resp.json == dict(name='test123', value='test4566')33def test_with_inject_value():34 plugin = ArgsMapPlugin()...
__main__.py
Source:__main__.py
2import os3import socket4from libinstall import FileInstaller, PackageInstaller5dir = os.path.dirname(__file__)6PackageInstaller.try_install('i3-gaps')7PackageInstaller.try_install('i3lock')8PackageInstaller.try_install('i3status')9PackageInstaller.try_install('feh')10PackageInstaller.try_install('rxvt-unicode')11sources = [os.path.join(dir, 'i3-config')]12if socket.gethostname() == 'sunjammer':13 FileInstaller.create_symlink(os.path.join(dir, 'a73', 'wrapper.py'), '~/.config/i3status/wrapper.py')14 sources.append(os.path.join(dir, 'a73/i3-config'))15else:16 sources.append(os.path.join(dir, 'pc/i3-config'))17FileInstaller.merge_files(sources, '~/.config/i3/config', '#')...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!