Best Python code snippet using tempest_python
test_alarm.py
Source:test_alarm.py
...58 self.details_multiple(result)[0]['name'])59 params = "update --state ok"60 result = self.aodh('alarm', params=params,61 fail_ok=True, merge_stderr=True)62 self.assertFirstLineStartsWith(63 result.splitlines(),64 'You need to specify one of alarm ID and alarm name(--name) '65 'to update an alarm.')66 params = "delete %s" % name67 result = self.aodh('alarm', params=params)68 self.assertEqual("", result)69 params = "create --type event --name %s" % name70 result = self.aodh('alarm', params=params)71 alarm_id = self.details_multiple(result)[0]['alarm_id']72 params = "delete %s" % alarm_id73 result = self.aodh('alarm', params=params)74 self.assertEqual("", result)75 _test(uuidutils.generate_uuid())76 _test('normal-alarm-name')77 def test_event_scenario(self):78 PROJECT_ID = uuidutils.generate_uuid()79 # CREATE80 result = self.aodh(u'alarm',81 params=(u"create --type event --name ev_alarm1 "82 "--project-id %s" % PROJECT_ID))83 alarm = self.details_multiple(result)[0]84 ALARM_ID = alarm['alarm_id']85 self.assertEqual('ev_alarm1', alarm['name'])86 self.assertEqual('*', alarm['event_type'])87 # UPDATE IGNORE INVALID88 result = self.aodh(89 'alarm', params=("update %s --severity critical --threshold 10"90 % ALARM_ID))91 alarm_updated = self.details_multiple(result)[0]92 self.assertEqual(ALARM_ID, alarm_updated["alarm_id"])93 self.assertEqual('critical', alarm_updated['severity'])94 # UPDATE IGNORE INVALID95 result = self.aodh(96 'alarm', params=("update %s --event-type dummy" % ALARM_ID))97 alarm_updated = self.details_multiple(result)[0]98 self.assertEqual(ALARM_ID, alarm_updated["alarm_id"])99 self.assertEqual('dummy', alarm_updated['event_type'])100 # GET101 result = self.aodh(102 'alarm', params="show %s" % ALARM_ID)103 alarm_show = self.details_multiple(result)[0]104 self.assertEqual(ALARM_ID, alarm_show["alarm_id"])105 self.assertEqual(PROJECT_ID, alarm_show["project_id"])106 self.assertEqual('ev_alarm1', alarm_show['name'])107 self.assertEqual('dummy', alarm_show['event_type'])108 # GET BY NAME109 result = self.aodh(110 'alarm', params="show --name ev_alarm1")111 alarm_show = self.details_multiple(result)[0]112 self.assertEqual(ALARM_ID, alarm_show["alarm_id"])113 self.assertEqual(PROJECT_ID, alarm_show["project_id"])114 self.assertEqual('ev_alarm1', alarm_show['name'])115 self.assertEqual('dummy', alarm_show['event_type'])116 # GET BY NAME AND ID ERROR117 self.assertRaises(exceptions.CommandFailed,118 self.aodh, u'alarm',119 params=(u"show %s --name ev_alarm1" %120 ALARM_ID))121 # LIST122 result = self.aodh('alarm', params="list")123 self.assertIn(ALARM_ID,124 [r['alarm_id'] for r in self.parser.listing(result)])125 output_colums = ['alarm_id', 'type', 'name', 'state', 'severity',126 'enabled']127 for alarm_list in self.parser.listing(result):128 self.assertEqual(sorted(output_colums), sorted(alarm_list.keys()))129 if alarm_list["alarm_id"] == ALARM_ID:130 self.assertEqual('ev_alarm1', alarm_list['name'])131 # LIST WITH QUERY132 result = self.aodh('alarm',133 params=("list --query project_id=%s" % PROJECT_ID))134 alarm_list = self.parser.listing(result)[0]135 self.assertEqual(ALARM_ID, alarm_list["alarm_id"])136 self.assertEqual('ev_alarm1', alarm_list['name'])137 # DELETE138 result = self.aodh('alarm', params="delete %s" % ALARM_ID)139 self.assertEqual("", result)140 # GET FAIL141 result = self.aodh('alarm', params="show %s" % ALARM_ID,142 fail_ok=True, merge_stderr=True)143 expected = "Alarm %s not found (HTTP 404)" % ALARM_ID144 self.assertFirstLineStartsWith(result.splitlines(), expected)145 # DELETE FAIL146 result = self.aodh('alarm', params="delete %s" % ALARM_ID,147 fail_ok=True, merge_stderr=True)148 self.assertFirstLineStartsWith(result.splitlines(), expected)149 # LIST DOES NOT HAVE ALARM150 result = self.aodh('alarm', params="list")151 self.assertNotIn(ALARM_ID,152 [r['alarm_id'] for r in self.parser.listing(result)])153 def test_threshold_scenario(self):154 PROJECT_ID = uuidutils.generate_uuid()155 # CREATE156 result = self.aodh(u'alarm',157 params=(u"create --type threshold --name alarm_th "158 "-m meter_name --threshold 5 "159 "--project-id %s" % PROJECT_ID))160 alarm = self.details_multiple(result)[0]161 ALARM_ID = alarm['alarm_id']162 self.assertEqual('alarm_th', alarm['name'])163 self.assertEqual('meter_name', alarm['meter_name'])164 self.assertEqual('5.0', alarm['threshold'])165 # CREATE WITH --TIME-CONSTRAINT166 result = self.aodh(167 u'alarm',168 params=(u"create --type threshold --name alarm_tc "169 "-m meter_name --threshold 5 "170 "--time-constraint "171 "name=cons1;start='0 11 * * *';duration=300 "172 "--time-constraint "173 "name=cons2;start='0 23 * * *';duration=600 "174 "--project-id %s" % PROJECT_ID))175 alarm = self.details_multiple(result)[0]176 self.assertEqual('alarm_tc', alarm['name'])177 self.assertEqual('meter_name', alarm['meter_name'])178 self.assertEqual('5.0', alarm['threshold'])179 self.assertIsNotNone(alarm['time_constraints'])180 # CREATE FAIL MISSING PARAM181 self.assertRaises(exceptions.CommandFailed,182 self.aodh, u'alarm',183 params=(u"create --type threshold --name alarm_th "184 "--project-id %s" % PROJECT_ID))185 # UPDATE186 result = self.aodh(187 'alarm', params=("update %s --severity critical --threshold 10"188 % ALARM_ID))189 alarm_updated = self.details_multiple(result)[0]190 self.assertEqual(ALARM_ID, alarm_updated["alarm_id"])191 self.assertEqual('critical', alarm_updated['severity'])192 self.assertEqual('10.0', alarm_updated["threshold"])193 # GET194 result = self.aodh(195 'alarm', params="show %s" % ALARM_ID)196 alarm_show = self.details_multiple(result)[0]197 self.assertEqual(ALARM_ID, alarm_show["alarm_id"])198 self.assertEqual(PROJECT_ID, alarm_show["project_id"])199 self.assertEqual('alarm_th', alarm_show['name'])200 self.assertEqual('meter_name', alarm_show['meter_name'])201 self.assertEqual('10.0', alarm_show['threshold'])202 # GET BY NAME203 result = self.aodh(204 'alarm', params="show --name alarm_th")205 alarm_show = self.details_multiple(result)[0]206 self.assertEqual(ALARM_ID, alarm_show["alarm_id"])207 self.assertEqual(PROJECT_ID, alarm_show["project_id"])208 self.assertEqual('alarm_th', alarm_show['name'])209 self.assertEqual('meter_name', alarm_show['meter_name'])210 self.assertEqual('10.0', alarm_show['threshold'])211 # GET BY NAME AND ID ERROR212 self.assertRaises(exceptions.CommandFailed,213 self.aodh, u'alarm',214 params=(u"show %s --name alarm_th" %215 ALARM_ID))216 # LIST217 result = self.aodh('alarm', params="list")218 self.assertIn(ALARM_ID,219 [r['alarm_id'] for r in self.parser.listing(result)])220 output_colums = ['alarm_id', 'type', 'name', 'state', 'severity',221 'enabled']222 for alarm_list in self.parser.listing(result):223 self.assertEqual(sorted(output_colums), sorted(alarm_list.keys()))224 if alarm_list["alarm_id"] == ALARM_ID:225 self.assertEqual('alarm_th', alarm_list['name'])226 # LIST WITH PAGINATION227 # list with limit228 result = self.aodh('alarm',229 params="list --limit 1")230 alarm_list = self.parser.listing(result)231 self.assertEqual(1, len(alarm_list))232 # list with sort with key=name dir=asc233 result = self.aodh('alarm',234 params="list --sort name:asc")235 names = [r['name'] for r in self.parser.listing(result)]236 sorted_name = sorted(names)237 self.assertEqual(sorted_name, names)238 # list with sort with key=name dir=asc and key=alarm_id dir=asc239 result = self.aodh(u'alarm',240 params=(u"create --type threshold --name alarm_th "241 "-m meter_name --threshold 5 "242 "--project-id %s" % PROJECT_ID))243 created_alarm_id = self.details_multiple(result)[0]['alarm_id']244 result = self.aodh('alarm',245 params="list --sort name:asc --sort alarm_id:asc")246 alarm_list = self.parser.listing(result)247 ids_with_same_name = []248 names = []249 for alarm in alarm_list:250 names.append(['alarm_name'])251 if alarm['name'] == 'alarm_th':252 ids_with_same_name.append(alarm['alarm_id'])253 sorted_ids = sorted(ids_with_same_name)254 sorted_names = sorted(names)255 self.assertEqual(sorted_names, names)256 self.assertEqual(sorted_ids, ids_with_same_name)257 # list with sort with key=name dir=desc and with the marker equal to258 # the alarm_id of the alarm_th we created for this test.259 result = self.aodh('alarm',260 params="list --sort name:desc "261 "--marker %s" % created_alarm_id)262 self.assertIn('alarm_tc',263 [r['name'] for r in self.parser.listing(result)])264 self.aodh('alarm', params="delete %s" % created_alarm_id)265 # LIST WITH QUERY266 result = self.aodh('alarm',267 params=("list --query project_id=%s" % PROJECT_ID))268 alarm_list = self.parser.listing(result)[0]269 self.assertEqual(ALARM_ID, alarm_list["alarm_id"])270 self.assertEqual('alarm_th', alarm_list['name'])271 # DELETE272 result = self.aodh('alarm', params="delete %s" % ALARM_ID)273 self.assertEqual("", result)274 # GET FAIL275 result = self.aodh('alarm', params="show %s" % ALARM_ID,276 fail_ok=True, merge_stderr=True)277 expected = "Alarm %s not found (HTTP 404)" % ALARM_ID278 self.assertFirstLineStartsWith(result.splitlines(), expected)279 # DELETE FAIL280 result = self.aodh('alarm', params="delete %s" % ALARM_ID,281 fail_ok=True, merge_stderr=True)282 self.assertFirstLineStartsWith(result.splitlines(), expected)283 # LIST DOES NOT HAVE ALARM284 result = self.aodh('alarm', params="list")285 self.assertNotIn(ALARM_ID,286 [r['alarm_id'] for r in self.parser.listing(result)])287 def test_composite_scenario(self):288 project_id = uuidutils.generate_uuid()289 # CREATE290 result = self.aodh(u'alarm',291 params=(u'create --type composite --name calarm1 '292 ' --composite-rule \'{"or":[{"threshold"'293 ': 0.8,"meter_name": "cpu_util",'294 '"type": "threshold"},{"and": ['295 '{"threshold": 200, "meter_name": '296 '"disk.iops", "type": "threshold"},'297 '{"threshold": 1000,"meter_name":'298 '"network.incoming.packets.rate",'299 '"type": "threshold"}]}]}\' '300 '--project-id %s' % project_id))301 alarm = self.details_multiple(result)[0]302 alarm_id = alarm['alarm_id']303 self.assertEqual('calarm1', alarm['name'])304 self.assertEqual('composite', alarm['type'])305 self.assertIn('composite_rule', alarm)306 # CREATE FAIL MISSING PARAM307 self.assertRaises(exceptions.CommandFailed,308 self.aodh, u'alarm',309 params=(u"create --type composite --name calarm1 "310 "--project-id %s" % project_id))311 # UPDATE312 result = self.aodh(313 'alarm', params=("update %s --severity critical" % alarm_id))314 alarm_updated = self.details_multiple(result)[0]315 self.assertEqual(alarm_id, alarm_updated["alarm_id"])316 self.assertEqual('critical', alarm_updated['severity'])317 # GET318 result = self.aodh(319 'alarm', params="show %s" % alarm_id)320 alarm_show = self.details_multiple(result)[0]321 self.assertEqual(alarm_id, alarm_show["alarm_id"])322 self.assertEqual(project_id, alarm_show["project_id"])323 self.assertEqual('calarm1', alarm_show['name'])324 # GET BY NAME325 result = self.aodh(326 'alarm', params="show --name calarm1")327 alarm_show = self.details_multiple(result)[0]328 self.assertEqual(alarm_id, alarm_show["alarm_id"])329 self.assertEqual(project_id, alarm_show["project_id"])330 self.assertEqual('calarm1', alarm_show['name'])331 # GET BY NAME AND ID ERROR332 self.assertRaises(exceptions.CommandFailed,333 self.aodh, u'alarm',334 params=(u"show %s --name calarm1" %335 alarm_id))336 # LIST337 result = self.aodh('alarm', params="list")338 self.assertIn(alarm_id,339 [r['alarm_id'] for r in self.parser.listing(result)])340 output_colums = ['alarm_id', 'type', 'name', 'state', 'severity',341 'enabled']342 for alarm_list in self.parser.listing(result):343 self.assertEqual(sorted(output_colums), sorted(alarm_list.keys()))344 if alarm_list["alarm_id"] == alarm_id:345 self.assertEqual('calarm1', alarm_list['name'])346 # LIST WITH QUERY347 result = self.aodh('alarm',348 params=("list --query project_id=%s" % project_id))349 alarm_list = self.parser.listing(result)[0]350 self.assertEqual(alarm_id, alarm_list["alarm_id"])351 self.assertEqual('calarm1', alarm_list['name'])352 # DELETE353 result = self.aodh('alarm', params="delete %s" % alarm_id)354 self.assertEqual("", result)355 # GET FAIL356 result = self.aodh('alarm', params="show %s" % alarm_id,357 fail_ok=True, merge_stderr=True)358 expected = "Alarm %s not found (HTTP 404)" % alarm_id359 self.assertFirstLineStartsWith(result.splitlines(), expected)360 # DELETE FAIL361 result = self.aodh('alarm', params="delete %s" % alarm_id,362 fail_ok=True, merge_stderr=True)363 self.assertFirstLineStartsWith(result.splitlines(), expected)364 # LIST DOES NOT HAVE ALARM365 result = self.aodh('alarm', params="list")366 self.assertNotIn(alarm_id,367 [r['alarm_id'] for r in self.parser.listing(result)])368 def _test_alarm_create_show_query(self, create_params, expected_lines):369 def test(params):370 result = self.aodh('alarm', params=params)371 alarm = self.details_multiple(result)[0]372 for key, value in six.iteritems(expected_lines):373 self.assertEqual(value, alarm[key])374 return alarm375 alarm = test(create_params)376 params = 'show %s' % alarm['alarm_id']377 test(params)378 self.aodh('alarm', params='delete %s' % alarm['alarm_id'])379 def test_threshold_alarm_create_show_query(self):380 params = ('create --type threshold --name alarm-multiple-query '381 '-m cpu_util --threshold 90 --query "project_id=123;'382 'resource_id=456"')383 expected_lines = {384 'query': 'project_id = 123 AND',385 '': 'resource_id = 456'386 }387 self._test_alarm_create_show_query(params, expected_lines)388 params = ('create --type threshold --name alarm-single-query '389 '-m cpu_util --threshold 90 --query project_id=123')390 expected_lines = {'query': 'project_id = 123'}391 self._test_alarm_create_show_query(params, expected_lines)392 params = ('create --type threshold --name alarm-no-query '393 '-m cpu_util --threshold 90')394 self._test_alarm_create_show_query(params, {'query': ''})395 def test_event_alarm_create_show_query(self):396 params = ('create --type event --name alarm-multiple-query '397 '--query "traits.project_id=789;traits.resource_id=012"')398 expected_lines = {399 'query': 'traits.project_id = 789 AND',400 '': 'traits.resource_id = 012',401 }402 self._test_alarm_create_show_query(params, expected_lines)403 params = ('create --type event --name alarm-single-query '404 '--query "traits.project_id=789"')405 expected_lines = {'query': 'traits.project_id = 789'}406 self._test_alarm_create_show_query(params, expected_lines)407 params = 'create --type event --name alarm-no-query'408 self._test_alarm_create_show_query(params, {'query': ''})409 def test_set_get_alarm_state(self):410 result = self.aodh(411 'alarm',412 params=("create --type threshold --name alarm_state_test "413 "-m meter_name --threshold 5"))414 alarm = self.details_multiple(result)[0]415 alarm_id = alarm['alarm_id']416 result = self.aodh(417 'alarm', params="show %s" % alarm_id)418 alarm_show = self.details_multiple(result)[0]419 self.assertEqual('insufficient data', alarm_show['state'])420 result = self.aodh('alarm', params="state get %s" % alarm_id)421 state_get = self.details_multiple(result)[0]422 self.assertEqual('insufficient data', state_get['state'])423 self.aodh('alarm',424 params="state set --state ok %s" % alarm_id)425 result = self.aodh('alarm', params="state get %s" % alarm_id)426 state_get = self.details_multiple(result)[0]427 self.assertEqual('ok', state_get['state'])428 self.aodh('alarm', params='delete %s' % alarm_id)429class AodhClientGnocchiRulesTest(base.ClientTestBase):430 def test_gnocchi_resources_threshold_scenario(self):431 PROJECT_ID = uuidutils.generate_uuid()432 RESOURCE_ID = uuidutils.generate_uuid()433 req = requests.post(434 os.environ.get("GNOCCHI_ENDPOINT") + "/v1/resource/generic",435 auth=requests.auth.HTTPBasicAuth('admin', ''),436 json={437 "id": RESOURCE_ID,438 })439 self.assertEqual(201, req.status_code)440 # CREATE441 result = self.aodh(u'alarm',442 params=(u"create "443 "--type gnocchi_resources_threshold "444 "--name alarm_gn1 --metric cpu_util "445 "--threshold 80 "446 "--resource-id %s --resource-type generic "447 "--aggregation-method last "448 "--project-id %s"449 % (RESOURCE_ID, PROJECT_ID)))450 alarm = self.details_multiple(result)[0]451 ALARM_ID = alarm['alarm_id']452 self.assertEqual('alarm_gn1', alarm['name'])453 self.assertEqual('cpu_util', alarm['metric'])454 self.assertEqual('80.0', alarm['threshold'])455 self.assertEqual('last', alarm['aggregation_method'])456 self.assertEqual(RESOURCE_ID,457 alarm['resource_id'])458 self.assertEqual('generic', alarm['resource_type'])459 # CREATE FAIL MISSING PARAM460 self.assertRaises(exceptions.CommandFailed,461 self.aodh, u'alarm',462 params=(u"create "463 "--type gnocchi_resources_threshold "464 "--name alarm1 --metric cpu_util "465 "--resource-id %s --resource-type generic "466 "--aggregation-method last "467 "--project-id %s"468 % (RESOURCE_ID, PROJECT_ID)))469 # UPDATE470 result = self.aodh(471 'alarm', params=("update %s --severity critical --threshold 90"472 % ALARM_ID))473 alarm_updated = self.details_multiple(result)[0]474 self.assertEqual(ALARM_ID, alarm_updated["alarm_id"])475 self.assertEqual('critical', alarm_updated['severity'])476 self.assertEqual('90.0', alarm_updated["threshold"])477 # GET478 result = self.aodh(479 'alarm', params="show %s" % ALARM_ID)480 alarm_show = self.details_multiple(result)[0]481 self.assertEqual(ALARM_ID, alarm_show["alarm_id"])482 self.assertEqual(PROJECT_ID, alarm_show["project_id"])483 self.assertEqual('alarm_gn1', alarm_show['name'])484 self.assertEqual('cpu_util', alarm_show['metric'])485 self.assertEqual('90.0', alarm_show['threshold'])486 self.assertEqual('critical', alarm_show['severity'])487 self.assertEqual('last', alarm_show['aggregation_method'])488 self.assertEqual('generic', alarm_show['resource_type'])489 # GET BY NAME490 result = self.aodh(491 'alarm', params="show --name alarm_gn1")492 self.assertEqual(ALARM_ID, alarm_show["alarm_id"])493 self.assertEqual(PROJECT_ID, alarm_show["project_id"])494 self.assertEqual('alarm_gn1', alarm_show['name'])495 self.assertEqual('cpu_util', alarm_show['metric'])496 self.assertEqual('90.0', alarm_show['threshold'])497 self.assertEqual('critical', alarm_show['severity'])498 self.assertEqual('last', alarm_show['aggregation_method'])499 self.assertEqual('generic', alarm_show['resource_type'])500 # GET BY NAME AND ID ERROR501 self.assertRaises(exceptions.CommandFailed,502 self.aodh, u'alarm',503 params=(u"show %s --name alarm_gn1" %504 ALARM_ID))505 # LIST506 result = self.aodh('alarm', params="list")507 self.assertIn(ALARM_ID,508 [r['alarm_id'] for r in self.parser.listing(result)])509 output_colums = ['alarm_id', 'type', 'name', 'state', 'severity',510 'enabled']511 for alarm_list in self.parser.listing(result):512 self.assertEqual(sorted(output_colums), sorted(alarm_list.keys()))513 if alarm_list["alarm_id"] == ALARM_ID:514 self.assertEqual('alarm_gn1', alarm_list['name'])515 # LIST WITH QUERY516 result = self.aodh('alarm',517 params=("list --query project_id=%s" % PROJECT_ID))518 alarm_list = self.parser.listing(result)[0]519 self.assertEqual(ALARM_ID, alarm_list["alarm_id"])520 self.assertEqual('alarm_gn1', alarm_list['name'])521 # DELETE522 result = self.aodh('alarm', params="delete %s" % ALARM_ID)523 self.assertEqual("", result)524 # GET FAIL525 result = self.aodh('alarm', params="show %s" % ALARM_ID,526 fail_ok=True, merge_stderr=True)527 expected = "Alarm %s not found (HTTP 404)" % ALARM_ID528 self.assertFirstLineStartsWith(result.splitlines(), expected)529 # DELETE FAIL530 result = self.aodh('alarm', params="delete %s" % ALARM_ID,531 fail_ok=True, merge_stderr=True)532 self.assertFirstLineStartsWith(result.splitlines(), expected)533 # LIST DOES NOT HAVE ALARM534 result = self.aodh('alarm', params="list")535 self.assertNotIn(ALARM_ID,536 [r['alarm_id'] for r in self.parser.listing(result)])537 def test_gnocchi_aggr_by_resources_scenario(self):538 PROJECT_ID = uuidutils.generate_uuid()539 # CREATE540 result = self.aodh(541 u'alarm',542 params=(u"create "543 "--type "544 "gnocchi_aggregation_by_resources_threshold "545 "--name alarm1 --metric cpu --threshold 80 "546 "--query "547 '\'{"=": {"creator": "cr3at0r"}}\' '548 "--resource-type generic "549 "--aggregation-method last "550 "--project-id %s" % PROJECT_ID))551 alarm = self.details_multiple(result)[0]552 ALARM_ID = alarm['alarm_id']553 self.assertEqual('alarm1', alarm['name'])554 self.assertEqual('cpu', alarm['metric'])555 self.assertEqual('80.0', alarm['threshold'])556 self.assertEqual('last', alarm['aggregation_method'])557 self.assertEqual('generic', alarm['resource_type'])558 self.assertEqual('{"=": {"creator": "cr3at0r"}}',559 alarm['query'])560 # CREATE FAIL MISSING PARAM561 self.assertRaises(562 exceptions.CommandFailed,563 self.aodh, u'alarm',564 params=(u"create "565 "--type "566 "gnocchi_aggregation_by_resources_threshold "567 "--name alarm1 --metric cpu "568 "--query "569 '\'{"=": {"creator": "cr3at0r"}}\' '570 "--resource-type generic "571 "--aggregation-method last "572 "--project-id %s" % PROJECT_ID))573 # UPDATE574 result = self.aodh(575 'alarm', params=("update %s --severity critical --threshold 90"576 % ALARM_ID))577 alarm_updated = self.details_multiple(result)[0]578 self.assertEqual(ALARM_ID, alarm_updated["alarm_id"])579 self.assertEqual('critical', alarm_updated['severity'])580 self.assertEqual('90.0', alarm_updated["threshold"])581 # GET582 result = self.aodh(583 'alarm', params="show %s" % ALARM_ID)584 alarm_show = self.details_multiple(result)[0]585 self.assertEqual(ALARM_ID, alarm_show["alarm_id"])586 self.assertEqual(PROJECT_ID, alarm_show["project_id"])587 self.assertEqual('alarm1', alarm_show['name'])588 self.assertEqual('cpu', alarm_show['metric'])589 self.assertEqual('90.0', alarm_show['threshold'])590 self.assertEqual('critical', alarm_show['severity'])591 self.assertEqual('last', alarm_show['aggregation_method'])592 self.assertEqual('generic', alarm_show['resource_type'])593 # LIST594 result = self.aodh('alarm', params="list")595 self.assertIn(ALARM_ID,596 [r['alarm_id'] for r in self.parser.listing(result)])597 output_colums = ['alarm_id', 'type', 'name', 'state', 'severity',598 'enabled']599 for alarm_list in self.parser.listing(result):600 self.assertEqual(sorted(output_colums), sorted(alarm_list.keys()))601 if alarm_list["alarm_id"] == ALARM_ID:602 self.assertEqual('alarm1', alarm_list['name'])603 # DELETE604 result = self.aodh('alarm', params="delete %s" % ALARM_ID)605 self.assertEqual("", result)606 # GET FAIL607 result = self.aodh('alarm', params="show %s" % ALARM_ID,608 fail_ok=True, merge_stderr=True)609 expected = "Alarm %s not found (HTTP 404)" % ALARM_ID610 self.assertFirstLineStartsWith(result.splitlines(), expected)611 # DELETE FAIL612 result = self.aodh('alarm', params="delete %s" % ALARM_ID,613 fail_ok=True, merge_stderr=True)614 self.assertFirstLineStartsWith(result.splitlines(), expected)615 # LIST DOES NOT HAVE ALARM616 result = self.aodh('alarm', params="list")617 self.assertNotIn(ALARM_ID,618 [r['alarm_id'] for r in self.parser.listing(result)])619 def test_gnocchi_aggr_by_metrics_scenario(self):620 PROJECT_ID = uuidutils.generate_uuid()621 METRIC1 = 'cpu'622 METRIC2 = 'cpu_util'623 # CREATE624 result = self.aodh(625 u'alarm',626 params=(u"create "627 "--type gnocchi_aggregation_by_metrics_threshold "628 "--name alarm1 "629 "--metrics %s "630 "--metrics %s "631 "--threshold 80 "632 "--aggregation-method last "633 "--project-id %s"634 % (METRIC1, METRIC2, PROJECT_ID)))635 alarm = self.details_multiple(result)[0]636 ALARM_ID = alarm['alarm_id']637 self.assertEqual('alarm1', alarm['name'])638 metrics = "[u'cpu', u'cpu_util']" if six.PY2 else "['cpu', 'cpu_util']"639 self.assertEqual(metrics, alarm['metrics'])640 self.assertEqual('80.0', alarm['threshold'])641 self.assertEqual('last', alarm['aggregation_method'])642 # CREATE FAIL MISSING PARAM643 self.assertRaises(644 exceptions.CommandFailed,645 self.aodh, u'alarm',646 params=(u"create "647 "--type gnocchi_aggregation_by_metrics_threshold "648 "--name alarm1 "649 "--metrics %s "650 "--metrics %s "651 "--aggregation-method last "652 "--project-id %s"653 % (METRIC1, METRIC2, PROJECT_ID)))654 # UPDATE655 result = self.aodh(656 'alarm', params=("update %s --severity critical --threshold 90"657 % ALARM_ID))658 alarm_updated = self.details_multiple(result)[0]659 self.assertEqual(ALARM_ID, alarm_updated["alarm_id"])660 self.assertEqual('critical', alarm_updated['severity'])661 self.assertEqual('90.0', alarm_updated["threshold"])662 # GET663 result = self.aodh(664 'alarm', params="show %s" % ALARM_ID)665 alarm_show = self.details_multiple(result)[0]666 self.assertEqual(ALARM_ID, alarm_show["alarm_id"])667 self.assertEqual(PROJECT_ID, alarm_show["project_id"])668 self.assertEqual('alarm1', alarm_show['name'])669 self.assertEqual(metrics, alarm_show['metrics'])670 self.assertEqual('90.0', alarm_show['threshold'])671 self.assertEqual('critical', alarm_show['severity'])672 self.assertEqual('last', alarm_show['aggregation_method'])673 # LIST674 result = self.aodh('alarm', params="list")675 self.assertIn(ALARM_ID,676 [r['alarm_id'] for r in self.parser.listing(result)])677 for alarm_list in self.parser.listing(result):678 if alarm_list["alarm_id"] == ALARM_ID:679 self.assertEqual('alarm1', alarm_list['name'])680 # LIST WITH QUERY681 result = self.aodh('alarm',682 params=("list --query project_id=%s" % PROJECT_ID))683 alarm_list = self.parser.listing(result)[0]684 self.assertEqual(ALARM_ID, alarm_list["alarm_id"])685 self.assertEqual('alarm1', alarm_list['name'])686 # DELETE687 result = self.aodh('alarm', params="delete %s" % ALARM_ID)688 self.assertEqual("", result)689 # GET FAIL690 result = self.aodh('alarm', params="show %s" % ALARM_ID,691 fail_ok=True, merge_stderr=True)692 expected = "Alarm %s not found (HTTP 404)" % ALARM_ID693 self.assertFirstLineStartsWith(result.splitlines(), expected)694 # DELETE FAIL695 result = self.aodh('alarm', params="delete %s" % ALARM_ID,696 fail_ok=True, merge_stderr=True)697 self.assertFirstLineStartsWith(result.splitlines(), expected)698 # LIST DOES NOT HAVE ALARM699 result = self.aodh('alarm', params="list")700 output_colums = ['alarm_id', 'type', 'name', 'state', 'severity',701 'enabled']702 for alarm_list in self.parser.listing(result):703 self.assertEqual(sorted(output_colums), sorted(alarm_list.keys()))704 self.assertNotIn(ALARM_ID,...
client_testcase.py
Source:client_testcase.py
...157 def setUp(self):158 super(CLIClientTestCase, self).setUp()159 self.clients = self._get_clients()160 self.parser = cli_output_parser161 def assertFirstLineStartsWith(self, lines, beginning):162 """assertFirstLineStartsWith163 Verify that the first line starts with a string164 :param lines: strings for each line of output165 :type lines: list166 :param beginning: verify this is at the beginning of the first line167 :type beginning: string168 """169 self.assertTrue(lines[0].startswith(beginning),170 msg=('Beginning of first line has invalid content: %s'171 % lines[:3]))172 def assertCommandFailed(self, message, fun, *args, **kwds):173 self.assertRaisesRegex(exceptions.CommandFailed, message,174 fun, *args, **kwds)175 def _get_clients(self):176 if self.me == Role.admin:177 self.cli = self.admin_cli178 elif self.me == Role.tenant:179 self.cli = self.tenant_cli180 else:181 self.cli = self.nonadmin_cli182 return self.cli183 def _as_admin(self):184 self.me = Role.admin185 self._get_clients()186 return self187 def _as_tenant(self):188 self.me = Role.tenant189 self._get_clients()190 def create_network_with_args(self, *args):191 """Wrapper utility that returns a test network."""192 the_params = ''193 for arg in args:194 the_params += ' '195 the_params += arg196 response = self.cli.neutron('net-create', params=the_params)197 self.assertFirstLineStartsWith(response.split('\n'),198 'Created a new network:')199 network = self.parser.details(response)200 self.networks.append(network)201 return network202 def create_network(self, network_name=None):203 """Wrapper utility that returns a test network."""204 network_name = network_name or data_utils.rand_name('test-network')205 return self.create_network_with_args(network_name)206 def delete_network(self, network_id):207 response = self.cli.neutron('net-delete', params=network_id)208 self.assertFirstLineStartsWith(response.split('\n'),209 'Deleted network')210 self.assertIn(network_id, response)211 def show_network(self, network_id):212 response = self.cli.neutron('net-show', params=network_id)213 network = self.parser.details(response)214 self.assertEqual(network['id'], network_id)215 return network216 def list_networks(self):217 response = self.cli.neutron('net-list')218 return response219 def show_subnet(self, subnet_id):220 response = self.cli.neutron('subnet-show', params=subnet_id)221 subnet = self.parser.details(response)222 self.assertEqual(subnet['id'], subnet_id)223 return subnet224 def delete_subnet(self, subnet_id):225 response = self.cli.neutron('subnet-delete', params=subnet_id)226 self.assertFirstLineStartsWith(response.split('\n'),227 'Deleted subnet')228 self.assertIn(subnet_id, response)229 def list_subnets(self):230 response = self.cli.neutron('subnet-list')231 return response232 def create_subnet_with_args(self, *args):233 """Wrapper utility that returns a test subnet."""234 the_params = ''235 for arg in args:236 the_params += ' '237 the_params += arg238 response = self.cli.neutron('subnet-create', params=the_params)239 self.assertFirstLineStartsWith(response.split('\n'),240 'Created a new subnet:')241 subnet = self.parser.details(response)242 self.subnets.append(subnet)243 return subnet244 def update_subnet_with_args(self, *args):245 """Wrapper utility that updates returns a test subnet."""246 the_params = ''247 for arg in args:248 the_params += ' '249 the_params += arg250 response = self.cli.neutron('subnet-update', params=the_params)251 self.assertFirstLineStartsWith(response.split('\n'),252 'Updated subnet:')253 def create_router_with_args(self, *args):254 """Wrapper utility that returns a test router."""255 the_params = ''256 for arg in args:257 the_params += ' '258 the_params += arg259 response = self.cli.neutron('router-create', params=the_params)260 self.assertFirstLineStartsWith(response.split('\n'),261 'Created a new router:')262 router = self.parser.details(response)263 self.routers.append(router)264 return router265 def create_router(self, router_name=None):266 """Wrapper utility that returns a test router."""267 router_name = router_name or data_utils.rand_name('test-router')268 return self.create_router_with_args(router_name)269 def update_router_with_args(self, *args):270 """Wrapper utility that returns a test router."""271 the_params = ''272 for arg in args:273 the_params += ' '274 the_params += arg275 response = self.cli.neutron('router-update', params=the_params)276 self.assertFirstLineStartsWith(response.split('\n'),277 'Updated router:')278 # router = self.parser.details(response)279 # return router280 def show_router(self, router_id):281 response = self.cli.neutron('router-show', params=router_id)282 router = self.parser.details(response)283 self.assertEqual(router['id'], router_id)284 return router285 def list_routers(self):286 response = self.cli.neutron('router-list')287 return response288 def set_router_gateway_with_args(self, *args):289 """Wrapper utility that sets the router gateway."""290 the_params = ''291 for arg in args:292 the_params += ' '293 the_params += arg294 response = self.cli.neutron('router-gateway-set', params=the_params)295 self.assertFirstLineStartsWith(response.split('\n'),296 'Set gateway for router')297 def add_router_interface_with_args(self, *args):298 """Wrapper utility that sets the router gateway."""299 the_params = ''300 for arg in args:301 the_params += ' '302 the_params += arg303 response = self.cli.neutron('router-interface-add', params=the_params)304 self.assertFirstLineStartsWith(response.split('\n'),305 'Added interface')306 def create_port_with_args(self, *args):307 """Wrapper utility that returns a test port."""308 the_params = ''309 for arg in args:310 the_params += ' '311 the_params += arg312 if (CONF.network.port_vnic_type == 'direct' and313 'switchdev' in CONF.network.port_profile.get('capabilities',314 [])):315 the_params += ' '316 the_params += '--vnic-type direct'317 the_params += ' '318 the_params += ('--binding:profile type=dict'319 ' capabilities=[switchdev]')320 response = self.cli.neutron('port-create', params=the_params)321 self.assertFirstLineStartsWith(response.split('\n'),322 'Created a new port:')323 port = self.parser.details(response)324 self.ports.append(port)325 return port326 def create_port(self, network, port_name=None):327 """Wrapper utility that returns a test port."""328 port_name = port_name or data_utils.rand_name('cli-test-port-')329 response = self.create_port_with_args("--name ", port_name,330 network['id'])331 return response332 def update_port_with_args(self, port_id, *args):333 the_params = ''334 for arg in args:335 the_params += ' '336 the_params += arg337 response = self.cli.neutron('port-update ',338 params=port_id + ' ' + the_params)339 self.assertFirstLineStartsWith(response.split('\n'), 'Updated port:')340 def show_port(self, port_id):341 response = self.cli.neutron('port-show', params=port_id)342 port = self.parser.details(response)343 self.assertEqual(port['id'], port_id)344 return port345 def create_floating_ip_with_args(self, *args):346 """Wrapper utility that returns a test floating_ip."""347 the_params = ''348 for arg in args:349 the_params += ' '350 the_params += arg351 response = self.cli.neutron('floatingip-create',352 params=the_params)353 self.assertFirstLineStartsWith(response.split('\n'),354 'Created a new floatingip:')355 floating_ip = self.parser.details(response)356 self.floating_ips.append(floating_ip)357 return floating_ip358 def update_floating_ip_with_args(self, *args):359 """Wrapper utility that returns a test floating_ip."""360 the_params = ''361 for arg in args:362 the_params += ' '363 the_params += arg364 response = self.cli.neutron('floatingip-update', params=the_params)365 self.assertFirstLineStartsWith(response.split('\n'),366 'Updated floatingip:')367 def create_floating_ip(self, floating_ip_name=None):368 """Wrapper utility that returns a test floating_ip."""369 floating_ip_name = floating_ip_name or data_utils.rand_name(370 'test-floating_ip')371 return self.create_floating_ip_with_args(floating_ip_name)372 def show_floating_ip(self, floating_ip_id):373 response = self.cli.neutron('floatingip-show', params=floating_ip_id)374 floating_ip = self.parser.details(response)375 return floating_ip376 def list_nuage_floating_ip_all(self):377 response = self.cli.neutron('nuage-floatingip-list')378 nuage_floating_ip_list = self.parser.details(response)379 return nuage_floating_ip_list380 def list_nuage_floating_ip_for_subnet(self, subnet_id):381 response = self.cli.neutron('nuage-floatingip-show --subnet ',382 params=subnet_id)383 nuage_floating_ip_list = self.parser.details(response)384 return nuage_floating_ip_list385 def list_nuage_floating_ip_for_port(self, port_id):386 response = self.cli.neutron('nuage-floatingip-show --subnet ',387 params=port_id)388 nuage_floating_ip_list = self.parser.details(response)389 return nuage_floating_ip_list390 def show_nuage_floating_ip(self, floating_ip_id):391 response = self.cli.neutron('floatingip-show', params=floating_ip_id)392 floating_ip = self.parser.details(response)393 return floating_ip394 def _kwargs_to_cli(self, **kwargs):395 params_str = ''396 if kwargs is not None:397 for key, value in iteritems(kwargs):398 print("%s == %s", (key, value))399 params_str += " --%s %s" % (key, value)400 params_str = params_str.replace("_", "-")401 return params_str402 def associate_floating_ip(self, floating_ip_id, port_id, **kwargs):403 the_params = self._kwargs_to_cli(**kwargs)404 return self.cli.neutron('floatingip-associate',405 params=floating_ip_id + ' ' +406 port_id + the_params)407 def disassociate_floating_ip(self, floating_ip_id):408 return self.cli.neutron('floatingip-disassociate',409 params=floating_ip_id)410 def create_nuage_l2bridge_cli(self, *args):411 the_params = ''412 for arg in args:413 the_params += ' '414 the_params += arg415 return self.cli.neutron('nuage-l2bridge-create',416 params=the_params)417 def show_nuage_l2bridge_cli(self, *args):418 the_params = ''419 for arg in args:420 the_params += ' '421 the_params += arg422 return self.cli.neutron('nuage-l2bridge-show',423 params=the_params)424 def delete_nuage_l2bridge_cli(self, id, name=None):425 if name:426 return self.cli.neutron('nuage-l2bridge-delete',427 params=' ' + name)428 else:429 return self.cli.neutron('nuage-l2bridge-delete',430 params=' ' + id)431 def update_nuage_l2bridge_cli(self, *args):432 the_params = ''433 for arg in args:434 the_params += ' '435 the_params += arg436 return self.cli.neutron('nuage-l2bridge-update',437 params=the_params)438 def create_security_group_with_args(self, *args):439 """Wrapper utility that returns a test security group."""440 the_params = ''441 for arg in args:442 the_params += ' '443 the_params += arg444 response = self.cli.neutron('security-group-create',445 params=the_params)446 self.assertFirstLineStartsWith(response.split('\n'),447 'Created a new security_group:')448 security_group = self.parser.details(response)449 self.security_groups.append(security_group)450 return security_group451 def update_security_group_with_args(self, *args):452 """Wrapper utility that returns a test security group."""453 the_params = ''454 for arg in args:455 the_params += ' '456 the_params += arg457 response = self.cli.neutron('security-group-update',458 params=the_params)459 self.assertFirstLineStartsWith(response.split('\n'),460 'Updated security_group:')461 security_group = self.parser.details(response)462 return security_group463 def show_security_group(self, sg_id):464 response = self.cli.neutron('security-group-show', params=sg_id)465 security_group = self.parser.details(response)466 self.assertEqual(security_group['id'], sg_id)467 return security_group468 def delete_security_group(self, sg_id):469 response = self.cli.neutron('security-group-delete', params=sg_id)470 self.assertFirstLineStartsWith(response.split('\n'),471 'Deleted security_group(s)')472 self.assertIn(sg_id, response)473 def create_security_group_rule_with_args(self, *args):474 """Wrapper utility that returns a test security group rule."""475 the_params = ''476 for arg in args:477 the_params += ' '478 the_params += arg479 response = self.cli.neutron('security-group-rule-create',480 params=the_params)481 self.assertFirstLineStartsWith(response.split('\n'),482 'Created a new security_group_rule:')483 security_group_rule = self.parser.details(response)484 self.security_group_rules.append(security_group_rule)485 return security_group_rule486 def create_vm_with_args(self, *args):487 """Wrapper utility that returns a test VM."""488 the_params = ''489 for arg in args:490 the_params += ' '491 the_params += arg492 response = self.cli.nova('boot', params=the_params)493 # self.assertFirstLineStartsWith(response.split('\n'),494 # 'Created a new VM:')495 vm = self.parser.details(response)496 self.vms.append(vm)497 return vm498 @classmethod499 def delete_router(cls, router):500 cls._clear_router_gateway(router['id'])501 interfaces = cls._list_router_ports(router['id'])502 for i in interfaces:503 fixed_ips = i['fixed_ips']504 fixed_ips_dict = json.loads(fixed_ips)505 subnet_id = fixed_ips_dict['subnet_id']506 cls._remove_router_interface_with_subnet_id(router['id'],507 subnet_id)508 cls._delete_router(router['id'])509 @classmethod510 def _delete_network(cls, network_id):511 cls.cli.neutron('net-delete', params=network_id)512 @classmethod513 def _delete_subnet(cls, subnet_id):514 cls.cli.neutron('subnet-delete', params=subnet_id)515 @classmethod516 def _delete_port(cls, port_id):517 cls.cli.neutron('port-delete', params=port_id)518 @classmethod519 def _delete_router(cls, router_id):520 cls.cli.neutron('router-delete', params=router_id)521 @classmethod522 def _delete_floating_ip(cls, floating_ip_id):523 cls.cli.neutron('floatingip-delete', params=floating_ip_id)524 @classmethod525 def _clear_router_gateway(cls, router_id):526 cls.cli.neutron('router-gateway-clear', params=router_id)527 @classmethod528 def _list_router_ports(cls, router_id):529 response = cls.cli.neutron('router-port-list', params=router_id)530 ports = cls.parser.listing(response)531 return ports532 @classmethod533 def _remove_router_interface_with_subnet_id(cls, router_id, subnet_id):534 response = cls.cli.neutron('router-interface-delete',535 params=router_id + ' ' + subnet_id)536 return response537 # @classmethod538 def _cli_create_redirect_target_with_args(self, *args):539 the_params = ''540 for arg in args:541 the_params += ' '542 the_params += arg543 response = self.cli.neutron('nuage-redirect-target-create',544 params=the_params)545 self.assertFirstLineStartsWith(response.split('\n'),546 'Created a new nuage_redirect_target:')547 redirect_target = self.parser.details(response)548 # self.nuage_redirect_targets.append(redirect_target)549 return redirect_target550 def _cli_create_nuage_redirect_target_in_l2_subnet(self, l2subnet,551 name=None):552 if name is None:553 name = data_utils.rand_name('cli-os-l2-rt')554 # parameters for nuage redirection target555 response = self.cli.neutron(556 'nuage-redirect-target-create --insertion-mode VIRTUAL_WIRE '557 '--redundancy-enabled false --subnet',558 params=l2subnet['name'] + ' ' + name)559 self.assertFirstLineStartsWith(response.split('\n'),560 'Created a new nuage_redirect_target:')561 redirect_target = self.parser.details(response)562 # self.nuage_redirect_targets.append(redirect_target)563 return redirect_target564 def _cli_create_nuage_redirect_target_in_l3_subnet(self, l3subnet,565 name=None):566 if name is None:567 name = data_utils.rand_name('cli-os-l3-rt')568 response = self.cli.neutron(569 'nuage-redirect-target-create --insertion-mode L3 '570 '--redundancy-enabled false --subnet',571 params=l3subnet['name'] + ' ' + name)572 self.assertFirstLineStartsWith(response.split('\n'),573 'Created a new nuage_redirect_target:')574 redirect_target = self.parser.details(response)575 # self.nuage_redirect_targets.append(redirect_target)576 return redirect_target577 def delete_redirect_target(self, redirect_target_id):578 self.cli.neutron('nuage-redirect-target-delete',579 params=redirect_target_id)580 def list_nuage_redirect_target_for_l2_subnet(self, l2subnet):581 response = self.cli.neutron('nuage-redirect-target-list --subnet ',582 params=l2subnet['id'])583 rt_list = self.parser.listing(response)584 return rt_list585 def list_nuage_redirect_target_for_port(self, port):586 response = self.cli.neutron('nuage-redirect-target-list --for-port ',...
test_archive_policy.py
Source:test_archive_policy.py
...26 u'archive-policy', params=u"create %s"27 u" --back-window 0"28 u" -d granularity:1s,points:86400" % apname,29 fail_ok=True, merge_stderr=True)30 self.assertFirstLineStartsWith(31 result.split('\n'),32 "Archive policy %s already exists (HTTP 409)" % apname)33 # GET34 result = self.gnocchi(35 'archive-policy', params="show %s" % apname)36 policy = self.details_multiple(result)[0]37 self.assertEqual(apname, policy["name"])38 # LIST39 result = self.gnocchi(40 'archive-policy', params="list")41 policies = self.parser.listing(result)42 policy_from_list = [p for p in policies43 if p['name'] == apname][0]44 for field in ["back_window", "definition", "aggregation_methods"]:45 self.assertEqual(policy[field], policy_from_list[field])46 # UPDATE47 result = self.gnocchi(48 'archive-policy', params='update %s'49 ' -d granularity:1s,points:60' % apname)50 policy = self.details_multiple(result)[0]51 self.assertEqual(apname, policy["name"])52 # UPDATE FAIL53 result = self.gnocchi(54 'archive-policy', params='update %s'55 ' -d granularity:5s,points:86400' % apname,56 fail_ok=True, merge_stderr=True)57 self.assertFirstLineStartsWith(58 result.split('\n'),59 "Archive policy %s does not support change: 1.0 granularity "60 "interval was changed (HTTP 400)" % apname)61 # DELETE62 result = self.gnocchi('archive-policy',63 params="delete %s" % apname)64 self.assertEqual("", result)65 # GET FAIL66 result = self.gnocchi('archive-policy',67 params="show %s" % apname,68 fail_ok=True, merge_stderr=True)69 self.assertFirstLineStartsWith(70 result.split('\n'),71 "Archive policy %s does not exist (HTTP 404)" % apname)72 # DELETE FAIL73 result = self.gnocchi('archive-policy',74 params="delete %s" % apname,75 fail_ok=True, merge_stderr=True)76 self.assertFirstLineStartsWith(77 result.split('\n'),...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!