Best Python code snippet using Testify_python
test_network.py
Source:test_network.py
1# Licensed under the Apache License, Version 2.0 (the "License"); you may2# not use this file except in compliance with the License. You may obtain3# a copy of the License at4#5# http://www.apache.org/licenses/LICENSE-2.06#7# Unless required by applicable law or agreed to in writing, software8# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT9# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the10# License for the specific language governing permissions and limitations11# under the License.12import json13import uuid14from openstackclient.tests.functional import base15class NetworkTests(base.TestCase):16 """Functional tests for network"""17 def test_network_create(self):18 """Test create options, delete"""19 # Get project IDs20 cmd_output = json.loads(self.openstack('token issue -f json '))21 auth_project_id = cmd_output['project_id']22 cmd_output = json.loads(self.openstack('project list -f json '))23 admin_project_id = None24 demo_project_id = None25 for p in cmd_output:26 if p['Name'] == 'admin':27 admin_project_id = p['ID']28 if p['Name'] == 'demo':29 demo_project_id = p['ID']30 # Verify assumptions:31 # * admin and demo projects are present32 # * demo and admin are distinct projects33 # * tests run as admin34 self.assertIsNotNone(admin_project_id)35 self.assertIsNotNone(demo_project_id)36 self.assertNotEqual(admin_project_id, demo_project_id)37 self.assertEqual(admin_project_id, auth_project_id)38 # network create with no options39 name1 = uuid.uuid4().hex40 cmd_output = json.loads(self.openstack(41 'network create -f json ' +42 name143 ))44 self.addCleanup(self.openstack, 'network delete ' + name1)45 self.assertIsNotNone(cmd_output["id"])46 # Check the default values47 self.assertEqual(48 admin_project_id,49 cmd_output["project_id"],50 )51 self.assertEqual(52 '',53 cmd_output["description"],54 )55 self.assertEqual(56 'UP',57 cmd_output["admin_state_up"],58 )59 self.assertEqual(60 False,61 cmd_output["shared"],62 )63 self.assertEqual(64 'Internal',65 cmd_output["router:external"],66 )67 name2 = uuid.uuid4().hex68 cmd_output = json.loads(self.openstack(69 'network create -f json ' +70 '--project demo ' +71 name272 ))73 self.addCleanup(self.openstack, 'network delete ' + name2)74 self.assertIsNotNone(cmd_output["id"])75 self.assertEqual(76 demo_project_id,77 cmd_output["project_id"],78 )79 self.assertEqual(80 '',81 cmd_output["description"],82 )83 def test_network_delete(self):84 """Test create, delete multiple"""85 name1 = uuid.uuid4().hex86 cmd_output = json.loads(self.openstack(87 'network create -f json ' +88 '--description aaaa ' +89 name190 ))91 self.assertIsNotNone(cmd_output["id"])92 self.assertEqual(93 'aaaa',94 cmd_output["description"],95 )96 name2 = uuid.uuid4().hex97 cmd_output = json.loads(self.openstack(98 'network create -f json ' +99 '--description bbbb ' +100 name2101 ))102 self.assertIsNotNone(cmd_output["id"])103 self.assertEqual(104 'bbbb',105 cmd_output["description"],106 )107 del_output = self.openstack('network delete ' + name1 + ' ' + name2)108 self.assertOutput('', del_output)109 def test_network_list(self):110 """Test create defaults, list filters, delete"""111 name1 = uuid.uuid4().hex112 cmd_output = json.loads(self.openstack(113 'network create -f json ' +114 '--description aaaa ' +115 name1116 ))117 self.addCleanup(self.openstack, 'network delete ' + name1)118 self.assertIsNotNone(cmd_output["id"])119 self.assertEqual(120 'aaaa',121 cmd_output["description"],122 )123 # Check the default values124 self.assertEqual(125 'UP',126 cmd_output["admin_state_up"],127 )128 self.assertEqual(129 False,130 cmd_output["shared"],131 )132 self.assertEqual(133 'Internal',134 cmd_output["router:external"],135 )136 # NOTE(dtroyer): is_default is not present in the create output137 # so make sure it stays that way.138 # NOTE(stevemar): is_default *is* present in SDK 0.9.11 and newer,139 # but the value seems to always be None, regardless140 # of the --default or --no-default value.141 # self.assertEqual('x', cmd_output)142 if ('is_default' in cmd_output):143 self.assertEqual(144 None,145 cmd_output["is_default"],146 )147 self.assertEqual(148 True,149 cmd_output["port_security_enabled"],150 )151 name2 = uuid.uuid4().hex152 cmd_output = json.loads(self.openstack(153 'network create -f json ' +154 '--description bbbb ' +155 '--disable ' +156 '--share ' +157 name2158 ))159 self.addCleanup(self.openstack, 'network delete ' + name2)160 self.assertIsNotNone(cmd_output["id"])161 self.assertEqual(162 'bbbb',163 cmd_output["description"],164 )165 self.assertEqual(166 'DOWN',167 cmd_output["admin_state_up"],168 )169 self.assertEqual(170 True,171 cmd_output["shared"],172 )173 if ('is_default' in cmd_output):174 self.assertEqual(175 None,176 cmd_output["is_default"],177 )178 self.assertEqual(179 True,180 cmd_output["port_security_enabled"],181 )182 # Test list --long183 cmd_output = json.loads(self.openstack(184 "network list -f json " +185 "--long"186 ))187 col_name = [x["Name"] for x in cmd_output]188 self.assertIn(name1, col_name)189 self.assertIn(name2, col_name)190 # Test list --long --enable191 cmd_output = json.loads(self.openstack(192 "network list -f json " +193 "--enable " +194 "--long"195 ))196 col_name = [x["Name"] for x in cmd_output]197 self.assertIn(name1, col_name)198 self.assertNotIn(name2, col_name)199 # Test list --long --disable200 cmd_output = json.loads(self.openstack(201 "network list -f json " +202 "--disable " +203 "--long"204 ))205 col_name = [x["Name"] for x in cmd_output]206 self.assertNotIn(name1, col_name)207 self.assertIn(name2, col_name)208 # Test list --long --share209 cmd_output = json.loads(self.openstack(210 "network list -f json " +211 "--share " +212 "--long"213 ))214 col_name = [x["Name"] for x in cmd_output]215 self.assertNotIn(name1, col_name)216 self.assertIn(name2, col_name)217 # Test list --long --no-share218 cmd_output = json.loads(self.openstack(219 "network list -f json " +220 "--no-share " +221 "--long"222 ))223 col_name = [x["Name"] for x in cmd_output]224 self.assertIn(name1, col_name)225 self.assertNotIn(name2, col_name)226 def test_network_set(self):227 """Tests create options, set, show, delete"""228 name = uuid.uuid4().hex229 cmd_output = json.loads(self.openstack(230 'network create -f json ' +231 '--description aaaa ' +232 '--enable ' +233 '--no-share ' +234 '--internal ' +235 '--no-default ' +236 '--enable-port-security ' +237 name238 ))239 self.addCleanup(self.openstack, 'network delete ' + name)240 self.assertIsNotNone(cmd_output["id"])241 self.assertEqual(242 'aaaa',243 cmd_output["description"],244 )245 self.assertEqual(246 'UP',247 cmd_output["admin_state_up"],248 )249 self.assertEqual(250 False,251 cmd_output["shared"],252 )253 self.assertEqual(254 'Internal',255 cmd_output["router:external"],256 )257 # NOTE(dtroyer): is_default is not present in the create output258 # so make sure it stays that way.259 # NOTE(stevemar): is_default *is* present in SDK 0.9.11 and newer,260 # but the value seems to always be None, regardless261 # of the --default or --no-default value.262 if ('is_default' in cmd_output):263 self.assertEqual(264 None,265 cmd_output["is_default"],266 )267 self.assertEqual(268 True,269 cmd_output["port_security_enabled"],270 )271 raw_output = self.openstack(272 'network set ' +273 '--description cccc ' +274 '--disable ' +275 '--share ' +276 '--external ' +277 '--disable-port-security ' +278 name279 )280 self.assertOutput('', raw_output)281 cmd_output = json.loads(self.openstack(282 'network show -f json ' + name283 ))284 self.assertEqual(285 'cccc',286 cmd_output["description"],287 )288 self.assertEqual(289 'DOWN',290 cmd_output["admin_state_up"],291 )292 self.assertEqual(293 True,294 cmd_output["shared"],295 )296 self.assertEqual(297 'External',298 cmd_output["router:external"],299 )300 # why not 'None' like above??301 self.assertEqual(302 False,303 cmd_output["is_default"],304 )305 self.assertEqual(306 False,307 cmd_output["port_security_enabled"],308 )309 # NOTE(dtroyer): There is ambiguity around is_default in that310 # it is not in the API docs and apparently can311 # not be set when the network is --external,312 # although the option handling code only looks at313 # the value of is_default when external is True.314 raw_output = self.openstack(315 'network set ' +316 '--default ' +317 name318 )319 self.assertOutput('', raw_output)320 cmd_output = json.loads(self.openstack(321 'network show -f json ' + name322 ))323 self.assertEqual(324 'cccc',325 cmd_output["description"],326 )327 # NOTE(dtroyer): This should be 'True'328 self.assertEqual(329 False,330 cmd_output["port_security_enabled"],...
smbtouch.py
Source:smbtouch.py
12import ops.cmd3import util.mac4import dsz5from scanengine2 import scan6import os.path7import re8from xml.etree.ElementTree import ElementTree9from xml.etree.ElementTree import Element1011def _whats_your_job():12 return 'smbtouch\\|*'1314def _whats_your_name():15 return 'smbtouch'1617def _support_ipv6():18 return False1920class smbtouch(scan, ):2122 def __init__(self, job, timeout=60):23 scan.__init__(self, job)24 if (len(job) > 1):25 self.port = job[0].split('|')[1]26 self.scan_type = _whats_your_name()27 self.timeout = timeout2829 def execute_scan(self, verbose):30 redir_cmd = scan.gettunnel(self, self.target, 'tcp', self.port)31 PATH_TO_SMBTOUCH = scan.find_newest_touch(self, 'Smbtouch', 'exe')32 PATH_TO_SMBXML = scan.find_newest_touch(self, 'Smbtouch', 'xml')33 smbcmd = ops.cmd.getDszCommand('run', dszquiet=(not verbose))34 smb_cmd_list = []35 smb_cmd_list.append(('--InConfig %s' % PATH_TO_SMBXML))36 smb_cmd_list.append(('--TargetIp %s' % '127.0.0.1'))37 smb_cmd_list.append(('--TargetPort %s' % redir_cmd.lplisten))38 smb_cmd_list.append(('--NetworkTimeout %s' % self.timeout))39 if (int(self.port) == 445):40 smb_cmd_list.append(('--Protocol %s' % 'SMB'))41 elif (int(self.port) == 139):42 smb_cmd_list.append(('--Protocol %s' % 'NBT'))43 smb_cmd_list.append(('--Credentials %s' % 'Anonymous'))44 outconfig = os.path.join(ops.LOGDIR, 'Logs', ('%s_%s_%s.xml' % (os.path.basename(PATH_TO_SMBTOUCH), self.target, dsz.Timestamp())))45 smb_cmd_list.append(('--OutConfig %s' % outconfig))46 smb_cmd_string = ((PATH_TO_SMBTOUCH + ' ') + ' '.join(smb_cmd_list))47 smbcmd.command = ('cmd /C %s' % smb_cmd_string)48 smbcmd.arglist.append('-redirect')49 smbcmd.arglist.append(('-directory %s' % os.path.join(ops.DSZDISKSDIR, 'lib', 'x86-Windows')))50 smbcmd.prefixes.append('local')51 smbcmd.prefixes.append('log')52 smbobject = smbcmd.execute()53 ops.networking.redirect.stop_tunnel(dsz_cmd=redir_cmd)54 cmd_output = {}55 cmd_output['error'] = None56 screenlog = os.path.join(ops.PROJECTLOGDIR, smbobject.commandmetadata.screenlog)57 f = open(screenlog, 'r')58 screenlog_lines = f.readlines()59 f.close()60 vulnerable = []61 not_vulnerable = []62 not_supported = []63 for line in screenlog_lines:64 re_out = re.search('Error 0x', line)65 if (re_out is not None):66 cmd_output['error'] = line.split('-')[(-1)].strip()67 re_out = re.search('ETERNAL', line)68 if (re_out is not None):69 line_split = line.split('-')70 exploit = line_split[0].strip()71 if (exploit == 'ETERNALBLUE'):72 exploit = 'ETEB'73 elif (exploit == 'ETERNALROMANCE'):74 exploit = 'ETRO'75 elif (exploit == 'ETERNALCHAMPION'):76 exploit = 'ETCH'77 elif (exploit == 'ETERNALSYNERGY'):78 exploit = 'ETSY'79 if ((re.search('FB', line) is not None) or (re.search('DANE', line) is not None)):80 vulnerable.append(exploit)81 elif (re.search('not supported', line) is not None):82 not_supported.append(exploit)83 else:84 not_vulnerable.append(exploit)85 self.vulnerable = ','.join(vulnerable)86 self.not_vulnerable = ','.join(not_vulnerable)87 self.not_supported = ','.join(not_supported)88 if (cmd_output['error'] is None):89 tree = ElementTree()90 tree.parse(outconfig)91 root = tree.getroot()92 outparams = root.find('{urn:trch}outputparameters').getchildren()93 for ele in outparams:94 try:95 cmd_output[ele.get('name')] = ele.find('{urn:trch}value').text96 except:97 continue98 if ('Target' in cmd_output.keys()):99 self.os = cmd_output['Target']100 if ('TargetOsArchitecture' in cmd_output.keys()):101 self.arch = cmd_output['TargetOsArchitecture']102 if ('PipeName' in cmd_output.keys()):103 self.pipe = cmd_output['PipeName']104 if ('ShareName' in cmd_output.keys()):105 self.share = cmd_output['ShareName']106 if ('Credentials' in cmd_output.keys()):107 self.credentials = cmd_output['Credentials']108 self.error = cmd_output['error']109 self.timestamp = dsz.Timestamp()110 if ((cmd_output['error'] is None) or (not (cmd_output['error'] == 'ErrorConnectionTimedOut'))):111 self.success = True112113 def return_success_message(self):114 return ('SMBtouch response for %s' % self.target)115116 def verify_escalation(self, escalation_rule):117 smbtouch = self118 try:119 eval_res = eval(escalation_rule)120 if ((eval_res == True) or (eval_res == False)):121 return True122 else:123 return False124 except:125 return False126127 def check_escalation(self, escalation_rule):128 smbtouch = self129 try:130 if eval(escalation_rule):131 return True132 else:133 return False134 except:135 return False136137 def return_data(self):138 return scan.return_data(self)139140 def get_display_headers(self):141 return ['Targeted Address', 'Port', 'OS', 'Arch', 'Creds', 'Pipe', 'Error', 'Vulnerable', 'Not Vulnerable', 'Not Supported', 'Time Stamp']142143 def get_data_fields(self):144 return ['target', 'port', 'os', 'arch', 'credentials', 'pipe', 'error', 'vulnerable', 'not_vulnerable', 'not_supported', 'timestamp']145146 def get_raw_fields(self):147 return (self.get_data_fields() + ['success'])148149 def verify_job(self, job):150 if ((not (len(job) == 2)) or (not (int(job[1]) in [139, 445]))):151 return False152 return True153154 def min_time(self):155 return 30156157 def min_range(self):
...
smart_cli.py
Source:smart_cli.py
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3# Copyright (c) 2016, Intel Corporation. All rights reserved.4# This file is licensed under the GPLv2 license.5# For the full content of this license, see the LICENSE.txt6# file at the top level of this source tree.7import subprocess8import os9from .. import exception10from tools import logging_helper, shell_ops11import manage_config12class REPO_HANDLER(object):13 def __init__(self):14 self.__error = None15 self.__repo_number = 016 self.__result = dict()17 self.__response = dict()18 self.__log_helper = logging_helper.logging_helper.Logger()19 def add(self, repo_urls):20 self.__response['status'] = "failure"21 for url in repo_urls:22 self.__repo_number += 123 repo_name = manage_config.flex_repo_name + str(self.__repo_number)24 self.__log_helper.logger.debug(str(repo_name) + " " + str(url))25 command = "smart channel --add '" + repo_name + "' type=rpm-md baseurl=" + url + " -y"26 result = shell_ops.run_cmd_chk(command)27 if result['returncode']:28 if "error" in result['cmd_output']:29 self.action(repo_name, 'remove', 'WR')30 self.__error = "Failed to add repository: " + result['cmd_output'][result['cmd_output'].index("error:") + 7:].replace("\n", "")31 else:32 self.__error = "Error adding update repository"33 self.__response['message'] = self.__error34 self.__log_helper.logger.error("Failed to add repository. Error output: '%s'" % self.__response['message'])35 return self.__response36 else:37 command = "smart update '" + repo_name + "'"38 result = shell_ops.run_cmd_chk(command) # Attempt to connect to new repo39 if result['returncode']: # If attempt fails determine error and remove repo40 # Sometimes, some special repo URL will not be removed from the cache file when the repo is removed. (repo add and then repo remove)41 # If this happens, then "smart update this repo" will fail. So we want to manually delete the cache file.42 if "OSError: [Errno 2] No such file or directory" in result['cmd_output']:43 self.__log_helper.logger.debug("trying to remove cache file and retry...")44 if os.path.isfile(manage_config.smart_cache_file):45 self.__log_helper.logger.debug(manage_config.smart_cache_file + ' exists, so we will try to remove it.')46 try:47 os.remove(manage_config.smart_cache_file)48 self.__log_helper.logger.debug("Try updating again............")49 result = shell_ops.run_cmd_chk(command) # Attempt to connect to new repo50 if not result['returncode']: # If it works51 continue # go to the next loop52 except Exception as e:53 self.__log_helper.logger.error(manage_config.smart_cache_file + ' cannot be removed. ' + str(e))54 if "error" in result['cmd_output']:55 if "Invalid URL" in result['cmd_output']:56 self.__response['message'] = "Failed to add repository: Invalid URL."57 elif "Failed to connect" in result['cmd_output'] or 'URL returned error: 503' in result['cmd_output']:58 self.__response['message'] = "Failed to add repository: Unable to connect to repository."59 elif "Invalid XML" in result['cmd_output']:60 self.__response['message'] = "Failed to add repository: Repository XML file invalid. "61 else:62 self.__response['message'] = result['cmd_output'][result['cmd_output'].index("error:") + 7:].replace("\n", "")63 else:64 self.__response['message'] = 'Error adding repository: ' + str(result['cmd_output'])65 self.action(repo_name, 'remove', 'WR')66 self.__log_helper.logger.error("Failed to add repository. Error output: '%s'" % self.__response['message'])67 return self.__response68 self.__response['status'] = "success"69 self.__log_helper.logger.debug(str(self.__response))70 return self.__response71 def action(self, m_repo, action, type):72 cmd_output = dict()73 for repo in m_repo:74 if type == 'not_WR' and manage_config.flex_repo_name not in repo:75 self.__log_helper.logger.debug(str(action) + " " + str(repo))76 cmd_output = shell_ops.run_cmd_chk("smart channel --" + action + " " + repo + " -y")77 if cmd_output['returncode']:78 return cmd_output79 if type == 'WR' and manage_config.flex_repo_name in repo:80 self.__log_helper.logger.debug(str(action) + " " + str(repo))81 cmd_output = shell_ops.run_cmd_chk("smart channel --" + action + " " + repo + " -y")82 if cmd_output['returncode']:83 return cmd_output84 if action == 'remove':85 cmd_output = shell_ops.run_cmd_chk("smart update")86 cmd_output['returncode'] = 0...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!