Best Python code snippet using Kiwi_python
api.py
Source:api.py
1from __future__ import annotations2import datetime3import logging4from functools import lru_cache5from pathlib import Path6from typing import List7import packaging.version8from fastapi import APIRouter, Request9from fastapi.responses import HTMLResponse10from ispyb.sqlalchemy import BLSession, Proposal11from pydantic import BaseSettings12import murfey.server.bootstrap13import murfey.server.ispyb14import murfey.server.websocket as ws15from murfey.server import _transport_object, get_hostname, get_microscope16from murfey.server import shutdown as _shutdown17from murfey.server import templates18from murfey.server.config import from_file19from murfey.util.models import (20 ContextInfo,21 DCGroupParameters,22 DCParameters,23 File,24 ProcessFile,25 ProcessingJobParameters,26 RegistrationMessage,27 SuggestedPathParameters,28 TiltSeries,29 Visit,30)31log = logging.getLogger("murfey.server.api")32class Settings(BaseSettings):33 murfey_machine_configuration: str = ""34settings = Settings()35machine_config: dict = {}36if settings.murfey_machine_configuration:37 microscope = get_microscope()38 machine_config = from_file(Path(settings.murfey_machine_configuration), microscope)39router = APIRouter()40# This will be the homepage for a given microscope.41@router.get("/", response_class=HTMLResponse)42async def root(request: Request):43 return templates.TemplateResponse(44 "home.html",45 {46 "request": request,47 "hostname": get_hostname(),48 "microscope": get_microscope(),49 "version": murfey.__version__,50 },51 )52@lru_cache(maxsize=1)53@router.get("/machine/")54def machine_info():55 if settings.murfey_machine_configuration:56 microscope = get_microscope()57 return from_file(settings.murfey_machine_configuration, microscope)58 return {}59@router.get("/microscope/")60def get_mic():61 microscope = get_microscope()62 return {"microscope": microscope}63@router.get("/visits/")64def all_visit_info(request: Request, db=murfey.server.ispyb.DB):65 microscope = get_microscope()66 visits = murfey.server.ispyb.get_all_ongoing_visits(microscope, db)67 if visits:68 return_query = [69 {70 "Start date": visit.start,71 "End date": visit.end,72 "Visit name": visit.name,73 "Time remaining": str(visit.end - datetime.datetime.now()),74 }75 for visit in visits76 ] # "Proposal title": visit.proposal_title77 log.debug(78 f"{len(visits)} visits active for {microscope=}: {', '.join(v.name for v in visits)}"79 )80 return templates.TemplateResponse(81 "activevisits.html",82 {"request": request, "info": return_query, "microscope": microscope},83 )84 else:85 log.debug(f"No visits identified for {microscope=}")86 return templates.TemplateResponse(87 "activevisits.html",88 {"request": request, "info": [], "microscope": microscope},89 )90@router.get("/demo/visits_raw", response_model=List[Visit])91def get_current_visits_demo(db=murfey.server.ispyb.DB):92 microscope = "m12"93 return murfey.server.ispyb.get_all_ongoing_visits(microscope, db)94@router.get("/visits_raw", response_model=List[Visit])95def get_current_visits(db=murfey.server.ispyb.DB):96 microscope = get_microscope()97 return murfey.server.ispyb.get_all_ongoing_visits(microscope, db)98@router.get("/visits/{visit_name}")99def visit_info(request: Request, visit_name: str, db=murfey.server.ispyb.DB):100 microscope = get_microscope()101 query = (102 db.query(BLSession)103 .join(Proposal)104 .filter(105 BLSession.proposalId == Proposal.proposalId,106 BLSession.beamLineName == microscope,107 BLSession.endDate > datetime.datetime.now(),108 BLSession.startDate < datetime.datetime.now(),109 )110 .add_columns(111 BLSession.startDate,112 BLSession.endDate,113 BLSession.beamLineName,114 Proposal.proposalCode,115 Proposal.proposalNumber,116 BLSession.visit_number,117 Proposal.title,118 )119 .all()120 )121 if query:122 return_query = [123 {124 "Start date": id.startDate,125 "End date": id.endDate,126 "Beamline name": id.beamLineName,127 "Visit name": visit_name,128 "Time remaining": str(id.endDate - datetime.datetime.now()),129 }130 for id in query131 if id.proposalCode + str(id.proposalNumber) + "-" + str(id.visit_number)132 == visit_name133 ] # "Proposal title": id.title134 return templates.TemplateResponse(135 "visit.html",136 {"request": request, "visit": return_query},137 )138 else:139 return None140@router.post("/visits/{visit_name}/context")141async def register_context(context_info: ContextInfo):142 log.info(143 f"Context {context_info.experiment_type}:{context_info.acquisition_software} registered"144 )145 await ws.manager.broadcast(f"Context registered: {context_info}")146 await ws.manager.set_state("experiment_type", context_info.experiment_type)147 await ws.manager.set_state(148 "acquisition_software", context_info.acquisition_software149 )150@router.post("/visits/{visit_name}/files")151async def add_file(file: File):152 message = f"File {file} transferred"153 log.info(message)154 await ws.manager.broadcast(f"File {file} transferred")155 return file156@router.post("/feedback")157async def send_murfey_message(msg: RegistrationMessage):158 if _transport_object:159 _transport_object.transport.send(160 "murfey_feedback", {"register": msg.registration}161 )162@router.post("/visits/{visit_name}/tomography_preprocess")163async def request_tomography_preprocessing(visit_name: str, proc_file: ProcessFile):164 visit_idx = Path(proc_file.path).parts.index(visit_name)165 core = Path(*Path(proc_file.path).parts[: visit_idx + 1])166 ppath = Path(proc_file.path)167 sub_dataset = (168 ppath.relative_to(core).parts[0]169 if len(ppath.relative_to(core).parts) > 1170 else ""171 )172 mrc_out = (173 core174 / "processed"175 / sub_dataset176 / "MotionCorr"177 / str(ppath.stem + "_motion_corrected.mrc")178 )179 ctf_out = core / "processed" / sub_dataset / "CTF" / str(ppath.stem + "_ctf.mrc")180 if not mrc_out.parent.exists():181 mrc_out.parent.mkdir(parents=True, mode=1411)182 if not ctf_out.parent.exists():183 ctf_out.parent.mkdir(parents=True, mode=1411)184 zocalo_message = {185 "recipes": ["em-tomo-preprocess"],186 "parameters": {187 "dcid": proc_file.data_collection_id,188 "autoproc_program_id": proc_file.autoproc_program_id,189 "movie": proc_file.path,190 "mrc_out": str(mrc_out),191 "pix_size": (proc_file.pixel_size) * 10**10,192 "output_image": str(ctf_out),193 "image_number": proc_file.image_number,194 "microscope": get_microscope(),195 "mc_uuid": proc_file.mc_uuid,196 },197 }198 log.info(f"Sending Zocalo message {zocalo_message}")199 if _transport_object:200 _transport_object.transport.send("processing_recipe", zocalo_message)201 else:202 log.error(203 f"Pe-processing was requested for {ppath.name} but no Zocalo transport object was found"204 )205 return proc_file206 await ws.manager.broadcast(f"Pre-processing requested for {ppath.name}")207 return proc_file208@router.post("/visits/{visit_name}/align")209async def request_tilt_series_alignment(tilt_series: TiltSeries):210 stack_file = (211 Path(tilt_series.motion_corrected_path).parents[1]212 / "align_output"213 / "aligned_file.mrc"214 )215 if not stack_file.parent.exists():216 stack_file.parent.mkdir(parents=True, mode=1411)217 zocalo_message = {218 "recipes": ["em-tomo-align"],219 "parameters": {220 # "ispyb_process": tilt_series.processing_job, #221 "input_file_list": tilt_series.file_tilt_list,222 "dcid": tilt_series.dcid,223 "appid": tilt_series.autoproc_program_id,224 "stack_file": str(stack_file),225 "movie_id": tilt_series.movie_id,226 },227 }228 log.info(f"Sending Zocalo message {zocalo_message}")229 if _transport_object:230 _transport_object.transport.send("processing_recipe", zocalo_message)231 else:232 log.error(233 f"Processing was requested for tilt series {tilt_series.name} but no Zocalo transport object was found"234 )235 return tilt_series236 await ws.manager.broadcast(237 f"Processing requested for tilt series {tilt_series.name}"238 )239 return tilt_series240@router.get("/version")241def get_version(client_version: str = ""):242 result = {243 "server": murfey.__version__,244 "oldest-supported-client": murfey.__supported_client_version__,245 }246 if client_version:247 client = packaging.version.parse(client_version)248 server = packaging.version.parse(murfey.__version__)249 minimum_version = packaging.version.parse(murfey.__supported_client_version__)250 result["client-needs-update"] = minimum_version > client251 result["client-needs-downgrade"] = client > server252 return result253@router.get("/shutdown", include_in_schema=False)254def shutdown():255 """A method to stop the server. This should be removed before Murfey is256 deployed in production. To remove it we need to figure out how to control257 to process (eg. systemd) and who to run it as."""258 log.info("Server shutdown request received")259 _shutdown()260 return {"success": True}261@router.post("/visits/{visit_name}/suggested_path")262def suggest_path(visit_name, params: SuggestedPathParameters):263 count: int | None = None264 check_path = (265 machine_config["rsync_basepath"] / params.base_path266 if machine_config267 else Path(f"/dls/{get_microscope()}") / params.base_path268 )269 check_path_name = check_path.name270 while check_path.exists():271 count = count + 1 if count else 2272 check_path = check_path.parent / f"{check_path_name}{count}"273 return {"suggested_path": check_path}274@router.post("/visits/{visit_name}/register_data_collection_group")275def register_dc_group(visit_name, dcg_params: DCGroupParameters):276 ispyb_proposal_code = visit_name[:2]277 ispyb_proposal_number = visit_name.split("-")[0][2:]278 ispyb_visit_number = visit_name.split("-")[-1]279 log.info(f"Registering data collection group on microscope {get_microscope()}")280 dcg_parameters = {281 "session_id": murfey.server.ispyb.get_session_id(282 microscope="m12", # get_microscope(),283 proposal_code=ispyb_proposal_code,284 proposal_number=ispyb_proposal_number,285 visit_number=ispyb_visit_number,286 db=murfey.server.ispyb.Session(),287 ),288 "start_time": str(datetime.datetime.now()),289 "experiment_type": dcg_params.experiment_type,290 }291 if _transport_object:292 _transport_object.transport.send(293 "murfey_feedback", {"register": "data_collection_group", **dcg_parameters} # type: ignore294 )295 return dcg_parameters296@router.post("/visits/{visit_name}/start_data_collection")297def start_dc(visit_name, dc_params: DCParameters):298 ispyb_proposal_code = visit_name[:2]299 ispyb_proposal_number = visit_name.split("-")[0][2:]300 ispyb_visit_number = visit_name.split("-")[-1]301 log.info(f"Starting data collection on microscope {get_microscope()}")302 dc_parameters = {303 "visit": visit_name,304 "session_id": murfey.server.ispyb.get_session_id(305 microscope="m12", # get_microscope(),306 proposal_code=ispyb_proposal_code,307 proposal_number=ispyb_proposal_number,308 visit_number=ispyb_visit_number,309 db=murfey.server.ispyb.Session(),310 ),311 "image_directory": dc_params.image_directory,312 "start_time": str(datetime.datetime.now()),313 "voltage": dc_params.voltage,314 "pixel_size": dc_params.pixel_size_on_image,315 "image_suffix": dc_params.file_extension,316 "experiment_type": dc_params.experiment_type,317 "n_images": dc_params.tilt,318 "image_size_x": dc_params.image_size_x,319 "image_size_y": dc_params.image_size_y,320 "acquisition_software": dc_params.acquisition_software,321 "tag": dc_params.tag,322 }323 if _transport_object:324 log.debug(f"Send registration message to murfey_feedback: {dc_parameters}")325 _transport_object.transport.send(326 "murfey_feedback", {"register": "data_collection", **dc_parameters}327 )328 return dc_params329@router.post("/visits/{visit_name}/register_processing_job")330def register_proc(visit_name, proc_params: ProcessingJobParameters):331 proc_parameters = {332 "recipe": proc_params.recipe,333 "tag": proc_params.tag,334 }335 if _transport_object:336 log.info(337 f"Send processing registration message to murfey_feedback: {proc_parameters}"338 )339 _transport_object.transport.send(340 "murfey_feedback", {"register": "processing_job", **proc_parameters}341 )...
demo_api.py
Source:demo_api.py
1from __future__ import annotations2import datetime3import logging4from functools import lru_cache5from pathlib import Path6from typing import List7import packaging.version8from fastapi import APIRouter, Request9from fastapi.responses import HTMLResponse10from ispyb.sqlalchemy import BLSession11from pydantic import BaseSettings12import murfey.server.bootstrap13import murfey.server.websocket as ws14from murfey.server import feedback_callback_async, get_hostname, get_microscope15from murfey.server import shutdown as _shutdown16from murfey.server import templates17from murfey.server.config import from_file18from murfey.util.models import (19 ContextInfo,20 DCGroupParameters,21 DCParameters,22 File,23 ProcessFile,24 ProcessingJobParameters,25 RegistrationMessage,26 SuggestedPathParameters,27 TiltSeries,28 Visit,29)30from murfey.util.state import global_state31log = logging.getLogger("murfey.server.demo_api")32tags_metadata = [murfey.server.bootstrap.tag]33router = APIRouter()34class Settings(BaseSettings):35 murfey_machine_configuration: str = ""36settings = Settings()37machine_config: dict = {}38if settings.murfey_machine_configuration:39 microscope = get_microscope()40 machine_config = dict(41 from_file(Path(settings.murfey_machine_configuration), microscope)42 )43# This will be the homepage for a given microscope.44@router.get("/", response_class=HTMLResponse)45async def root(request: Request):46 return templates.TemplateResponse(47 "home.html",48 {49 "request": request,50 "hostname": get_hostname(),51 "microscope": get_microscope(),52 "version": murfey.__version__,53 },54 )55@lru_cache(maxsize=1)56@router.get("/machine/")57def machine_info():58 if settings.murfey_machine_configuration:59 microscope = get_microscope()60 return from_file(settings.murfey_machine_configuration, microscope)61 return {}62@router.get("/microscope/")63def get_mic():64 microscope = get_microscope()65 return {"microscope": microscope}66@router.get("/visits/")67def all_visit_info(request: Request):68 microscope = get_microscope()69 return_query = [70 {71 "Start date": datetime.datetime.now(),72 "End date": datetime.datetime.now(),73 "Visit name": "dummy",74 "Time remaining": 0,75 }76 ] # "Proposal title": visit.proposal_title77 return templates.TemplateResponse(78 "activevisits.html",79 {"request": request, "info": return_query, "microscope": microscope},80 )81@router.get("/visits_raw", response_model=List[Visit])82def get_current_visits():83 return [84 Visit(85 start=datetime.datetime.now(),86 end=datetime.datetime.now() + datetime.timedelta(days=1),87 session_id=1,88 name="cm31111-2",89 beamline="m12",90 proposal_title="Nothing of importance",91 )92 ]93@router.get("/visits/{visit_name}")94def visit_info(request: Request, visit_name: str):95 microscope = get_microscope()96 query = [97 BLSession(98 proposalId=1,99 beamLineName=microscope,100 endDate=datetime.datetime.now() + datetime.timedelta(days=1),101 startDate=datetime.datetime.now(),102 visitNumber=1,103 )104 ]105 return_query = [106 {107 "Start date": id.startDate,108 "End date": id.endDate,109 "Beamline name": id.beamLineName,110 "Visit name": visit_name,111 "Time remaining": str(id.endDate - datetime.datetime.now()),112 }113 for id in query114 if id.proposalCode + str(id.proposalNumber) + "-" + str(id.visit_number)115 == visit_name116 ] # "Proposal title": id.title117 return templates.TemplateResponse(118 "visit.html",119 {"request": request, "visit": return_query},120 )121@router.post("/visits/{visit_name}/context")122async def register_context(context_info: ContextInfo):123 log.info(124 f"Context {context_info.experiment_type}:{context_info.acquisition_software} registered"125 )126 await ws.manager.broadcast(f"Context registered: {context_info}")127 await ws.manager.set_state("experiment_type", context_info.experiment_type)128 await ws.manager.set_state(129 "acquisition_software", context_info.acquisition_software130 )131@router.post("/visits/{visit_name}/files")132async def add_file(file: File):133 message = f"File {file} transferred"134 log.info(message)135 await ws.manager.broadcast(f"File {file} transferred")136 return file137@router.post("/feedback")138async def send_murfey_message(msg: RegistrationMessage):139 pass140@router.post("/visits/{visit_name}/tomography_preprocess")141async def request_tomography_preprocessing(visit_name: str, proc_file: ProcessFile):142 if not Path(proc_file.path).exists():143 log.warning(f"{proc_file.path} has not been transferred before preprocessing")144 visit_idx = Path(proc_file.path).parts.index(visit_name)145 core = Path(*Path(proc_file.path).parts[: visit_idx + 1])146 ppath = Path(proc_file.path)147 sub_dataset = (148 ppath.relative_to(core).parts[0]149 if len(ppath.relative_to(core).parts) > 1150 else ""151 )152 mrc_out = (153 core154 / "processed"155 / sub_dataset156 / "MotionCorr"157 / str(ppath.stem + "_motion_corrected.mrc")158 )159 if not mrc_out.parent.exists():160 mrc_out.parent.mkdir(parents=True)161 await feedback_callback_async(162 {},163 {164 "register": "motion_corrected",165 "movie": str(proc_file.path),166 "mrc_out": str(mrc_out),167 "movie_id": proc_file.mc_uuid,168 },169 )170 await ws.manager.broadcast(f"Pre-processing requested for {ppath.name}")171 mrc_out.touch()172 return proc_file173@router.post("/visits/{visit_name}/align")174async def request_tilt_series_alignment(tilt_series: TiltSeries):175 stack_file = (176 Path(tilt_series.motion_corrected_path).parents[1]177 / "align_output"178 / f"aligned_file_{tilt_series.name}.mrc"179 )180 if not stack_file.parent.exists():181 stack_file.parent.mkdir(parents=True)182 await ws.manager.broadcast(183 f"Processing requested for tilt series {tilt_series.name}"184 )185 stack_file.touch()186 return tilt_series187@router.get("/version")188def get_version(client_version: str = ""):189 result = {190 "server": murfey.__version__,191 "oldest-supported-client": murfey.__supported_client_version__,192 }193 if client_version:194 client = packaging.version.parse(client_version)195 server = packaging.version.parse(murfey.__version__)196 minimum_version = packaging.version.parse(murfey.__supported_client_version__)197 result["client-needs-update"] = minimum_version > client198 result["client-needs-downgrade"] = client > server199 return result200@router.get("/shutdown", include_in_schema=False)201def shutdown():202 """A method to stop the server. This should be removed before Murfey is203 deployed in production. To remove it we need to figure out how to control204 to process (eg. systemd) and who to run it as."""205 log.info("Server shutdown request received")206 _shutdown()207 return {"success": True}208@router.post("/visits/{visit_name}/suggested_path")209def suggest_path(visit_name, params: SuggestedPathParameters):210 count: int | None = None211 check_path = (212 machine_config["rsync_basepath"] / params.base_path213 if machine_config214 else Path(f"/dls/{get_microscope()}") / params.base_path215 )216 check_path_name = check_path.name217 while check_path.exists():218 count = count + 1 if count else 2219 check_path = check_path.parent / f"{check_path_name}{count}"220 return {"suggested_path": check_path.relative_to(machine_config["rsync_basepath"])}221@router.post("/visits/{visit_name}/register_data_collection_group")222def register_dc_group(visit_name, dcg_params: DCGroupParameters):223 log.info(f"Registering data collection group on microscope {get_microscope()}")224 global_state["data_collection_group_id"] = 1225 return dcg_params226@router.post("/visits/{visit_name}/start_data_collection")227def start_dc(visit_name, dc_params: DCParameters):228 log.info(f"Starting data collection on microscope {get_microscope()}")229 if global_state.get("data_collection_ids") and isinstance(230 global_state["data_collection_ids"], dict231 ):232 global_state["data_collection_ids"] = {233 **global_state["data_collection_ids"],234 dc_params.tag: 1,235 }236 else:237 global_state["data_collection_ids"] = {dc_params.tag: 1}238 return dc_params239@router.post("/visits/{visit_name}/register_processing_job")240def register_proc(visit_name, proc_params: ProcessingJobParameters):241 if global_state.get("processing_job_ids"):242 assert isinstance(global_state["processing_job_ids"], dict)243 global_state["processing_job_ids"] = {244 **{245 k: v246 for k, v in global_state["processing_job_ids"].items()247 if k != proc_params.tag248 },249 proc_params.tag: {250 **global_state["processing_job_ids"].get(proc_params.tag, {}),251 proc_params.recipe: 1,252 },253 }254 else:255 global_state["processing_job_ids"] = {proc_params.tag: {proc_params.recipe: 1}}256 if global_state.get("autoproc_program_ids"):257 assert isinstance(global_state["autoproc_program_ids"], dict)258 global_state["autoproc_program_ids"] = {259 **global_state["autoproc_program_ids"],260 proc_params.tag: {261 **global_state["autoproc_program_ids"].get(proc_params.tag, {}),262 proc_params.recipe: 1,263 },264 }265 else:266 global_state["autoproc_program_ids"] = {267 proc_params.tag: {proc_params.recipe: 1}268 }...
visitor.py
Source:visitor.py
1import typing as tp2class Visited(object):3 #Defaults to the class name4 def kind(self) -> str:5 return [cls.__name__ for cls in type(self).mro()][:-1]6 def children(self):7 raise NotImplemented()8class AbstractDag:9 def __init__(self, *roots):10 self._roots = roots11 def roots(self):12 return self._roots13class VisitorMeta(type):14 def __new__(self, name, bases, dct):15 if bases and ("visit" in dct):16 raise ValueError("Cannot override visit")17 return type.__new__(self, name, bases, dct)18class Visitor(metaclass=VisitorMeta):19 def run(self, dag: AbstractDag):20 assert isinstance(dag, AbstractDag), f"{dag}"21 self._dag_cache = set()22 for root in dag.roots():23 assert root is not None24 self.visit(root)25 return self26 def visit(self, node: Visited):27 assert node is not None28 assert isinstance(node, Visited), f"{node}"29 if node in self._dag_cache:30 return31 visited = False32 for kind_str in node.kind():33 visit_name = f"visit_{kind_str}"34 if hasattr(self, visit_name):35 getattr(self, visit_name)(node)36 visited = True37 break38 if not visited:39 self.generic_visit(node)40 self._dag_cache.add(node)41 def generic_visit(self, node):42 assert node is not None43 #Do nothing for current node44 for child in node.children():45 self.visit(child)46#Semantics are if you return None, then do not change anything47#If you return something then replace current node with that thing48#TODO Does replacing even work if you are replacing a Node that has multiple parents??49class Transformer(metaclass=VisitorMeta):50 def run(self, dag: AbstractDag):51 assert isinstance(dag, AbstractDag)52 self._dag_cache = {}53 for output in dag.roots():54 self.generic_visit(output)55 def visit(self, node):56 if node in self._dag_cache:57 return self._dag_cache[node]58 visited = False59 for kind_str in node.kind():60 visit_name = f"visit_{kind_str}"61 if hasattr(self, visit_name):62 ret = getattr(self, visit_name)(node)63 visited = True64 break65 if not visited:66 ret = self.generic_visit(node)67 if ret is None:68 ret = node69 if not isinstance(ret, Visited):70 raise ValueError(f"Transformed object {ret} is not a Visited object")71 self._dag_cache[node] = ret72 #TODO inputs and outputs should be replaced appropriately ??73 return ret74 def generic_visit(self, node):75 #Modify the current node with the new children76 new_children = []77 for child in node.children():78 new_child = self.visit(child)79 assert new_child is not None80 new_children.append(new_child)81 node.set_children(*new_children)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!