Best Python code snippet using localstack_python
kinesis_listener.py
Source:kinesis_listener.py
...35 consumer = json_safe(consumer)36 STREAM_CONSUMERS.append(consumer)37 return {'Consumer': consumer}38 elif action == 'DeregisterStreamConsumer':39 def consumer_matches(c):40 stream_arn = data.get('StreamARN')41 cons_name = data.get('ConsumerName')42 cons_arn = data.get('ConsumerARN')43 return (c.get('ConsumerARN') == cons_arn or44 (c.get('StreamARN') == stream_arn and c.get('ConsumerName') == cons_name))45 STREAM_CONSUMERS = [c for c in STREAM_CONSUMERS if not consumer_matches(c)]46 return {}47 elif action == 'ListStreamConsumers':48 result = {49 'Consumers': [c for c in STREAM_CONSUMERS if c.get('StreamARN') == data.get('StreamARN')]50 }51 return result52 elif action == 'DescribeStreamConsumer':53 consumer_arn = data.get('ConsumerARN') or data['ConsumerName']54 consumer_name = data.get('ConsumerName') or data['ConsumerARN']55 creation_timestamp = data.get('ConsumerCreationTimestamp')56 result = {57 'ConsumerDescription': {58 'ConsumerARN': consumer_arn,59 'ConsumerCreationTimestamp': creation_timestamp,...
loaders.py
Source:loaders.py
1import os, re2from collections import OrderedDict, Counter, defaultdict3from mmap import mmap4import logging5logger = logging.getLogger( __name__ )6class Archive( object ):7 def __init__( self ):8 pass9 def close( self ):10 pass11 def get_meta( self ):12 pass13 def list_files( self, path=None, recurse=True ):14 pass15 def list_paths( self, path=None, recurse=True ):16 pass17 def get_file_meta( self, path ):18 pass19 def get_file( self, path ):20 pass21 def get_path_meta( self, path ):22 pass23class FileSystem( Archive ):24 def __init__( self, base_path ):25 self.base_path = os.path.abspath( base_path )26 def _to_internal( self, path ):27 assert path.startswith( self.base_path )28 return "." + path[len( self.base_path ) :] + os.path.sep29 def _from_internal( self, path ):30 assert path.startswith( "." + os.path.sep )31 return self.base_path + path[1:]32 def list_paths( self, path=None, recurse=True ):33 results = []34 base = self.base_path if path is None else self._from_internal( path )35 if not recurse:36 _, sub_folders, _ = next( os.walk( base ) )37 results = [self._to_internal( root ) for root in sub_folders]38 else:39 for root, sub_folders, files in os.walk( base ):40 results.append( self._to_internal( root ) )41 return results42 def list_files( self, path=None, recurse=True ):43 results = []44 base = self.base_path if path is None else self._from_internal( path )45 if not recurse:46 _, _, files = next( os.walk( base ) )47 results = [self._to_internal( f ) for f in files]48 else:49 for root, sub_folders, files in os.walk( base ):50 for f in files:51 results.append( self._to_internal( root ) + f )52 return results53 def get_file( self, path ):54 # TODO: something nicer involving mmap?55 return open( self._from_internal( path ), "r+b" )56class Loader( object ):57 _SEP = re.escape( os.path.sep )58 def __init__(59 self,60 file_class_map,61 dependency_list=None,62 case_sensitive=False,63 unique_matches=True,64 ):65 self.file_class_map = file_class_map66 self.dependency_list = dependency_list67 self.case_sensitive = case_sensitive68 self.unique_matches = unique_matches69 self.re_flags = re.IGNORECASE if not case_sensitive else 070 self.file_re_map = {71 key: re.compile( key, flags=self.re_flags )72 for key, klass in file_class_map.items()73 if klass74 }75 self._files = OrderedDict()76 def load( self, target_path ):77 # target_path = os.path.abspath( target_path )78 self.fs = FileSystem( target_path )79 for f in self.fs.list_files():80 for key, regex in self.file_re_map.items():81 match = regex.search( f )82 if match:83 self._files[f] = {84 "klass": self.file_class_map[key],85 "re": key,86 "match": match.groups(),87 }88 if not self.case_sensitive:89 self._files[f]["match"] = tuple(90 [x.upper() for x in self._files[f]["match"]]91 )92 if self.unique_matches:93 unique_check = {94 k: v95 for k, v in Counter(96 [x["match"] for x in self._files.values()]97 ).items()98 if v > 199 }100 if unique_check:101 extras = []102 for name, file in self._files.items():103 if file["match"] in unique_check:104 extras.append( name )105 self._files = {}106 raise Exception(107 f"Multiple filename matches found for the same source: {', '.join( extras )}"108 )109 dependencies = []110 if self.dependency_list:111 for i, (consumer, dependency, format, attr) in enumerate(112 self.dependency_list113 ):114 consumer_re = re.compile( consumer, flags=self.re_flags )115 dependency_re = re.compile( dependency, flags=self.re_flags )116 consumer_matches = []117 dependency_matches = []118 if not self.case_sensitive:119 format = tuple( [x.upper() for x in format] )120 for path in self._files:121 consumer_match = consumer_re.search( path )122 dependency_match = dependency_re.search( path )123 if consumer_match and dependency_match:124 self._files = {}125 raise Exception(126 f"Problem parsing dependencies: path {path} matches for both consumer ({consumer}) and dependency ({dependency})"127 )128 elif consumer_match:129 groups = consumer_match.groups()130 if not self.case_sensitive:131 groups = tuple( [x.upper() for x in groups] )132 consumer_matches.append( (path, groups) )133 elif dependency_match:134 groups = dependency_match.groups()135 if not self.case_sensitive:136 groups = tuple( [x.upper() for x in groups] )137 dependency_matches.append( (path, groups) )138 for path, groups in consumer_matches:139 target_groups = tuple( [x.format( *groups ) for x in format] )140 if not self.case_sensitive:141 target_groups = tuple( [x.upper() for x in target_groups] )142 targets = [143 x[0] for x in dependency_matches if x[1] == target_groups144 ]145 if len( targets ) > 1:146 self._files = {}147 raise Exception(148 f"Problem parsing dependencies: path {path} has multiple matches for dependency {attr} ({', '.join( targets )})"149 )150 elif len( targets ) == 1:151 dependencies.append( (i, path, targets[0]) )152 # make dependency lookup table153 dependency_map = defaultdict( list )154 for index, source, dest in dependencies:155 dependency_map[source].append( (dest, self.dependency_list[index][3]) )156 # model the dependency tree157 head_count = defaultdict( int )158 tails = defaultdict( list )159 heads = []160 for index, tail, head in dependencies:161 head_count[tail] += 1162 if head in tails:163 tails[head].append( tail )164 else:165 tails[head] = [tail]166 heads.append( head )167 load_order = [h for h in heads if h not in head_count]168 for head in load_order:169 for tail in tails[head]:170 head_count[tail] -= 1171 if not head_count[tail]:172 load_order.append( tail )173 loop = [n for n, heads in head_count.items() if heads]174 if loop:175 self._files = {}176 raise Exception( "Problem parsing dependencies: loop detected" )177 load_order += [x for x in self._files.keys() if x not in load_order]178 # load files in based on dependency sorted list order179 logger.info( f"{self}: loading files" )180 for path in load_order:181 info = self._files[path]182 with self.fs.get_file( path ) as f:183 data = mmap( f.fileno(), 0 )184 logger.info( f'{path} => {info["klass"]}' )185 deps = {186 attr: self._files[dest]["obj"]187 for dest, attr in dependency_map[path]188 }189 info["obj"] = info["klass"]( data, preload_attrs=deps )190 data.close()191 self.post_load()192 return193 def post_load( self ):194 pass195 def save_file( self, target ):196 assert target in self._files197 export = self._files[target]["obj"].export_data()198 with open( target, "wb" ) as out:199 out.write( export )200 return201 def keys( self ):202 return self._files.keys()203 def __len__( self ):204 return len( self._files )205 def __getitem__( self, key ):206 return self._files[key]["obj"]207 def __contains__( self, key ):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!