Best Python code snippet using yandex-tank
plugin.py
Source:plugin.py
...112 self.lock_target_duration = expand_to_seconds(self.get_option(113 "target_lock_duration", "30m"))114 self.send_status_period = expand_to_seconds(115 self.get_option('send_status_period', '10'))116 def check_task_is_open(self):117 if self.backend_type == BackendTypes.OVERLOAD:118 return119 TASK_TIP = 'The task should be connected to Lunapark. Open startrek task page, click "actions" -> "load testing".'120 logger.debug("Check if task %s is open", self.task)121 try:122 task_data = self.lp_job.get_task_data(self.task)[0]123 try:124 task_status = task_data['status']125 if task_status == 'Open':126 logger.info("Task %s is ok", self.task)127 self.task_name = str(task_data['name'])128 else:129 logger.info("Task %s:" % self.task)130 logger.info(task_data)131 raise RuntimeError("Task is not open")132 except KeyError:133 try:134 error = task_data['error']135 raise RuntimeError(136 "Task %s error: %s\n%s" %137 (self.task, error, TASK_TIP))138 except KeyError:139 raise RuntimeError(140 'Unknown task data format:\n{}'.format(task_data))141 except requests.exceptions.HTTPError as ex:142 logger.error("Failed to check task status for '%s': %s", self.task, ex)143 if ex.response.status_code == 404:144 raise RuntimeError("Task not found: %s\n%s" % (self.task, TASK_TIP))145 elif ex.response.status_code == 500 or ex.response.status_code == 400:146 raise RuntimeError(147 "Unable to check task staus, id: %s, error code: %s" %148 (self.task, ex.response.status_code))149 raise ex150 @staticmethod151 def search_task_from_cwd(cwd):152 issue = re.compile("^([A-Za-z]+-[0-9]+)(-.*)?")153 while cwd:154 logger.debug("Checking if dir is named like JIRA issue: %s", cwd)155 if issue.match(os.path.basename(cwd)):156 res = re.search(issue, os.path.basename(cwd))157 return res.group(1).upper()158 newdir = os.path.abspath(os.path.join(cwd, os.path.pardir))159 if newdir == cwd:160 break161 else:162 cwd = newdir163 raise RuntimeError(164 "task=dir requested, but no JIRA issue name in cwd: %s" %165 os.getcwd())166 def prepare_test(self):167 info = self.generator_info168 port = info.port169 instances = info.instances170 if info.ammo_file.startswith(171 "http://") or info.ammo_file.startswith("https://"):172 ammo_path = info.ammo_file173 else:174 ammo_path = os.path.realpath(info.ammo_file)175 duration = int(info.duration)176 if duration:177 self.lock_target_duration = duration178 loop_count = info.loop_count179 lp_job = self.lp_job180 self.locked_targets = self.check_and_lock_targets(strict=bool(181 int(self.get_option('strict_lock', '0'))), ignore=self.ignore_target_lock)182 try:183 if lp_job._number:184 self.make_symlink(lp_job._number)185 self.check_task_is_open()186 else:187 self.check_task_is_open()188 lp_job.create()189 self.make_symlink(lp_job.number)190 self.core.publish(self.SECTION, 'jobno', lp_job.number)191 except (APIClient.JobNotCreated, APIClient.NotAvailable, APIClient.NetworkError) as e:192 logger.error(e.message)193 logger.error(194 'Failed to connect to Lunapark, disabling DataUploader')195 self.start_test = lambda *a, **kw: None196 self.post_process = lambda *a, **kw: None197 self.on_aggregated_data = lambda *a, **kw: None198 self.monitoring_data = lambda *a, **kw: None199 return200 cmdline = ' '.join(sys.argv)201 lp_job.edit_metainfo(...
cardcreator.py
Source:cardcreator.py
...43 """Get the Redmine Task information."""44 url_get_children = URL_API_REMINE + '&include=children'45 resp = requests.get(url_get_children.format(task_id, REDMINE_API_KEY))46 return resp.json()['issue'] if resp.ok else resp.raise_for_status()47def check_task_is_open(task: dict) -> bool:48 """Check if the Redmine task is Open."""49 return 'closed_on' not in task or not task['closed_on']50def parse_task(redmine_task: dict) -> RedmineTask:51 """Create a new RedmineTask from a json."""52 task = RedmineTask()53 task.id = redmine_task['id']54 task.name = redmine_task['subject']55 task.endDate = redmine_task.get('due_date')56 return task57def get_children_tasks(parent_task: dict) -> list:58 """Get the information from the children task of the task."""59 children_list = []60 for child in parent_task['children']:61 child_task = get_redmine_task_info(child['id'])62 if check_task_is_open(child_task):63 if 'children' not in child_task:64 # The task has not children, so it is added to the list.65 children_list.append(parse_task(child_task))66 else:67 # The task has children, so the method is68 # called again to pick his children.69 children_list += get_children_tasks(child_task)70 return children_list71def main():72 """Main method, will be responsible for creating the task cards."""73 try:74 task_id = input('Indicate the task you want to migrate: ')75 main_task = get_redmine_task_info(task_id)76 if 'children' in main_task:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!