Best Python code snippet using localstack_python
utils_factory.py
Source:utils_factory.py
...21 :returns: A specialization wrapper.22 :rtype: TopicSpecialization23 """24 # Create root topic.25 root = _create_topic(modules[0], None)26 # Create sub-topics.27 for module in modules[1:]:28 _create_topic(module, root)29 return root30create_specializations = get_specialization31def _create_topic(spec, parent, key=None):32 """Creates & returns a topic specialization.33 """34 if spec is None:35 return None36 # Hydrate either from a dictionary or a module.37 topic = TopicSpecialization(spec, parent)38 if isinstance(spec, dict):39 _set_topic_from_dict(topic, parent, key)40 else:41 _set_topic_from_module(topic, parent)42 # Set injected properties.43 _set_topic_injected_properties(topic)44 # Cache.45 cache_specialization(topic)46 return topic47def _set_topic_injected_properties(topic):48 """Injects a set of properties into the set of specializations.49 """50 # Model properties.51 if len(topic.path) == 3 and topic.path[1] == 'toplevel' and topic.path[2] == 'key_properties':52 if not topic.has_property('overview'):53 description = 'Top level overview of coupled model'54 _set_injected_property('overview', 'l-str', '1.1', description, topic)55 if not topic.has_property('keywords'):56 description = 'Keywords associated with coupled model'57 _set_injected_property('keywords', 'cs-str', '1.1', description, topic)58 if not topic.has_property('name'):59 description = 'Name of coupled model'60 _set_injected_property('name', 'str', '1.1', description, topic)61 # Topic key properties.62 elif len(topic.path) == 3 and topic.path[2] == 'key_properties':63 if not topic.has_property('overview'):64 description = 'Overview of {} model.'.format(topic.root.name)65 _set_injected_property('overview', 'l-str', '1.1', description, topic)66 if not topic.has_property('keywords'):67 description = 'Keywords associated with {} model code'.format(topic.root.name)68 _set_injected_property('keywords', 'cs-str', '1.1', description, topic)69 if not topic.has_property('name'):70 description = 'Name of {} model code'.format(topic.root.name)71 _set_injected_property('name', 'str', '1.1', description, topic)72 # Topic grid properties.73 elif len(topic.path) == 3 and topic.path[-1] == 'grid':74 if not topic.has_property('overview'):75 description = 'Overview of grid in {} model.'.format(topic.root.name)76 _set_injected_property('overview', 'l-str', '0.1', description, topic)77 if not topic.has_property('name'):78 description = 'Name of grid in {} model.'.format(topic.root.name)79 _set_injected_property('name', 'str', '0.1', description, topic)80 # Topic process properties.81 elif len(topic.path) == 3:82 if not topic.has_property('overview'):83 description = 'Overview of {} in {} model.'.format(topic.description.lower(), topic.root.name)84 _set_injected_property('overview', 'l-str', '0.1', description, topic)85 if not topic.has_property('name'):86 description = 'Commonly used name for the {} in {} model.'.format(topic.name_camel_case_spaced.lower(), topic.root.name)87 _set_injected_property('name', 'str', '0.1', description, topic)88 # Topic sub-process properties.89 elif len(topic.path) == 4:90 pass91 # if not topic.has_property('overview'):92 # description = 'Overview of {} in {} model.'.format(topic.description.lower(), topic.root.name)93 # _set_injected_property('overview', 'str', '0.1', description, topic)94def _set_injected_property(name, typeof, cardinality, description, topic):95 """Injects a property into the set of specializations.96 """97 try:98 enums = topic.spec.ENUMERATIONS99 except AttributeError:100 enums = topic.parent.spec.ENUMERATIONS101 _set_property(name, typeof, cardinality, description, topic, enums, False, True)102def _set_topic_from_module(topic, parent):103 """Set topic specialization attributes from a module.104 """105 try:106 topic.authors = topic.spec.AUTHORS107 except AttributeError:108 topic.authors = topic.root.authors109 try:110 topic.contact = topic.spec.CONTACT111 except AttributeError:112 topic.contact = topic.root.contact113 topic.description = topic.spec.DESCRIPTION114 if parent:115 topic.change_history = parent.change_history116 topic.contributors = parent.contributors117 topic.name = "_".join(topic.spec.__name__.split(".")[-1].split("_")[1:])118 topic.id = "{}.{}".format(parent.id, topic.name)119 else:120 topic.change_history = topic.spec.CHANGE_HISTORY121 topic.contributors = topic.spec.CONTRIBUTORS122 topic.name = topic.spec.__name__123 topic.id = "{}.{}".format(topic.spec.__file__.split("/")[-2], topic.name)124 # Assign properties / property sets.125 if hasattr(topic.spec, "DETAILS") and hasattr(topic.spec, "ENUMERATIONS"):126 for key, obj in topic.spec.DETAILS.items():127 # ... toplevel properties128 if key == "toplevel":129 _set_property_collection(topic, obj, topic.spec.ENUMERATIONS)130 # ... toplevel property sets131 elif key.startswith("toplevel"):132 _set_property_set(topic, key, obj, topic.spec.ENUMERATIONS)133 # ... sub-topic properties134 elif len(key.split(":")) == 1:135 # Create sub-sub-processes.136 _create_topic(obj, topic, key)137 _set_property_collection(topic.sub_topics[-1], obj, topic.spec.ENUMERATIONS)138 # ... sub-topic property sets139 elif len(key.split(":")) == 2:140 for st in topic.sub_topics:141 if st.name == key.split(":")[0]:142 _set_property_set(st, key, obj, topic.spec.ENUMERATIONS)143def _set_topic_from_dict(topic, parent, name):144 """Set topic specialization attributes from a dictionary.145 """146 topic.authors = parent.authors147 topic.contact = parent.contact148 topic.change_history = parent.change_history149 topic.contributors = parent.contributors150 topic.description = topic.spec['description']...
topic.py
Source:topic.py
...19 topic,20 num_partitions=num_partitions or self.num_partitions,21 replication_factor=replication_factor or self.replication_factor,22 )23 self._create_topic(new_topic)24 return self25 def _create_topic(self, topic: NewTopic):26 fs = self.admin.create_topics([topic])27 # Wait for each operation to finish.28 for item, f in fs.items():29 try:30 f.result() # The result itself is None31 print("topic {} created".format(item))32 except Exception as e:33 print("Failed to create topic {}: {}".format(item, e))34 def delete(self, topics: Union[str, List[str]]):35 if isinstance(topics, str):36 topics = [topics]...
test_admin.py
Source:test_admin.py
...9 return admin10@pytest.fixture()11def create_topic(admin):12 """Factory fixture to create new topics."""13 def _create_topic(topic_names: Union[str, List[str]]) -> None:14 if not isinstance(topic_names, list):15 topic_names = [topic_names]16 new_topics = [NewTopic(name, 1, 1) for name in topic_names]17 admin.admin.create_topics(new_topics)18 return _create_topic19def test_get_topics_by_prefix(admin, create_topic):20 create_topic(["foo-1", "foo-2", "bar-1"])21 topics = admin.get_topics_by_prefix("foo")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!