Best Python code snippet using slash
__init__.py
Source:__init__.py
...21 'See https://pypi.org/project/SQLAlchemy/')22logger = logging.getLogger(__name__)23__all__ = ('DatabaseBackend',)24@contextmanager25def session_cleanup(session):26 try:27 yield28 except Exception:29 session.rollback()30 raise31 finally:32 session.close()33def retry(fun):34 @wraps(fun)35 def _inner(*args, **kwargs):36 max_retries = kwargs.pop('max_retries', 3)37 for retries in range(max_retries):38 try:39 return fun(*args, **kwargs)40 except (DatabaseError, InvalidRequestError, StaleDataError):41 logger.warning(42 'Failed operation %s. Retrying %s more times.',43 fun.__name__, max_retries - retries - 1,44 exc_info=True)45 if retries + 1 >= max_retries:46 raise47 return _inner48class DatabaseBackend(BaseBackend):49 """The database result backend."""50 # ResultSet.iterate should sleep this much between each pool,51 # to not bombard the database with queries.52 subpolling_interval = 0.553 def __init__(self, dburi=None, engine_options=None, url=None, **kwargs):54 # The `url` argument was added later and is used by55 # the app to set backend by url (celery.app.backends.by_url)56 super(DatabaseBackend, self).__init__(57 expires_type=maybe_timedelta, url=url, **kwargs)58 conf = self.app.conf59 self.url = url or dburi or conf.database_url60 self.engine_options = dict(61 engine_options or {},62 **conf.database_engine_options or {})63 self.short_lived_sessions = kwargs.get(64 'short_lived_sessions',65 conf.database_short_lived_sessions)66 tablenames = conf.database_table_names or {}67 Task.__table__.name = tablenames.get('task', 'celery_taskmeta')68 TaskSet.__table__.name = tablenames.get('group', 'celery_tasksetmeta')69 if not self.url:70 raise ImproperlyConfigured(71 'Missing connection string! Do you have the'72 ' database_url setting set to a real value?')73 def ResultSession(self, session_manager=SessionManager()):74 return session_manager.session_factory(75 dburi=self.url,76 short_lived_sessions=self.short_lived_sessions,77 **self.engine_options)78 @retry79 def _store_result(self, task_id, result, state,80 traceback=None, max_retries=3, **kwargs):81 """Store return value and state of an executed task."""82 session = self.ResultSession()83 with session_cleanup(session):84 task = list(session.query(Task).filter(Task.task_id == task_id))85 task = task and task[0]86 if not task:87 task = Task(task_id)88 session.add(task)89 session.flush()90 task.result = result91 task.status = state92 task.traceback = traceback93 session.commit()94 return result95 @retry96 def _get_task_meta_for(self, task_id):97 """Get task meta-data for a task by id."""98 session = self.ResultSession()99 with session_cleanup(session):100 task = list(session.query(Task).filter(Task.task_id == task_id))101 task = task and task[0]102 if not task:103 task = Task(task_id)104 task.status = states.PENDING105 task.result = None106 return self.meta_from_decoded(task.to_dict())107 @retry108 def _save_group(self, group_id, result):109 """Store the result of an executed group."""110 session = self.ResultSession()111 with session_cleanup(session):112 group = TaskSet(group_id, result)113 session.add(group)114 session.flush()115 session.commit()116 return result117 @retry118 def _restore_group(self, group_id):119 """Get meta-data for group by id."""120 session = self.ResultSession()121 with session_cleanup(session):122 group = session.query(TaskSet).filter(123 TaskSet.taskset_id == group_id).first()124 if group:125 return group.to_dict()126 @retry127 def _delete_group(self, group_id):128 """Delete meta-data for group by id."""129 session = self.ResultSession()130 with session_cleanup(session):131 session.query(TaskSet).filter(132 TaskSet.taskset_id == group_id).delete()133 session.flush()134 session.commit()135 @retry136 def _forget(self, task_id):137 """Forget about result."""138 session = self.ResultSession()139 with session_cleanup(session):140 session.query(Task).filter(Task.task_id == task_id).delete()141 session.commit()142 def cleanup(self):143 """Delete expired meta-data."""144 session = self.ResultSession()145 expires = self.expires146 now = self.app.now()147 with session_cleanup(session):148 session.query(Task).filter(149 Task.date_done < (now - expires)).delete()150 session.query(TaskSet).filter(151 TaskSet.date_done < (now - expires)).delete()152 session.commit()153 def __reduce__(self, args=(), kwargs={}):154 kwargs.update(155 {'dburi': self.url,156 'expires': self.expires,157 'engine_options': self.engine_options})...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!