Best Python code snippet using gabbi_python
gabbits.py
Source:gabbits.py
...40 global CONF41 return deploy.loadapp(CONF)42class APIFixture(fixture.GabbiFixture):43 """Setup the required backend fixtures for a basic placement service."""44 def start_fixture(self):45 global CONF46 # Set up stderr and stdout captures by directly driving the47 # existing nova fixtures that do that. This captures the48 # output that happens outside individual tests (for49 # example database migrations).50 self.standard_logging_fixture = capture.Logging()51 self.standard_logging_fixture.setUp()52 self.output_stream_fixture = output.CaptureOutput()53 self.output_stream_fixture.setUp()54 self.logging_error_fixture = (55 logging_error.get_logging_handle_error_fixture())56 self.logging_error_fixture.setUp()57 # Filter ignorable warnings during test runs.58 self.warnings_fixture = capture.WarningsFixture()59 self.warnings_fixture.setUp()60 # Do not use global CONF61 self.conf_fixture = config_fixture.Config(cfg.ConfigOpts())62 self.conf_fixture.setUp()63 conf.register_opts(self.conf_fixture.conf)64 self.conf_fixture.config(group='api', auth_strategy='noauth2')65 self.placement_db_fixture = fixtures.Database(66 self.conf_fixture, set_config=True)67 self.placement_db_fixture.setUp()68 self.context = context.RequestContext()69 # Register CORS opts, but do not set config. This has the70 # effect of exercising the "don't use cors" path in71 # deploy.py. Without setting some config the group will not72 # be present.73 self.conf_fixture.register_opts(cors.CORS_OPTS, 'cors')74 # Set default policy opts, otherwise the deploy module can75 # NoSuchOptError.76 policy_opts.set_defaults(self.conf_fixture.conf)77 # Make sure default_config_files is an empty list, not None.78 # If None /etc/placement/placement.conf is read and confuses results.79 self.conf_fixture.conf([], default_config_files=[])80 # Turn on a policy fixture.81 self.policy_fixture = policy_fixture.PolicyFixture(82 self.conf_fixture)83 self.policy_fixture.setUp()84 os.environ['RP_UUID'] = uuidutils.generate_uuid()85 os.environ['RP_NAME'] = uuidutils.generate_uuid()86 os.environ['CUSTOM_RES_CLASS'] = 'CUSTOM_IRON_NFV'87 os.environ['PROJECT_ID'] = uuidutils.generate_uuid()88 os.environ['USER_ID'] = uuidutils.generate_uuid()89 os.environ['PROJECT_ID_ALT'] = uuidutils.generate_uuid()90 os.environ['USER_ID_ALT'] = uuidutils.generate_uuid()91 os.environ['INSTANCE_UUID'] = uuidutils.generate_uuid()92 os.environ['MIGRATION_UUID'] = uuidutils.generate_uuid()93 os.environ['CONSUMER_UUID'] = uuidutils.generate_uuid()94 os.environ['PARENT_PROVIDER_UUID'] = uuidutils.generate_uuid()95 os.environ['ALT_PARENT_PROVIDER_UUID'] = uuidutils.generate_uuid()96 CONF = self.conf_fixture.conf97 def stop_fixture(self):98 global CONF99 self.placement_db_fixture.cleanUp()100 self.warnings_fixture.cleanUp()101 self.output_stream_fixture.cleanUp()102 self.standard_logging_fixture.cleanUp()103 self.logging_error_fixture.cleanUp()104 self.policy_fixture.cleanUp()105 self.conf_fixture.cleanUp()106 CONF = None107class AllocationFixture(APIFixture):108 """An APIFixture that has some pre-made Allocations.109 +----- same user----+ alt_user110 | | |111 +----+----------+ +------+-----+ +-----+---------+112 | consumer1 | | consumer2 | | alt_consumer |113 | DISK_GB:1000 | | VCPU: 6 | | VCPU: 1 |114 | | | | | DISK_GB:20 |115 +-------------+-+ +------+-----+ +-+-------------+116 | | |117 +-+----------+---------+-+118 | rp |119 | VCPU: 10 |120 | DISK_GB:2048 |121 +------------------------+122 """123 def start_fixture(self):124 super(AllocationFixture, self).start_fixture()125 # For use creating and querying allocations/usages126 os.environ['ALT_USER_ID'] = uuidutils.generate_uuid()127 project_id = os.environ['PROJECT_ID']128 user_id = os.environ['USER_ID']129 alt_user_id = os.environ['ALT_USER_ID']130 user = user_obj.User(self.context, external_id=user_id)131 user.create()132 alt_user = user_obj.User(self.context, external_id=alt_user_id)133 alt_user.create()134 project = project_obj.Project(self.context, external_id=project_id)135 project.create()136 # Stealing from the super137 rp_name = os.environ['RP_NAME']138 rp_uuid = os.environ['RP_UUID']139 # Create the rp with VCPU and DISK_GB inventory140 rp = tb.create_provider(self.context, rp_name, uuid=rp_uuid)141 tb.add_inventory(rp, 'DISK_GB', 2048,142 step_size=10, min_unit=10, max_unit=1000)143 tb.add_inventory(rp, 'VCPU', 10, max_unit=10)144 # Create a first consumer for the DISK_GB allocations145 consumer1 = tb.ensure_consumer(self.context, user, project)146 tb.set_allocation(self.context, rp, consumer1, {'DISK_GB': 1000})147 os.environ['CONSUMER_0'] = consumer1.uuid148 # Create a second consumer for the VCPU allocations149 consumer2 = tb.ensure_consumer(self.context, user, project)150 tb.set_allocation(self.context, rp, consumer2, {'VCPU': 6})151 os.environ['CONSUMER_ID'] = consumer2.uuid152 # Create a consumer object for a different user153 alt_consumer = tb.ensure_consumer(self.context, alt_user, project)154 os.environ['ALT_CONSUMER_ID'] = alt_consumer.uuid155 # Create a couple of allocations for a different user.156 tb.set_allocation(self.context, rp, alt_consumer,157 {'DISK_GB': 20, 'VCPU': 1})158 # The ALT_RP_XXX variables are for a resource provider that has159 # not been created in the Allocation fixture160 os.environ['ALT_RP_UUID'] = uuidutils.generate_uuid()161 os.environ['ALT_RP_NAME'] = uuidutils.generate_uuid()162class SharedStorageFixture(APIFixture):163 """An APIFixture that has two compute nodes, one with local storage and one164 without, both associated by aggregate to two providers of shared storage.165 Both compute nodes have respectively two numa node resource providers, each166 of which has a pf resource provider.167 +-------------------------+ +-------------------------+168 | sharing storage (ss) | | sharing storage (ss2) |169 | DISK_GB:2000 |----+---| DISK_GB:2000 |170 | traits: MISC_SHARES... | | | traits: MISC_SHARES... |171 +-------------------------+ | +-------------------------+172 | aggregate173 +--------------------------+ | +------------------------+174 | compute node (cn1) |---+---| compute node (cn2) |175 | CPU: 24 | | CPU: 24 |176 | MEMORY_MB: 128*1024 | | MEMORY_MB: 128*1024 |177 | traits: HW_CPU_X86_SSE, | | DISK_GB: 2000 |178 | HW_CPU_X86_SSE2 | | |179 +--------------------------+ +------------------------+180 | | | |181 +---------+ +---------+ +---------+ +---------+182 | numa1_1 | | numa1_2 | | numa2_1 | | numa2_2 |183 +---------+ +---------+ +---------+ +---------+184 | | | |185 +---------------++---------------++---------------++----------------+186 | pf1_1 || pf1_2 || pf2_1 || pf2_2 |187 | SRIOV_NET_VF:8|| SRIOV_NET_VF:8|| SRIOV_NET_VF:8|| SRIOV_NET_VF:8 |188 +---------------++---------------++---------------++----------------+189 """190 def start_fixture(self):191 super(SharedStorageFixture, self).start_fixture()192 agg_uuid = uuidutils.generate_uuid()193 cn1 = tb.create_provider(self.context, 'cn1', agg_uuid)194 cn2 = tb.create_provider(self.context, 'cn2', agg_uuid)195 ss = tb.create_provider(self.context, 'ss', agg_uuid)196 ss2 = tb.create_provider(self.context, 'ss2', agg_uuid)197 numa1_1 = tb.create_provider(self.context, 'numa1_1', parent=cn1.uuid)198 numa1_2 = tb.create_provider(self.context, 'numa1_2', parent=cn1.uuid)199 numa2_1 = tb.create_provider(self.context, 'numa2_1', parent=cn2.uuid)200 numa2_2 = tb.create_provider(self.context, 'numa2_2', parent=cn2.uuid)201 pf1_1 = tb.create_provider(self.context, 'pf1_1', parent=numa1_1.uuid)202 pf1_2 = tb.create_provider(self.context, 'pf1_2', parent=numa1_2.uuid)203 pf2_1 = tb.create_provider(self.context, 'pf2_1', parent=numa2_1.uuid)204 pf2_2 = tb.create_provider(self.context, 'pf2_2', parent=numa2_2.uuid)205 os.environ['AGG_UUID'] = agg_uuid206 os.environ['CN1_UUID'] = cn1.uuid207 os.environ['CN2_UUID'] = cn2.uuid208 os.environ['SS_UUID'] = ss.uuid209 os.environ['SS2_UUID'] = ss2.uuid210 os.environ['NUMA1_1_UUID'] = numa1_1.uuid211 os.environ['NUMA1_2_UUID'] = numa1_2.uuid212 os.environ['NUMA2_1_UUID'] = numa2_1.uuid213 os.environ['NUMA2_2_UUID'] = numa2_2.uuid214 os.environ['PF1_1_UUID'] = pf1_1.uuid215 os.environ['PF1_2_UUID'] = pf1_2.uuid216 os.environ['PF2_1_UUID'] = pf2_1.uuid217 os.environ['PF2_2_UUID'] = pf2_2.uuid218 # Populate compute node inventory for VCPU and RAM219 for cn in (cn1, cn2):220 tb.add_inventory(cn, orc.VCPU, 24,221 allocation_ratio=16.0)222 tb.add_inventory(cn, orc.MEMORY_MB, 128 * 1024,223 allocation_ratio=1.5)224 tb.set_traits(cn1, 'HW_CPU_X86_SSE', 'HW_CPU_X86_SSE2')225 tb.add_inventory(cn2, orc.DISK_GB, 2000,226 reserved=100, allocation_ratio=1.0)227 for shared in (ss, ss2):228 # Populate shared storage provider with DISK_GB inventory and229 # mark it shared among any provider associated via aggregate230 tb.add_inventory(shared, orc.DISK_GB, 2000,231 reserved=100, allocation_ratio=1.0)232 tb.set_traits(shared, 'MISC_SHARES_VIA_AGGREGATE')233 # Populate PF inventory for VF234 for pf in (pf1_1, pf1_2, pf2_1, pf2_2):235 tb.add_inventory(pf, orc.SRIOV_NET_VF,236 8, allocation_ratio=1.0)237class NUMAAggregateFixture(APIFixture):238 """An APIFixture that has two compute nodes without a resource themselves.239 They are associated by aggregate to a provider of shared storage and both240 compute nodes have two numa node resource providers with CPUs. One of the241 numa node is associated to another sharing storage by a different242 aggregate.243 +-----------------------+244 | sharing storage (ss1) |245 | DISK_GB:2000 |246 | agg: [aggA] |247 +-----------+-----------+248 |249 +---------------+----------------+250 +---------------|--------------+ +--------------|--------------+251 | +-------------+------------+ | | +------------+------------+ |252 | | compute node (cn1) | | | |compute node (cn2) | |253 | | agg: [aggA] | | | | agg: [aggA, aggB] | |254 | +-----+-------------+------+ | | +----+-------------+------+ |255 | | nested | nested | | | nested | nested |256 | +-----+------+ +----+------+ | | +----+------+ +----+------+ |257 | | numa1_1 | | numa1_2 | | | | numa2_1 | | numa2_2 | |258 | | CPU: 24 | | CPU: 24 | | | | CPU: 24 | | CPU: 24 | |259 | | agg:[aggC]| | | | | | | | | |260 | +-----+------+ +-----------+ | | +-----------+ +-----------+ |261 +-------|----------------------+ +-----------------------------+262 | aggC263 +-----+-----------------+264 | sharing storage (ss2) |265 | DISK_GB:2000 |266 | agg: [aggC] |267 +-----------------------+268 """269 def start_fixture(self):270 super(NUMAAggregateFixture, self).start_fixture()271 aggA_uuid = uuidutils.generate_uuid()272 aggB_uuid = uuidutils.generate_uuid()273 aggC_uuid = uuidutils.generate_uuid()274 cn1 = tb.create_provider(self.context, 'cn1', aggA_uuid)275 cn2 = tb.create_provider(self.context, 'cn2', aggA_uuid, aggB_uuid)276 ss1 = tb.create_provider(self.context, 'ss1', aggA_uuid)277 ss2 = tb.create_provider(self.context, 'ss2', aggC_uuid)278 numa1_1 = tb.create_provider(279 self.context, 'numa1_1', aggC_uuid, parent=cn1.uuid)280 numa1_2 = tb.create_provider(self.context, 'numa1_2', parent=cn1.uuid)281 numa2_1 = tb.create_provider(self.context, 'numa2_1', parent=cn2.uuid)282 numa2_2 = tb.create_provider(self.context, 'numa2_2', parent=cn2.uuid)283 os.environ['AGGA_UUID'] = aggA_uuid284 os.environ['AGGB_UUID'] = aggB_uuid285 os.environ['AGGC_UUID'] = aggC_uuid286 os.environ['CN1_UUID'] = cn1.uuid287 os.environ['CN2_UUID'] = cn2.uuid288 os.environ['SS1_UUID'] = ss1.uuid289 os.environ['SS2_UUID'] = ss2.uuid290 os.environ['NUMA1_1_UUID'] = numa1_1.uuid291 os.environ['NUMA1_2_UUID'] = numa1_2.uuid292 os.environ['NUMA2_1_UUID'] = numa2_1.uuid293 os.environ['NUMA2_2_UUID'] = numa2_2.uuid294 # Populate compute node inventory for VCPU and RAM295 for numa in (numa1_1, numa1_2, numa2_1, numa2_2):296 tb.add_inventory(numa, orc.VCPU, 24,297 allocation_ratio=16.0)298 # Populate shared storage provider with DISK_GB inventory and299 # mark it shared among any provider associated via aggregate300 for ss in (ss1, ss2):301 tb.add_inventory(ss, orc.DISK_GB, 2000,302 reserved=100, allocation_ratio=1.0)303 tb.set_traits(ss, 'MISC_SHARES_VIA_AGGREGATE')304class NonSharedStorageFixture(APIFixture):305 """An APIFixture that has two compute nodes with local storage that do not306 use shared storage.307 """308 def start_fixture(self):309 super(NonSharedStorageFixture, self).start_fixture()310 aggA_uuid = uuidutils.generate_uuid()311 aggB_uuid = uuidutils.generate_uuid()312 aggC_uuid = uuidutils.generate_uuid()313 os.environ['AGGA_UUID'] = aggA_uuid314 os.environ['AGGB_UUID'] = aggB_uuid315 os.environ['AGGC_UUID'] = aggC_uuid316 cn1 = tb.create_provider(self.context, 'cn1')317 cn2 = tb.create_provider(self.context, 'cn2')318 os.environ['CN1_UUID'] = cn1.uuid319 os.environ['CN2_UUID'] = cn2.uuid320 # Populate compute node inventory for VCPU, RAM and DISK321 for cn in (cn1, cn2):322 tb.add_inventory(cn, 'VCPU', 24)323 tb.add_inventory(cn, 'MEMORY_MB', 128 * 1024)324 tb.add_inventory(cn, 'DISK_GB', 2000)325class CORSFixture(APIFixture):326 """An APIFixture that turns on CORS."""327 def start_fixture(self):328 super(CORSFixture, self).start_fixture()329 # NOTE(cdent): If we remove this override, then the cors330 # group ends up not existing in the conf, so when deploy.py331 # wants to load the CORS middleware, it will not.332 self.conf_fixture.config(333 group='cors',334 allowed_origin='http://valid.example.com')335class GranularFixture(APIFixture):336 """An APIFixture that sets up the following provider environment for337 testing granular resource requests.338+========================++========================++========================+339|cn_left ||cn_middle ||cn_right |340|VCPU: 8 ||VCPU: 8 ||VCPU: 8 |341|MEMORY_MB: 4096 ||MEMORY_MB: 4096 ||MEMORY_MB: 4096 |342|DISK_GB: 500 ||SRIOV_NET_VF: 8 ||DISK_GB: 500 |343|VGPU: 8 ||CUSTOM_NET_MBPS: 4000 ||VGPU: 8 |344|SRIOV_NET_VF: 8 ||traits: HW_CPU_X86_AVX, || - max_unit: 2 |345|CUSTOM_NET_MBPS: 4000 || HW_CPU_X86_AVX2,||traits: HW_CPU_X86_MMX, |346|traits: HW_CPU_X86_AVX, || HW_CPU_X86_SSE, || HW_GPU_API_DXVA,|347| HW_CPU_X86_AVX2,|| HW_NIC_ACCEL_TLS|| CUSTOM_DISK_SSD,|348| HW_GPU_API_DXVA,|+=+=====+================++==+========+============+349| HW_NIC_DCB_PFC, | : : : : a350| CUSTOM_FOO +..+ +--------------------+ : g351+========================+ : a : : g352 : g : : C353+========================+ : g : +===============+======+354|shr_disk_1 | : A : |shr_net |355|DISK_GB: 1000 +..+ : |SRIOV_NET_VF: 16 |356|traits: CUSTOM_DISK_SSD,| : : a |CUSTOM_NET_MBPS: 40000|357| MISC_SHARES_VIA_AGG...| : : g |traits: MISC_SHARES...|358+========================+ : : g +======================+359+=======================+ : : B360|shr_disk_2 +...+ :361|DISK_GB: 1000 | :362|traits: MISC_SHARES... +.........+363+=======================+364 """365 def start_fixture(self):366 super(GranularFixture, self).start_fixture()367 rc_obj.ResourceClass(368 context=self.context, name='CUSTOM_NET_MBPS').create()369 os.environ['AGGA'] = uuids.aggA370 os.environ['AGGB'] = uuids.aggB371 os.environ['AGGC'] = uuids.aggC372 cn_left = tb.create_provider(self.context, 'cn_left', uuids.aggA)373 os.environ['CN_LEFT'] = cn_left.uuid374 tb.add_inventory(cn_left, 'VCPU', 8)375 tb.add_inventory(cn_left, 'MEMORY_MB', 4096)376 tb.add_inventory(cn_left, 'DISK_GB', 500)377 tb.add_inventory(cn_left, 'VGPU', 8)378 tb.add_inventory(cn_left, 'SRIOV_NET_VF', 8)379 tb.add_inventory(cn_left, 'CUSTOM_NET_MBPS', 4000)380 tb.set_traits(cn_left, 'HW_CPU_X86_AVX', 'HW_CPU_X86_AVX2',381 'HW_GPU_API_DXVA', 'HW_NIC_DCB_PFC', 'CUSTOM_FOO')382 cn_middle = tb.create_provider(383 self.context, 'cn_middle', uuids.aggA, uuids.aggB)384 os.environ['CN_MIDDLE'] = cn_middle.uuid385 tb.add_inventory(cn_middle, 'VCPU', 8)386 tb.add_inventory(cn_middle, 'MEMORY_MB', 4096)387 tb.add_inventory(cn_middle, 'SRIOV_NET_VF', 8)388 tb.add_inventory(cn_middle, 'CUSTOM_NET_MBPS', 4000)389 tb.set_traits(cn_middle, 'HW_CPU_X86_AVX', 'HW_CPU_X86_AVX2',390 'HW_CPU_X86_SSE', 'HW_NIC_ACCEL_TLS')391 cn_right = tb.create_provider(392 self.context, 'cn_right', uuids.aggB, uuids.aggC)393 os.environ['CN_RIGHT'] = cn_right.uuid394 tb.add_inventory(cn_right, 'VCPU', 8)395 tb.add_inventory(cn_right, 'MEMORY_MB', 4096)396 tb.add_inventory(cn_right, 'DISK_GB', 500)397 tb.add_inventory(cn_right, 'VGPU', 8, max_unit=2)398 tb.set_traits(cn_right, 'HW_CPU_X86_MMX', 'HW_GPU_API_DXVA',399 'CUSTOM_DISK_SSD')400 shr_disk_1 = tb.create_provider(self.context, 'shr_disk_1', uuids.aggA)401 os.environ['SHR_DISK_1'] = shr_disk_1.uuid402 tb.add_inventory(shr_disk_1, 'DISK_GB', 1000)403 tb.set_traits(shr_disk_1, 'MISC_SHARES_VIA_AGGREGATE',404 'CUSTOM_DISK_SSD')405 shr_disk_2 = tb.create_provider(406 self.context, 'shr_disk_2', uuids.aggA, uuids.aggB)407 os.environ['SHR_DISK_2'] = shr_disk_2.uuid408 tb.add_inventory(shr_disk_2, 'DISK_GB', 1000)409 tb.set_traits(shr_disk_2, 'MISC_SHARES_VIA_AGGREGATE')410 shr_net = tb.create_provider(self.context, 'shr_net', uuids.aggC)411 os.environ['SHR_NET'] = shr_net.uuid412 tb.add_inventory(shr_net, 'SRIOV_NET_VF', 16)413 tb.add_inventory(shr_net, 'CUSTOM_NET_MBPS', 40000)414 tb.set_traits(shr_net, 'MISC_SHARES_VIA_AGGREGATE')415class OpenPolicyFixture(APIFixture):416 """An APIFixture that changes all policy rules to allow non-admins."""417 def start_fixture(self):418 super(OpenPolicyFixture, self).start_fixture()419 # Get all of the registered rules and set them to '@' to allow any420 # user to have access. The nova policy "admin_or_owner" concept does421 # not really apply to most of placement resources since they do not422 # have a user_id/project_id attribute.423 rules = {}424 for rule in policies.list_rules():425 name = rule.name426 # Ignore "base" rules for role:admin.427 if name in ['placement', 'admin_api']:428 continue429 rules[name] = '@'430 self.policy_fixture.set_rules(rules)431 def stop_fixture(self):432 super(OpenPolicyFixture, self).stop_fixture()
fixtures.py
Source:fixtures.py
...25class APIFixture(fixture.GabbiFixture):26 """Setup the required backend fixtures for a basic placement service."""27 def __init__(self):28 self.conf = None29 def start_fixture(self):30 # Set up stderr and stdout captures by directly driving the31 # existing nova fixtures that do that. This captures the32 # output that happens outside individual tests (for33 # example database migrations).34 self.standard_logging_fixture = fixtures.StandardLogging()35 self.standard_logging_fixture.setUp()36 self.output_stream_fixture = fixtures.OutputStreamCapture()37 self.output_stream_fixture.setUp()38 self.conf = CONF39 self.conf.set_override('auth_strategy', 'noauth2', group='api')40 # Be explicit about all three database connections to avoid41 # potential conflicts with config on disk.42 self.conf.set_override('connection', "sqlite://", group='database')43 self.conf.set_override('connection', "sqlite://",44 group='api_database')45 self.conf.set_override('connection', "sqlite://",46 group='placement_database')47 # Register CORS opts, but do not set config. This has the48 # effect of exercising the "don't use cors" path in49 # deploy.py. Without setting some config the group will not50 # be present.51 self.conf.register_opts(cors.CORS_OPTS, 'cors')52 # Make sure default_config_files is an empty list, not None.53 # If None /etc/nova/nova.conf is read and confuses results.54 config.parse_args([], default_config_files=[], configure_db=False,55 init_rpc=False)56 # NOTE(cdent): All three database fixtures need to be57 # managed for database handling to work and not cause58 # conflicts with other tests in the same process.59 self._reset_db_flags()60 self.placement_db_fixture = fixtures.Database('placement')61 self.api_db_fixture = fixtures.Database('api')62 self.main_db_fixture = fixtures.Database('main')63 self.placement_db_fixture.reset()64 self.api_db_fixture.reset()65 self.main_db_fixture.reset()66 # Do this now instead of waiting for the WSGI app to start so that67 # fixtures can have traits.68 deploy.update_database()69 os.environ['RP_UUID'] = uuidutils.generate_uuid()70 os.environ['RP_NAME'] = uuidutils.generate_uuid()71 os.environ['CUSTOM_RES_CLASS'] = 'CUSTOM_IRON_NFV'72 os.environ['PROJECT_ID'] = uuidutils.generate_uuid()73 os.environ['USER_ID'] = uuidutils.generate_uuid()74 os.environ['PROJECT_ID_ALT'] = uuidutils.generate_uuid()75 os.environ['USER_ID_ALT'] = uuidutils.generate_uuid()76 os.environ['INSTANCE_UUID'] = uuidutils.generate_uuid()77 os.environ['MIGRATION_UUID'] = uuidutils.generate_uuid()78 os.environ['CONSUMER_UUID'] = uuidutils.generate_uuid()79 os.environ['PARENT_PROVIDER_UUID'] = uuidutils.generate_uuid()80 os.environ['ALT_PARENT_PROVIDER_UUID'] = uuidutils.generate_uuid()81 def stop_fixture(self):82 self.placement_db_fixture.cleanup()83 self.api_db_fixture.cleanup()84 self.main_db_fixture.cleanup()85 # Since we clean up the DB, we need to reset the traits sync86 # flag to make sure the next run will recreate the traits and87 # reset the _RC_CACHE so that any cached resource classes88 # are flushed.89 self._reset_db_flags()90 self.output_stream_fixture.cleanUp()91 self.standard_logging_fixture.cleanUp()92 if self.conf:93 self.conf.reset()94 @staticmethod95 def _reset_db_flags():96 rp_obj._TRAITS_SYNCED = False97 rp_obj._RC_CACHE = None98class AllocationFixture(APIFixture):99 """An APIFixture that has some pre-made Allocations."""100 def start_fixture(self):101 super(AllocationFixture, self).start_fixture()102 self.context = context.get_admin_context()103 # For use creating and querying allocations/usages104 os.environ['ALT_USER_ID'] = uuidutils.generate_uuid()105 project_id = os.environ['PROJECT_ID']106 user_id = os.environ['USER_ID']107 alt_user_id = os.environ['ALT_USER_ID']108 # Stealing from the super109 rp_name = os.environ['RP_NAME']110 rp_uuid = os.environ['RP_UUID']111 rp = rp_obj.ResourceProvider(112 self.context, name=rp_name, uuid=rp_uuid)113 rp.create()114 # Create some DISK_GB inventory and allocations.115 consumer_id = uuidutils.generate_uuid()116 inventory = rp_obj.Inventory(117 self.context, resource_provider=rp,118 resource_class='DISK_GB', total=2048,119 step_size=10, min_unit=10, max_unit=600)120 inventory.obj_set_defaults()121 rp.add_inventory(inventory)122 alloc1 = rp_obj.Allocation(123 self.context, resource_provider=rp,124 resource_class='DISK_GB',125 consumer_id=consumer_id,126 project_id=project_id,127 user_id=user_id,128 used=500)129 alloc2 = rp_obj.Allocation(130 self.context, resource_provider=rp,131 resource_class='DISK_GB',132 consumer_id=consumer_id,133 project_id=project_id,134 user_id=user_id,135 used=500)136 alloc_list = rp_obj.AllocationList(137 self.context,138 objects=[alloc1, alloc2]139 )140 alloc_list.create_all()141 # Create some VCPU inventory and allocations.142 consumer_id = uuidutils.generate_uuid()143 os.environ['CONSUMER_ID'] = consumer_id144 inventory = rp_obj.Inventory(145 self.context, resource_provider=rp,146 resource_class='VCPU', total=10,147 max_unit=4)148 inventory.obj_set_defaults()149 rp.add_inventory(inventory)150 alloc1 = rp_obj.Allocation(151 self.context, resource_provider=rp,152 resource_class='VCPU',153 consumer_id=consumer_id,154 project_id=project_id,155 user_id=user_id,156 used=2)157 alloc2 = rp_obj.Allocation(158 self.context, resource_provider=rp,159 resource_class='VCPU',160 consumer_id=consumer_id,161 project_id=project_id,162 user_id=user_id,163 used=4)164 alloc_list = rp_obj.AllocationList(165 self.context,166 objects=[alloc1, alloc2])167 alloc_list.create_all()168 # Create a couple of allocations for a different user.169 consumer_id = uuidutils.generate_uuid()170 alloc1 = rp_obj.Allocation(171 self.context, resource_provider=rp,172 resource_class='DISK_GB',173 consumer_id=consumer_id,174 project_id=project_id,175 user_id=alt_user_id,176 used=20)177 alloc2 = rp_obj.Allocation(178 self.context, resource_provider=rp,179 resource_class='VCPU',180 consumer_id=consumer_id,181 project_id=project_id,182 user_id=alt_user_id,183 used=1)184 alloc_list = rp_obj.AllocationList(185 self.context,186 objects=[alloc1, alloc2])187 alloc_list.create_all()188 # The ALT_RP_XXX variables are for a resource provider that has189 # not been created in the Allocation fixture190 os.environ['ALT_RP_UUID'] = uuidutils.generate_uuid()191 os.environ['ALT_RP_NAME'] = uuidutils.generate_uuid()192class SharedStorageFixture(APIFixture):193 """An APIFixture that has some two compute nodes without local storage194 associated by aggregate to a provider of shared storage.195 """196 def start_fixture(self):197 super(SharedStorageFixture, self).start_fixture()198 self.context = context.get_admin_context()199 cn1_uuid = uuidutils.generate_uuid()200 cn2_uuid = uuidutils.generate_uuid()201 ss_uuid = uuidutils.generate_uuid()202 agg_uuid = uuidutils.generate_uuid()203 os.environ['CN1_UUID'] = cn1_uuid204 os.environ['CN2_UUID'] = cn2_uuid205 os.environ['SS_UUID'] = ss_uuid206 os.environ['AGG_UUID'] = agg_uuid207 cn1 = rp_obj.ResourceProvider(208 self.context,209 name='cn1',210 uuid=cn1_uuid)211 cn1.create()212 cn2 = rp_obj.ResourceProvider(213 self.context,214 name='cn2',215 uuid=cn2_uuid)216 cn2.create()217 ss = rp_obj.ResourceProvider(218 self.context,219 name='ss',220 uuid=ss_uuid)221 ss.create()222 # Populate compute node inventory for VCPU and RAM223 for cn in (cn1, cn2):224 vcpu_inv = rp_obj.Inventory(225 self.context,226 resource_provider=cn,227 resource_class='VCPU',228 total=24,229 reserved=0,230 max_unit=24,231 min_unit=1,232 step_size=1,233 allocation_ratio=16.0)234 vcpu_inv.obj_set_defaults()235 ram_inv = rp_obj.Inventory(236 self.context,237 resource_provider=cn,238 resource_class='MEMORY_MB',239 total=128 * 1024,240 reserved=0,241 max_unit=128 * 1024,242 min_unit=256,243 step_size=256,244 allocation_ratio=1.5)245 ram_inv.obj_set_defaults()246 inv_list = rp_obj.InventoryList(objects=[vcpu_inv, ram_inv])247 cn.set_inventory(inv_list)248 t_avx_sse = rp_obj.Trait.get_by_name(self.context, "HW_CPU_X86_SSE")249 t_avx_sse2 = rp_obj.Trait.get_by_name(self.context, "HW_CPU_X86_SSE2")250 cn1.set_traits(rp_obj.TraitList(objects=[t_avx_sse, t_avx_sse2]))251 # Populate shared storage provider with DISK_GB inventory252 disk_inv = rp_obj.Inventory(253 self.context,254 resource_provider=ss,255 resource_class='DISK_GB',256 total=2000,257 reserved=100,258 max_unit=2000,259 min_unit=10,260 step_size=10,261 allocation_ratio=1.0)262 disk_inv.obj_set_defaults()263 inv_list = rp_obj.InventoryList(objects=[disk_inv])264 ss.set_inventory(inv_list)265 # Mark the shared storage pool as having inventory shared among any266 # provider associated via aggregate267 t = rp_obj.Trait.get_by_name(268 self.context,269 "MISC_SHARES_VIA_AGGREGATE",270 )271 ss.set_traits(rp_obj.TraitList(objects=[t]))272 # Now associate the shared storage pool and both compute nodes with the273 # same aggregate274 cn1.set_aggregates([agg_uuid])275 cn2.set_aggregates([agg_uuid])276 ss.set_aggregates([agg_uuid])277class NonSharedStorageFixture(APIFixture):278 """An APIFixture that has two compute nodes with local storage that do not279 use shared storage.280 """281 def start_fixture(self):282 super(NonSharedStorageFixture, self).start_fixture()283 self.context = context.get_admin_context()284 cn1_uuid = uuidutils.generate_uuid()285 cn2_uuid = uuidutils.generate_uuid()286 aggA_uuid = uuidutils.generate_uuid()287 aggB_uuid = uuidutils.generate_uuid()288 aggC_uuid = uuidutils.generate_uuid()289 os.environ['CN1_UUID'] = cn1_uuid290 os.environ['CN2_UUID'] = cn2_uuid291 os.environ['AGGA_UUID'] = aggA_uuid292 os.environ['AGGB_UUID'] = aggB_uuid293 os.environ['AGGC_UUID'] = aggC_uuid294 cn1 = rp_obj.ResourceProvider(295 self.context,296 name='cn1',297 uuid=cn1_uuid)298 cn1.create()299 cn2 = rp_obj.ResourceProvider(300 self.context,301 name='cn2',302 uuid=cn2_uuid)303 cn2.create()304 # Populate compute node inventory for VCPU and RAM305 for cn in (cn1, cn2):306 vcpu_inv = rp_obj.Inventory(307 self.context,308 resource_provider=cn,309 resource_class='VCPU',310 total=24,311 reserved=0,312 max_unit=24,313 min_unit=1,314 step_size=1,315 allocation_ratio=16.0)316 vcpu_inv.obj_set_defaults()317 ram_inv = rp_obj.Inventory(318 self.context,319 resource_provider=cn,320 resource_class='MEMORY_MB',321 total=128 * 1024,322 reserved=0,323 max_unit=128 * 1024,324 min_unit=256,325 step_size=256,326 allocation_ratio=1.5)327 ram_inv.obj_set_defaults()328 disk_inv = rp_obj.Inventory(329 self.context,330 resource_provider=cn,331 resource_class='DISK_GB',332 total=2000,333 reserved=100,334 max_unit=2000,335 min_unit=10,336 step_size=10,337 allocation_ratio=1.0)338 disk_inv.obj_set_defaults()339 inv_list = rp_obj.InventoryList(objects=[vcpu_inv, ram_inv,340 disk_inv])341 cn.set_inventory(inv_list)342class CORSFixture(APIFixture):343 """An APIFixture that turns on CORS."""344 def start_fixture(self):345 super(CORSFixture, self).start_fixture()346 # NOTE(cdent): If we remove this override, then the cors347 # group ends up not existing in the conf, so when deploy.py348 # wants to load the CORS middleware, it will not.349 self.conf.set_override('allowed_origin', 'http://valid.example.com',...
fixture.py
Source:fixture.py
...32 happen in ``start_fixture`` and ``stop_fixture`` and not in ``__init__``.33 Otherwise exception handling will not work properly.34 """35 def __enter__(self):36 self.start_fixture()37 def __exit__(self, exc_type, value, traceback):38 self.stop_fixture()39 def start_fixture(self):40 """Implement the actual workings of starting the fixture here."""41 pass42 def stop_fixture(self):43 """Implement the actual workings of stopping the fixture here."""44 pass45class InterceptFixture(GabbiFixture):46 """Start up the wsgi intercept. This should not be called directly."""47 httplib2_intercept.install()48 def __init__(self, host, port, app):49 self.host = host50 self.port = port51 self.app = app52 def start_fixture(self):53 wsgi_intercept.add_wsgi_intercept(self.host, self.port, self.app)54 def stop_fixture(self):55 wsgi_intercept.remove_wsgi_intercept(self.host, self.port)56class SkipAllFixture(GabbiFixture):57 """A fixture that skips all the tests in the current suite."""58 def start_fixture(self):59 raise case.SkipTest('entire suite skipped')60@contextlib.contextmanager61def nest(fixtures):62 """Nest a series of fixtures.63 This is duplicated from ``nested`` in the stdlib, which has been64 deprecated because of issues with how exceptions are difficult to65 handle during ``__init__``. Gabbi needs to nest an unknown number66 of fixtures dynamically, so the ``with`` syntax that replaces67 ``nested`` will not work.68 """69 contexts = []70 exits = []71 exc = (None, None, None)72 try:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!