Best Python code snippet using autotest_python
action_executor.py
Source:action_executor.py
1import os2import platform3import typing4import shutil5from photomanager.db.models import ImageMeta6from sqlalchemy import and_7from sqlalchemy.orm import Session8from photomanager.lib.errors import MultiFileError9from photomanager.lib.pmconst import PATH_SEP10from photomanager.db.config import Config, FieldBackupDir11from photomanager.utils.pathutils import is_abspath12class ActionExecutor(object):13 def __init__(self, base_folder: str, db_session: Session, action_item: typing.Dict[str, typing.List]):14 self.base_folder = base_folder15 self.db_session = db_session16 self.action_item = action_item17 self.config = Config(db_session)18 def do(self):19 raise NotImplementedError20class ActionRemoveFile(ActionExecutor):21 def do(self):22 if self.action_item["action"] != "remove_file":23 raise ValueError24 for relative_file in self.action_item["files"]:25 self.do_remove_file(relative_file)26 def do_remove_file(self, relative_filename):27 self._do_backup_file(relative_filename)28 self._do_remove_file_from_disk(relative_filename)29 self._do_remove_file_from_db(relative_filename)30 def _do_backup_file(self, relative_filename):31 full_name = self._get_fullname(relative_filename)32 if not os.path.exists(full_name):33 return34 backup_dir = self.config.get_value(FieldBackupDir)35 if not is_abspath(backup_dir):36 backup_dir = self.base_folder + PATH_SEP + backup_dir37 # remove driver letter38 if platform.system() == "Windows" and not self.base_folder[1] == ":":39 relative_filename = relative_filename[2:]40 # remove first path separator41 if relative_filename[0] == PATH_SEP:42 relative_filename = relative_filename[1:]43 backup_filename = f"{backup_dir}{PATH_SEP}{relative_filename}"44 backup_file_path, _ = os.path.split(backup_filename)45 os.makedirs(backup_file_path, exist_ok=True)46 if os.path.exists(backup_filename):47 ori_backup_name = backup_filename48 for i in range(10000):49 if not os.path.exists(f"{ori_backup_name}.{i}"):50 backup_filename = f"{ori_backup_name}.{i}"51 break52 shutil.copyfile(full_name, backup_filename)53 def _get_fullname(self, relative_filename: str):54 full_base_folder = self.base_folder55 if platform.system() == "Linux" and not self.base_folder.startswith(PATH_SEP): # relative path:56 full_base_folder = os.getcwd() + PATH_SEP + self.base_folder57 elif platform.system() == "Windows" and not self.base_folder[1] == ":":58 full_base_folder = os.getcwd() + PATH_SEP + self.base_folder59 return f"{full_base_folder}/{relative_filename}"60 def _do_remove_file_from_disk(self, relative_filename: str):61 fullname = self._get_fullname(relative_filename)62 if os.path.exists(fullname):63 os.remove(fullname)64 def _do_remove_file_from_db(self, relative_filename):65 relative_dir = os.path.dirname(relative_filename)66 basename = os.path.basename(relative_filename)67 image_metas = self.db_session.query(ImageMeta).filter(68 and_(ImageMeta.folder == relative_dir, ImageMeta.filename == basename)).all()69 if len(image_metas) > 1:70 raise MultiFileError()71 image_meta = image_metas[0]72 self.db_session.delete(image_meta)73class ActionExecutorList(object):74 def __init__(self, base_folder: str, db_session, actions: list):75 """76 :param base_folder:77 :param db_session: a sqlalchemy session78 :param action_contents: a action list79 example:80 [{"action": "remove_file", "files": ["file2:, "files4"]}81 allow action:82 {"action": "remove_file", "files": ["file2:, "files4"]}83 {"action": "move_file", "from": "file1", "to": "file2"}84 """85 self.base_folder = base_folder86 self.db_session = db_session87 self.actions = actions88 def do(self):89 for action_item in self.actions:90 executor = self._create_executor(action_item)91 executor.do()92 def _create_executor(self, action_item) -> ActionExecutor:93 if action_item["action"] == "remove_file":94 return ActionRemoveFile(self.base_folder, self.db_session, action_item)95 else:...
api.py
Source:api.py
1from flask import request, Blueprint, jsonify, redirect, url_for2from flask_login import login_required, current_user3from cromwell_frontend.db_models import WorkflowDefinition, db4from cromwell_frontend.exceptions import NotFoundException, AuthException5from cromwell_frontend.login_manager import authenticate_user, get_jwt6from cromwell_frontend.util import handle_exception7from cromwell_frontend.workflow import Workflow8api = Blueprint('api', __name__, url_prefix='/api')9@api.route('/unauthorized')10def unauthorized():11 return jsonify({'message': 'Not authenticated.'}), 40112@api.route('/authenticate', methods=['POST'])13def jwt_authenticate():14 try:15 credentials = request.get_json(force=True)16 authenticate_user(request)17 token = get_jwt(credentials['username'], credentials['password'])18 return jsonify({'token': str(token)}), 20019 except Exception as e:20 return jsonify({'message': f'authentication failed: {e}'}), 40321@api.route('/workflow_definitions', methods=['GET', 'POST'])22@login_required23def list_workflow_definitions():24 try:25 if request.method == 'POST':26 data = request.get_json(force=True)27 workflow_definition = WorkflowDefinition(relative_filename=data['relative_filename'],28 name=data['name'],29 description=data['description'],30 owner=current_user)31 db.session.add(workflow_definition)32 db.session.commit()33 return redirect(url_for('api.get_workflow_definition',34 relative_filename=workflow_definition.relative_filename))35 else:36 return jsonify([definition.to_dict for definition in WorkflowDefinition.query.all()])37 except Exception as e:38 return handle_exception(e)39@api.route('/workflow_definitions/<relative_filename>', methods=['GET', 'POST', 'DELETE'])40@login_required41def get_workflow_definition(relative_filename):42 try:43 workflow_definition = WorkflowDefinition.query.filter_by(relative_filename=relative_filename).first()44 if workflow_definition is None:45 raise NotFoundException(f'No workflow with relative filename {relative_filename} exists!')46 if request.method == 'POST':47 data = request.get_json(force=True)48 workflow_definition.update(data, current_user)49 db.session.commit()50 if request.method == 'DELETE':51 if current_user is workflow_definition.owner or current_user.admin:52 db.session.delete(workflow_definition)53 db.session.commit()54 return jsonify({'message': f'Workflow definition {workflow_definition.relative_filename} deleted'})55 else:56 raise AuthException(f'User {current_user.username} cannot delete {workflow_definition.relative_filename}')57 return jsonify(workflow_definition.to_dict())58 except Exception as e:59 return handle_exception(e)60@api.route('/workflow_chart/<workflow_id>', methods=['GET'])61@login_required62def get_workflow_chart_data(workflow_id):63 try:64 workflow = Workflow(workflow_id)65 return jsonify(workflow.get_chart_metadata())66 except Exception as e:67 return handle_exception(e)68@api.route('/workflows/<workflow_id>/release_hold', methods=['POST'])69@login_required70def release_workflow_hold(workflow_id):71 try:72 workflow = Workflow(workflow_id)73 if current_user == workflow.owner or current_user.admin:74 return jsonify(workflow.resume())75 else:76 raise AuthException(f'User {current_user.id} cannot release hold on {workflow_id}')77 except Exception as e:78 return handle_exception(e)79@api.route('/workflows/<workflow_id>/cancel', methods=['POST'])80@login_required81def cancel_workflow(workflow_id):82 try:83 workflow = Workflow(workflow_id)84 if current_user == workflow.owner or current_user.admin:85 return jsonify(workflow.cancel())86 else:87 raise AuthException(f'User {current_user.id} cannot release hold on {workflow_id}')88 except Exception as e:...
workflow_definitions.py
Source:workflow_definitions.py
1import os2from flask import Blueprint, render_template, request, url_for, redirect3from flask_login import login_required4from cromwell_frontend import config5from cromwell_frontend.db_models import WorkflowDefinition, db6from cromwell_frontend.page_data import WorkflowDefinitionEntry, WorkflowDefinitionList7from cromwell_frontend.util import handle_exception_browser8from cromwell_frontend.exceptions import NotFoundException9workflow_definitions = Blueprint('workflow_definitions', __name__, url_prefix='/workflow_definitions')10@workflow_definitions.route('/')11@login_required12def list_workflow_definitions():13 return render_template('workflow_definition_list.html', page_data=WorkflowDefinitionList('Workflow Definitions'))14@workflow_definitions.route('/create', methods=['GET', 'POST'])15@login_required16def create_workflow_definition():17 try:18 if request.method == 'POST':19 workflow_definition = WorkflowDefinition(20 name=request.form.get('name'),21 description=request.form.get('description'),22 relative_filename=request.form.get('relative_path'),23 file_contents=request.form.get('definition'))24 db.session.add(workflow_definition)25 db.session.commit()26 return redirect(url_for('workflow_definitions.view_workflow_definition',27 relative_path=workflow_definition.relative_filename))28 else:29 count = WorkflowDefinition.query.count()30 filename = f'workflow{count}.wdl'31 while config.WORKFLOW_DEFINITION_DIR.joinpath(filename).exists():32 count += 133 filename = f'workflow{count}.wdl'34 workflow_definition = WorkflowDefinition(relative_filename=str(filename), name=str(filename))35 return render_template('workflow_definition_entry.html', page_data=WorkflowDefinitionEntry('Create workflow definition', workflow_definition))36 except Exception as e:37 return handle_exception_browser(e)38@workflow_definitions.route('/<relative_path>', methods=['GET', 'POST', 'DELETE'])39@login_required40def view_workflow_definition(relative_path):41 try:42 workflow_definition = WorkflowDefinition.query.filter_by(relative_filename=relative_path).first()43 if workflow_definition is None:44 raise NotFoundException(f'No workflow with path {relative_path} exists!')45 if request.method == 'POST':46 name = request.form.get('name')47 description = request.form.get('description')48 relative_filename = request.form.get('relative_path')49 file_contents = request.form.get('definition')50 if workflow_definition is None:51 workflow_definition = WorkflowDefinition(52 relative_filename=relative_filename,53 name=name,54 description=description55 )56 workflow_definition.file_contents = file_contents57 db.session.add(workflow_definition)58 else:59 workflow_definition.name = name60 workflow_definition.description = description61 workflow_definition.relative_filename = relative_filename62 workflow_definition.file_contents = file_contents63 db.session.commit()64 elif request.method == 'DELETE':65 db.session.delete(workflow_definition)66 db.session.commit()67 else:68 if workflow_definition is None:69 count = WorkflowDefinition.query.count()70 filename = f'workflow{count}.wdl'71 while os.path.exists(os.path.join(config.WORKFLOW_DEFINITION_DIR, filename)):72 count += 173 filename = f'workflow{count}.wdl'74 workflow_definition = WorkflowDefinition(relative_filename=filename)75 return render_template('workflow_definition_entry.html',76 page_data=WorkflowDefinitionEntry(workflow_definition.name, workflow_definition))77 except Exception as e:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!