Best Python code snippet using localstack_python
repository.py
Source:repository.py
...59 fields.update(defaults)60 return fields61 def build(self, item):62 return self.Model(**self.to_fields(item))63 def _pre_create(self, item):64 pass65 @retry(2, OperationalError, close_connection)66 def _try_create(self, item, fields):67 try:68 return self.Model.objects.create(**fields)69 except IntegrityError:70 return self.update_or_create(item)71 def create(self, item):72 self._pre_create(item)73 return self._try_create(item, self.to_fields(item))74 def _pre_bulk_create(self, items):75 pass76 @retry(DEFAULT_RETRIES, IntegrityError)77 def _try_bulk_create(self, items_dict):78 """å¦ä¸»é®éå¤ï¼åéæ°å°è¯"""79 all_ids = items_dict.keys()80 exist_ids = self.get_exist_ids(all_ids)81 new_ids = set(all_ids) - set(exist_ids)82 new_objects = [self.build(items_dict[k]) for k in new_ids]83 return self.Model.objects.bulk_create(new_objects)84 def bulk_create(self, items):85 """æ¹éå建记å½ï¼å¿½ç¥å·²åå¨çè®°å½ï¼ï¼åªè¿åæ°çè®°å½"""86 self._pre_bulk_create(items)87 items_dict = self.distinct(items)88 return self._try_bulk_create(items_dict)89 def _get_pk(self, obj):90 return getattr(obj, self.PK)91 def _get_item_pk(self, item):92 return item.get(self.ITEM_PK)93 def is_updatable(self, obj, item):94 """å¤ææ¯å¦å¯æ´æ°"""95 if self.UPDATE_CHECK_FIELDS:96 for field in self.UPDATE_CHECK_FIELDS:97 if getattr(obj, field) != item.get(field):98 return True99 return timezone.now() - obj.updated_at > DEFAULT_UPDATE_CYCLE100 def update(self, obj, item, is_check_updatable=True):101 defaults = self.get_defaults_fields(item)102 if not is_check_updatable or self.is_updatable(obj, defaults):103 for key, value in defaults.iteritems():104 setattr(obj, key, value)105 obj.save()106 def bulk_update_or_create(self, items):107 """æ¹éå建记å½ï¼å¹¶èªå¨æ´æ°ï¼å·²åå¨çä¸è¶
è¿æ´æ°å¨æçï¼è®°å½ï¼è¿åææè®°å½"""108 self._pre_bulk_create(items)109 items_dict = self.distinct(items)110 new_objects = self._try_bulk_create(items_dict)111 if len(items) != len(new_objects):112 new_ids = [self._get_pk(obj) for obj in new_objects]113 old_ids = set(items_dict.keys()) - set(new_ids)114 old_objects = self.get_exist_objects(old_ids)115 for obj in old_objects:116 item = items_dict[self._get_pk(obj)]117 self.update(obj, item)118 new_objects.extend(old_objects)119 return new_objects120 def get_or_create(self, item):121 self._pre_create(item)122 pk = self.get_pk_fields(item)123 defaults = self.get_defaults_fields(item)124 pk.update(defaults=defaults)125 return self.Model.objects.get_or_create(**pk)126 def update_or_create(self, item):127 self._pre_create(item)128 pk = self.get_pk_fields(item)129 try:130 obj = self.Model.objects.get(**pk)131 self.update(obj, item)132 except self.Model.DoesNotExist:133 fields = self.get_defaults_fields(item)134 fields.update(pk)135 obj = self.Model(**fields)136 obj.save()137 return obj138 def delete(self, item):139 pk = self.get_pk_fields(item)...
command.py
Source:command.py
...28 :param rule: æ¥è¯¢è¯å¥29 """30 self._pre_analyze(database, rule)31 return self._run_cmd(self._CODEQL_CMD+self._CODEQL_ANALYSE_COMMAND)32 def _pre_create(self, database_name, project):33 self.database_name = database_name34 self._CODEQL_CREATE_COMMAND = [item.replace(COMMAND_CREATE_DATABASE_NAME_REPLACE_FLAG, database_name) for item35 in36 self._CODEQL_CREATE_COMMAND]37 self._CODEQL_CREATE_COMMAND = [item.replace(COMMAND_CREATE_PROJECT_PATH_REPLACE_FLAG, project) for item in38 self._CODEQL_CREATE_COMMAND]39 def create(self, database_name, project):40 """41 codeqlå建æ°æ®åº42 :param database_name: çææ°æ®åºå称43 :param project: 项ç®è·¯å¾44 """45 self._pre_create(database_name, project)46 return self._run_cmd(self._CODEQL_CMD+self._CODEQL_CREATE_COMMAND)47 def _run_cmd(self, cmd):48 print(cmd)49 #python3.7以ä¸ï¼subprocess.runææ¯ælist å½ä»¤50 if isinstance(cmd,list):51 cmd = " ".join(cmd)52 result = subprocess.run(cmd, shell=True,53 stdout=subprocess.PIPE) # å½å½ä»¤æ¯é误çæ¶åï¼è¿åçç¶æç å°±ä¸æ¯0äº54 # print("result: %s" % ret2.stdout.decode("gbk"))...
manager.py
Source:manager.py
1from __future__ import unicode_literals2from django.db import models3from account.models.core.serializer import UserSerializer4class PrototypeManager(models.Manager):5 def _pre_create(self, *args, **kwargs):6 kwargs['designer'] = UserSerializer(kwargs['designed_by']).data7 derivatives = {}8 return args, kwargs, derivatives9 def _post_create(self, instance, **derivatives):10 pass11 def create(self, *args, **kwargs):12 args, kwargs, derivatives = self._pre_create(*args, **kwargs)13 instance = super(PrototypeManager, self).create(*args, **kwargs)14 self._post_create(instance, **derivatives)15 return instance16 def get_or_create(self, *args, **kwargs):17 args, kwargs, derivatives = self._pre_create(*args, **kwargs)18 instance, created = super(PrototypeManager, self).get_or_create(*args, **kwargs)19 self._post_create(instance, **derivatives)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!