Best Python code snippet using avocado_python
_classes.py
Source:_classes.py
1import asyncio2from abc import abstractmethod3from datetime import datetime4from app.base.status_updater.models import StatusApplication5from app.db import DBS6from app.db.queries.mysql import QUERY_UPDATE_CARGO_STATUS_BY_IDMONOPOLIA7from app.routes.pecom.utils import batch8from app.settings.consts import TK_ID_DICT9from app.settings.log import logger10class TKStatusApplicationsIterator:11 batch_size: int = 112 semaphore_size: int = 113 def __init__(self, applications: list[StatusApplication], test=False):14 self.applications = applications15 self.batch_size = self.batch_size16 self.semaphore = asyncio.Semaphore(self.semaphore_size)17 batcher = batch(self.batch_size)18 self.applications_batched = [b for b in batcher(self.applications)]19 self.test = test20 async def __aiter__(self):21 # нам возвÑаÑаеÑÑÑ Ð² Ñикле коÑÑÑинÑ, ÑезÑлÑÑаÑÑ ÐºÐ¾ÑоÑÑÑ
list22 # а Ð¼Ñ Ñ
оÑим напиÑаÑÑ Ð¸ÑеÑаÑоÑ, поÑÑÐ¾Ð¼Ñ Ð´ÐµÐ»Ð°ÐµÐ¼ yield Ð¾Ñ ÑезÑлÑÑаÑа23 for st_a in await asyncio.gather(24 *[25 self.get_applications_status_filtered(apl)26 for apl in self.applications_batched27 ]28 ):29 # logger.info(f'заÑвки, Ñ ÐºÐ¾ÑоÑÑÑ
мзменилÑÑ ÑÑаÑÑÑ: {st_a}')30 if st_a:31 for a in st_a:32 yield a33 async def get_applications_status_filtered(34 self, applications: list[StatusApplication]35 ):36 """37 возвÑаÑаем ÑолÑко Ñе заÑвки, ÑÑаÑÑÑ ÐºÐ¾ÑоÑÑÑ
изменилÑÑ38 """39 async with self.semaphore:40 sorted_appl = sorted(applications, key=lambda x: x.idmonopolia)41 newer_statuses = await self.get_applications_status(sorted_appl)42 # Ð´Ð»Ñ Ð¿Ñода43 predicate = lambda x, y: x != y44 # Ð´Ð»Ñ ÑеÑÑа45 if self.test:46 predicate = lambda x, y: True47 l = list(48 filter(49 lambda z: predicate(z[0].status, z[1].status),50 zip(sorted_appl, newer_statuses),51 )52 )53 return [t[1] for t in l]54 @abstractmethod55 async def get_applications_status(56 self, applications: list[StatusApplication]57 ) -> list[StatusApplication]:58 """59 ÐбÑабаÑÑÐ²Ð°ÐµÑ Ð·Ð°Ñвки из маÑÑива applications и оÑÐ´Ð°ÐµÑ Ð¿Ð¾ ним ÑÑаÑÑÑ60 Ð Ð°Ð·Ð¼ÐµÑ Ð¼Ð°ÑÑива applications Ñавен batch_size.61 Таким обÑазом, еÑли ÑÑаÑÑÑ Ð¼Ð¾Ð¶ÐµÑ Ð±ÑÑÑ ÑолÑко по одной заÑвке, ÑÑиÑаем ÑÑо в маÑÑиве applications62 наÑ
одиÑÑÑ ÑолÑко 1 ÑлеменÑ63 """64 pass65class TKStatusDB:66 def __init__(self, idtk: int, finished_statuses: list[str]):67 self.idtk = idtk68 self.finished_statuses = finished_statuses69 if not self.finished_statuses:70 raise NotImplementedError("Ðе Ð¼Ð¾Ð¶ÐµÑ Ð½Ðµ бÑÑÑ ÑиналÑнÑÑ
ÑÑаÑÑÑов гÑÑзов")71 mng_ep = DBS["mongo_epool_admin"]["client"]72 self.mysql_write = DBS["mysql_write"]73 self.mysql_read = DBS["mysql"]74 self.orders_coll = mng_ep.get_collection("tk_" + TK_ID_DICT[self.idtk])75 async def update_status(self, application: StatusApplication) -> None:76 mysql_write = DBS["mysql_write"]77 status_info = {"status": application.status}78 # if application.status.id:79 # status_info["status_id"] = application.status.id80 self.orders_coll.update_one(81 {"_id": application.idmonopolia}, {"$set": status_info}82 )83 dt = datetime.now()84 await mysql_write.fetch_one(85 QUERY_UPDATE_CARGO_STATUS_BY_IDMONOPOLIA,86 {87 "idmonopolia": application.idmonopolia,88 "tk_status": application.status,89 "status_changed": dt,90 },91 )92 logger.info(93 f"{TK_ID_DICT[self.idtk].upper()} STATUS UPDATER: "94 f"гÑÑз Ñ idmonopolia=%s обновлен %s Ñо status=%s",95 application.idmonopolia,96 dt,97 application.status,98 )99 def get_applications_query(self):100 finish_cond = []101 for finish_st in self.finished_statuses:102 finish_cond += [f"tk_status LIKE '{finish_st}'"]103 finish_cond = " OR ".join(finish_cond)104 return (105 "SELECT idmonopolia, tk_num, tk_status AS status "106 "FROM mircomf4_epool.ep_zakaz_mon4tk_acc "107 f"WHERE idtk={self.idtk} AND (tk_status IS NULL OR "108 f"NOT ({finish_cond}));"109 )110 async def get_applications(self) -> list[StatusApplication]:111 applications = [112 dict(t)113 for t in await self.mysql_read.fetch_all(self.get_applications_query())114 ]115 # logger.info(applications[0])116 applications = [StatusApplication(**t) for t in applications]117 logger.info(118 f"{TK_ID_DICT[self.idtk].upper()} STATUS UPDATER: "119 f"колиÑеÑÑво заÑвок Ð´Ð»Ñ Ð¾Ð±ÑабоÑки len(applications)={len(applications)}"120 )...
_status_updater.py
Source:_status_updater.py
1import datetime2from typing import Type3from app.base.sms.sender_api import SenderAPI4from app.base.sms.sms_handler import SMSHandler5from app.base.sms.sms_sender import SMSSender6from ._classes import TKStatusApplicationsIterator, TKStatusDB7from ._consts import STATUS_TIME_FROM, STATUS_TIME_TO8from ...env import SETTINGS9from ...settings.consts import TK_ID_DICT, SMS_SENDER10from ...settings.log import logger11class TKStatusUpdater:12 def __init__(13 self,14 idtk: int,15 finished_statuses: list[str],16 status_iterator_class: Type[TKStatusApplicationsIterator],17 status_db_class: Type[TKStatusDB] = TKStatusDB,18 ):19 self.idtk = idtk20 self.finished_statuses = finished_statuses21 self.status_iterator_class = status_iterator_class22 self.status_db_class = status_db_class23 self.sms_handler = SMSHandler(24 idtk=idtk, sender=SMSSender(SenderAPI(who=SMS_SENDER))25 )26 async def update_all(self, test=False):27 if not SETTINGS.STATUS_OFF:28 now_hour = datetime.datetime.now().hour29 # ÐÑÐµÐ¼Ñ ÐоÑковÑкое (ÑÑÑановлено в докеÑе)30 # ÐÐµÐ¶Ð»Ð¸Ð²Ð°Ñ Ð¾ÑпÑавка31 if STATUS_TIME_FROM <= now_hour < STATUS_TIME_TO:32 status_db = self.status_db_class(self.idtk, self.finished_statuses)33 applications = await status_db.get_applications()34 # Ð´Ð»Ñ ÑеÑÑа обÑежем ÑÑÑка35 if test:36 applications = applications[:5]37 async_gen = self.status_iterator_class(applications, test=test)38 cnt = 039 async for appl in async_gen:40 cnt += async_gen.batch_size41 if not test:42 await status_db.update_status(appl)43 await self.sms_handler.handle(appl)44 if not test:45 logger.info(46 f"СÑаÑÑÑ Ð¾Ð±Ð½Ð¾Ð²Ð»ÐµÐ½ Ñ {cnt} {TK_ID_DICT[self.idtk].upper()} заÑвок"...
handler.py
Source:handler.py
1import json2from okdata.aws.logging import log_add, logging_wrapper3from okdata.aws.status import TraceStatus, TraceEventStatus4from okdata.aws.status.sdk import Status5from okdata.aws.status.wrapper import _status_from_lambda_context6finished_statuses = {7 "ABORTED": TraceEventStatus.FAILED,8 "FAILED": TraceEventStatus.FAILED,9 "TIMED_OUT": TraceEventStatus.FAILED,10 "SUCCEEDED": TraceEventStatus.OK,11}12@logging_wrapper13def act_on_queue(event, context):14 records = event.get("Records")15 if not records:16 raise ValueError("Event does not contain Records")17 # We always get 1 event, see Reliability section of https://aws.amazon.com/sns/faqs/18 record = records[0]19 source = record["EventSource"]20 if source != "aws:sns":21 raise ValueError(22 f"Unsuported 'EventSource' {source}. Supported types: 'aws:sns'"23 )24 event = json.loads(record["Sns"]["Message"])25 try:26 event_status = event["detail"]["status"]27 except KeyError:28 return False29 trace_id = event["detail"].get("name")30 log_add(trace_id=trace_id, event_status=event_status)31 # Ignore statuses where pipeline is not finished32 if event_status not in finished_statuses:33 return False34 return _set_finished_status(event, context, trace_id, event_status)35def _set_finished_status(event, context, trace_id, event_status):36 status = Status(_status_from_lambda_context(event, context))37 trace_event_status = finished_statuses[event_status]38 log_add(trace_event_status=trace_event_status)39 status.add(40 trace_id=trace_id,41 domain="dataset",42 operation="_set_finished_status",43 trace_event_status=trace_event_status,44 trace_status=TraceStatus.FINISHED,45 )46 status_api_ok = status.done() is not None47 log_add(status_api_ok=status_api_ok)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!