Best Python code snippet using uiautomator
__init__.py
Source:__init__.py
1# Native imports2import time3import datetime4import json5import os6import shutil7import tempfile8import uuid9import zipfile10import requests11# Module imports12from flask import Blueprint, current_app, jsonify, request, send_file13name = 'DataPipeline'14prefix = 'datapipeline'15storage_enabled = True16global storage_path17plugin = Blueprint(name, __name__)18"""19 - DataPipelineLoadTest:20 type: radon.policies.testing.DataPipelineLoadTest21 properties:22 velocity_per_minute: "60"23 hostname: "radon.s3.eu-central-1.amazonaws.com"24 port: "8080"25 ti_blueprint: "radon.blueprints.testing.NiFiTIDocker"26 resource_location: "/tmp/resources.zip"27 test_duration_sec: "60"28 test_id: "firstdptest"29 targets: [ ConsS3Bucket_1 ]30"""31def getInfo(nifi_url):32 target = nifi_url + "/resources"33 response = requests.get(target)34 return response.json()35def getGroupInfo(nifi_url, id):36 target = nifi_url + "/process-groups/"+id+"/processors"37 response = requests.get(target)38 return response.json()39def configurePipeSchedulePeriod(nifi_url, id, name, seconds_between_gen):40 group_info = getGroupInfo(nifi_url, id)41 processor_info = get_processor_info(group_info, name)42 processor_id = processor_info['component']['id']43 revision = processor_info['revision']44 schedulingPeriod = str(seconds_between_gen) + " sec"45 configuration = {46 "revision": revision,47 "component": {48 "id": str(processor_id),49 "config": {50 "schedulingPeriod": schedulingPeriod51 }52 }53 }54 stop_uri = nifi_url + "/processors/" + processor_id55 response = requests.put(stop_uri, json = configuration)56 print(response.text)57 return58def startPipe(nifi_url, id):59 stop_uri = nifi_url + "/flow/process-groups/" + id60 response = requests.put(stop_uri, json={"id": id, 'state': "RUNNING"})61 print(response.text)62 return63def stopPipe(nifi_url, id):64 stop_uri = nifi_url + "/flow/process-groups/" + id65 response = requests.put(stop_uri, json={"id": id, 'state': "STOPPED" })66 print(response.text)67 return68def findGroupId(info, type, name):69 for item in info['resources']:70 itempath = item['identifier']71 if(itempath.startswith(type) and item['name'] == name):72 id = itempath.split('/')[-1]73 return(id)74def get_processor_info(info, name):75 for item in info['processors']:76 if(item['component']['name'] == name):77 return item78def createController(nifi_url):79 configuration = {80 "revision": {81 "version": 082 },83 "component": {84 "name": "PrometheusReportingTask",85 "type": "org.apache.nifi.reporting.prometheus.PrometheusReportingTask",86 "properties": {"prometheus-reporting-task-metrics-endpoint-port": "9093"}87 }88 }89 stop_uri = nifi_url + "/controller/reporting-tasks"90 response = requests.post(stop_uri, json=configuration)91 return response.json()['id']92def startController(nifi_url, task_id):93 stop_uri = nifi_url + "/reporting-tasks/" + task_id + "/run-status"94 body = { "revision": { "version": 1 },95 'state': "RUNNING"96 }97 response = requests.put(stop_uri, json=body )98 return99def controller_exists(info):100 for item in info['resources']:101 itempath = item['identifier']102 if (itempath.startswith("/reporting-tasks") and item['name'] == "PrometheusReportingTask"):103 return True104 return False105def register(app, plugin_storage_path=None):106 app.register_blueprint(plugin, url_prefix=f'/{prefix}')107 app.logger.info(f'{name} plugin registered.')108 global storage_path109 storage_path= plugin_storage_path110persistence = {111 "configuration": {},112 "execution": {},113}114resources_filename = "resources.zip"115# Folder in local storage where configuration artifacts are stored116# (e.g., <storage_path>/<config_uuid>/<storage_config_folder_name/)117storage_config_folder_name = 'config'118result_zip_file_name = 'results.zip'119# Names of the folders in the results zip file120result_config_folder_name = 'config'121result_execution_folder_name = 'execution'122@plugin.route('/')123def index():124 return f'This is the Radon CTT Agent Data Pipeline Plugin.', 200125############# Data Ppeline Testing Plugin #############126# Create Configuration127@plugin.route('/configuration/', methods=['POST'])128def configuration_create():129 config_instance = {}130 configuration_uuid = str(uuid.uuid4())131 config_instance['uuid'] = configuration_uuid132 config_path_relative = os.path.join(configuration_uuid, storage_config_folder_name)133 config_path = os.path.join(storage_path, config_path_relative)134 os.makedirs(config_path, exist_ok=True)135 current_app.logger.debug(json.dumps(request.form))136 # data = {'host': sut_hostname, 'test_duration_sec': test_duration_sec, 'velocity_per_minute': velocity_per_minute}137 # files = {'resources': open(resources, 'rb')}138 # Host (form)139 if 'host' in request.form:140 host = request.form.get('host', type=str)141 current_app.logger.info(f'\'host\' set to: {host}')142 config_instance['host'] = host143 # test_duration_sec (form)144 if 'test_duration_sec' in request.form:145 test_duration_sec = request.form.get('test_duration_sec', type=int)146 current_app.logger.info(f'\'test_duration_sec\' set to: {test_duration_sec}')147 config_instance['test_duration_sec'] = test_duration_sec148 # velocity_per_minute (form)149 if 'velocity_per_minute' in request.form:150 velocity_per_minute = request.form.get('velocity_per_minute', type=int)151 current_app.logger.info(f'\'velocity_per_minute\' set to: {velocity_per_minute}')152 config_instance['velocity_per_minute'] = velocity_per_minute153 #performance_metric (form)154 if 'performance_metric' in request.form:155 performance_metric = request.form.get('performance_metric', type=str)156 current_app.logger.info(f'\'performance_metric\' set to: {host}')157 config_instance['performance_metric'] = performance_metric158 # lower_bound (form)159 if 'lower_bound' in request.form:160 lower_bound = request.form.get('lower_bound', type=float)161 current_app.logger.info(f'\'lower_bound\' set to: {host}')162 config_instance['lower_bound'] = lower_bound163 # (form)164 if 'lower_bound' in request.form:165 lower_bound = request.form.get('lower_bound', type=float)166 current_app.logger.info(f'\'lower_bound\' set to: {host}')167 config_instance['lower_bound'] = lower_bound168 # resources (file)169 data_archive_path = None170 if 'resources' in request.files:171 # Get file from request172 resources_file = request.files['resources']173 resources_zip_path = os.path.join(config_path, resources_filename)174 resources_file.save(resources_zip_path)175 # Extract resources176 resources_extract_dir = os.path.join(config_path, 'resources')177 os.makedirs(resources_extract_dir)178 with zipfile.ZipFile(resources_zip_path, 'r') as res_zip:179 res_zip.extractall(resources_extract_dir)180 else:181 return 'No resources archive location provided.', 400182 persistence['configuration'][configuration_uuid] = config_instance183 return_json = {184 'configuration': {185 'uuid': configuration_uuid,186 'entry': config_instance187 }188 }189 return jsonify(return_json), 201190# Get/Delete Configuration191@plugin.route('/configuration/<string:config_uuid>/', methods=['GET', 'DELETE'])192def configuration_get_delete(config_uuid):193 if config_uuid in persistence['configuration']:194 if request.method == 'GET':195 return_json = {196 'configuration': {197 'uuid': config_uuid,198 'entry': persistence['configuration'][config_uuid]199 }200 }201 return jsonify(return_json), 200202 if request.method == 'DELETE':203 del persistence['configuration'][config_uuid]204 shutil.rmtree(os.path.join(storage_path, config_uuid))205 return 'Successfully deleted ' + config_uuid + '.', 200206 else:207 return "No configuration with that ID found", 404208# Run load test (param: configuration uuid)209@plugin.route('/execution/', methods=['POST'])210def execution():211 execution_instance = {}212 if 'config_uuid' in request.form:213 config_uuid = request.form['config_uuid']214 config_entry = persistence['configuration'][config_uuid]215 execution_instance['config'] = config_entry216 # Create UUID for execution217 execution_uuid = str(uuid.uuid4())218 execution_instance['uuid'] = execution_uuid219 # Execution folder will be below configuration folder220 execution_path = os.path.join(storage_path, config_uuid, execution_uuid)221 resources_path = os.path.join(storage_path, config_uuid, 'config/resources/')222 test_execution_cli_command = ["cp", resources_path+"/*", "/tmp/nifi_agent/"]223 if 'host' in config_entry:224 target_host = config_entry['host']225 current_app.logger.info(f'Setting host to {target_host}')226 else:227 return "Configuration does not contain a host value.", 404228 if 'test_duration_sec' in config_entry:229 test_duration_sec = config_entry['test_duration_sec']230 current_app.logger.info(f'Setting test_duration_sec to {test_duration_sec}')231 else:232 return "Configuration does not contain a test_duration_sec value.", 404233 if 'velocity_per_minute' in config_entry:234 velocity_per_minute = config_entry['velocity_per_minute']235 current_app.logger.info(f'Setting velocity_per_minute to {velocity_per_minute}')236 else:237 return "Configuration does not contain a test_duration_sec value.", 404238 if 'host' in config_entry:239 target_host = config_entry['host']240 current_app.logger.info(f'Setting host to {target_host}')241 else:242 return "Configuration does not contain a host value.", 404243 os.mkdir(execution_path)244 execution_instance['cli_call'] = test_execution_cli_command + [">",os.path.join(execution_path,"out.log")]245 current_app.logger.info(f'CLI call: {str(test_execution_cli_command)}')246 #access NiFi through docker host247 nifihost = "172.17.0.1"248 nifiport = "8080"249 nifi_url = "http://"+nifihost+":" + nifiport+"/nifi-api"250 #Data pipeline blocks inside the TI251 processor_group_name = "S3Bucket_dest_PG_LocalConn"252 processor_name = "PutS3Object"253 #Fetch current information from API about running services254 nifi_info = getInfo(nifi_url)255 # Create Prometheus reporting task service if it does not already exist256 # port 9093257 # http://ec2-3-122-159-218.eu-central-1.compute.amazonaws.com:9093/metrics258 if not controller_exists(nifi_info):259 controller_id = createController(nifi_url)260 startController(nifi_url, controller_id)261 #Get processor group id by name262 id = findGroupId(nifi_info, "/process-groups/", processor_group_name)263 #stop the pipeline264 stopPipe(nifi_url, id)265 # Copy files to input of the pipeline266 os.system(' '.join(test_execution_cli_command))267 #configure the pipeline268 seconds_between_gen = round(60 / velocity_per_minute, 3)269 configurePipeSchedulePeriod(nifi_url, id, processor_name, seconds_between_gen)270 execution_start = datetime.datetime.now()271 #start the pipeline272 startPipe(nifi_url, id)273 #wait for the test duration274 time.sleep(test_duration_sec)275 execution_end = datetime.datetime.now()276 execution_instance['execution_start'] = execution_start277 execution_instance['execution_end'] = execution_end278 with tempfile.NamedTemporaryFile() as tf:279 tmp_zip_file = shutil.make_archive(tf.name, 'zip', execution_path)280 shutil.copy2(tmp_zip_file, os.path.join(execution_path, result_zip_file_name))281 persistence['execution'][execution_uuid] = execution_instance282 return jsonify(execution_instance), 201283 else:284 return "No configuration with that ID found.", jsonify(persistence), 404285# Get load test results286@plugin.route('/execution/<string:exec_uuid>/', methods=['GET'])287def execution_results(exec_uuid):288 try:289 config_uuid = persistence.get('execution').get(exec_uuid).get('config').get('uuid')290 except AttributeError:291 return "No execution found with that ID.", 404292 results_zip_path = os.path.join(storage_path, config_uuid, exec_uuid, result_zip_file_name)293 if os.path.isfile(results_zip_path):294 return send_file(results_zip_path)295 else:...
main.py
Source:main.py
1import subprocess, logging, time, smtplib, ssl2from email.mime.text import MIMEText3from email.mime.multipart import MIMEMultipart4from tracemalloc import stop5import environ6env = environ.Env()7environ.Env.read_env()8logging.basicConfig(9 filename='logger.log',10 # filemode='w',11 format='%(asctime)s - %(message)s',12 level=logging.INFO13)14debug = env.int('DEBUG')15#https://realpython.com/python-send-email/16def mail_to():17 recipiants = env.list('RECIPIANTS')18 smtp_server = env('SMTP_SERVER')19 sender_email = env('SENDER_EMAIL')20 port = env('PORT') # For SSL21 password = env('PASSWORD')22 exo_server = env('EXO_SERVER')23 message = MIMEMultipart("alternative")24 message["Subject"] = f'Automatisk omstart av {exo_server}'25 message["From"] = sender_email26 message["To"] = ", ".join(recipiants)27 # Create the plain-text and HTML version of your message28 text = """\29 Omstart har skett av {exo_server}."""30 html = """\31 <html>32 <body>33 <p>Omstart har skett av {exo_server}.</p>34 </body>35 </html>36 """37 # Turn these into plain/html MIMEText objects38 part1 = MIMEText(text, "plain")39 part2 = MIMEText(html, "html")40 # Add HTML/plain-text parts to MIMEMultipart message41 # The email client will try to render the last part first42 message.attach(part1)43 message.attach(part2)44 # Create a secure SSL context45 context = ssl.create_default_context()46 try:47 server = smtplib.SMTP(smtp_server,port)48 server.ehlo() # Can be omitted49 server.starttls(context=context) # Secure the connection50 server.ehlo() # Can be omitted51 server.login(sender_email, password)52 server.sendmail(sender_email, recipiants, message.as_string().format(exo_server=exo_server))53 logging.info('Sent e-mails responsible staff.')54 except Exception as e:55 # Print any error messages to stdout56 logging.info('Failed to send e-mails.')57 print(e)58 finally:59 server.quit() 60def get_task_status(task_name):61 tasks = subprocess.check_output(f'tasklist /v /fi "IMAGENAME eq {task_name}"').decode('cp866', 'ignore').split("\r\n")62 for task in tasks:63 if task_name in task:64 return False if 'Not Responding' in task else True65 66 return 067def exo_handle(start=False):68 start_uri = env.path('EXO_START_URI')69 stop_uri = env.path('EXO_STOP_URI')70 name = 'Eo4Run' if start else 'EXOstop'71 uri = start_uri if start else stop_uri72 try:73 status = subprocess.Popen(uri)74 logging.info(f'Succesfully ran {name}.')75 return status76 except:77 logging.info(f'{name} failed to run.')78 return 079if __name__ == '__main__':80 logging.info('-----------------------------------------------------')81 logging.info('Checking for hung processes..')82 task_name = env.str('EXO_TASK_NAME')83 task = get_task_status(task_name)84 if task == 0:85 logging.info(f'Failed to find any running {task_name}.')86 elif task and debug == 0:87 logging.info(f'Task {task_name} is Running or Unknown, no action required.')88 else:89 logging.info(f'{task_name} is Not Responding, restarting services.')90 91 retry = 192 # Trying to run Exostop 5 times before exiting script 93 while retry <= 5:94 logging.info(f'Running EXOstop, try {retry} of 5.')95 stop_status = exo_handle()96 if stop_status == 0:97 retry += 198 else:99 break100 101 # Trying to run Eo4Run if EXOstop was successful and Exo is not still running.102 if stop_status != 0:103 retry = 1104 while retry <= 5:105 time.sleep(10)106 logging.info(f'Starting service {task_name}, try {retry} of 5.')107 start_status = get_task_status(task_name)108 if start_status == 0:109 task = exo_handle(True)110 if task != 0:111 if debug == 0:112 mail_to()113 break114 else:115 retry += 1116 else:117 retry += 1118 else:...
7dtd.py
Source:7dtd.py
1import boto.ec22import logging3import requests4import click5import yaml6import time7logging.basicConfig(level=logging.INFO)8class Server(object):9 def __init__(self):10 stream = file('config.yaml', 'r')11 self.cfg = yaml.load(stream)12 name_tag = '7dtd'13 filters = {"tag:Name": name_tag}14 if not self.cfg['aws']:15 logging.error("No AWS config found in config.yaml")16 raise SystemExit17 aws_config_options = {}18 aws_config_options.update(**self.cfg['aws'])19 try:20 self.conn = boto.ec2.connect_to_region(21 "us-west-2",22 **aws_config_options23 )24 except boto.provider.ProfileNotFoundError:25 logging.error("AWS profile not found")26 self.help()27 raise SystemExit28 try:29 reservations = self.conn.get_all_reservations(filters=filters)30 self.instance_id = reservations[0].instances[0].id31 except boto.exception.EC2ResponseError as e:32 raise SystemExit33 except Exception:34 logging.fatal(35 "no instance found in AWS EC2 with a name tag: {0}".format(name_tag))36 raise SystemExit37 def start(self):38 try:39 self.conn.start_instances(instance_ids=[self.instance_id])40 except boto.exception.EC2ResponseError as e:41 logging.warn("Server is not ready to be started. waiting 10 seconds and trying again.")42 time.sleep(10)43 self.start()44 logging.info("Starting server")45 def stop(self):46 stop_uri = 'http://{0}:5000/stop'.format(self.cfg['game_server']['host'])47 username = self.cfg['game_server']['stop_username']48 password = self.cfg['game_server']['stop_password']49 if self.is_game_running():50 try:51 r = requests.get(stop_uri, auth=(username, password))52 except requests.ConnectionError as e:53 if 'Connection refused' in e.message[1]:54 logging.error('The rest service is not running on {0}'.format(55 self.cfg['game_server']['host']))56 raise SystemExit57 self.conn.stop_instances(instance_ids=[self.instance_id])58 logging.info("Stopping server")59 def help(self):60 pass61 def is_game_running(self):62 ''' An unfortunate way to check if the game is online63 Surely there is a better way to do this64 '''65 try:66 r = requests.get('http://{0}:26900'.format(self.cfg['game_server']['host']))67 except requests.ConnectionError as e:68 if 'Connection refused' in e.message[1]:69 return False70 return True71 def status(self):72 statuses = self.conn.get_all_instance_status(instance_ids=self.instance_id)73 if len(statuses) == 1:74 status = statuses[0]75 state = status.state_name76 elif len(statuses) == 0:77 logging.info("Server is offline")78 raise SystemExit79 else:80 logging.warn("More than 1 instances found, something is wrong")81 raise SystemExit82 if state == 'running':83 if not self.is_game_running():84 logging.info("Server is running but game service is offline")85 else:86 logging.info(87 "EVERYTHING IS OKAY. Server is running and game service is online")88@click.command()89@click.argument('operation', nargs=1, type=click.Choice(90 ['start', 'stop', 'status']91 ))92def main(operation):93 ''' Start/Stop 7 Days to Die game server94 '''95 server = Server()96 op = getattr(server, operation)97 op()98if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!