Best Python code snippet using avocado_python
celery.py
Source:celery.py
1import json2import subprocess3import importlib4from django.conf.urls import url5from django.core.cache import cache6from django.shortcuts import redirect7from django.urls import reverse8from django.utils.html import format_html9from django_celery_beat.admin import PeriodicTaskAdmin10from django_celery_beat.models import PeriodicTask11from django_celery_results.admin import TaskResultAdmin12from django_celery_results.backends import DatabaseBackend13from apps.general.decorators import run_on_commit14from apps.general.loggers import django_logger15from apps.general.utils import in_tests16class WMCPDatabaseBackend(DatabaseBackend):17 def mark_as_failure(self, task_id, exc, *args, **kwargs):18 """Mark task as executed with failure."""19 django_logger.exception('Celery task failed: %s' % exc, exc_info=exc)20 super().mark_as_failure(task_id, exc, *args, **kwargs)21 """Custom result backend that also stores a task name."""22 def _store_result(self, task_id, result, status,23 traceback=None, request=None):24 """Store return value and status of an executed task."""25 content_type, content_encoding, result = self.encode_content(result)26 _, _, meta = self.encode_content({27 'children': self.current_task_children(request),28 'task_name': request.task29 })30 self.TaskModel._default_manager.store_result(31 content_type, content_encoding,32 task_id, result, status,33 traceback=traceback,34 meta=meta,35 )36 return result37# Monkey-patching for displaying also the 'task_name' in the admin task results table.38# A more clean approach with unregister\register leads to errors.39TaskResultAdmin.list_display = ('task_id', 'date_done', 'status', 'task_name')40TaskResultAdmin.task_name = lambda self, obj: json.loads(obj.meta)['task_name']41TaskResultAdmin.task_name.short_description = 'Task name'42def import_task(task_id):43 try:44 task_path = PeriodicTask.objects.get(id=task_id).task45 module_path, task_name = task_path.rsplit('.', 1)46 module = importlib.import_module(module_path)47 task = getattr(module, task_name)48 return task, True49 except Exception:50 return None, False51def task_actions(self, obj):52 key = 'task_%s_importable' % obj.pk53 is_importable = cache.get(key)54 if is_importable is None:55 _, is_importable = import_task(obj.pk)56 cache.set(key, is_importable, 60 * 60)57 if is_importable:58 return format_html('<a class="button" href="%s">Run</a>' % reverse('admin:run_task', args=[obj.pk]))59 else:60 return ''61def process_task_run(request, task_id, *args, **kwargs):62 task, _ = import_task(task_id)63 run_task(task)64 return redirect(reverse('admin:django_celery_results_taskresult_changelist'))65def task_get_urls(self):66 return [url(r'^celery_task/(?P<task_id>.+)/run/$', self.admin_site.admin_view(process_task_run), name='run_task')] \67 + super(self.__class__, self).get_urls()68PeriodicTaskAdmin.task_actions = task_actions69PeriodicTaskAdmin.list_display = PeriodicTaskAdmin.list_display + ('task_actions',)70PeriodicTaskAdmin.get_urls = task_get_urls71def run_task(task, *args, **kwargs):72 """73 Runs celery task asynchronously if celery workers are available, else synchronously.74 This works only until we have workers and rabbitmq on the same instance as our app,75 rather than on remote nodes.76 """77 ps = subprocess.Popen(['ps', 'aux'], stdout=subprocess.PIPE)78 grep1 = subprocess.Popen(['grep', '-v', 'grep'], stdin=ps.stdout, stdout=subprocess.PIPE)79 grep2 = subprocess.Popen(['grep', 'celery.*worker'], stdin=grep1.stdout, stdout=subprocess.PIPE)80 if len(list(grep2.stdout)) > 0 and not in_tests():81 run_on_commit(lambda: task.delay(*args, **kwargs))82 else:...
test_util_misc.py
Source:test_util_misc.py
...38 assert s.start == 0 if i == 0 else slices[i-1].stop39 assert s.step is None40 assert s.stop == s.start + size if i < ns - 1 else n41 assert list(misc.chunk_slices(0, 10)) == []42def test_is_importable():43 """Test the is_importable() function."""44 assert misc.is_importable('urllib')45 assert misc.is_importable('urllib.request')46 assert not misc.is_importable('aklhaskhdkslkdjahkdf')47 assert not misc.is_importable('urllib.aklhaskhdkslkdjahkdf')48def test_join_list_human():49 l = ['foo', 'bar', 'baz']50 assert misc.join_list_human(l[:1]) == 'foo'51 assert misc.join_list_human(l[:2]) == 'foo and bar'52 assert misc.join_list_human(l[:3]) == 'foo, bar, and baz'53 assert misc.join_list_human(l[:1], 'or') == 'foo'54 assert misc.join_list_human(l[:2], 'or') == 'foo or bar'...
__init__.py
Source:__init__.py
...10 "requests": True,11 "json": False,12 "random": False13}14def is_importable(module):15 return module in MODULES16def import_and_parse(runner: "Runtime", lineno: int, module: str) -> objects.Module:17 if not is_importable(module):18 raise errors.ViperModuleError(runner, lineno, f"Cannot import '{module}'")19 mod = importlib.import_module(f"viper.lib.{module}")20 mod = objects.Module(module, mod, runner)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!