Best Python code snippet using localstack_python
scheduler.py
Source:scheduler.py
...33 return self.period is not None34 @property35 def is_cancelled(self) -> bool:36 return self._cancelled37 def set_next_deadline(self):38 """39 Internal method to update the next deadline of this task based on the period and the current time.40 """41 if not self.deadline:42 raise ValueError("Deadline was not initialized")43 if self.fixed_rate:44 self.deadline = self.deadline + self.period45 else:46 self.deadline = time.time() + self.period47 def cancel(self):48 self._cancelled = True49 def run(self):50 """51 Executes the task function. If the function raises and Exception, ``on_error`` is called (if set).52 """53 try:54 self.task(*self.args, **self.kwargs)55 except Exception as e:56 if self.on_error:57 self.on_error(e)58class Scheduler:59 """60 An event-loop based task scheduler that can manage multiple scheduled tasks with different periods,61 can be parallelized with an executor.62 """63 POISON = (-1, "__POISON__")64 def __init__(self, executor: Optional[Executor] = None) -> None:65 """66 Creates a new Scheduler. If an executor is passed, then that executor will be used to run the scheduled tasks67 asynchronously, otherwise they will be executed synchronously inside the event loop. Running tasks68 asynchronously in an executor means that they will be effectively executed at a fixed rate (scheduling with69 ``fixed_rate = False``, will have no effect).70 :param executor: an optional executor that tasks will be submitted to.71 """72 super().__init__()73 self.executor = executor74 self._queue = queue.PriorityQueue()75 self._condition = threading.Condition()76 def schedule(77 self,78 func: Callable,79 period: Optional[float] = None,80 fixed_rate: bool = True,81 start: Optional[float] = None,82 on_error: Callable[[Exception], None] = None,83 args: Optional[Union[Tuple, List[Any]]] = None,84 kwargs: Optional[Mapping[str, Any]] = None,85 ) -> ScheduledTask:86 """87 Schedules a given task (function call).88 :param func: the task to schedule89 :param period: the period in which to run the task (in seconds). if not set, task will run once90 :param fixed_rate: whether the to run at a fixed rate (neglecting execution duration of the task)91 :param start: start time92 :param on_error: error callback93 :param args: additional positional arguments to pass to the function94 :param kwargs: additional keyword arguments to pass to the function95 :return: a ScheduledTask instance96 """97 st = ScheduledTask(98 func,99 period=period,100 fixed_rate=fixed_rate,101 start=start,102 on_error=on_error,103 args=args,104 kwargs=kwargs,105 )106 self.schedule_task(st)107 return st108 def schedule_task(self, task: ScheduledTask) -> None:109 """110 Schedules the given task and sets the deadline of the task to either ``task.start`` or the current time.111 :param task: the task to schedule112 """113 task.deadline = max(task.start or 0, time.time())114 self.add(task)115 def add(self, task: ScheduledTask) -> None:116 """117 Schedules the given task. Requires that the task has a deadline set. It's better to use ``schedule_task``.118 :param task: the task to schedule.119 """120 if task.deadline is None:121 raise ValueError122 task._cancelled = False123 with self._condition:124 self._queue.put((task.deadline, task))125 self._condition.notify()126 def close(self) -> None:127 """128 Terminates the run loop.129 """130 with self._condition:131 self._queue.put(self.POISON)132 self._condition.notify()133 def run(self):134 q = self._queue135 cond = self._condition136 executor = self.executor137 poison = self.POISON138 task: ScheduledTask139 while True:140 deadline, task = q.get()141 if (deadline, task) == poison:142 break143 if task.is_cancelled:144 continue145 # wait until the task should be executed146 wait = max(0, deadline - time.time())147 if wait > 0:148 with cond:149 interrupted = cond.wait(timeout=wait)150 if interrupted:151 # something with a potentially earlier deadline has arrived while waiting, so we re-queue and152 # continue. this could be optimized by checking the deadline of the added element(s) first,153 # but that would be fairly involved. the assumption is that `schedule` is not invoked frequently154 q.put((task.deadline, task))155 continue156 # run or submit the task157 if not task.is_cancelled:158 if executor:159 executor.submit(task.run)160 else:161 task.run()162 if task.is_periodic:163 try:164 task.set_next_deadline()165 except ValueError:166 # task deadline couldn't be set because it was cancelled167 continue...
m6_demo_plans.py
Source:m6_demo_plans.py
...17from bluesky import preprocessors as bpp18import datetime19import time20import tqdm21def set_next_deadline(deadline, interval):22 while deadline <= time.time(): # until time is in the future23 deadline += interval24 return deadline25def push_images(num_images=4, frame_rate=10, run_time=300, md={}):26 _md = dict(27 purpose="push TIFF files to PVaccess PV",28 num_images=num_images,29 frame_rate=frame_rate,30 run_time=run_time,31 datetime=str(datetime.datetime.now()),32 )33 _md.update(md)34 # Problems priming, cannot force a new cam image. Set Capture directly.35 # if not AD_plugin_primed(adpvadet.tiff1):36 # print("Priming 'adpvadet.tiff1' plugin.")37 # AD_prime_plugin2(adpvadet.tiff1)38 if "capture" in adpvadet.tiff1.stage_sigs:39 adpvadet.tiff1.stage_sigs.pop("capture")40 print(f"adpvadet.tiff1.stage_sigs={adpvadet.tiff1.stage_sigs}")41 adpvadet.cam.stage_sigs["num_images"] = 1_000_000 # num_images42 adpvadet.tiff1.stage_sigs["num_capture"] = 1_000_000 # num_images43 frame_interval = 1.0 / frame_rate44 # setup custom file names in TIFF plugin45 yield from bps.mv(46 adpvadet.tiff1.file_name, iconfig["BDP_DATA_FILE_NAME"],47 adpvadet.tiff1.file_path, iconfig["BDP_DATA_DIR"],48 adpvadet.tiff1.file_template, iconfig["BDP_DATA_FILE_TEMPLATE"],49 )50 @bpp.stage_decorator([adpvadet])51 @bpp.run_decorator(md=_md)52 def inner_plan():53 yield from bps.mv(adpvadet.tiff1.capture, 1)54 yield from bps.mv(adpvadet.cam.acquire, 1)55 t0 = time.time()56 frame_deadline = t057 run_deadline = t0 + max(0, run_time)58 def detector_stopped():59 result = adpvadet.cam.acquire.get() not in (1, "Acquire")60 if result:61 logger.info(62 "Stopping 'acquisition' early:"63 f" {adpvadet.cam.acquire.pvname} stopped."64 )65 return result66 def has_runtime_expired():67 result = time.time() >= run_deadline68 if result:69 logger.info("Run time time complete.")70 return result71 def publish_single_frame(frame):72 yield from bps.null()73 # next call is not a bluesky plan74 img2pva.publish_frame_as_pva(frame) # runs in thread75 progress_bar = tqdm.tqdm(desc=f"run time: {run_time} seconds.")76 while True:77 "Repeat until run time expires."78 if has_runtime_expired() or detector_stopped():79 break80 for frame in gallery.image_file_list(num_images):81 progress_bar.update()82 if has_runtime_expired() or detector_stopped():83 break84 yield from img2pva.wait_server(frame_deadline)85 # yield from bps.mv(img2pva, item)86 yield from publish_single_frame(frame)87 yield from img2pva.wait_server()88 yield from bps.create()89 yield from bps.read(adpvadet.cam.array_counter)90 yield from bps.read(adpvadet.cam.array_rate)91 yield from bps.read(adpvadet.pva1.execution_time)92 yield from bps.read(adpvadet.tiff1.execution_time)93 yield from bps.save()94 frame_deadline = set_next_deadline(frame_deadline, frame_interval)95 yield from img2pva.wait_server()96 progress_bar.close()97 yield from bps.mv(adpvadet.tiff1.capture, 0)98 yield from bps.mv(adpvadet.cam.acquire, 0)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!