Best Python code snippet using localstack_python
tag_model.py
Source:tag_model.py
...130 def get_ops(self):131 return [t for t in self._tags.values() if isinstance(t, Op)]132 def get_invisible_tags(self):133 return [t for t in self._tags.values() if isinstance(t, InvisibleTag)]134 def get_tag_keys(self):135 return self._tags.keys()136 def get_members(self):137 return self.get_attrs() + self.get_ops()138 def is_empty(self):139 return not self.get_members()140 def is_global_scope(self):141 return self.name == ""142 def touches(self, range):143 if self.range and LineRange.touching(range, self.range):144 return True145 for tag in self.get_members():146 if LineRange.touching(range, tag.range):147 return True148 return False149class RelType(IntEnum):150 DERIV = 1151 REAL = 2152 ASSOC = 3153class Rel:154 def __init__(self, other_qn, type, diffsym=""):155 self.other_qn = other_qn156 self.type = type157 self.diffsym = diffsym158@dataclass159class Tag():160 fname: str161 kind: str162 roles: str163 name: str164 typeref: str165 scope: str166 visibility: str167 diffsym: str168 range169 def __init__(self):170 pass171@dataclass172class Attr(Tag):173 def __init__(self):174 super().__init__()175@dataclass176class Op(Tag):177 def __init__(self):178 super().__init__()179@dataclass180class InvisibleTag(Tag):181 def __init__(self):182 super().__init__()183class ModelFactory:184 def fromfile(self, tagpath):185 model = Model()186 if os.path.isdir(tagpath):187 tagpath = os.path.join(tagpath, "tags")188 if not os.path.isfile(tagpath):189 print("error:file not found: " + tagpath, file=sys.stderr)190 exit(1)191 with open(tagpath) as tagfile:192 for line in tagfile:193 if line.startswith("!_"):194 continue195 cols = line.rstrip("\n").split("\t")196 fname = cols[0]197 roles = cols[1]198 kind = cols[2]199 access = cols[6]200 implementation = cols[7]201 language = TagLanguage(cols[9])202 name = language.name(cols[3])203 scope = language.scope(cols[5])204 if language.is_blacklisted():205 continue206 if fname not in model:207 model[fname] = File(fname)208 if language.is_scope(kind, roles, access, name):209 stereotype = language.get_scope_stereotype(kind,210 implementation)211 t = Scope(name, fname, stereotype=stereotype)212 inherits = cols[8]213 if inherits != "" and inherits != "-":214 for i in language.split_inherits(inherits):215 t.add_inherit(i)216 if not Config.show_nested_classes \217 and language.is_nested(scope) \218 and not language.only_works_with_nesting():219 continue220 elif language.is_attr(kind, roles, access, name):221 t = Attr()222 elif language.is_op(kind, roles, access, name):223 t = Op()224 elif language.is_inv_tag(kind, roles, access, name):225 t = InvisibleTag()226 else:227 continue228 t.fname = fname229 t.roles = roles230 t.kind = kind231 t.name = name232 t.typeref = language.typeref(cols[4])233 t.scope = scope234 t.visibility = language.visibility(access)235 t.language = language236 t.is_static = language.is_static_member(kind, implementation)237 start_line = int(cols[10]) if cols[10].isdigit() else None238 end_line = int(cols[11]) if cols[11].isdigit() else None239 t.range = LineRange(start_line, end_line)240 if isinstance(t, Scope):241 model[fname].add_scope(t)242 else:243 model[fname].add_tag(t)244 self.complete_relationships_inheritance(model)245 self.complete_relationships_assoc(model)246 return model247 def complete_relationships_inheritance(self, model):248 for fname in model:249 for scope in model[fname].get_scopes():250 for inherit in scope.tagval_inherits:251 other_scope = model.find_scope(inherit, fname)252 if not other_scope:253 # Ignore scopes we don't know254 continue255 if other_scope.is_interface():256 reltype = RelType.REAL257 else:258 reltype = RelType.DERIV259 scope.add_rel(other_scope.qualified_name, reltype)260 def complete_relationships_assoc(self, model):261 for fname in model:262 file = model[fname]263 for scope in file.get_scopes():264 for key in scope.get_tags_dict():265 tag = scope.get_tag(key)266 if tag.roles == "def":267 continue268 found_file = model.find_file(tag.name, tag.language)269 if found_file:270 other_scope = found_file.get_global_scope()271 else:272 # Also look for references to scopes273 local_name = tag.language.local_name(tag.name)274 other_scope = model.find_scope(local_name, fname)275 if other_scope:276 other_file = model.get_file_by_scope(277 other_scope.qualified_name)278 start_scope = self.pick_rel_scope(file, scope,279 tag.range)280 other_scope = self.pick_rel_scope(other_file,281 other_scope)282 if other_scope == start_scope:283 # Ignore self-reference284 continue285 if (start_scope.is_global_scope() and286 file.find_scope(other_scope.qualified_name)):287 # Ignore reference from a file to its own scopes288 continue289 start_scope.add_rel(other_scope.qualified_name,290 RelType.ASSOC)291 def pick_rel_scope(self, file, scope, line_range=None):292 if (scope.is_global_scope() and scope.is_empty()293 and file.get_scope_count() == 2):294 return file.get_real_scopes()[0]295 if (Config.find_rel_scope_using_line_numbers296 and scope.is_global_scope()297 and line_range):298 for s in file.get_scopes():299 if s.touches(line_range):300 return s301 return scope302 def diff(self, model_a, model_b):303 model = Model()304 fnames = self._merge_in_order(list(model_a.keys()),305 list(model_b.keys()))306 for fname in fnames:307 model[fname] = File(fname)308 file_a = model_a[fname] if fname in model_a else File()309 file_b = model_b[fname] if fname in model_b else File()310 snames = self._merge_in_order(list(file_a.get_scope_keys()),311 list(file_b.get_scope_keys()))312 for sname in snames:313 scope_a = (file_a.get_scope(sname) if file_a.has(sname) else314 Scope())315 scope_b = (file_b.get_scope(sname) if file_b.has(sname) else316 Scope())317 tnames = self._merge_in_order(list(scope_a.get_tag_keys()),318 list(scope_b.get_tag_keys()))319 sdiffsym = (" " if file_a.has(sname) and file_b.has(sname) else320 "-" if file_a.has(sname) else321 "+")322 stereotype = scope_b.stereotype or scope_a.stereotype323 scope = Scope(sname, fname, sdiffsym, stereotype)324 other_qns = self._merge_in_order(325 list(scope_a.get_rel_other_qns()),326 list(scope_b.get_rel_other_qns()))327 for other_qn in other_qns:328 rel_a = scope_a.get_rel(other_qn)329 rel_b = scope_b.get_rel(other_qn)330 rdiffsym = (" " if (rel_a and rel_b) else331 "-" if rel_a else332 "+")333 reltype = rel_b and rel_b.type or rel_a.type334 scope.add_rel(other_qn, reltype, rdiffsym)335 model[fname].add_scope(scope)336 for tname in tnames:337 if (tname in scope_a.get_tag_keys()338 and tname in scope_b.get_tag_keys()339 and (scope_a.get_tag(tname) ==340 scope_b.get_tag(tname))):341 scope.add_tag(copy.copy(scope_b.get_tag(tname)), " ")342 else:343 if tname in scope_a.get_tag_keys():344 scope.add_tag(copy.copy(scope_a.get_tag(tname)),345 "-")346 if tname in scope_b.get_tag_keys():347 scope.add_tag(copy.copy(scope_b.get_tag(tname)),348 "+")349 return model350 def _merge_in_order(self, a, b):351 a2 = list(a)352 b2 = list(b)353 result = list()354 while len(a2) > 0:355 v1 = a2[0]356 if v1 in b2:357 while b2[0] != v1:358 v2 = b2[0]359 if v2 in a2:360 a2.remove(v2)...
test_tags.py
Source:test_tags.py
...113 reader.add_feed('1')114 reader.add_feed('2')115 reader.update_feeds()116 with subtests.test("no tags"):117 assert list(reader.get_tag_keys(one)) == []118 assert list(reader.get_tag_keys()) == []119 assert list(reader.get_tag_keys(None)) == []120 assert list(reader.get_tag_keys(wildcard)) == []121 reader.set_tag(one, 'tag-1')122 with subtests.test("one tag"):123 assert list(reader.get_tag_keys(one)) == ['tag-1']124 assert list(reader.get_tag_keys()) == ['tag-1']125 with subtests.test("adding tag twice does not raise"):126 reader.set_tag(one, 'tag-1')127 reader.set_tag(two, 'tag-2-2')128 reader.set_tag(two, 'tag-2-1')129 with subtests.test("many tags"):130 assert list(reader.get_tag_keys(one)) == ['tag-1']131 assert list(reader.get_tag_keys(two)) == ['tag-2-1', 'tag-2-2']132 assert list(reader.get_tag_keys()) == ['tag-1', 'tag-2-1', 'tag-2-2']133 assert list(reader.get_tag_keys(None)) == ['tag-1', 'tag-2-1', 'tag-2-2']134 assert list(reader.get_tag_keys(wildcard)) == ['tag-1', 'tag-2-1', 'tag-2-2']135 reader.delete_tag(one, 'tag-2-1', missing_ok=True)136 with subtests.test("after delete"):137 assert list(reader.get_tag_keys(one)) == ['tag-1']138 assert list(reader.get_tag_keys(two)) == ['tag-2-1', 'tag-2-2']139 assert list(reader.get_tag_keys()) == ['tag-1', 'tag-2-1', 'tag-2-2']140 reader.delete_tag(two, 'tag-2-1', missing_ok=True)141 with subtests.test("after another delete"):142 assert list(reader.get_tag_keys(one)) == ['tag-1']143 assert list(reader.get_tag_keys(two)) == ['tag-2-2']144 assert list(reader.get_tag_keys()) == ['tag-1', 'tag-2-2']145 reader.set_tag(two, 'tag-2-3')146 reader.set_tag(two, 'tag-2-0')147 reader.set_tag(two, 'tag-2-1')148 reader.set_tag(one, 'tag-common')149 reader.set_tag(two, 'tag-common')150 with subtests.test("ordering and uninon"):151 assert list(reader.get_tag_keys(one)) == ['tag-1', 'tag-common']152 assert list(reader.get_tag_keys(two)) == [153 'tag-2-0',154 'tag-2-1',155 'tag-2-2',156 'tag-2-3',157 'tag-common',158 ]159 assert list(reader.get_tag_keys()) == [160 'tag-1',161 'tag-2-0',162 'tag-2-1',163 'tag-2-2',164 'tag-2-3',165 'tag-common',166 ]167 assert list(reader.get_tag_keys(wildcard)) == [168 'tag-1',169 'tag-2-0',170 'tag-2-1',171 'tag-2-2',172 'tag-2-3',173 'tag-common',174 ]175 reader.delete_feed('2')176 with subtests.test("after delete resource"):177 assert list(reader.get_tag_keys(one)) == ['tag-1', 'tag-common']178 assert list(reader.get_tag_keys(two)) == []179 assert list(reader.get_tag_keys()) == ['tag-1', 'tag-common']180def test_as_tags_global(reader, subtests, chunk_size):181 """Subset of test_as_tags() that works for global tags."""182 reader._storage.chunk_size = chunk_size183 with subtests.test("no tags"):184 assert list(reader.get_tag_keys(())) == []185 assert list(reader.get_tag_keys()) == []186 assert list(reader.get_tag_keys(None)) == []187 reader.set_tag((), 'tag-1')188 with subtests.test("one tag"):189 assert list(reader.get_tag_keys(())) == ['tag-1']190 assert list(reader.get_tag_keys()) == ['tag-1']191 with subtests.test("adding tag twice does not raise"):192 reader.set_tag((), 'tag-1')193 reader.set_tag((), 'tag-2')194 with subtests.test("many tags"):195 assert list(reader.get_tag_keys(())) == ['tag-1', 'tag-2']196 assert list(reader.get_tag_keys()) == ['tag-1', 'tag-2']197 assert list(reader.get_tag_keys(None)) == ['tag-1', 'tag-2']198 reader.delete_tag((), 'tag-2', missing_ok=True)199 with subtests.test("after delete"):200 assert list(reader.get_tag_keys(())) == ['tag-1']201 assert list(reader.get_tag_keys()) == ['tag-1']202 reader.set_tag((), 'tag-3')203 reader.set_tag((), 'tag-0')204 with subtests.test("ordering"):205 assert list(reader.get_tag_keys(())) == ['tag-0', 'tag-1', 'tag-3']206 assert list(reader.get_tag_keys()) == ['tag-0', 'tag-1', 'tag-3']207def test_wildcard_interaction(reader, chunk_size):208 reader._storage.chunk_size = chunk_size209 reader._parser = parser = Parser()210 parser.feed(1)211 parser.entry(1, 1)212 parser.feed(2)213 parser.entry(2, 1)214 reader.add_feed('1')215 reader.add_feed('2')216 reader.update_feeds()217 reader.set_tag(('2', '2, 1'), '6-entry')218 reader.set_tag('1', '4-feed')219 reader.set_tag(('1', '1, 1'), '2-entry')220 reader.set_tag((), '5-global')221 reader.set_tag((), '3-global')222 reader.set_tag('2', '1-feed')223 assert list(reader.get_tag_keys((None,))) == ['1-feed', '4-feed']224 assert list(reader.get_tag_keys((None, None))) == ['2-entry', '6-entry']225 assert list(reader.get_tag_keys(())) == ['3-global', '5-global']226 assert list(reader.get_tag_keys()) == [227 '1-feed',228 '2-entry',229 '3-global',230 '4-feed',231 '5-global',232 '6-entry',233 ]234 assert list(reader.get_tag_keys(None)) == [235 '1-feed',236 '2-entry',237 '3-global',238 '4-feed',239 '5-global',240 '6-entry',241 ]242@parametrize_dict(243 'resource',244 {245 'feed': '1',246 'entry': ('1', '1, 1'),247 'global': (),248 },249)250@pytest.mark.parametrize('value', [0, 1, 'value', {}, False, None, {'complex': [1]}])251def test_set_no_value(reader, resource, value):252 reader._parser = parser = Parser()253 feed = parser.feed(1)254 entry = parser.entry(1, 1)255 reader.add_feed(feed)256 reader.update_feeds()257 reader.set_tag(resource, 'one', value)258 reader.set_tag(resource, 'two')259 reader.set_tag(resource, 'one')260 assert dict(reader.get_tags(resource)) == {'one': value, 'two': None}261 assert set(reader.get_tag_keys(resource)) == {'one', 'two'}262@parametrize_dict(263 'make_resource_arg',264 {265 'global': lambda *_: (),266 'feed': lambda f, _: f,267 'feed_id': lambda f, _: f.resource_id,268 'feed_tuple': lambda f, _: (f.url,),269 'entry': lambda _, e: e,270 'entry_id': lambda _, e: e.resource_id,271 },272)273def test_resource_argument(reader, make_resource_arg):274 reader._parser = parser = Parser()275 feed = parser.feed(1)276 entry = parser.entry(1, 1)277 reader.add_feed(feed)278 reader.update_feeds()279 resource = make_resource_arg(feed, entry)280 reader.set_tag(resource, 'one', 'value')281 assert list(reader.get_tags(resource)) == [('one', 'value')]282 assert list(reader.get_tag_keys(resource)) == ['one']283 assert reader.get_tag(resource, 'one') == 'value'284 reader.delete_tag(resource, 'one')285@pytest.mark.parametrize(286 'resource',287 # a small subset of the _resource_argument() bad arguments288 [1, ('a', 2), Entry('entry', feed=Feed(None))],289)290def test_resource_argument_valueerror(reader, resource):291 with pytest.raises(ValueError):292 reader.set_tag(resource, 'one', 'value')293 with pytest.raises(ValueError):294 list(reader.get_tags(resource))295 with pytest.raises(ValueError):296 list(reader.get_tag_keys(resource))297 with pytest.raises(ValueError):298 reader.get_tag(resource, 'one')299 with pytest.raises(ValueError):300 reader.delete_tag(resource, 'one')301@pytest.mark.parametrize(302 'resource',303 [304 None,305 (None,),306 (None, None),307 ('feed', None),308 (None, 'entry'),309 ],310)311def test_get_tags_wildcard_valueerror(reader, resource):312 with pytest.raises(ValueError):313 list(reader.get_tags(resource))314@pytest.mark.parametrize('resource', [('feed', None), (None, 'entry')])315def test_get_tag_keys_wildcard_valueerror(reader, resource):316 with pytest.raises(ValueError):...
identify_clusters.py
Source:identify_clusters.py
...13print("Default region selected: %s" % args.Region[0])14os.environ["AWS_DEFAULT_REGION"] = args.Region[0]15client = boto3.client('resourcegroupstaggingapi')16ec2 = boto3.client('ec2')17def get_tag_keys():18 tag_keys = []19 response = client.get_tag_keys()20 tag_keys.extend(response['TagKeys'])21 while 'PaginationToken' in response and response['PaginationToken'] != "":22 current_token = response['PaginationToken']23 response = client.get_tag_keys(PaginationToken=current_token)24 tag_keys.extend(response['TagKeys'])25 return tag_keys26def get_only_clusters(tags):27 return [tag for tag in tags if "kubernetes.io/cluster" in tag]28def get_resources_for_cluster(cluster_tag):29 resources = []30 response = client.get_resources(TagFilters=[cluster_tag])31 resources.extend([resource['ResourceARN']32 for resource in response['ResourceTagMappingList']])33 while 'PaginationToken' in response and response['PaginationToken'] != "":34 current_token = response['PaginationToken']35 response = client.get_resources(36 PaginationToken=current_token, TagFilters=[cluster_tag])37 resources.extend([resource['ResourceARN']38 for resource in response['ResourceTagMappingList']])39 return resources40def filter_list(key, full_list):41 return [resource.split(f"arn:aws:ec2:{args.Region[0]}:{args.acc_id[0]}:{key}")[-1] for resource in full_list if key in resource]42def remove_whitelisted_tags(tags, whitelist):43 return [tag for tag in tags if all(whitelist_tag not in tag for whitelist_tag in whitelist)]44def main():45 print("Searching for clusters, Please wait this may take a while!")46 with open("./whitelist.json") as json_file:47 whitelist = json.load(json_file)48 tag_keys = get_tag_keys()49 only_clusters = get_only_clusters(tag_keys)50 clusters_to_delete = remove_whitelisted_tags(only_clusters, whitelist['tags'])51 clusters_with_instances = []52 total_instances_counted = 053 for cluster in clusters_to_delete:54 resources = get_resources_for_cluster({"Key": cluster, "Values": ["owned"]})55 instances = filter_list('instance/', resources)56 if len(instances) > 0:57 clusters_with_instances.append({58 "cluster": cluster,59 "instance_count": len(instances)60 })61 total_instances_counted += len(instances)62 print("Total instances count=",total_instances_counted)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!