Best Python code snippet using Kiwi_python
process_dns_updates.py
Source:process_dns_updates.py
1#!/usr/bin/env python2"""process_dns_updates3Gets the latest DNS record change requests from SQS and updates the Route534records accordingly5NOTE: This is intended to be run within an AWS Lambda container! Correct6functionality outside of Lambda cannot be guaranteed.7"""8# builtin modules9import logging10import re11import json12import os13# 3rd party modules14import boto315import botocore.exceptions16# Pull some config from env (supplied by AWS Lambda)17AWS_REGION = os.environ['AWS_REGION'] # set by Lambda automatically18SQS_QUEUE_NAME = os.environ['SQS_QUEUE_NAME'] # set in Lambda manually19# Set up some global constants20MAX_MESSAGES = 10 # AWS global limit is 10, so let's get up to that amount21WAIT_TIME_SECONDS = 1 # max seconds to wait for a non-empty SQS queue (20 max)22MAX_MESSAGE_RECEIVE_LOOPS = 30 # set a sane limit to prevent infinite loop23# Set up the root logger with a basic StreamHandler set to log INFO and above.24# This is so we can also emit log events from imported modules.25logging.basicConfig()26logging.getLogger().setLevel(logging.INFO)27# Override default handler's format. AWS butchers it with useless info.28logging.getLogger().handlers[0].setFormatter(logging.Formatter(29 '%(levelname)s:%(name)s:%(message)s\r'))30# Set up a local logger so we can log our own messages31log = logging.getLogger(__name__)32# Set up API objects at the module level. Lambda will reuse the runtime33# envionments when possible, meaning the API connections will be reused.34# https://docs.aws.amazon.com/lambda/latest/dg/running-lambda-code.html35SQS_API = boto3.resource('sqs', region_name=AWS_REGION).get_queue_by_name(36 QueueName=SQS_QUEUE_NAME)37R53_API = boto3.client('route53', region_name=AWS_REGION)38class ErrorSearchException(Exception):39 """40 Exception raised when a record name can not be found in a Route5341 InvalidChangeBatch error. If this exception is raised, you should42 check if the error message returned by AWS's API has changed since43 this script was written (check the regex in44 DnsChangeQueue.remove_failed_record_change() for the expected format)45 """46 pass47class DnsChange(object):48 """49 One-stop shop for all your DNS-record-change-request info and actions50 The JSON in each SQS message's body should follow this format:51 {52 "Domain": str(qaolate.com),53 "Action": str(UPSERT),54 "Record": {55 "Name": str(test-record.qaolate.com),56 "Type": str(CNAME),57 "TTL": int(60),58 "Targets": [59 str(test1.example.com),60 str(test2.example.com)61 ]62 }63 }64 Keep in mind that the body JSON is stringified before being sent to SQS so65 all keys and values should be converted into their required data type66 before use67 """68 __hash__ = None # Instances of this class should not be hashable69 def __init__(self, boto_message):70 """71 Args:72 boto_message (sqs.Message): boto3 object for the SQS message73 """74 # Init some useful class properties75 self._boto_message = boto_message76 self.sent_timestamp = int(boto_message.attributes['SentTimestamp'])77 self.receipt_handle = str(boto_message.receipt_handle)78 # Attributes to be set later79 self.requested = False # Was this change added to a batch request?80 self.duplicate = False # Is the change an unused duplicate?81 self.failed = False # Did the change fail to apply?82 # Process and load the SQS message's body83 self.safe_load_body()84 def delete(self):85 """Delete the message from the queue"""86 self._boto_message.delete()87 def safe_load_body(self):88 """89 Attempt to load the SQS message's body, and get our expected values90 from it. If any expected fields are missing, produce a friendly error91 instead of a traceback.92 """93 # Attempt to parse the message body into JSON94 try:95 self.message_body = json.loads(self._boto_message.body)96 except Exception as e:97 log.error('Could not load SQS message body as JSON (%s)! ' +98 "Marking message as 'failed'. Message body is:\n" +99 str(self._boto_message.body), e)100 self.failed = True101 return102 # Attempt to load our attributes from the parsed JSON103 try:104 self.domain = str(self.message_body['Domain']).lower()105 self.action = str(self.message_body['Action']).upper()106 self.record = str(self.message_body['Record']['Name']).lower()107 self.record_type = str(self.message_body['Record']['Type']).upper()108 self.ttl = int(self.message_body['Record']['TTL'])109 self.targets = set()110 for target in self.message_body['Record']['Targets']:111 self.targets.add(str(target).lower())112 except KeyError as e:113 log.error('Could not load required attribute (%s) from message ' +114 "body! Marking message as 'failed'. Message body is:\n" +115 str(self.message_body), e)116 self.failed = True117 return118class DnsChangeQueue(object):119 """120 List of DNS changes being requested, and relevant attributes/actions121 """122 def __init__(self, sqs_queue_name):123 """124 Args:125 sqs_queue_name (str, unicode): name of DNS change SQS queue126 """127 self._sqs_api = SQS_API128 self._r53_api = R53_API129 self._messages = None # list()130 self._record_changes = None # dict()131 self._hosted_zones = None # dict()132 @property133 def messages(self):134 """135 Build a list of DnsChange objects, attempting to receive all136 messages in the queue137 Returns:138 list of DnsChange() objects: all received messages from SQS139 """140 if self._messages is None:141 self._messages = list()142 num_received_msgs = None143 receive_loops = 0144 while (num_received_msgs != 0 and145 receive_loops <= MAX_MESSAGE_RECEIVE_LOOPS):146 receive_loops += 1147 log.info('Starting receive_messages loop #%s', receive_loops)148 num_received_msgs = 0149 messages_from_boto = self._sqs_api.receive_messages(150 AttributeNames=['SentTimestamp'],151 MaxNumberOfMessages=MAX_MESSAGES,152 WaitTimeSeconds=WAIT_TIME_SECONDS)153 for boto_message in messages_from_boto:154 self._messages.append(DnsChange(boto_message))155 num_received_msgs += 1156 log.info('Received %s messages from SQS', num_received_msgs)157 log.info('Received a total of %s messages from SQS',158 len(self._messages))159 return self._messages160 @property161 def record_changes(self):162 """163 Return a slightly easier-to-process data set of our DNS changes.164 When calling this for the first time, it will create a new dict in the165 following format:166 {167 'foo.tld': {168 'record1.foo.tld': [169 DnsChange(),170 DnsChange()171 ]172 }173 }174 This makes it much easier to dedupe the changes and build a Route53175 request later.176 Returns:177 dict178 """179 if self._record_changes is None:180 self._record_changes = dict()181 # For each message, add it to the new dict, creating keys if they182 # don't exist already183 for msg in self.messages:184 if msg.failed is not True:185 self._record_changes.setdefault(186 msg.domain, {}).setdefault(187 msg.record, []).append(msg)188 return self._record_changes189 def dedupe_change_requests(self):190 """191 Find and process any 'duplicate' change requests192 If any record has multiple changes (messages) in the queue, keep only193 most recent one, discarding the rest (and marking them as duplicates so194 the unused messages can be deleted later).195 """196 # Find all the records in the queue that have more than one change197 log.info('Removing any duplicate changes')198 for domain in self.record_changes:199 for record in self.record_changes[domain]:200 if len(self.record_changes[domain][record]) > 1:201 # Sort the change list by timestamp, newest first202 records = sorted(self.record_changes[domain][record],203 key=lambda r: r.sent_timestamp,204 reverse=True)205 log.info(('Found %s change requests for %s. '206 'Using only the most recent one.'),207 len(records), record)208 # Keep only the newest change, marking the others as209 # duplicates and removing them from the change list210 while len(records) > 1:211 records.pop().duplicate = True212 # Replace the original list with our own213 self.record_changes[domain][record] = records214 @property215 def hosted_zones(self):216 """217 Create a dict of all our hosted zones and their IDs218 Returns:219 dict: {zone_name: zone_id, ...}220 """221 if self._hosted_zones is None:222 self._hosted_zones = dict()223 # For each hosted zone...224 for zone in self._r53_api.list_hosted_zones_by_name(225 MaxItems='100')['HostedZones']:226 # Cut off the trailing '.'227 zone_name = '.'.join(zone['Name'].split('.')[:-1])228 # Get just the ID, not the full path229 zone_id = zone['Id'].split('/')[-1:][0]230 # Add to our dict231 self._hosted_zones[zone_name] = zone_id232 return self._hosted_zones233 def generate_r53_changebatch(self, domain):234 """235 Return a properly formed R53 changebatch request dict for a domain236 Args:237 domain (str): which domain to build the changebatch for238 Returns:239 list of dicts240 """241 log.info('Generating Route53 ChangeBatch request for %s', domain)242 changes = list()243 for record in self.record_changes[domain]:244 change = self.record_changes[domain][record][0]245 # Ignore any messages marked as 'failed'246 if change.failed is True:247 continue248 else:249 change.requested = True250 resource_records = list()251 for target in change.targets:252 resource_records.append({'Value': target})253 log.info('Adding change to request: %s -> %s',254 change.record, change.targets)255 changes.append(256 {257 'Action': change.action,258 'ResourceRecordSet': {259 'Name': change.record,260 'Type': change.record_type,261 'TTL': change.ttl,262 'ResourceRecords': resource_records263 }264 }265 )266 return {'Changes': changes}267 def mark_failed_record_change(self, domain, error_msg):268 """269 Given an Route53 InvalidChangeBatch error message, determine what270 record failed to change and mark is as failed271 Args:272 domain (str): hosted zone the record lives in273 error_msg (str, unicode): error message returned from Route53274 """275 # Iterate over _every_ requested change and use the record name to...276 for record in self.record_changes[domain]:277 # ...check if it's present within the error message string.278 # If it is, then it's very likely to be the record causing the279 # failure and it should be removed280 if record in error_msg:281 log.error("Removing '%s' from change list and retrying",282 record)283 for msg in self.record_changes[domain][record]:284 msg.failed = True285 def commit_changes_to_r53(self):286 """Attempt to make all the changes in our queue"""287 # Before committing, we should remove any duplicate changes288 self.dedupe_change_requests()289 # Run this in a loop so we can re-attempt if we get InvalidChangeBatch290 for domain in self.record_changes:291 while True:292 # Build the changebatch fresh for each loop293 changebatch = self.generate_r53_changebatch(domain)294 log.info('Sending ChangeBatch request for %s', domain)295 try:296 self._r53_api.change_resource_record_sets(297 HostedZoneId=self.hosted_zones[domain],298 ChangeBatch=changebatch)299 # If we get InvalidChangeBatch, try to remove the offending300 # change, and restart the loop301 except botocore.exceptions.ClientError as e:302 if e.response['Error']['Code'] == 'InvalidChangeBatch':303 log.error('Route53: %s', e.message)304 self.mark_failed_record_change(305 domain, e.response['Error']['Message'])306 continue307 else:308 raise e309 # If we run out of records to send, let's fail gracefully310 except botocore.exceptions.ParamValidationError as e:311 if re.search(r'Invalid\slength', e.message) is not None:312 failed = set(msg.record for msg in self.messages if (313 msg.failed is True))314 log.critical(315 'No more non-failing changes to submit! ' +316 'For reference, the records that are unable to ' +317 'updated are:\n' + '\n'.join(failed) +318 '\nYou may want to process them manually.')319 break320 else:321 raise e322 # If we're still here, everything went well so we should break323 # out of the loop324 else:325 break326 def delete_processed_messages(self):327 """Delete (or not) all the messages we just processed"""328 # Build a list of messages that we can delete (i.e. successfully329 # processed)330 messages_to_delete = list()331 for message in self.messages:332 if ((message.requested is True or message.duplicate is True) and333 (message.failed is False)):334 messages_to_delete.append(message)335 log.info('Deleting %s messages (in batches of 10)',336 len(messages_to_delete))337 # Batch delete messages 10 at a time (max per the API), removing them338 # from the list of messages to delete339 deleted_messages_count = 0340 while len(messages_to_delete) > 0:341 entries = list()342 message_count = 0343 while message_count < 10:344 try:345 message = messages_to_delete.pop()346 except IndexError:347 break348 else:349 entries.append({350 'Id': str(message_count),351 'ReceiptHandle': message.receipt_handle352 })353 message_count += 1354 deleted_messages_count += 1355 self._sqs_api.delete_messages(Entries=entries)356 log.info('Deleted %s messages from the SQS queue',357 deleted_messages_count)358def lambda_handler(event, context):359 """360 Lambda execution handler361 https://docs.aws.amazon.com/lambda/latest/dg/362 python-programming-model-handler-types.html363 """364 # Create our class objects365 dns_change_queue = DnsChangeQueue(SQS_QUEUE_NAME)366 # Update DNS records for each message in the queue367 dns_change_queue.commit_changes_to_r53()368 # Clean up the messages we just processed...
admin.py
Source:admin.py
...30 # Override this field to change the 'target' object used for retrieving and31 # displaying history. It should be a double__underscore__delimited32 # property related to the primary model admin object.33 papertrail_field = None34 def _record_changes(self, obj, fields=None):35 '''36 Records the state of `obj` to a JSON-serializable object, optionally37 recording only values in a list of `fields`. If `fields` is not38 specified, all fields will be recorded.39 '''40 rec = json.loads(serializers.serialize('json', [obj]))[0]41 if fields:42 rec['fields'] = {k: v for k, v in rec['fields'].items()43 if k in fields}44 return rec45 def _resolve_pt_object(self, object):46 instance = object47 if self.papertrail_field:48 for field_name in self.papertrail_field.split('__'):49 instance = getattr(instance, field_name, None)50 if not instance:51 return object52 return instance53 def log_addition(self, request, object, message=None):54 if message is not None:55 super(AdminEventLoggerMixin, self).log_addition(request, object, message)56 try:57 message = json.loads(message)58 except ValueError:59 pass60 else:61 super(AdminEventLoggerMixin, self).log_addition(request, object)62 fields = self._record_changes(object)['fields']63 return papertrail.log(64 'admin-edit',65 'Created object',66 data={67 'action': 'add',68 'fields': fields,69 'message': message,70 },71 targets={72 'acting_user': request.user,73 'instance': self._resolve_pt_object(object),74 },75 )76 def log_change(self, request, object, message):77 super(AdminEventLoggerMixin, self).log_change(request, object, message)78 # construct_change_message() creates a JSON message that we load and79 # store here. (In case we don't get JSON back for some reason, still80 # store the message)81 try:82 data = {'changes': json.loads(message), 'action': 'change'}83 except ValueError:84 data = {'message': message, 'action': 'change'}85 return papertrail.log(86 'admin-edit',87 'Updated object',88 data=data,89 targets={90 'acting_user': request.user,91 'instance': self._resolve_pt_object(object),92 },93 )94 def log_deletion(self, request, object, object_repr):95 super(AdminEventLoggerMixin, self).log_deletion(request, object, object_repr)96 fields = self._record_changes(object)['fields']97 return papertrail.log(98 'admin-edit',99 'Deleted object',100 data={101 'action': 'delete',102 'fields': fields,103 },104 targets={105 'acting_user': request.user,106 'instance': self._resolve_pt_object(object),107 },108 )109 def get_urls(self):110 info = self.model._meta.app_label, self.model._meta.model_name111 urlpatterns = [112 url(r'^(.+)/papertrail/', self.admin_site.admin_view(self.view_papertrail_item), name=u'{0}_{1}_papertrail'.format(*info)),113 ] + super(AdminEventLoggerMixin, self).get_urls()114 return urlpatterns115 def get_actions(self, request):116 actions = super(AdminEventLoggerMixin, self).get_actions(request)117 func, name, desc = self.get_action('view_papertrail')118 actions[name] = (func, name, desc)119 return actions120 def view_papertrail_item(self, request, object_id, extra_context=None):121 get_object_or_404(self.model, id=object_id)122 return self.view_papertrail(request, self.model.objects.filter(id=object_id))123 def view_papertrail(self, request, queryset, extra_context=None):124 '''125 Action for displaying the papertrail for one or more selected admin126 items.127 '''128 # Map to alternate queryset if specified129 if self.papertrail_field:130 queryset = self._map_to_related_queryset(queryset, self.papertrail_field)131 action_list = Entry.objects.related_to(queryset).select_related()132 opts = queryset.model._meta133 app_label = opts.app_label134 available_type_filters = collections.OrderedDict(self.papertrail_type_filters)135 if available_type_filters:136 available_type_filters['Other'] = ('__other', )137 selected_type_filters = request.POST.getlist('papertrail-selected-type-filters')138 selected_types = list(itertools.chain.from_iterable(available_type_filters.get(k, []) for k in selected_type_filters))139 if '__other' in selected_types:140 all_types = list(itertools.chain.from_iterable(available_type_filters.values()))141 action_list = action_list.filter(Q(type__in=selected_types) | ~Q(type__in=all_types))142 elif selected_types:143 action_list = action_list.filter(type__in=selected_types)144 if queryset.count() == 1:145 obj = queryset[0]146 title = _('Paper Trail: %s') % force_text(obj)147 else:148 obj = None149 title = _('Paper Trail: %s %s') % (queryset.count(), opts.verbose_name_plural)150 context = {151 'title': title,152 'action_list': action_list,153 'module_name': capfirst(force_text(opts.verbose_name_plural)),154 'app_label': app_label,155 'opts': opts,156 'object': obj,157 'available_type_filters': available_type_filters,158 'selected_type_filters': selected_type_filters,159 'selected_actions': request.POST.getlist('_selected_action'),160 }161 context.update(extra_context or {})162 return render(request, self.papertrail_object_template or [163 "admin/%s/%s/object_papertrail.html" % (app_label, opts.object_name.lower()),164 "admin/%s/object_papertrail.html" % app_label,165 "admin/object_papertrail.html"166 ], context)167 view_papertrail.short_description = 'View Paper Trail'168 def _map_to_related_queryset(self, queryset, field):169 '''170 Given a queryset and an double__underscore__delimited field relation,171 return a list of the objects that are the target of that relation for172 all objects in the queryset.173 '''174 model = queryset.model175 # The easy part is finding the pks of the related objects176 pks = queryset.values_list('{}__pk'.format(field), flat=True)177 # The hard part is traversing the relation string and finding out what178 # model we're actually looking for.179 segments = field.split('__')180 for segment in segments:181 field = getattr(model, segment).field182 try:183 model = field.related.parent_model184 except AttributeError:185 try:186 model = field.rel.model187 except AttributeError:188 model = field.related_model189 # Once we have both pieces, we can just query the model for the ids190 return model.objects.filter(pk__in=pks)191 def construct_change_message(self, request, form, formsets, add=False):192 '''193 Construct a detailed change message from a changed object, including194 related objects updated via subforms. Returns a JSON string containing195 a structure detailing the fields changed and their updated values.196 '''197 def add_related_change(changes, obj, action='change', fields=None):198 rec = self._record_changes(obj, fields=fields)199 rec['action'] = action200 changes['related_changes'].append(rec)201 return rec202 changes = {203 'action': 'change',204 'fields': self._record_changes(form.instance, form.changed_data)['fields'],205 'related_changes': [],206 }207 for formset in (formsets or []):208 for obj in formset.new_objects:209 add_related_change(changes, obj, action='add')210 for obj, changed_fields in formset.changed_objects:211 add_related_change(changes, obj, action='change', fields=changed_fields)212 for obj in formset.deleted_objects:213 add_related_change(changes, obj, action='add')214 return json.dumps(changes)215class EntryRelatedObjectInline(admin.StackedInline):216 model = EntryRelatedObject217 extra = 0218 fields = ('relation_name', 'related_content_type', 'related_model', )...
record.py
Source:record.py
1import logging2from datetime import datetime3from .db_dummy import AttributeValueNotSet4from .database import EntityDatabase5class Record:6 """7 Internal cache for database record and its updates to prevent loading whole record from database.8 Only record's attributes, which are really needed, will be loaded.9 """10 # TODO remove EntityDatabase11 def __init__(self, db: EntityDatabase, table_name, key):12 self._db_connection = db13 self._log = logging.getLogger("Record")14 self.table_name = table_name15 self.key = key16 # record structure17 self._record = {}18 # structure, which will hold all attributes, which were updated, all their updated values19 self._record_changes = {}20 self.exists_in_db = self.exists()21 def _load_from_db(self, attrib_name):22 """23 Loads attribute from database into cache.24 WARNING: If the attribute has no value set in database yet, the cache remains the same25 :param attrib_name: attribute's name26 :return: None27 """28 try:29 self._record[attrib_name] = self._db_connection.get_attrib(self.table_name, self.key, attrib_name)30 except AttributeValueNotSet: # TODO: the EntityDatabase.get_attrib doesn't raise exception like this31 pass32 def __getitem__(self, attrib_name):33 """34 Overrides functionality, when "Record[key]" is called. Gets value from record.35 :param attrib_name: key to value, which wants to be obtained from record36 :return: value, which is saved under the key in record37 :raise KeyError when the attrib_name does not exist in database under in the record38 """39 value = self._record.get(attrib_name)40 # if attribute is not loaded into cache yet, load it from database41 if value is None:42 self._load_from_db(attrib_name)43 return self._record[attrib_name]44 def __setitem__(self, attrib_name, value):45 """46 Overrides functionality, when "Record[key] = value" is called. Sets value in record.47 :param attrib_name: attribute's name in record48 :param value: new value under the key49 :return: None50 """51 self._record[attrib_name] = value52 # cache the changes53 self._record_changes[attrib_name] = value54 def __contains__(self, attrib_name):55 """56 Overrides functionality, when "key in Record" is called.57 :param attrib_name: key name58 :return: True if attribute is in record and it's value is not None, False otherwise59 """60 if attrib_name in self._record:61 return True62 # if attribute not in cache, try to load it from database (if does not exist in database, cache will not be63 # updated)64 self._load_from_db(attrib_name)65 return attrib_name in self._record and self._record[attrib_name] is not None66 def __delitem__(self, attrib_name):67 """68 Overrides functionality, when "del Record[key]" is called. Deletes attribute from record.69 :param attrib_name: attribute's name, which should be deleted70 :return: None71 """72 try:73 del self._record[attrib_name]74 except KeyError:75 pass76 self._db_connection.delete_attribute(self.table_name, self.key, attrib_name)77 def exists(self):78 """79 Checks, whether record exists in the database.80 :return: True if exists record exists in the database, False otherwise81 """82 return self._db_connection.exists(self.table_name, self.key)83 def update(self, dict_update):84 """85 Behaves like classic dict.update()86 """87 self._record.update(dict_update)88 self._record_changes.update(dict_update)89 def get(self, attrib_name, default_val=None, load_from_db=True):90 """91 Behaves same as Dict's get, but can request loading record from database with 'load_from_db' flag, if not cached92 """93 value = self._record.get(attrib_name)94 if value is None and load_from_db:95 self._load_from_db(attrib_name)96 return self._record.get(attrib_name, default_val)97 def push_changes_to_db(self):98 """99 Send all updates to database.100 :return: None101 """102 if self._record_changes:103 self['ts_last_update'] = datetime.utcnow()104 if self.exists_in_db:105 self._db_connection.update_record(self.table_name, self.key, self._record_changes)106 else:107 # if new record get created, _record and _record_changes should contain same data108 self._db_connection.create_record(self.table_name, self.key, self._record)109 # reset record changes, because they were pushed to database, but leave record cache, it may be still needed110 # (but probably won't)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!