Best Python code snippet using localstack_python
provider.py
Source:provider.py
...55 for target in targets:56 arn = target.get("Arn")57 event_str = target.get("Input") or "{}"58 event = json.loads(event_str)59 attr = aws_stack.get_events_target_attributes(target)60 try:61 send_event_to_target(arn, event, target_attributes=attr)62 except Exception as e:63 LOG.info(64 f"Unable to send event notification {truncate(event)} to target {target}: {e}"65 )66 return func67 @staticmethod68 def convert_schedule_to_cron(schedule):69 """Convert Events schedule like "cron(0 20 * * ? *)" or "rate(5 minutes)" """70 cron_regex = r"\s*cron\s*\(([^\)]*)\)\s*"71 if re.match(cron_regex, schedule):72 cron = re.sub(cron_regex, r"\1", schedule)73 return cron74 rate_regex = r"\s*rate\s*\(([^\)]*)\)\s*"75 if re.match(rate_regex, schedule):76 rate = re.sub(rate_regex, r"\1", schedule)77 value, unit = re.split(r"\s+", rate.strip())78 if "minute" in unit:79 return "*/%s * * * *" % value80 if "hour" in unit:81 return "* */%s * * *" % value82 if "day" in unit:83 return "* * */%s * *" % value84 raise Exception("Unable to parse events schedule expression: %s" % schedule)85 return schedule86 @staticmethod87 def put_rule_job_scheduler(88 name: Optional[RuleName],89 state: Optional[RuleState],90 schedule_expression: Optional[ScheduleExpression],91 ):92 enabled = state != "DISABLED"93 if schedule_expression:94 job_func = EventsProvider.get_scheduled_rule_func(name)95 cron = EventsProvider.convert_schedule_to_cron(schedule_expression)96 LOG.debug("Adding new scheduled Events rule with cron schedule %s", cron)97 job_id = JobScheduler.instance().add_job(job_func, cron, enabled)98 rule_scheduled_jobs = EventsBackend.get().rule_scheduled_jobs99 rule_scheduled_jobs[name] = job_id100 def put_rule(101 self,102 context: RequestContext,103 name: RuleName,104 schedule_expression: ScheduleExpression = None,105 event_pattern: EventPattern = None,106 state: RuleState = None,107 description: RuleDescription = None,108 role_arn: RoleArn = None,109 tags: TagList = None,110 event_bus_name: EventBusNameOrArn = None,111 ) -> PutRuleResponse:112 self.put_rule_job_scheduler(name, state, schedule_expression)113 return call_moto(context)114 def delete_rule(115 self,116 context: RequestContext,117 name: RuleName,118 event_bus_name: EventBusNameOrArn = None,119 force: Boolean = None,120 ) -> None:121 rule_scheduled_jobs = EventsBackend.get().rule_scheduled_jobs122 job_id = rule_scheduled_jobs.get(name)123 if job_id:124 LOG.debug("Removing scheduled Events: {} | job_id: {}".format(name, job_id))125 JobScheduler.instance().cancel_job(job_id=job_id)126 call_moto(context)127 def disable_rule(128 self, context: RequestContext, name: RuleName, event_bus_name: EventBusNameOrArn = None129 ) -> None:130 rule_scheduled_jobs = EventsBackend.get().rule_scheduled_jobs131 job_id = rule_scheduled_jobs.get(name)132 if job_id:133 LOG.debug("Disabling Rule: {} | job_id: {}".format(name, job_id))134 JobScheduler.instance().disable_job(job_id=job_id)135 call_moto(context)136class EventsBackend(RegionBackend):137 # maps rule name to job_id138 rule_scheduled_jobs: Dict[str, str]139 def __init__(self):140 self.rule_scheduled_jobs = {}141def _get_events_tmp_dir():142 return os.path.join(config.dirs.tmp, EVENTS_TMP_DIR)143def _create_and_register_temp_dir():144 tmp_dir = _get_events_tmp_dir()145 if not os.path.exists(tmp_dir):146 mkdir(tmp_dir)147 TMP_FILES.append(tmp_dir)148 return tmp_dir149def _dump_events_to_files(events_with_added_uuid):150 try:151 _create_and_register_temp_dir()152 current_time_millis = int(round(time.time() * 1000))153 for event in events_with_added_uuid:154 target = os.path.join(155 _get_events_tmp_dir(),156 "%s_%s" % (current_time_millis, event["uuid"]),157 )158 save_file(target, json.dumps(event["event"]))159 except Exception as e:160 LOG.info("Unable to dump events to tmp dir %s: %s", _get_events_tmp_dir(), e)161def handle_numeric_conditions(conditions: List[Any], value: float):162 for i in range(0, len(conditions), 2):163 if conditions[i] == "<" and not (value < conditions[i + 1]):164 return False165 if conditions[i] == ">" and not (value > conditions[i + 1]):166 return False167 if conditions[i] == "<=" and not (value <= conditions[i + 1]):168 return False169 if conditions[i] == ">=" and not (value >= conditions[i + 1]):170 return False171 return True172def check_valid_numeric_content_base_rule(list_of_operators):173 if len(list_of_operators) > 4:174 return False175 if "=" in list_of_operators:176 return False177 if len(list_of_operators) > 2:178 upper_limit = None179 lower_limit = None180 for index in range(len(list_of_operators)):181 if not isinstance(list_of_operators[index], int) and "<" in list_of_operators[index]:182 upper_limit = list_of_operators[index + 1]183 if not isinstance(list_of_operators[index], int) and ">" in list_of_operators[index]:184 lower_limit = list_of_operators[index + 1]185 if upper_limit and lower_limit and upper_limit < lower_limit:186 return False187 index = index + 1188 return True189def filter_event_with_content_base_parameter(pattern_value, event_value):190 for element in pattern_value:191 if (isinstance(element, (str, int))) and (event_value == element or element in event_value):192 return True193 elif isinstance(element, dict):194 element_key = list(element.keys())[0]195 element_value = element.get(element_key)196 if element_key.lower() == "prefix":197 if isinstance(event_value, str) and event_value.startswith(element_value):198 return True199 elif element_key.lower() == "exists":200 if element_value and event_value:201 return True202 elif not element_value and not event_value:203 return True204 elif element_key.lower() == "cidr":205 ips = [str(ip) for ip in ipaddress.IPv4Network(element_value)]206 if event_value in ips:207 return True208 elif element_key.lower() == "numeric":209 if check_valid_numeric_content_base_rule(element_value):210 for index in range(len(element_value)):211 if isinstance(element_value[index], int):212 continue213 if (214 element_value[index] == ">"215 and isinstance(element_value[index + 1], int)216 and event_value <= element_value[index + 1]217 ):218 break219 elif (220 element_value[index] == ">="221 and isinstance(element_value[index + 1], int)222 and event_value < element_value[index + 1]223 ):224 break225 elif (226 element_value[index] == "<"227 and isinstance(element_value[index + 1], int)228 and event_value >= element_value[index + 1]229 ):230 break231 elif (232 element_value[index] == "<="233 and isinstance(element_value[index + 1], int)234 and event_value > element_value[index + 1]235 ):236 break237 else:238 return True239 elif element_key.lower() == "anything-but":240 if isinstance(element_value, list) and event_value not in element_value:241 return True242 elif (isinstance(element_value, (str, int))) and event_value != element_value:243 return True244 elif isinstance(element_value, dict):245 nested_key = list(element_value)[0]246 if nested_key == "prefix" and not re.match(247 r"^{}".format(element_value.get(nested_key)), event_value248 ):249 return True250 return False251# TODO: unclear shared responsibility for filtering with filter_event_with_content_base_parameter252def handle_prefix_filtering(event_pattern, value):253 for element in event_pattern:254 if isinstance(element, (int, str)):255 if str(element) == str(value):256 return True257 elif isinstance(element, dict) and "prefix" in element:258 if value.startswith(element.get("prefix")):259 return True260 elif isinstance(element, dict) and "anything-but" in element:261 if element.get("anything-but") != value:262 return True263 elif "numeric" in element:264 return handle_numeric_conditions(element.get("numeric"), value)265 elif isinstance(element, list):266 if value in list:267 return True268 return False269def identify_content_base_parameter_in_pattern(parameters):270 if any(271 list(param.keys())[0] in CONTENT_BASE_FILTER_KEYWORDS272 for param in parameters273 if isinstance(param, dict)274 ):275 return True276def get_two_lists_intersection(lst1, lst2):277 lst3 = [value for value in lst1 if value in lst2]278 return lst3279# TODO: refactor/simplify280def filter_event_based_on_event_format(self, rule_name: str, event: Dict[str, Any]):281 def filter_event(event_pattern_filter: Dict[str, Any], event: Dict[str, Any]):282 for key, value in event_pattern_filter.items():283 event_value = event.get(key.lower())284 if event_value is None:285 return False286 if event_value and isinstance(event_value, dict):287 for key_a, value_a in event_value.items():288 if key_a == "ip":289 # TODO add IP-Address check here290 continue291 if isinstance(value.get(key_a), (int, str)):292 if value_a != value.get(key_a):293 return False294 if isinstance(value.get(key_a), list) and value_a not in value.get(key_a):295 if not handle_prefix_filtering(value.get(key_a), value_a):296 return False297 elif isinstance(value, list) and not identify_content_base_parameter_in_pattern(value):298 if (299 isinstance(event_value, list)300 and get_two_lists_intersection(value, event_value) == []301 ):302 return False303 elif (304 not isinstance(event_value, list)305 and isinstance(event_value, (str, int))306 and event_value not in value307 ):308 return False309 elif isinstance(value, list) and identify_content_base_parameter_in_pattern(value):310 if not filter_event_with_content_base_parameter(value, event_value):311 return False312 elif isinstance(value, (str, dict)):313 try:314 value = json.loads(value) if isinstance(value, str) else value315 if isinstance(value, dict) and not filter_event(value, event_value):316 return False317 except json.decoder.JSONDecodeError:318 return False319 return True320 rule_information = self.events_backend.describe_rule(rule_name)321 if not rule_information:322 LOG.info('Unable to find rule "%s" in backend: %s', rule_name, rule_information)323 return False324 if rule_information.event_pattern._pattern:325 event_pattern = rule_information.event_pattern._pattern326 if not filter_event(event_pattern, event):327 return False328 return True329def filter_event_with_target_input_path(target: Dict, event: Dict) -> Dict:330 input_path = target.get("InputPath")331 if input_path:332 event = extract_jsonpath(event, input_path)333 return event334def process_events(event: Dict, targets: List[Dict]):335 for target in targets:336 arn = target["Arn"]337 changed_event = filter_event_with_target_input_path(target, event)338 try:339 send_event_to_target(arn, changed_event, aws_stack.get_events_target_attributes(target))340 except Exception as e:341 LOG.info(f"Unable to send event notification {truncate(event)} to target {target}: {e}")342# TODO: this migration. can I tidy up these dictionaries in this migration??343@patch(MotoEventsHandler.put_events)344def events_handler_put_events(_, self):345 entries = self._get_param("Entries")346 # keep track of events for local integration testing347 if config.is_local_test_mode():348 TEST_EVENTS_CACHE.extend(entries or [])349 events = list(map(lambda event: {"event": event, "uuid": long_uid()}, entries))350 _dump_events_to_files(events)351 event_rules = self.events_backend.rules352 for event_envelope in events:353 event = event_envelope["event"]...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!