Best Python code snippet using localstack_python
test_rules_api.py
Source:test_rules_api.py
...87 body.update({'active': active})88 if priority is not None:89 body.update({'priority': priority})90 return body91 def _create_rule(self, name, org_id=None, pool_id="", owner_id="",92 conditions=None, active=None, priority=None,93 set_allowed=True):94 if not org_id:95 org_id = self.org_id96 if not conditions:97 conditions = [98 {"type": "name_starts_with", "meta_info": "name_starts_with"}99 ]100 if pool_id == "":101 pool_id = self.org_pool_id102 if owner_id == "":103 owner_id = self.employee['id']104 if set_allowed:105 _, employee = self.client.employee_get(owner_id)106 self.set_allowed_pair(employee['auth_user_id'], pool_id)107 rule_body = self._prepare_rule_body(108 name=name,109 pool_id=pool_id,110 owner_id=owner_id,111 conditions=conditions,112 active=active,113 priority=priority,114 )115 code, rule = self.client.rules_create(org_id, rule_body)116 self.assertEqual(code, 201, rule)117 return rule118 def _test_params(self, call, params, *args, required_parameters=None,119 entities_should_exist=None):120 data = params.copy()121 unexpected_param_name = "unexpected_param"122 data[unexpected_param_name] = self.gen_id()123 code, response = call(*args, data)124 self.assertEqual(code, 400)125 self.assertTrue('Unexpected parameters' in response['error']['reason'])126 self.assertTrue(unexpected_param_name in response['error']['reason'])127 if required_parameters:128 for param in required_parameters:129 data = params.copy()130 data.pop(param)131 code, response = call(*args, data)132 self.assertEqual(code, 400)133 self.assertEqual(response['error']['reason'],134 '%s is not provided' % param)135 if entities_should_exist:136 for param in entities_should_exist:137 data = params.copy()138 id = str(self.gen_id())139 data[param] = id140 code, response = call(*args, data)141 self.assertEqual(code, 400)142 self.assertTrue('%s %s doesn\'t exist' % (param, id)143 in response['error']['reason'])144 self.assertTrue(response['error']['error_code'], 'OE0212')145class TestRuleApi(TestRulesApiBase):146 @patch(AUTHORIZE_ACTION_METHOD)147 def test_create_rule(self, p_authorize):148 """149 Basic create rule request flow.150 Steps:151 - 1. Check initial count of rules. Should be 0.152 - 2. Create FullPair rule.153 - 3. Verify response code is 201.154 - 4. Verify count rules is 1.155 - 12. Create rule in inactive state.156 - verify state157 """158 p_authorize.return_value = True159 rules = self._get_rules()160 self.assertEqual(len(rules), 0)161 full_rule = self._create_rule('Test rule')162 self.assertEqual(self.p_activities_publish.call_count, 1)163 self.assertEqual(full_rule['pool_name'], self.organization['name'])164 self.assertEqual(full_rule['pool_purpose'], 'business_unit')165 self.assertEqual(full_rule['owner_name'], self.employee['name'])166 self.assertEqual(full_rule['creator_name'], self.employee['name'])167 self.assertEqual(full_rule['priority'], 1)168 rules = self._get_rules()169 self.assertEqual(len(rules), 1)170 self.assertEqual(rules[0]['id'], full_rule['id'])171 inactive = self._create_rule("Inactive rule",172 active=False)173 self.assertEqual(self.p_activities_publish.call_count, 2)174 code, rule = self.client.rule_get(inactive['id'])175 self.assertEqual(code, 200)176 self.assertEqual(rule['active'], False)177 rules = self._get_rules()178 self.assertEqual(len(rules), 2)179 code, rules = self.client.rules_list(self.org_id,180 valid_rules_only=True)181 self.assertEqual(code, 200)182 all_rules = rules['rules']183 for rule in all_rules:184 self.assertTrue(rule['id'] != inactive['id'])185 self.assertIn('priority', rule)186 @patch(AUTHORIZE_ACTION_METHOD)187 def test_create_rule_with_priority(self, p_authorize):188 """189 - 1. Check initial count of rules. Should be 0.190 - 2. Create 2 rules191 - 3. Create rules with priority 2192 - 4. Get list of rules and check priorities193 """194 p_authorize.return_value = True195 rules = self._get_rules()196 self.assertEqual(len(rules), 0)197 rule1 = self._create_rule('rule1')198 rule3 = self._create_rule('rule3')199 self.assertEqual(self.p_activities_publish.call_count, 2)200 rules = self._get_rules()201 self.assertEqual(len(rules), 2)202 rule2 = self._create_rule('rule2', priority=2)203 self.assertEqual(self.p_activities_publish.call_count, 3)204 self.assertEqual(rule2['priority'], 2)205 rule_body = self._prepare_rule_body(206 name='rule4',207 pool_id=self.org_pool_id,208 owner_id=self.employee['id'],209 conditions=[{"type": "name_starts_with", "meta_info": "name_starts_with"}],210 active=None,211 priority=5,212 )213 code, resp = self.client.rules_create(self.org_id, rule_body)214 self.assertEqual(code, 400)215 self.verify_error_code(resp, 'OE0217')216 rule_body.update({'priority': 0})217 code, resp = self.client.rules_create(self.org_id, rule_body)218 self.assertEqual(code, 400)219 self.verify_error_code(resp, 'OE0224')220 rules = self._get_rules()221 self.assertEqual(len(rules), 3)222 for priority, rule in enumerate(rules, start=1):223 self.assertEqual(rule['priority'], priority)224 def test_params_create_rule(self):225 """226 Test required parameters for creation rule.227 Steps:228 - 1. Verify requests without param:229 - name230 - conditions231 - 2. Verify requests with param, but entity doesn't exist:232 - owner_id233 - pool_id234 - 3. Verify request with conditions as empty array235 - err_code should be 400236 - localized code should be E0216237 - 3. Verify request without conditions in body238 - err_code should be 400239 - localized code should be E0216240 - 4. Try to create rule without owner or pool.241 - localized code should be E0216242 - 5. Try to create rule with unsupported param in condition243 - localized code should be E0212244 - 6. Try to create rule with unsupported condition type245 - localized code should be E0430246 - 7. Try to create rule with absent meta in condition247 - localized code should be E0430248 - 8. Try to create rule with absent type in condition249 - localized code should be E0430250 - 9. Try to create rule with pool in another org251 - localized code should be E0002252 - 10. Try to create rule with owner in another org253 - localized code should be E0002254 - 11. Try to create rule with invalid condition format255 - localized code should be E0344256 """257 rule_body = self._prepare_rule_body(258 name="Test rule",259 pool_id=self.org_pool_id,260 owner_id=self.employee['id'],261 conditions=[262 {"type": "name_starts_with", "meta_info": "test_"}263 ]264 )265 self._test_params(266 self.client.rules_create, rule_body, self.org_id,267 required_parameters=['name', 'conditions'],268 entities_should_exist=['owner_id', 'pool_id']269 )270 rule_body_copy = rule_body.copy()271 rule_body_copy['name'] = None272 code, resp = self.client.rules_create(self.org_id, rule_body_copy)273 self.assertEqual(code, 400)274 self.verify_error_code(resp, 'OE0216')275 rule_body_copy = rule_body.copy()276 rule_body_copy['conditions'] = []277 code, resp = self.client.rules_create(self.org_id, rule_body_copy)278 self.assertEqual(code, 400)279 self.verify_error_code(resp, 'OE0216')280 rule_body_copy = rule_body.copy()281 rule_body_copy.pop('conditions')282 code, resp = self.client.rules_create(self.org_id, rule_body_copy)283 self.assertEqual(code, 400)284 self.verify_error_code(resp, 'OE0216')285 rule_body_copy = rule_body.copy()286 rule_body_copy['conditions'] = None287 code, resp = self.client.rules_create(self.org_id, rule_body_copy)288 self.assertEqual(code, 400)289 self.verify_error_code(resp, 'OE0216')290 rule_body_copy = rule_body.copy()291 rule_body_copy['pool_id'] = None292 rule_body_copy['owner_id'] = None293 code, resp = self.client.rules_create(self.org_id, rule_body_copy)294 self.assertEqual(code, 400)295 self.verify_error_code(resp, 'OE0216')296 rule_body_copy = rule_body.copy()297 rule_body_copy.pop('pool_id')298 rule_body_copy.pop('owner_id')299 code, resp = self.client.rules_create(self.org_id, rule_body_copy)300 self.assertEqual(code, 400)301 self.verify_error_code(resp, 'OE0216')302 rule_body_copy = copy.deepcopy(rule_body)303 rule_body_copy['conditions'][0]['unsupported_param'] = 'test_val'304 code, resp = self.client.rules_create(self.org_id, rule_body_copy)305 self.assertEqual(code, 400)306 self.verify_error_code(resp, 'OE0212')307 rule_body_copy = copy.deepcopy(rule_body)308 rule_body_copy['conditions'][0]['type'] = 'UNSUPPORTED_TYPE'309 code, resp = self.client.rules_create(self.org_id, rule_body_copy)310 self.assertEqual(code, 400)311 self.verify_error_code(resp, 'OE0430')312 required_condition_fields = ['type', 'meta_info']313 for param in required_condition_fields:314 rule_body_copy = copy.deepcopy(rule_body)315 rule_body_copy['conditions'][0].pop(param)316 code, resp = self.client.rules_create(self.org_id,317 rule_body_copy)318 self.assertEqual(code, 400)319 self.verify_error_code(resp, 'OE0216')320 rule_body_copy = copy.deepcopy(rule_body)321 rule_body_copy['conditions'][0]['unsupported_key'] = None322 code, resp = self.client.rules_create(self.org_id,323 rule_body_copy)324 self.assertEqual(code, 400)325 self.verify_error_code(resp, 'OE0212')326 rule_body_copy = rule_body.copy()327 rule_body_copy['owner_id'] = self.org2_employee['id']328 code, resp = self.client.rules_create(self.org_id, rule_body_copy)329 self.assertEqual(code, 400)330 self.verify_error_code(resp, 'OE0005')331 rule_body_copy = rule_body.copy()332 rule_body_copy['pool_id'] = self.org2_pool_id333 code, resp = self.client.rules_create(self.org_id, rule_body_copy)334 self.assertEqual(code, 400)335 self.verify_error_code(resp, 'OE0005')336 rule_body_copy = rule_body.copy()337 rule_body_copy['conditions'] = ["123", "123"]338 code, resp = self.client.rules_create(self.org_id, rule_body_copy)339 self.assertEqual(code, 400)340 self.verify_error_code(resp, 'OE0344')341 @patch(AUTHORIZE_ACTION_METHOD)342 def test_list_rules(self, p_authorize):343 """344 Test list rules API with query params345 Steps:346 - 1. Create rules:347 - rule1: full pair rule for pool_org and user1 as owner348 - rule2: full pair rule for pool_org and user2 as owner349 - rule3: full pair rule for sub_pool1 and user2 as owner350 - rule4: full pair rule for sub_pool1 and user1 as owner351 - 2. Request list rules. Should return all 4 rules352 - 3. Request list rules with owner as user1. Should return:353 - rule1, rule4354 - 4. Request list rules with pool as org_pool. Should return:355 - rule1, rule2356 - 5. Request list rules with org_pool and user1 as owner.357 Should return:358 - rule1359 -9. Request list rules with invalid owner_id.360 Should return:361 - empty array.362 """363 def check_list_rules(expected_rules, **kwargs):364 expected_ids = set([r['id'] for r in expected_rules])365 rules = self._get_rules(**kwargs)366 self.assertEqual(len(rules), len(expected_rules))367 for rule in rules:368 self.assertTrue(rule['id'] in expected_ids)369 p_authorize.return_value = True370 rules = self._get_rules()371 self.assertEqual(len(rules), 0)372 rule1 = self._create_rule("rule1")373 rule2 = self._create_rule("rule2", owner_id=self.employee2['id'])374 rule3 = self._create_rule(375 "rule3", pool_id=self.sub_pools[0]['id'],376 owner_id=self.employee2['id'])377 rule4 = self._create_rule(378 "rule4", pool_id=self.sub_pools[0]['id'],379 owner_id=self.employee['id'])380 rules = self._get_rules()381 self.assertEqual(len(rules), 4)382 check_list_rules([rule1, rule4], owner_id=self.employee['id'])383 check_list_rules([rule1, rule2], pool_id=self.org_pool_id)384 check_list_rules([rule1], owner_id=self.employee['id'],385 pool_id=self.org_pool_id)386 check_list_rules([], owner_id=self.gen_id())387 @patch(AUTHORIZE_ACTION_METHOD)388 def test_create_rule_with_existing_name(self, p_authorize):389 """390 Test for trying crete rule with name that is already exist391 Steps:392 - 1. Check initial count of rules. Should be 0.393 - 2. Create FullPair rule with name TestRule.394 - 3. Verify response code is 201.395 - 4. Verify count rules is 1.396 - 5. Create FullPair rule with the same name.397 - verify localized code is E0431398 - 6. Verify count rules is still 1.399 - 5. Create rules with the same name.400 - verify localized code is E0431401 - 6. Verify count rules is still 1.402 """403 p_authorize.return_value = True404 rules = self._get_rules()405 self.assertEqual(len(rules), 0)406 rule_body = self._prepare_rule_body(407 name="TestRule",408 pool_id=self.org_pool_id,409 owner_id=self.employee['id'],410 conditions=[411 {"type": "name_starts_with", "meta_info": "test_"}412 ]413 )414 code, full_rule = self.client.rules_create(self.org_id, rule_body)415 self.assertEqual(code, 201)416 rules = self._get_rules()417 self.assertEqual(len(rules), 1)418 code, resp = self.client.rules_create(self.org_id, rule_body)419 self.assertEqual(code, 409)420 self.verify_error_code(resp, 'OE0149')421 rules = self._get_rules()422 self.assertEqual(len(rules), 1)423 p_authorize.return_value = False424 rule_body_copy = rule_body.copy()425 rule_body_copy.pop('pool_id')426 code, resp = self.client.rules_create(self.org_id, rule_body_copy)427 self.assertEqual(code, 409)428 self.verify_error_code(resp, 'OE0149')429 rules = self._get_rules()430 self.assertEqual(len(rules), 1)431 rule_body_copy = rule_body.copy()432 rule_body_copy.pop('owner_id')433 code, resp = self.client.rules_create(self.org_id, rule_body_copy)434 self.assertEqual(code, 409)435 self.verify_error_code(resp, 'OE0149')436 rules = self._get_rules()437 self.assertEqual(len(rules), 1)438 @patch(AUTHORIZE_ACTION_METHOD)439 def test_create_full_rule_with_invalid_target_pair(self, p_authorize):440 """441 Test creation rule with invalid target pair.442 Steps:443 - 1. Check initial count of rules. Should be 0.444 - 2. Create FullPair rule with invalid target pair.445 - 3. Verify response code is 403.446 - 4. Verify localized code is E0379447 - 4. Verify count rules is 0.448 """449 p_authorize.return_value = False450 rules = self._get_rules()451 self.assertEqual(len(rules), 0)452 rule_body = self._prepare_rule_body(453 name="TestRule",454 pool_id=self.org_pool_id,455 owner_id=self.employee['id'],456 conditions=[457 {"type": "name_starts_with", "meta_info": "test_"}458 ]459 )460 code, resp = self.client.rules_create(self.org_id, rule_body)461 self.assertEqual(code, 403)462 self.verify_error_code(resp, "OE0379")463 rules = self._get_rules()464 self.assertEqual(len(rules), 0)465 @patch(AUTHORIZE_ACTION_METHOD)466 def test_create_rule_with_invalid_tag_is_meta(self, p_authorize):467 """468 Test creation rule with invalid tag is meta condition469 Steps:470 - 1. Try to create rule with tags_is conditions with all possible471 invalid meta_info parameter472 - 2. Verify all tries are failed473 - 3. Verify no new rule created474 """475 p_authorize.return_value = True476 rules = self._get_rules()477 self.assertEqual(len(rules), 0)478 valid_rule_body = self._prepare_rule_body(479 name="TestRule",480 pool_id=self.org_pool_id,481 owner_id=self.employee['id'],482 conditions=[483 {"type": "name_starts_with", "meta_info": "test_"}484 ]485 )486 def check_response(meta_info, error_code):487 conditions = [488 {"type": "tag_is", "meta_info": meta_info}489 ]490 valid_rule_body['conditions'] = conditions491 code, resp = self.client.rules_create(self.org_id,492 valid_rule_body)493 self.assertEqual(code, 400)494 self.verify_error_code(resp, error_code)495 check_response(None, "OE0216")496 check_response(123, "OE0214")497 check_response({}, "OE0214")498 check_response("{}", "OE0216")499 check_response("123", "OE0344")500 check_response("\"123\"", "OE0344")501 check_response("{\"123\"}", "OE0219")502 check_response("{\"key\": \"key\"}", "OE0216")503 check_response("{\"key\": 123}", "OE0214")504 check_response("{\"value\": \"value\"}", "OE0216")505 check_response("{\"key\": \"key\", \"value\": 123}", "OE0214")506 check_response("{\"key\": \"k\", \"value\": \"v\", \"new\": \"value\"}",507 "OE0212")508 check_response("{\"key\": \"key\", \"unexpected\": \"value\"}", "OE0216")509 rules = self._get_rules()510 self.assertEqual(len(rules), 0)511 @patch(AUTHORIZE_ACTION_METHOD)512 def test_update_rule(self, p_authorize):513 """514 Basic test for rule update515 Steps:516 - 1. Check initial count of rules. Should be 0.517 - 2. Create FullPair rule with 2 conditions.518 - 3. Update rule:519 - update name520 - update active521 - update pool_id522 - update owner_id523 - delete one condition524 - update one existing condition525 - add new condition526 - 4. Verify all changes with GET rule API527 - 5. Verify extra fields in PATCH response528 """529 final_conditions_after_update = set()530 p_authorize.return_value = True531 rules = self._get_rules()532 self.assertEqual(len(rules), 0)533 conditions = [534 {"type": "name_starts_with", "meta_info": "NAME_STARTS_WITH"},535 {"type": "name_contains", "meta_info": "NAME_CONTAINS"},536 ]537 full_rule = self._create_rule("TestRule", conditions=conditions)538 rules = self._get_rules()539 self.assertEqual(len(rules), 1)540 code, original_rule = self.client.rule_get(full_rule['id'])541 self.assertEqual(code, 200)542 modified_rule_body = copy.deepcopy(original_rule)543 modified_rule_body['name'] = 'new_name'544 modified_rule_body['active'] = False545 modified_rule_body['owner_id'] = self.employee2['id']546 modified_rule_body['pool_id'] = self.sub_pools[0]['id']547 modified_rule_body['conditions'][0]['type'] = 'name_ends_with'548 final_conditions_after_update.add('name_ends_with')549 modified_rule_body['conditions'][0]['meta_info'] = 'NAME_ENDS_WITH'550 modified_rule_body['conditions'] = modified_rule_body['conditions'][:-1]551 modified_rule_body['conditions'].append(552 {"type": "name_is", "meta_info": "NAME_IS"})553 final_conditions_after_update.add('name_is')554 code, patched_response = self.client.rule_update(original_rule['id'],555 modified_rule_body)556 self.assertEqual(code, 200)557 self.assertEqual(self.p_activities_publish.call_count, 2)558 code, modified_rule = self.client.rule_get(full_rule['id'])559 self.assertEqual(code, 200)560 self.assertEqual(modified_rule['name'], 'new_name')561 self.assertEqual(modified_rule['active'], False)562 self.assertEqual(modified_rule['pool_id'],563 self.sub_pools[0]['id'])564 self.assertEqual(modified_rule['owner_id'], self.employee2['id'])565 self.assertEqual(len(modified_rule['conditions']), 2)566 for condition in modified_rule['conditions']:567 self.assertTrue(condition['type'] in final_conditions_after_update)568 self.assertTrue(569 condition['meta_info'].lower() in final_conditions_after_update)570 self.assertEqual(patched_response['creator_name'],571 self.employee['name'])572 self.assertEqual(patched_response['owner_name'], self.employee2['name'])573 self.assertEqual(patched_response['pool_purpose'], "budget")574 self.assertEqual(patched_response['pool_name'],575 self.sub_pools[0]['name'])576 for field in ["creator_name", "pool_name",577 "owner_name", "pool_purpose"]:578 self.assertTrue(field in patched_response.keys())579 @patch(AUTHORIZE_ACTION_METHOD)580 def test_update_rule_with_existing_name(self, p_authorize):581 """582 Basic test for rule update583 Steps:584 - 1. Check initial count of rules. Should be 0.585 - 2. Create two rules with names rule0 and rule1586 - 3. Create rule with name rule2587 - 4. Create rule with name rule3588 - 5. Try to update rule0 with names rule1, rule2, rule3.589 - all updates should be failed.590 - verify localized code is E0431591 - 6. Check that rule0 has the same name592 - 7. Create rule for org2 with name rule_new_org.593 - 8. Update rule0 with name rule_new_org. Should be succeeded.594 """595 p_authorize.return_value = True596 rules = self._get_rules()597 self.assertEqual(len(rules), 0)598 rule0 = self._create_rule("rule0")599 rule1 = self._create_rule("rule1")600 rule2 = self._create_rule("rule2")601 rule3 = self._create_rule("rule3")602 for name in [rule1['name'], rule2['name'], rule3['name']]:603 rule0['name'] = name604 code, resp = self.client.rule_update(rule0['id'], rule0)605 self.assertEqual(code, 409)606 self.verify_error_code(resp, 'OE0149')607 code, rule = self.client.rule_get(rule0['id'])608 self.assertEqual(code, 200)609 self.assertEqual(rule['name'], "rule0")610 new_rule_name = "rule_new_org"611 with self.switch_user(self.org2_user_id):612 self._create_rule(new_rule_name, org_id=self.org2_id,613 owner_id=self.org2_employee['id'],614 pool_id=self.org2_pool_id)615 rule0['name'] = new_rule_name616 code, resp = self.client.rule_update(rule0['id'],617 rule0)618 # 5 rules created and one updated619 self.assertEqual(self.p_activities_publish.call_count, 6)620 self.assertEqual(code, 200)621 code, rule = self.client.rule_get(rule0['id'])622 self.assertEqual(code, 200)623 self.assertEqual(rule['name'], new_rule_name)624 @patch(AUTHORIZE_ACTION_METHOD)625 def test_update_rule_with_invalid_tag_is_meta(self, p_authorize):626 """627 Test update rule with invalid tag is meta condition628 Steps:629 - 1. Create valid rule with tag_is condition.630 - 2. Try to update rule with tags_is conditions with all possible631 invalid meta_info parameter632 - 3. Verify all tries are failed633 - 4. Verify no new rule created634 """635 p_authorize.return_value = True636 rules = self._get_rules()637 self.assertEqual(len(rules), 0)638 conditions = [639 {640 "type": "tag_is",641 "meta_info": "{\"key\":\"key\",\"value\":\"value\"}"642 }643 ]644 rule1 = self._create_rule('rule1', conditions=conditions)645 rules = self._get_rules()646 self.assertEqual(len(rules), 1)647 valid_rule_body = self._prepare_rule_body(648 name="TestRule",649 pool_id=self.org_pool_id,650 owner_id=self.employee['id'],651 conditions=[652 {"type": "name_starts_with", "meta_info": "test_"}653 ]654 )655 def check_response(meta_info, error_code):656 conditions = [657 {"type": "tag_is", "meta_info": meta_info}658 ]659 valid_rule_body['conditions'] = conditions660 code, resp = self.client.rule_update(rule1['id'],661 valid_rule_body)662 self.assertEqual(code, 400)663 self.verify_error_code(resp, error_code)664 check_response(None, "OE0216")665 check_response(123, "OE0214")666 check_response({}, "OE0214")667 check_response("{}", "OE0216")668 check_response("123", "OE0344")669 check_response("\"123\"", "OE0344")670 check_response("{\"123\"}", "OE0219")671 check_response("{\"key\": \"key\"}", "OE0216")672 check_response("{\"key\": 123}", "OE0214")673 check_response("{\"value\": \"value\"}", "OE0216")674 check_response("{\"key\": \"key\", \"value\": 123}", "OE0214")675 check_response("{\"key\": \"k\", \"value\": \"v\", \"new\": \"value\"}",676 "OE0212")677 check_response("{\"key\": \"key\", \"unexpected\": \"value\"}", "OE0216")678 rules = self._get_rules()679 self.assertEqual(len(rules), 1)680 @patch(AUTHORIZE_ACTION_METHOD)681 def test_update_rule_name_is_null(self, p_authorize):682 """683 Testing update rule name with None value684 Steps:685 - 1. Check initial count of rules. Should be 0.686 - 2. Create two FullPair rules with name rule0.687 - 3. Try to update rule0 with name is None688 - update should be failed.689 - verify localized code is E0216690 - 4. Try to update rule0 with name is integer value691 - update should be failed.692 - verify localized code is E0214693 """694 p_authorize.return_value = True695 rules = self._get_rules()696 self.assertEqual(len(rules), 0)697 full_rule = self._create_rule("rule0")698 full_rule['name'] = None699 code, resp = self.client.rule_update(full_rule['id'],700 full_rule)701 self.assertEqual(code, 400)702 self.verify_error_code(resp, 'OE0216')703 full_rule['name'] = 0704 code, resp = self.client.rule_update(full_rule['id'],705 full_rule)706 self.assertEqual(code, 400)707 self.verify_error_code(resp, 'OE0214')708 @patch(AUTHORIZE_ACTION_METHOD)709 def test_get_update_rule_invalid_id(self, p_authorize):710 """711 Test get_update_rule for invalid rule id712 Steps:713 - 1. Get rule by invalid id. Verify714 - code is 404715 - localized code is E0002716 - 2. Update rule by invalid id. Verify717 - code is 404718 - localized code is E0002719 - 3. Create rule for org2720 - 4. Try to update rule in org1 for rule id from org 2721 """722 p_authorize.return_value = True723 code, resp = self.client.rule_get(self.gen_id())724 self.assertEqual(code, 404)725 self.verify_error_code(resp, "OE0002")726 code, resp = self.client.rule_update(self.gen_id(), {})727 self.assertEqual(code, 404)728 self.verify_error_code(resp, "OE0002")729 # TODO uncomment after OSB-412 is done730 # with self.switch_user(self.org2_user_id):731 # org2_rule = self._create_rule(732 # 'new_rule_name',733 # org_id=self.org2_id,734 # owner_id=self.org2_employee['id'],735 # budget_id=self.org2_budget_id)736 # org2_rule_id = org2_rule['id']737 # code, resp = self.client.rule_get(org2_rule_id)738 # self.assertEqual(code, 404)739 # self.verify_error_code(resp, "OE0002")740 # code, resp = self.client.rule_update(org2_rule_id, {})741 # self.assertEqual(code, 404)742 # self.verify_error_code(resp, "OE0002")743 @patch(AUTHORIZE_ACTION_METHOD)744 def test_update_with_change_rule_priority(self, p_authorize):745 """746 Test update rule with changing rule priority747 Steps:748 - 2. Create 3 rules749 - 3. Update rule3 and set priority 1750 - 5. Verify new rule order751 """752 p_authorize.return_value = True753 rules = self._get_rules()754 self.assertEqual(len(rules), 0)755 rule1 = self._create_rule('rule1')756 rule2 = self._create_rule('rule2')757 rule3 = self._create_rule('rule3')758 rules = self._get_rules()759 self.assertEqual(len(rules), 3)760 original_order = [r['id'] for r in rules]761 rule2['priority'] = 0762 code, resp = self.client.rule_update(rule2['id'], rule2)763 self.assertEqual(code, 400)764 self.verify_error_code(resp, 'OE0224')765 rule2['priority'] = 5766 code, resp = self.client.rule_update(rule2['id'], rule2)767 self.assertEqual(code, 400)768 self.verify_error_code(resp, 'OE0217')769 rule1['priority'] = 1770 code, _ = self.client.rule_update(rule1['id'], rule1)771 self.assertEqual(code, 200)772 rules = self._get_rules()773 self.assertEqual(len(rules), 3)774 expected_new_order = [original_order[2], original_order[0],775 original_order[1]]776 new_order = [r['id'] for r in rules]777 self.assertEqual(expected_new_order, new_order)778 @patch(AUTHORIZE_ACTION_METHOD)779 def test_update_without_pool_and_owner(self, p_authorize):780 """781 Test update rule without owner and pool specified782 Steps:783 - 1. Create full pair rule784 - 2. Update rule remove owner and pool785 - verify it's failed786 - verify localized code787 - 3. Update rule (remove owner)788 - 4. Update rule (remove pool)789 - verify it's failed790 - verify localized code791 """792 p_authorize.return_value = True793 rule = self._create_rule('test rule')794 rule_tmp = copy.deepcopy(rule)795 rule_tmp['pool_id'] = None796 rule_tmp['owner_id'] = None797 code, resp = self.client.rule_update(rule['id'], rule_tmp)798 self.assertEqual(code, 400)799 self.verify_error_code(resp, "OE0216")800 rule_tmp = copy.deepcopy(rule)801 rule_tmp['owner_id'] = None802 code, resp = self.client.rule_update(rule['id'], rule_tmp)803 self.assertEqual(code, 400)804 self.verify_error_code(resp, "OE0216")805 rule_tmp['pool_id'] = None806 code, resp = self.client.rule_update(rule['id'], rule_tmp)807 self.assertEqual(code, 400)808 self.verify_error_code(resp, "OE0216")809 @patch(AUTHORIZE_ACTION_METHOD)810 def test_update_remove_all_conditions(self, p_authorize):811 """812 Test update rule without conditions813 Steps:814 - 1. Create full pair rule815 - 2. Update rule without conditions(empty array)816 - verify it's failed817 - verify localized code is E0216818 """819 p_authorize.return_value = True820 rule = self._create_rule('test rule')821 rule['conditions'] = []822 code, resp = self.client.rule_update(rule['id'], rule)823 self.assertEqual(code, 400)824 self.verify_error_code(resp, "OE0216")825 @patch(AUTHORIZE_ACTION_METHOD)826 def test_update_invalid_conditions(self, p_authorize):827 """828 Test update rule with invalid conditions829 Steps:830 - 1. Create full pair rule831 - 2. Update rule with832 - not supported type in condition(E0430)833 - not supported key in condition(E0212)834 - with condition type is None(E0430)835 - with condition meta is None(E0216)836 - verify all requests are failed837 """838 p_authorize.return_value = True839 rule = self._create_rule('test rule')840 tmp_rule = copy.deepcopy(rule)841 tmp_rule['conditions'][0]['type'] = "unsupported_type"842 code, resp = self.client.rule_update(rule['id'], tmp_rule)843 self.assertEqual(code, 400)844 self.verify_error_code(resp, "OE0430")845 tmp_rule = copy.deepcopy(rule)846 tmp_rule['conditions'][0]['unsupported_key'] = "dummy"847 code, resp = self.client.rule_update(rule['id'], tmp_rule)848 self.assertEqual(code, 400)849 self.verify_error_code(resp, "OE0212")850 tmp_rule = copy.deepcopy(rule)851 tmp_rule['conditions'][0]["type"] = None852 code, resp = self.client.rule_update(rule['id'], tmp_rule)853 self.assertEqual(code, 400)854 self.verify_error_code(resp, "OE0430")855 tmp_rule = copy.deepcopy(rule)856 tmp_rule['conditions'][0]["meta_info"] = None857 code, resp = self.client.rule_update(rule['id'], tmp_rule)858 self.assertEqual(code, 400)859 self.verify_error_code(resp, "OE0216")860 @patch(AUTHORIZE_ACTION_METHOD)861 def test_update_invalid_target_pair(self, p_authorize):862 """863 Test update rule without owner and pool specified864 Steps:865 - 1. Create full pair rule866 - 2. Update rule with invalid target pair867 - verify it's failed868 - verify localized code is E0379869 """870 p_authorize.return_value = True871 rule = self._create_rule('test rule')872 p_authorize.return_value = False873 code, resp = self.client.rule_update(rule['id'], rule)874 self.assertEqual(code, 403)875 self.verify_error_code(resp, "OE0379")876 @patch(AUTHORIZE_ACTION_METHOD)877 def test_update_active_is_none(self, p_authorize):878 """879 Test update rule with invalid active field880 Steps:881 - 1. Create full pair rule882 - 2. Update rule with active is None883 - verify it's failed884 - verify localized code is E0212885 """886 p_authorize.return_value = True887 rule = self._create_rule('test rule')888 rule['active'] = None889 code, resp = self.client.rule_update(rule['id'], rule)890 self.assertEqual(code, 400)891 self.verify_error_code(resp, "OE0216")892 @patch(AUTHORIZE_ACTION_METHOD)893 def test_update_invalid_pool_or_owner_id(self, p_authorize):894 """895 Test update rule with specifying invalid owner id.896 Steps:897 - 1. Create full pair rule898 - 2. Update rule with random UUID for pool_id and owner_id899 - verify it's failed900 - verify localized code is E0002901 -3. Update rule with pool_id and owner_id from another org.902 """903 p_authorize.return_value = True904 rule = self._create_rule('test rule')905 for param in ['pool_id', 'owner_id']:906 rule_tmp = copy.deepcopy(rule)907 rule_tmp[param] = self.gen_id()908 code, resp = self.client.rule_update(rule['id'], rule_tmp)909 self.assertEqual(code, 400)910 self.verify_error_code(resp, "OE0005")911 rule_tmp = copy.deepcopy(rule)912 rule_tmp['pool_id'] = self.org2_pool_id913 code, resp = self.client.rule_update(rule['id'], rule_tmp)914 self.assertEqual(code, 400)915 self.verify_error_code(resp, "OE0005")916 rule_tmp = copy.deepcopy(rule)917 rule_tmp['owner_id'] = self.org2_employee['id']918 code, resp = self.client.rule_update(rule['id'], rule_tmp)919 self.assertEqual(code, 400)920 self.verify_error_code(resp, "OE0005")921 @patch(AUTHORIZE_ACTION_METHOD)922 def test_test_patch_rule_fields(self, p_authorize):923 """924 Test update rule with field by filed925 Steps:926 - 1. Create full pair rule927 - 2. Update all fields in rule one by one using only this key in body928 - 3. Verify all fields are updated929 """930 p_authorize.return_value = True931 rule = self._create_rule('test rule')932 new_rule = {933 'name': 'new_name',934 'pool_id': self.sub_pools[0]['id'],935 'owner_id': self.employee2['id'],936 'conditions':937 [{"type": "name_ends_with", "meta_info": "name_ends_with"}]938 }939 for key in new_rule.keys():940 body = {key: new_rule[key]}941 code, resp = self.client.rule_update(rule['id'], body)942 self.assertEqual(code, 200)943 code, rule = self.client.rule_get(rule['id'])944 self.assertEqual(code, 200)945 if key == 'conditions':946 for cond in rule[key]:947 cond.pop('id')948 self.assertEqual(rule[key], new_rule[key])949 @patch(AUTHORIZE_ACTION_METHOD)950 def test_delete_rule(self, p_authorize):951 """952 Test delete rule953 Steps:954 - 1. Create rule rule1955 - 2. Create rule rule2956 - 3. Create rule rule3957 - 4. Verify count of rules, should be 3958 - 5. Delete rules one by one and check count -1959 - 6. Try to delete rule that is doesn't exist960 """961 p_authorize.return_value = True962 rules = self._get_rules()963 self.assertEqual(len(rules), 0)964 rule1 = self._create_rule('rule1')965 rule2 = self._create_rule('rule2')966 rule3 = self._create_rule('rule3')967 rules = self._get_rules()968 self.assertEqual(len(rules), 3)969 code, _ = self.client.rule_delete(rule1['id'])970 self.assertEqual(code, 204)971 self.assertEqual(self.p_activities_publish.call_count, 4)972 rules = self._get_rules()973 self.assertEqual(len(rules), 2)974 code, _ = self.client.rule_delete(rule2['id'])975 self.assertEqual(code, 204)976 self.assertEqual(self.p_activities_publish.call_count, 5)977 rules = self._get_rules()978 self.assertEqual(len(rules), 1)979 code, _ = self.client.rule_delete(rule3['id'])980 self.assertEqual(code, 204)981 self.assertEqual(self.p_activities_publish.call_count, 6)982 rules = self._get_rules()983 self.assertEqual(len(rules), 0)984 code, resp = self.client.rule_delete(self.gen_id())985 self.assertEqual(code, 404)986 self.verify_error_code(resp, 'OE0002')987 @patch(AUTHORIZE_ACTION_METHOD)988 def test_delete_rule_priority_change(self, p_authorize):989 """990 Test delete rule991 Steps:992 - 1. Create 3 rules993 - 2. Verify count of rules, should be 3994 - 3. Delete middle value995 - 4. Check priority order updated and have no gaps996 """997 p_authorize.return_value = True998 rules = self._get_rules()999 self.assertEqual(len(rules), 0)1000 rule1 = self._create_rule('rule1')1001 rule2 = self._create_rule('rule2')1002 rule3 = self._create_rule('rule3')1003 rules = self._get_rules()1004 self.assertEqual(len(rules), 3)1005 code, _ = self.client.rule_delete(rule2['id'])1006 self.assertEqual(code, 204)1007 rules = self._get_rules()1008 self.assertEqual(len(rules), 2)1009 for i, rule in enumerate(rules, start=1):1010 self.assertEqual(rule['priority'], i)1011class TestApplyRuleApi(TestRulesApiBase):1012 def setUp(self, version='v2'):1013 super().setUp(version)1014 patch('rest_api_server.controllers.cloud_account.'1015 'CloudAccountController._configure_report').start()1016 patch('rest_api_server.handlers.v1.base.BaseAuthHandler.'1017 '_get_user_info', return_value=self.user).start()1018 aws_cloud_acc = {1019 'name': 'my cloud_acc',1020 'type': 'aws_cnr',1021 'config': {1022 'access_key_id': 'key',1023 'secret_access_key': 'secret',1024 'config_scheme': 'create_report'1025 }1026 }1027 _, self.cloud_acc = self.create_cloud_account(1028 self.org_id, aws_cloud_acc, auth_user_id=self.user_id)1029 _, cloud_rules = self.client.rules_list(self.org_id)1030 self.set_allowed_pair(self.user_id,1031 cloud_rules['rules'][0]['pool_id'])1032 self.cloud_acc_id = self.cloud_acc['id']1033 def _create_resources(self, names, cloud_resource_ids=[]):1034 valid_body = {'resources': []}1035 for i, name in enumerate(names):1036 cloud_resource_id = 'res_id_%s' % self.gen_id()1037 if i < len(cloud_resource_ids):1038 cloud_resource_id = cloud_resource_ids[i]1039 valid_body['resources'].append(1040 {1041 'cloud_resource_id': cloud_resource_id,1042 'name': name,1043 'resource_type': 'VM',1044 }1045 )1046 code, result = self.cloud_resource_create_bulk(1047 self.cloud_acc_id, valid_body, return_resources=True)1048 self.assertEqual(code, 200)1049 return result['resources']1050 def _verify_assignments(self, resources, expected_map):1051 self.assertEqual(len(resources), len(expected_map.keys()))1052 for resource in resources:1053 self.assertEqual(1054 resource.get('employee_id'),1055 expected_map[resource['name']]['employee_id'])1056 self.assertEqual(1057 resource.get('pool_id'),1058 expected_map[resource['name']]['pool_id'])1059 @patch(AUTHORIZE_ACTION_METHOD)1060 @patch(RULE_APPLY_CONDITION_CLOUD_IS)1061 def test_apply_rules_all_supported_conditions(self, p_rule_apply,1062 p_authorize):1063 """1064 Test checks all supported conditions in one rule1065 Steps:1066 - 1. Create full pair rule rule1 with:1067 - name_is1068 - name_starts_with1069 - name_ends_with1070 - name_contains1071 - 2. Create resources:1072 - one matched all conditions1073 - one doesn't match name_is1074 - 3. Verify the first resource is assigned, the second one is not.1075 """1076 p_authorize.return_value = True1077 p_rule_apply.return_value = False1078 conditions = [1079 {"type": "name_starts_with", "meta_info": "starts_"},1080 {"type": "name_ends_with", "meta_info": "_ends"},1081 {"type": "name_contains", "meta_info": "contains"},1082 {"type": "name_is", "meta_info": "starts_contains_ends"},1083 ]1084 self._create_rule('test_rule', conditions=conditions,1085 pool_id=self.sub_pools_ids[0],1086 owner_id=self.employee2['id'])1087 resources = self._create_resources(1088 ['starts_contains_ends', 'starts_contains_new_ends']1089 )1090 expected_map = {1091 'starts_contains_ends': {1092 'pool_id': self.sub_pools_ids[0],1093 'employee_id': self.employee2['id']1094 },1095 'starts_contains_new_ends': {1096 'pool_id': self.org_pool_id,1097 'employee_id': self.employee['id']1098 }1099 }1100 self._verify_assignments(resources, expected_map)1101 @patch(AUTHORIZE_ACTION_METHOD)1102 @patch(RULE_APPLY_CONDITION_CLOUD_IS)1103 def test_apply_rules_multiple_rules_multiple_resources(self, p_rule_apply,1104 p_authorize):1105 """1106 Test checks applying rules. Multiple rules, multiple resources1107 Steps:1108 - 1. Create multiple rules rule1, rule2, rule31109 - 2. Create resources:1110 - two resources matched rule11111 - two resources matched rule21112 - two resources matched rule31113 - two doesn't match any rules1114 - 3. Verify the resource assignments.1115 """1116 p_authorize.return_value = True1117 p_rule_apply.return_value = False1118 conditions = [1119 {"type": "name_starts_with", "meta_info": "starts_"},1120 ]1121 rule1 = self._create_rule('rule1', conditions=conditions,1122 pool_id=self.sub_pools_ids[0],1123 owner_id=self.employee['id'])1124 conditions = [1125 {"type": "name_ends_with", "meta_info": "_ends"},1126 ]1127 rule2 = self._create_rule('rule2', conditions=conditions,1128 pool_id=self.sub_pools_ids[1],1129 owner_id=self.employee2['id'])1130 conditions = [1131 {"type": "name_contains", "meta_info": "contains"},1132 ]1133 rule3 = self._create_rule('rule3', conditions=conditions,1134 pool_id=self.sub_pools_ids[2],1135 owner_id=self.employee3['id'])1136 res_map = {1137 'starts_name_1': 'cloud_resource_id_1',1138 'name_2': 'starts_/res_2/id',1139 'name_3_ends': 'cloud_resource_id_3',1140 'name_4': 'res_4/_ends',1141 'name_contains_5': 'cloud_resource_id_5',1142 'name_6': 'res_id/contains_6',1143 'name_7': 'res_7_id',1144 'name_8': 'res_8_id'1145 }1146 resources = self._create_resources(1147 names=list(res_map.keys()),1148 cloud_resource_ids=list(res_map.values())1149 )1150 expected_map = {1151 'starts_name_1': {1152 'pool_id': self.sub_pools_ids[0],1153 'employee_id': self.employee['id']1154 },1155 'name_2': {1156 'pool_id': self.sub_pools_ids[0],1157 'employee_id': self.employee['id']1158 },1159 'name_3_ends': {1160 'pool_id': self.sub_pools_ids[1],1161 'employee_id': self.employee2['id']1162 },1163 'name_4': {1164 'pool_id': self.sub_pools_ids[1],1165 'employee_id': self.employee2['id']1166 },1167 'name_contains_5': {1168 'pool_id': self.sub_pools_ids[2],1169 'employee_id': self.employee3['id']1170 },1171 'name_6': {1172 'pool_id': self.sub_pools_ids[2],1173 'employee_id': self.employee3['id']1174 },1175 'name_7': {1176 'pool_id': self.org_pool_id,1177 'employee_id': self.employee['id']1178 },1179 'name_8': {1180 'pool_id': self.org_pool_id,1181 'employee_id': self.employee['id']1182 },1183 }1184 self._verify_assignments(resources, expected_map)1185 applied_rules_map = {1186 'starts_name_1': [1187 {'id': rule1['id'], 'name': rule1['name'],1188 'pool_id': rule1['pool_id']}1189 ],1190 'name_2': [1191 {'id': rule1['id'], 'name': rule1['name'],1192 'pool_id': rule1['pool_id']}1193 ],1194 'name_3_ends': [1195 {'id': rule2['id'], 'name': rule2['name'],1196 'pool_id': rule2['pool_id']}1197 ],1198 'name_4': [1199 {'id': rule2['id'], 'name': rule2['name'],1200 'pool_id': rule2['pool_id']}1201 ],1202 'name_contains_5': [1203 {'id': rule3['id'], 'name': rule3['name'],1204 'pool_id': rule3['pool_id']}1205 ],1206 'name_6': [1207 {'id': rule3['id'], 'name': rule3['name'],1208 'pool_id': rule3['pool_id']}1209 ],1210 'name_7': None,1211 'name_8': None1212 }1213 for r in resources:1214 name = r.get('name')1215 applied_rules = applied_rules_map[name]1216 self.assertEqual(applied_rules, r.get('applied_rules'))1217 if applied_rules:1218 self.assertEqual(1219 r.get('pool_id'), applied_rules[0]['pool_id'])1220 @patch(AUTHORIZE_ACTION_METHOD)1221 def test_apply_rules_rules_priority(self, p_authorize):1222 """1223 Test checks applying rules by priority.1224 Steps:1225 - 1. Create rule1 and rule2 with conflicted conditions1226 - 2. Create 4 resources matched both rules1227 - 3. Verify the resource assignments.1228 """1229 p_authorize.return_value = True1230 conditions = [1231 {"type": "name_starts_with", "meta_info": "rule"},1232 ]1233 self._create_rule('rule1', conditions=conditions,1234 pool_id=self.sub_pools_ids[0],1235 owner_id=self.employee2['id'])1236 self._create_rule('rule2', conditions=conditions,1237 pool_id=self.sub_pools_ids[1],1238 owner_id=self.employee3['id'])1239 resources = self._create_resources(1240 [1241 'rule1',1242 'rule2',1243 'rule3',1244 'rule4',1245 ]1246 )1247 expected_map = {1248 'rule1': {1249 'pool_id': self.sub_pools_ids[1],1250 'employee_id': self.employee3['id']1251 },1252 'rule2': {1253 'pool_id': self.sub_pools_ids[1],1254 'employee_id': self.employee3['id']1255 },1256 'rule3': {1257 'pool_id': self.sub_pools_ids[1],1258 'employee_id': self.employee3['id']1259 },1260 'rule4': {1261 'pool_id': self.sub_pools_ids[1],1262 'employee_id': self.employee3['id']1263 }1264 }1265 self._verify_assignments(resources, expected_map)1266 @patch(AUTHORIZE_ACTION_METHOD)1267 @patch(RULE_APPLY_CONDITION_CLOUD_IS)1268 def test_apply_rules_no_rules(self, p_rule_apply, p_authorize):1269 """1270 Test checks creation resources in case of no rules are available in org1271 Steps:1272 - 1. Create 4 resources.1273 - 2. Verify root assignments.1274 - 3. Create 4 rules1275 - 4. Create resource that doesn't match any rules.1276 - 5. Verify root assignments.1277 """1278 p_authorize.return_value = True1279 p_rule_apply.return_value = False1280 resources = self._create_resources(1281 [1282 'rule1',1283 'rule2',1284 'rule3',1285 'rule4',1286 ]1287 )1288 expected_map = {1289 'rule1': {1290 'pool_id': self.org_pool_id,1291 'employee_id': self.employee['id']1292 },1293 'rule2': {1294 'pool_id': self.org_pool_id,1295 'employee_id': self.employee['id']1296 },1297 'rule3': {1298 'pool_id': self.org_pool_id,1299 'employee_id': self.employee['id']1300 },1301 'rule4': {1302 'pool_id': self.org_pool_id,1303 'employee_id': self.employee['id']1304 }1305 }1306 self._verify_assignments(resources, expected_map)1307 conditions = [1308 {"type": "name_starts_with", "meta_info": "rule"},1309 ]1310 self._create_rule('rule1', conditions=conditions,1311 pool_id=self.sub_pools_ids[0],1312 owner_id=self.employee2['id'])1313 self._create_rule('rule2', conditions=conditions,1314 pool_id=self.sub_pools_ids[1],1315 owner_id=self.employee3['id'])1316 resources = self._create_resources(1317 [1318 '_rule1',1319 '_rule2',1320 '_rule3',1321 '_rule4',1322 ]1323 )1324 expected_map = {1325 '_rule1': {1326 'pool_id': self.org_pool_id,1327 'employee_id': self.employee['id']1328 },1329 '_rule2': {1330 'pool_id': self.org_pool_id,1331 'employee_id': self.employee['id']1332 },1333 '_rule3': {1334 'pool_id': self.org_pool_id,1335 'employee_id': self.employee['id']1336 },1337 '_rule4': {1338 'pool_id': self.org_pool_id,1339 'employee_id': self.employee['id']1340 }1341 }1342 self._verify_assignments(resources, expected_map)1343 @patch(AUTHORIZE_ACTION_METHOD)1344 @patch(RULE_APPLY_CONDITION_CLOUD_IS)1345 def test_apply_rules_disabled_rules(self, p_rule_apply, p_authorize):1346 """1347 Test checks creation resources in case disabled rules1348 Steps:1349 - 1. Create 2 rules rule1 and rule2.1350 - 2. Create resource matched both rules1351 - 3. Verify rule1 is applied1352 - 4. Disable rule11353 - 5. Create resource matched both rules1354 - 3. Verify rule2 is applied1355 - 4. Disable rule21356 - 5. Create resource matched both rules1357 - 3. Verify no rules are applied1358 """1359 p_authorize.return_value = True1360 p_rule_apply.return_value = False1361 conditions = [1362 {"type": "name_starts_with", "meta_info": "rule"},1363 ]1364 rule1 = self._create_rule('rule1', conditions=conditions,1365 pool_id=self.sub_pools_ids[0],1366 owner_id=self.employee2['id'])1367 rule2 = self._create_rule('rule2', conditions=conditions,1368 pool_id=self.sub_pools_ids[1],1369 owner_id=self.employee3['id'])1370 resources = self._create_resources(1371 [1372 'rule2'1373 ]1374 )1375 expected_map = {1376 'rule2': {1377 'pool_id': self.sub_pools_ids[1],1378 'employee_id': self.employee3['id']1379 }1380 }1381 self._verify_assignments(resources, expected_map)1382 code, resp = self.client.rule_update(rule2['id'], {'active': False})1383 self.assertEqual(code, 200)1384 resources = self._create_resources(1385 [1386 'rule1'1387 ]1388 )1389 expected_map = {1390 'rule1': {1391 'pool_id': self.sub_pools_ids[0],1392 'employee_id': self.employee2['id']1393 }1394 }1395 self._verify_assignments(resources, expected_map)1396 code, resp = self.client.rule_update(rule1['id'], {'active': False})1397 self.assertEqual(code, 200)1398 resources = self._create_resources(1399 [1400 'rule3'1401 ]1402 )1403 expected_map = {1404 'rule3': {1405 'pool_id': self.org_pool_id,1406 'employee_id': self.employee['id']1407 }1408 }1409 self._verify_assignments(resources, expected_map)1410 @patch(AUTHORIZE_ACTION_METHOD)1411 def test_apply_rules_complex(self, p_authorize):1412 """1413 Test checks creation resources in case complex rules1414 Steps:1415 - 1. Create multiple rules with multiple condition types.1416 - 2. Create resource one resource match only one rule.1417 - 3. Verify assignments.1418 """1419 p_authorize.return_value = True1420 conditions = [1421 {"type": "name_starts_with", "meta_info": "qa"},1422 {"type": "name_ends_with", "meta_info": "prod"},1423 {"type": "name_contains", "meta_info": "resource"},1424 ]1425 self._create_rule('full_pair_rule1', conditions=conditions,1426 pool_id=self.sub_pools_ids[0],1427 owner_id=self.employee2['id'])1428 conditions = [1429 {"type": "name_is", "meta_info": "special_name"},1430 ]1431 self._create_rule('full_pair_rule2', conditions=conditions,1432 pool_id=self.sub_pools_ids[1],)1433 resources = self._create_resources(1434 [1435 'qa_resource_prod',1436 'qa_resource1_prod',1437 'special_name',1438 ]1439 )1440 expected_map = {1441 'qa_resource_prod': {1442 'pool_id': self.sub_pools_ids[0],1443 'employee_id': self.employee2['id']1444 },1445 'qa_resource1_prod': {1446 'pool_id': self.sub_pools_ids[0],1447 'employee_id': self.employee2['id']1448 },1449 'special_name': {1450 'pool_id': self.sub_pools_ids[1],1451 'employee_id': self.employee['id']1452 },1453 }1454 self._verify_assignments(resources, expected_map)1455 @patch(AUTHORIZE_ACTION_METHOD)1456 def test_apply_rules_tag_condition(self, p_authorize):1457 """1458 Test checks creation resources in case of tag condition in rule1459 Steps:1460 - 1. Create rule with tag_is condition rule11461 - 2. Create resources with tags matched rule11462 - 3. Verify assignment.1463 """1464 p_authorize.return_value = True1465 conditions = [1466 {1467 "type": "tag_is",1468 "meta_info": "{\"key\":\"key\",\"value\":\"value\"}"1469 }1470 ]1471 self._create_rule('rule1', conditions=conditions)1472 valid_body = {'resources': []}1473 valid_body['resources'].append(1474 {1475 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1476 'name': 'resource',1477 'resource_type': 'VM',1478 'tags': {1479 "key": "value",1480 "key1": "value1",1481 "key2": "value2",1482 }1483 }1484 )1485 valid_body['resources'].append(1486 {1487 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1488 'name': 'resource1',1489 'resource_type': 'VM',1490 'tags': {1491 "key1": "value1",1492 "key": "value",1493 "key2": "value2",1494 }1495 }1496 )1497 valid_body['resources'].append(1498 {1499 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1500 'name': 'resource2',1501 'resource_type': 'VM',1502 'tags': {1503 "key1": "value1",1504 "key2": "value2",1505 "key": "value"1506 }1507 }1508 )1509 code, result = self.cloud_resource_create_bulk(1510 self.cloud_acc_id, valid_body, return_resources=True)1511 self.assertEqual(code, 200)1512 expected_map = {1513 'resource': {1514 'pool_id': self.org_pool_id,1515 'employee_id': self.employee['id']1516 },1517 'resource1': {1518 'pool_id': self.org_pool_id,1519 'employee_id': self.employee['id']1520 },1521 'resource2': {1522 'pool_id': self.org_pool_id,1523 'employee_id': self.employee['id']1524 },1525 }1526 self._verify_assignments(result['resources'], expected_map)1527 @patch(AUTHORIZE_ACTION_METHOD)1528 def test_apply_rules_tag_exists_condition(self, p_authorize):1529 """1530 Test checks creation resources in case of tag exists condition in rule1531 Steps:1532 - 1. Create rule with tag_exists condition rule11533 - 2. Create resources with tags matched rule11534 - 3. Verify assignment.1535 """1536 p_authorize.return_value = True1537 conditions = [1538 {"type": "tag_exists", "meta_info": "key"}1539 ]1540 self._create_rule('rule1', conditions=conditions)1541 valid_body = {'resources': []}1542 valid_body['resources'].append(1543 {1544 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1545 'name': 'resource',1546 'resource_type': 'VM',1547 'tags': {1548 "key": "value",1549 "key1": "value1",1550 "key2": "value2",1551 }1552 }1553 )1554 valid_body['resources'].append(1555 {1556 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1557 'name': 'resource1',1558 'resource_type': 'VM',1559 'tags': {1560 "key1": "value1",1561 "key": "value",1562 "key2": "value2",1563 }1564 }1565 )1566 valid_body['resources'].append(1567 {1568 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1569 'name': 'resource2',1570 'resource_type': 'VM',1571 'tags': {1572 "key1": "value1",1573 "key2": "value2",1574 "key": "value"1575 }1576 }1577 )1578 valid_body['resources'].append(1579 {1580 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1581 'name': 'resource3',1582 'resource_type': 'VM',1583 'tags': {1584 "key1": "value1",1585 "key2": "value2",1586 "key": "value3"1587 }1588 }1589 )1590 code, result = self.cloud_resource_create_bulk(1591 self.cloud_acc_id, valid_body, return_resources=True)1592 self.assertEqual(code, 200)1593 expected_map = {1594 'resource': {1595 'pool_id': self.org_pool_id,1596 'employee_id': self.employee['id']1597 },1598 'resource1': {1599 'pool_id': self.org_pool_id,1600 'employee_id': self.employee['id']1601 },1602 'resource2': {1603 'pool_id': self.org_pool_id,1604 'employee_id': self.employee['id']1605 },1606 'resource3': {1607 'pool_id': self.org_pool_id,1608 'employee_id': self.employee['id']1609 },1610 }1611 self._verify_assignments(result['resources'], expected_map)1612 @patch(AUTHORIZE_ACTION_METHOD)1613 def test_apply_rules_environment_resources(self, p_authorize):1614 p_authorize.return_value = True1615 name = 'some_name'1616 conditions = [1617 {"type": "name_is", "meta_info": name}1618 ]1619 self._create_rule('rule1', conditions=conditions)1620 code, res = self.environment_resource_create(1621 self.org_id, {'name': name, 'resource_type': 'type'})1622 self.assertEqual(code, 201)1623 expected_map = {1624 name: {1625 'pool_id': self.org_pool_id,1626 'employee_id': self.employee['id']1627 }1628 }1629 self._verify_assignments([res], expected_map)1630 @patch(AUTHORIZE_ACTION_METHOD)1631 def test_apply_rules_cluster(self, p_authorize):1632 p_authorize.return_value = True1633 conditions = [1634 {"type": "tag_exists", "meta_info": "key"}1635 ]1636 self._create_rule('rule1', conditions=conditions)1637 valid_body = {'resources': []}1638 valid_body['resources'].extend([1639 {1640 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1641 'name': 'resource',1642 'resource_type': 'VM',1643 'tags': {1644 "key": "value",1645 }1646 },1647 {1648 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1649 'name': 'resource1',1650 'resource_type': 'VM',1651 'tags': {1652 "key": "value",1653 }1654 }])1655 code, ct = self.client.cluster_type_create(1656 self.org_id, {'name': 'some', 'tag_key': 'key'})1657 self.assertEqual(code, 201)1658 code, result = self.cloud_resource_create_bulk(1659 self.cloud_acc_id, valid_body, return_resources=True,1660 behavior='update_existing')1661 self.assertEqual(code, 200)1662 expected_map = {1663 'resource': {1664 'pool_id': self.org_pool_id,1665 'employee_id': self.employee['id']1666 },1667 'resource1': {1668 'pool_id': self.org_pool_id,1669 'employee_id': self.employee['id']1670 }1671 }1672 cluster = next(self.resources_collection.find(1673 {'cluster_type_id': ct['id']}))1674 self._verify_assignments(result['resources'], expected_map)1675 self.assertEqual(cluster.get('pool_id'), self.org_pool_id)1676 @patch(AUTHORIZE_ACTION_METHOD)1677 def test_apply_rules_tag_value_starts_condition(self, p_authorize):1678 """1679 Test checks creation resources in case of tag value starts with1680 condition in rule1681 Steps:1682 - 1. Create rule with tag_value_starts_with condition rule11683 - 2. Create resources with tags matched rule11684 - 3. Verify assignment.1685 """1686 p_authorize.return_value = True1687 conditions = [1688 {1689 "type": "tag_value_starts_with",1690 "meta_info": "{\"key\":\"key\",\"value\":\"value\"}"1691 }1692 ]1693 self._create_rule('rule1', conditions=conditions)1694 valid_body = {'resources': []}1695 valid_body['resources'].append(1696 {1697 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1698 'name': 'resource',1699 'resource_type': 'VM',1700 'tags': {1701 "key": "value",1702 "key1": "value1",1703 "key2": "value2",1704 }1705 }1706 )1707 valid_body['resources'].append(1708 {1709 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1710 'name': 'resource1',1711 'resource_type': 'VM',1712 'tags': {1713 "key1": "value1",1714 "key": "value value",1715 "key2": "value2",1716 }1717 }1718 )1719 valid_body['resources'].append(1720 {1721 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1722 'name': 'resource2',1723 'resource_type': 'VM',1724 'tags': {1725 "key1": "value1",1726 "key2": "value2",1727 "key": "value vbivalue"1728 }1729 }1730 )1731 valid_body['resources'].append(1732 {1733 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1734 'name': 'resource3',1735 'resource_type': 'VM',1736 'tags': {1737 "key": "valueue",1738 "key1": "value1",1739 "key2": "value2",1740 }1741 }1742 )1743 code, result = self.cloud_resource_create_bulk(1744 self.cloud_acc_id, valid_body, return_resources=True)1745 self.assertEqual(code, 200)1746 expected_map = {1747 'resource': {1748 'pool_id': self.org_pool_id,1749 'employee_id': self.employee['id']1750 },1751 'resource1': {1752 'pool_id': self.org_pool_id,1753 'employee_id': self.employee['id']1754 },1755 'resource2': {1756 'pool_id': self.org_pool_id,1757 'employee_id': self.employee['id']1758 },1759 'resource3': {1760 'pool_id': self.org_pool_id,1761 'employee_id': self.employee['id']1762 },1763 }1764 self._verify_assignments(result['resources'], expected_map)1765 @patch(AUTHORIZE_ACTION_METHOD)1766 @patch(RULE_APPLY_CONDITION_CLOUD_IS)1767 def test_apply_rules_resource_type(self, p_rule_apply, p_authorize):1768 """1769 Test checks creation resources in case of res type condition in rule1770 Steps:1771 - 1. Create rule with resource_type_is condition rule11772 - 2. Create resource with type matched rule1 and two more resource1773 which are not1774 - 3. Verify assignment.1775 """1776 p_authorize.return_value = True1777 p_rule_apply.return_value = False1778 conditions = [1779 {"type": "resource_type_is", "meta_info": "VM"}1780 ]1781 self._create_rule('rule1', conditions=conditions)1782 valid_body = {'resources': []}1783 valid_body['resources'].append(1784 {1785 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1786 'name': 'resource',1787 'resource_type': 'VM'1788 }1789 )1790 valid_body['resources'].append(1791 {1792 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1793 'name': 'resource1',1794 'resource_type': 'VM1'1795 }1796 )1797 valid_body['resources'].append(1798 {1799 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1800 'name': 'resource2',1801 'resource_type': 'vm'1802 }1803 )1804 code, result = self.cloud_resource_create_bulk(1805 self.cloud_acc_id, valid_body, return_resources=True)1806 self.assertEqual(code, 200)1807 expected_map = {1808 'resource': {1809 'pool_id': self.org_pool_id,1810 'employee_id': self.employee['id']1811 },1812 'resource1': {1813 'pool_id': self.org_pool_id,1814 'employee_id': self.employee['id']1815 },1816 'resource2': {1817 'pool_id': self.org_pool_id,1818 'employee_id': self.employee['id']1819 },1820 }1821 self._verify_assignments(result['resources'], expected_map)1822 @patch(AUTHORIZE_ACTION_METHOD)1823 @patch(RULE_APPLY_CONDITION_CLOUD_IS)1824 def test_apply_rules_region(self, p_rule_apply, p_authorize):1825 """1826 Test checks creation resources in case of res region condition in rule1827 Steps:1828 - 1. Create rule with region_is condition rule11829 - 2. Create resource with type matched rule1 and two more resource1830 which are not1831 - 3. Verify assignment.1832 """1833 p_authorize.return_value = True1834 p_rule_apply.return_value = False1835 conditions = [1836 {"type": "region_is", "meta_info": "test_region"}1837 ]1838 self._create_rule('rule1', conditions=conditions,1839 pool_id=self.sub_pools_ids[0],1840 owner_id=self.employee2['id'])1841 valid_body = {'resources': []}1842 valid_body['resources'].append(1843 {1844 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1845 'name': 'resource',1846 'region': 'test_region1',1847 'resource_type': 'VM'1848 }1849 )1850 # matched resource1851 valid_body['resources'].append(1852 {1853 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1854 'name': 'resource1',1855 'region': 'test_region',1856 'resource_type': 'VM1'1857 }1858 )1859 valid_body['resources'].append(1860 {1861 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1862 'name': 'resource2',1863 'region': 'test_region3',1864 'resource_type': 'vm'1865 }1866 )1867 code, result = self.cloud_resource_create_bulk(1868 self.cloud_acc_id, valid_body, return_resources=True)1869 self.assertEqual(code, 200)1870 expected_map = {1871 'resource': {1872 'pool_id': self.org_pool_id,1873 'employee_id': self.employee['id']1874 },1875 'resource1': {1876 'pool_id': self.sub_pools_ids[0],1877 'employee_id': self.employee2['id']1878 },1879 'resource2': {1880 'pool_id': self.org_pool_id,1881 'employee_id': self.employee['id']1882 },1883 }1884 self._verify_assignments(result['resources'], expected_map)1885 @patch(AUTHORIZE_ACTION_METHOD)1886 def test_apply_rules_cloud_is(self, p_authorize):1887 """1888 Test checks creation resources in case of cloud condition in rule1889 Steps:1890 - 1. Create rule with cloud_is condition rule11891 - 2. Create resources for right cloud1892 - 3. Verify assignment.1893 - 4. Create resources for wrong cloud1894 - 5. Verify assignment.1895 - 6. Verify that get rules returns related cloud acc entities1896 """1897 p_authorize.return_value = True1898 auth_user_id_1 = self.gen_id()1899 _, employee1 = self.client.employee_create(1900 self.org_id, {'name': 'name1', 'auth_user_id': auth_user_id_1})1901 conditions = [1902 {"type": "cloud_is", "meta_info": self.cloud_acc_id}1903 ]1904 self._create_rule('rule1', conditions=conditions,1905 pool_id=self.sub_pools_ids[0],1906 owner_id=employee1['id'])1907 valid_body = {'resources': []}1908 valid_body['resources'].append(1909 {1910 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1911 'name': 'resource',1912 'resource_type': 'VM'1913 }1914 )1915 valid_body['resources'].append(1916 {1917 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1918 'name': 'resource1',1919 'resource_type': 'VM1'1920 }1921 )1922 valid_body['resources'].append(1923 {1924 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1925 'name': 'resource2',1926 'resource_type': 'vm'1927 }1928 )1929 code, result = self.cloud_resource_create_bulk(1930 self.cloud_acc_id, valid_body, return_resources=True)1931 self.assertEqual(code, 200)1932 expected_map = {1933 'resource': {1934 'pool_id': self.sub_pools_ids[0],1935 'employee_id': employee1['id']1936 },1937 'resource1': {1938 'pool_id': self.sub_pools_ids[0],1939 'employee_id': employee1['id']1940 },1941 'resource2': {1942 'pool_id': self.sub_pools_ids[0],1943 'employee_id': employee1['id']1944 },1945 }1946 self._verify_assignments(result['resources'], expected_map)1947 aws_cloud_acc1 = {1948 'name': 'my cloud_acc1',1949 'type': 'aws_cnr',1950 'config': {1951 'access_key_id': 'key',1952 'secret_access_key': 'secret',1953 'config_scheme': 'create_report'1954 }1955 }1956 _, cloud_acc1 = self.create_cloud_account(1957 self.org_id, aws_cloud_acc1, auth_user_id=auth_user_id_1)1958 cloud_acc_id = cloud_acc1['id']1959 valid_body = {'resources': []}1960 valid_body['resources'].append(1961 {1962 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1963 'name': 'resource',1964 'resource_type': 'VM'1965 }1966 )1967 valid_body['resources'].append(1968 {1969 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1970 'name': 'resource1',1971 'resource_type': 'VM1'1972 }1973 )1974 valid_body['resources'].append(1975 {1976 'cloud_resource_id': 'res_id_%s' % self.gen_id(),1977 'name': 'resource2',1978 'resource_type': 'vm'1979 }1980 )1981 code, result = self.cloud_resource_create_bulk(1982 cloud_acc_id, valid_body, return_resources=True)1983 self.assertEqual(code, 200)1984 expected_map = {1985 'resource': {1986 'pool_id': self.org_pool_id,1987 'employee_id': self.employee['id']1988 },1989 'resource1': {1990 'pool_id': self.org_pool_id,1991 'employee_id': self.employee['id']1992 },1993 'resource2': {1994 'pool_id': self.org_pool_id,1995 'employee_id': self.employee['id']1996 },1997 }1998 self._verify_assignments(result['resources'], expected_map)1999 code, rules = self.client.rules_list(2000 organization_id=self.org_id)2001 self.assertEqual(code, 200)2002 self.assertIn(self.cloud_acc_id, rules['entities'])2003 @patch(AUTHORIZE_ACTION_METHOD)2004 def test_apply_rules_with_deleted_condition(self, p_authorize):2005 """2006 Test checks that deleted conditions don't apply on filtering.2007 Steps:2008 - 1. Create rule with one condition.2009 - 2. Modify rule. Add one new condition, remove existing one.2010 - 3. Create resource matches to the new condition,2011 but doesn't match to the deleted one.2012 - 3. Verify assignments.2013 """2014 p_authorize.return_value = True2015 conditions = [2016 {"type": "name_starts_with", "meta_info": "qa"}2017 ]2018 rule = self._create_rule('full_pair_rule1', conditions=conditions)2019 conditions = [2020 {"type": "name_starts_with", "meta_info": "prod"}2021 ]2022 code, patched_response = self.client.rule_update(2023 rule['id'],2024 {'conditions': conditions}2025 )2026 resources = self._create_resources(2027 [2028 'prod_resource_prod',2029 ]2030 )2031 expected_map = {2032 'prod_resource_prod': {2033 'pool_id': self.org_pool_id,2034 'employee_id': self.employee['id']2035 }2036 }2037 self._verify_assignments(resources, expected_map)2038 @patch(AUTHORIZE_ACTION_METHOD)2039 def test_rule_prioritize(self, p_authorize):2040 p_authorize.return_value = True2041 for rule_name in ['rule1', 'rule2', 'rule3', 'rule4']:2042 conditions = [{"type": "name_starts_with", "meta_info": rule_name}]2043 self._create_rule(rule_name, conditions=conditions)2044 code, rules = self.client.rules_list(self.org_id)2045 self.assertEqual(code, 200)2046 initial_rule_order = [r['id'] for r in rules['rules']]2047 # prioritize third rule2048 expected_new_order = [initial_rule_order[2], initial_rule_order[0],2049 initial_rule_order[1], initial_rule_order[3],2050 initial_rule_order[4]]2051 code, response = self.client.rule_prioritize(initial_rule_order[2])2052 self.assertEqual(code, 200)2053 new_order = [r['id'] for r in response['rules']]2054 self.assertEqual(expected_new_order, new_order)2055 code, list_response = self.client.rules_list(self.org_id)2056 self.assertEqual(code, 200)2057 self.assertEqual(list_response, response)2058 @patch(AUTHORIZE_ACTION_METHOD)2059 def test_rule_prioritize_first(self, p_authorize):2060 p_authorize.return_value = True2061 for rule_name in ['rule1', 'rule2', 'rule3', 'rule4']:2062 conditions = [{"type": "name_starts_with", "meta_info": rule_name}]2063 self._create_rule(rule_name, conditions=conditions)2064 code, rules = self.client.rules_list(self.org_id)2065 self.assertEqual(code, 200)2066 initial_rule_order = [r['id'] for r in rules['rules']]2067 code, response = self.client.rule_prioritize(initial_rule_order[0])2068 self.assertEqual(code, 200)2069 new_order = [r['id'] for r in response['rules']]2070 self.assertEqual(initial_rule_order, new_order)2071 @patch(AUTHORIZE_ACTION_METHOD)2072 def test_rule_promote(self, p_authorize):2073 p_authorize.return_value = True2074 for rule_name in ['rule1', 'rule2', 'rule3', 'rule4']:2075 conditions = [{"type": "name_starts_with", "meta_info": rule_name}]2076 self._create_rule(rule_name, conditions=conditions)2077 code, rules = self.client.rules_list(self.org_id)2078 self.assertEqual(code, 200)2079 initial_rule_order = [r['id'] for r in rules['rules']]2080 # promote fourth rule2081 expected_new_order = [initial_rule_order[0], initial_rule_order[1],2082 initial_rule_order[3], initial_rule_order[2], initial_rule_order[4]]2083 code, resp = self.client.rule_promote(initial_rule_order[0])2084 self.assertEqual(code, 200)2085 code, response = self.client.rule_promote(initial_rule_order[3])2086 self.assertEqual(code, 200)2087 new_order = [r['id'] for r in response['rules']]2088 self.assertEqual(expected_new_order, new_order)2089 @patch(AUTHORIZE_ACTION_METHOD)2090 def test_rule_demote(self, p_authorize):2091 p_authorize.return_value = True2092 for rule_name in ['rule1', 'rule2', 'rule3', 'rule4']:2093 conditions = [{"type": "name_starts_with", "meta_info": rule_name}]2094 self._create_rule(rule_name, conditions=conditions)2095 code, rules = self.client.rules_list(self.org_id)2096 self.assertEqual(code, 200)2097 initial_rule_order = [r['id'] for r in rules['rules']]2098 # demote first rule2099 expected_new_order = [initial_rule_order[1], initial_rule_order[0],2100 initial_rule_order[2], initial_rule_order[4],2101 initial_rule_order[3]]2102 code, resp = self.client.rule_demote(initial_rule_order[3])2103 self.assertEqual(code, 200)2104 code, response = self.client.rule_demote(initial_rule_order[0])2105 self.assertEqual(code, 200)2106 new_order = [r['id'] for r in response['rules']]2107 self.assertEqual(expected_new_order, new_order)2108 @patch(AUTHORIZE_ACTION_METHOD)2109 def test_rule_deprioritize(self, p_authorize):2110 p_authorize.return_value = True2111 for rule_name in ['rule1', 'rule2', 'rule3', 'rule4']:2112 conditions = [{"type": "name_starts_with", "meta_info": rule_name}]2113 self._create_rule(rule_name, conditions=conditions)2114 code, rules = self.client.rules_list(self.org_id)2115 self.assertEqual(code, 200)2116 initial_rule_order = [r['id'] for r in rules['rules']]2117 # deprioritize second rule2118 expected_new_order = [initial_rule_order[0], initial_rule_order[2],2119 initial_rule_order[3], initial_rule_order[4],2120 initial_rule_order[1]]2121 code, response = self.client.rule_deprioritize(initial_rule_order[1])2122 self.assertEqual(code, 200)2123 new_order = [r['id'] for r in response['rules']]2124 self.assertEqual(expected_new_order, new_order)2125 @patch(AUTHORIZE_ACTION_METHOD)2126 @patch(RULE_APPLY_CONDITION_CLOUD_IS)2127 def test_invalid_rule_deactivated(self, p_rule_apply, p_authorize):2128 p_authorize.return_value = True2129 p_rule_apply.return_value = False2130 m_activities_publish = patch(2131 'rest_api_server.controllers.base.'2132 'BaseController.publish_activities_task').start()2133 conditions = [2134 {"type": "name_starts_with", "meta_info": "prod"}2135 ]2136 rule = self._create_rule('rule1', conditions=conditions,2137 pool_id=self.sub_pools_ids[2],2138 owner_id=self.employee3['id'],2139 set_allowed=False)2140 resources = self._create_resources(['prod_resource_prod'])2141 code, rule = self.client.rule_get(rule['id'])2142 self.assertEqual(code, 200)2143 self.assertFalse(rule['active'])2144 m_activities_publish.assert_has_calls([2145 call(rule['organization_id'], rule['id'], 'rule',2146 'rule_deactivated', ANY, 'rule.rule_deactivated',2147 add_token=True)])2148class TestRulesApplyApi(TestRulesApiBase):2149 def setUp(self, version='v2'):2150 super().setUp()2151 self._mock_auth_user(self.user_id)2152 patch('rest_api_server.handlers.v1.base.BaseAuthHandler._get_user_info',2153 return_value=self.user).start()2154 patch(AUTHORIZE_ACTION_METHOD, return_value=True).start()2155 patch('rest_api_server.controllers.cloud_account.'2156 'CloudAccountController._configure_report').start()2157 aws_cloud_acc = {2158 'name': 'my cloud_acc',2159 'type': 'aws_cnr',2160 'config': {2161 'access_key_id': 'key',2162 'secret_access_key': 'secret',2163 'config_scheme': 'create_report'2164 }2165 }2166 _, self.cloud_acc = self.create_cloud_account(2167 self.org_id, aws_cloud_acc, auth_user_id=self.user_id)2168 _, cloud_acc_rule = self.client.rules_list(self.org_id)2169 self.cloud_acc_rule = cloud_acc_rule['rules'][0]2170 self.cloud_acc_id = self.cloud_acc['id']2171 def _create_resources(self, names, pool_id=None, employee_id=None,2172 tags=None):2173 valid_body = {'resources': []}2174 for name in names:2175 resource_dict = {2176 'cloud_resource_id': 'res_id_%s' % self.gen_id(),2177 'name': name,2178 'resource_type': 'VM',2179 }2180 if not pool_id:2181 pool_id = self.org_pool_id2182 if not employee_id:2183 employee_id = self.employee['id']2184 resource_dict['pool_id'] = pool_id2185 resource_dict['employee_id'] = employee_id2186 if tags:2187 resource_dict['tags'] = tags2188 valid_body['resources'].append(resource_dict)2189 code, result = self.cloud_resource_create_bulk(2190 self.cloud_acc_id, valid_body, return_resources=True)2191 self.assertEqual(code, 200)2192 return result['resources']2193 def test_rules_apply_api_invalid_params(self):2194 code, resp = self.client.rules_apply(self.org_id, pool_id=1)2195 self.assertEqual(code, 400)2196 self.verify_error_code(resp, 'OE0214')2197 code, resp = self.client.rules_apply(self.org_id, pool_id=' ')2198 self.assertEqual(code, 400)2199 self.verify_error_code(resp, 'OE0416')2200 code, resp = self.client.rules_apply(self.org_id, pool_id='')2201 self.assertEqual(code, 400)2202 self.verify_error_code(resp, 'OE0215')2203 code, resp = self.client.rules_apply(self.org_id, pool_id='a' * 500)2204 self.assertEqual(code, 400)2205 self.verify_error_code(resp, 'OE0215')2206 code, resp = self.client.rules_apply(2207 self.org_id, pool_id=self.org_pool_id, include_children='a')2208 self.assertEqual(code, 400)2209 self.verify_error_code(resp, 'OE0226')2210 body_unexpected = {'pool_id': self.org_pool_id, 'aa': 'bb'}2211 code, resp = self.client.post(self.client.rules_apply_url(self.org_id),2212 body_unexpected)2213 self.assertEqual(code, 400)2214 self.verify_error_code(resp, 'OE0212')2215 def verify_assignment(self, resource_id, pool_id, employee_id):2216 code, resource = self.client.cloud_resource_get(resource_id)2217 self.assertEqual(code, 200)2218 self.assertEqual(resource['pool_id'], pool_id)2219 self.assertEqual(resource['employee_id'], employee_id)2220 def test_rules_apply_api_include_children(self):2221 sub_pool_id = self.sub_pools[0]['id']2222 code, sub_sub_pool = self.client.pool_create(2223 self.org_id,2224 {'name': "sub_sub_pool",2225 'parent_id': sub_pool_id})2226 self.assertEqual(code, 201)2227 sub_sub_pool_id = sub_sub_pool['id']2228 # root pool2229 resources2 = self._create_resources(2230 ['my_2', 'm3'], pool_id=self.org_pool_id,2231 employee_id=self.employee['id'])2232 # sub pool2233 resources3 = self._create_resources(2234 ['my_3', 'm4'], pool_id=sub_pool_id,2235 employee_id=self.employee['id'])2236 # sub sub pool2237 resources4 = self._create_resources(2238 ['my_4', 'm5'], pool_id=sub_sub_pool_id,2239 employee_id=self.employee['id'])2240 conditions = [2241 {"type": "name_starts_with", "meta_info": "my_"}2242 ]2243 self._create_rule('rule1', conditions=conditions, set_allowed=True)2244 # only processing resources from root2245 code, resp = self.client.rules_apply(self.org_id,2246 pool_id=self.org_pool_id)2247 self.assertEqual(code, 201)2248 self.assertEqual(resp['processed'], 2)2249 self.assertEqual(resp['updated_assignments'], 1)2250 self.verify_assignment(resources2[0]['id'], self.org_pool_id,2251 self.employee['id'])2252 self.verify_assignment(2253 resources2[1]['id'], self.cloud_acc_rule['pool_id'],2254 self.employee['id'])2255 # processing resources from sub and sub sub2256 code, resp = self.client.rules_apply(2257 self.org_id, pool_id=sub_pool_id, include_children=True)2258 self.assertEqual(code, 201)2259 self.assertEqual(resp['processed'], 4)2260 self.assertEqual(resp['updated_assignments'], 4)2261 self.verify_assignment(resources3[0]['id'], self.org_pool_id,2262 self.employee['id'])2263 self.verify_assignment(2264 resources3[1]['id'], self.cloud_acc_rule['pool_id'],2265 self.employee['id'])2266 self.verify_assignment(resources4[0]['id'], self.org_pool_id,2267 self.employee['id'])2268 self.verify_assignment(2269 resources4[1]['id'], self.cloud_acc_rule['pool_id'],2270 self.employee['id'])2271 def test_reapply_deleted_rule(self):2272 self._create_resources(['my_1'])2273 rule = self._create_rule('rule1', conditions=[2274 {"type": "name_starts_with", "meta_info": "m"}2275 ], set_allowed=True)2276 code, _ = self.client.rules_apply(self.org_id,2277 pool_id=self.org_pool_id)2278 self.assertEqual(code, 201)2279 self.client.rule_delete(rule['id'])2280 rule2 = self._create_rule('rule2', conditions=[2281 {"type": "name_starts_with", "meta_info": "my_"}2282 ], pool_id=self.sub_pools_ids[0], set_allowed=True)2283 m_activities_publish = patch(2284 'rest_api_server.controllers.base.'2285 'BaseController.publish_activities_task').start()2286 code, _ = self.client.rules_apply(2287 self.org_id, pool_id=self.org_pool_id)2288 self.assertEqual(code, 201)2289 m_activities_publish.assert_has_calls([2290 call(self.org_id, self.org_id, 'organization',2291 'rules_processing_started', ANY,2292 'organization.rules_processing_started', add_token=True),2293 call(self.org_id, rule2['id'], 'rule',2294 'rule_applied', ANY, 'rule.rule_applied', add_token=True),2295 call(self.org_id, self.org_id, 'organization',2296 'rules_processing_completed', ANY,2297 'organization.rules_processing_completed', add_token=True),2298 ])2299 def test_rules_apply_api_clusters(self):2300 m_activities_publish = patch(2301 'rest_api_server.controllers.base.'2302 'BaseController.publish_activities_task').start()2303 code, c_type = self.client.cluster_type_create(2304 self.org_id, {'name': 'c_type', 'tag_key': 'tn'})2305 self.assertEqual(code, 201)2306 resources = self._create_resources(['my_1', 'm2'], tags={'tn': 'tv'})2307 conditions = [2308 {"type": "tag_exists", "meta_info": "tn"}2309 ]2310 rule = self._create_rule('rule1', conditions=conditions,2311 set_allowed=True)2312 code, resp = self.client.rules_apply(self.org_id, self.org_pool_id)2313 m_activities_publish.assert_has_calls([2314 call(self.org_id, self.org_id, 'organization',2315 'rules_processing_started', ANY,2316 'organization.rules_processing_started', add_token=True),2317 call(self.org_id, rule['id'], 'rule',2318 'rule_applied', ANY, 'rule.rule_applied', add_token=True),2319 call(self.org_id, self.org_id, 'organization',2320 'rules_processing_completed', ANY,2321 'organization.rules_processing_completed', add_token=True),2322 ])2323 self.assertEqual(code, 201)2324 self.assertEqual(resp['processed'], 1)...
fuzzy_object.py
Source:fuzzy_object.py
...159 self.sim = None160 return161 if fuzzy_type == FuzzyType.BOTH:162 rules = []163 rules.append( self._create_rule("N", "N", "N") )164 rules.append( self._create_rule("N", "Z", "N") )165 rules.append( self._create_rule("N", "P", "Z") )166 rules.append( self._create_rule("Z", "N", "N") )167 rules.append( self._create_rule("Z", "Z", "Z") )168 rules.append( self._create_rule("Z", "P", "P") )169 rules.append( self._create_rule("P", "N", "Z") )170 rules.append( self._create_rule("P", "Z", "P") )171 rules.append( self._create_rule("P", "P", "P") )172# rules.append( self._create_rule("NB", "NB", "NB") )173# rules.append( self._create_rule("NB", "NS", "NB") )174# rules.append( self._create_rule("NB", "Z", "NS") )175# rules.append( self._create_rule("NB", "PS", "NS") )176# rules.append( self._create_rule("NB", "PB", "Z") )177#178# rules.append( self._create_rule("NS", "NB", "NB") )179# rules.append( self._create_rule("NS", "NS", "NS") )180# rules.append( self._create_rule("NS", "Z", "NS") )181# rules.append( self._create_rule("NS", "PS", "Z") )182# rules.append( self._create_rule("NS", "PB", "PS") )183#184# rules.append( self._create_rule("Z", "NB", "NS") )185# rules.append( self._create_rule("Z", "NS", "NS") )186# rules.append( self._create_rule("Z", "Z", "Z") )187# rules.append( self._create_rule("Z", "PS", "PS") )188# rules.append( self._create_rule("Z", "PB", "PS") )189#190# rules.append( self._create_rule("PS", "NB", "NS") )191# rules.append( self._create_rule("PS", "NS", "Z") )192# rules.append( self._create_rule("PS", "Z", "PS") )193# rules.append( self._create_rule("PS", "PS", "PS") )194# rules.append( self._create_rule("PS", "PB", "PB") )195#196# rules.append( self._create_rule("PB", "NB", "Z") )197# rules.append( self._create_rule("PB", "NS", "PS") )198# rules.append( self._create_rule("PB", "Z", "PS") )199# rules.append( self._create_rule("PB", "PS", "PB") )200# rules.append( self._create_rule("PB", "PB", "PB") )201 self.output_ctrl = ctrl.ControlSystem( rules )202 self.sim = ctrl.ControlSystemSimulation(self.output_ctrl)203# self._plots.request_plot( self.output_ctrl, "rules" )204 return205 if fuzzy_type == FuzzyType.ERR:206 rules = []207 rules.append( ctrl.Rule(self.ant_err["N"], self.con_output["N"]) )208 rules.append( ctrl.Rule(self.ant_err["Z"], self.con_output["Z"]) )209 rules.append( ctrl.Rule(self.ant_err["P"], self.con_output["P"]) )210# rules.append( ctrl.Rule(self.ant_err["NB"], self.con_output["NB"]) )211# rules.append( ctrl.Rule(self.ant_err["NS"], self.con_output["NS"]) )212# rules.append( ctrl.Rule(self.ant_err[ "Z"], self.con_output[ "Z"]) )213# rules.append( ctrl.Rule(self.ant_err["PS"], self.con_output["PS"]) )214# rules.append( ctrl.Rule(self.ant_err["PB"], self.con_output["PB"]) )215 self.output_ctrl = ctrl.ControlSystem( rules )216 self.sim = ctrl.ControlSystemSimulation(self.output_ctrl)217# self._plots.request_plot( self.output_ctrl, "rules" )218 return219 if fuzzy_type == FuzzyType.DERR:220 rules = []221 rules.append( ctrl.Rule(self.ant_derr["N"], self.con_output["N"]) )222 rules.append( ctrl.Rule(self.ant_derr["Z"], self.con_output["Z"]) )223 rules.append( ctrl.Rule(self.ant_derr["P"], self.con_output["P"]) )224# rules.append( ctrl.Rule(self.ant_derr["NB"], self.con_output["NB"]) )225# rules.append( ctrl.Rule(self.ant_derr["NS"], self.con_output["NS"]) )226# rules.append( ctrl.Rule(self.ant_derr[ "Z"], self.con_output[ "Z"]) )227# rules.append( ctrl.Rule(self.ant_derr["PS"], self.con_output["PS"]) )228# rules.append( ctrl.Rule(self.ant_derr["PB"], self.con_output["PB"]) )229 self.output_ctrl = ctrl.ControlSystem( rules )230 self.sim = ctrl.ControlSystemSimulation(self.output_ctrl)231# self._plots.request_plot( self.output_ctrl, "rules" )232 return233 self.sim = None234 raise ValueError('Invalid state: ' + fuzzy_type)235 def reset_state(self):236 self.error = Error()237 @synchronized238 def set_err_range(self, value):239 absval = abs(value)240 if absval < 0.1:241 self.ant_err = None242 return243 invert = False244 if value < 0.0:245 invert = True246 self.ant_err = ctrl.Antecedent(np.arange(-absval, absval, 1), 'err')247 self.ant_err.automf(names=["N", "Z", "P"], invert=invert)248 ##self.ant_err.automf(names=["NB", "NS", "Z", "PS", "PB"], invert=invert)249 self._plots.request_plot( self.ant_err, "err" )250 @synchronized251 def set_derr_range(self, value):252 absval = abs(value)253 if absval < 0.1:254 self.ant_derr = None255 return256 invert = False257 if value < 0.0:258 invert = True259 self.ant_derr = ctrl.Antecedent(np.arange(-absval, absval, 1), 'derr')260 self.ant_derr.automf(names=["N", "Z", "P"], invert=invert)261 ##self.ant_derr.automf(names=["NB", "NS", "Z", "PS", "PB"], invert=invert)262 self._plots.request_plot( self.ant_derr, "derr" )263 @synchronized264 def set_output_range(self, value):265 absval = abs(value)266 if absval < 0.1:267 self.con_output = None268 return269 invert = False270 if value < 0.0:271 invert = True272 self.con_output = ctrl.Consequent(np.arange(-absval, absval, 1), 'output')273 self.con_output.automf(names=["N", "Z", "P"], invert=invert)274 ##self.con_output.automf(names=["NB", "NS", "Z", "PS", "PB"], invert=invert)275 self._plots.request_plot( self.con_output, "output" )276 @synchronized277 def compute(self, valueInput):278 fuzzy_type = self.type()279 if fuzzy_type == FuzzyType.INVALID:280 self.error.calculate(valueInput)281 return 0282 err = self.error.calculate(valueInput)283 if fuzzy_type == FuzzyType.BOTH or fuzzy_type == FuzzyType.ERR:284 self.sim.input['err'] = err[0]285 if fuzzy_type == FuzzyType.BOTH or fuzzy_type == FuzzyType.DERR:286 self.sim.input['derr'] = err[1]287 self.sim.compute()288 self._plots.process_requests()289 output = self.sim.output['output']290 return output291 def close(self):292 self._plots.close_all()293 def _create_rule(self, err, derr, out):294 rule = ctrl.Rule(self.ant_err[err] | self.ant_derr[derr], self.con_output[out])295 return rule296class FuzzyObject():297 def __init__(self, controller_name, err_param=90, derr_param=30, output_param=20):298 self.controller_name = controller_name299 self.fuzzy = Fuzzy(err_param, derr_param, output_param)300 rospy.Subscriber("/self_balancer/" + self.controller_name + "/err_max", Float64, self._err_callback)301 rospy.Subscriber("/self_balancer/" + self.controller_name + "/derr_max", Float64, self._derr_callback)302 rospy.Subscriber("/self_balancer/" + self.controller_name + "/output_max", Float64, self._output_callback)303 self.output_pub = rospy.Publisher("/self_balancer/" + self.controller_name + "/output", Float64, queue_size=10)304 self.err_pub = rospy.Publisher("/self_balancer/" + self.controller_name + "/err", Float64, queue_size=10)305 self.derr_pub = rospy.Publisher("/self_balancer/" + self.controller_name + "/derr", Float64, queue_size=10)306 def reset_state(self):307 rospy.loginfo("resetting Fuzzy")...
test_rules.py
Source:test_rules.py
2from tracpro.test.cases import TracProTest3from .. import rules4from . import factories5class TestGetCategory(TracProTest):6 def _create_rule(self, category):7 return {8 'category': category,9 'test': {},10 }11 def test_get_category_direct(self):12 """Return rule['category'] if it is not a dictionary."""13 rule = self._create_rule('cats')14 self.assertEqual(rules.get_category(rule), "cats")15 def test_get_category_base(self):16 """get_category returns the base category name, if available."""17 rule = self._create_rule({'base': 'base', 'eng': 'eng'})18 self.assertEqual(rules.get_category(rule), "base")19 def test_get_category_other(self):20 """get_category returns the first available translation."""21 rule = self._create_rule({'eng': 'eng'})22 self.assertEqual(rules.get_category(rule), "eng")23class TestGetAllCategories(TracProTest):24 def _create_rule(self, category):25 return {26 'category': {'base': category},27 'test': {},28 }29 def test_get_all_no_rules(self):30 question = factories.Question(rules=[])31 self.assertEqual(rules.get_all_categories(question), ["Other"])32 def test_get_all(self):33 question = factories.Question(rules=[34 self._create_rule('a'),35 self._create_rule('b'),36 ])37 self.assertEqual(38 rules.get_all_categories(question),39 ['a', 'b', 'Other'])40 def test_get_all__include_other(self):41 """Ensure that "Other" is at the end of the list."""42 question = factories.Question(rules=[43 self._create_rule('a'),44 self._create_rule('Other'),45 self._create_rule('b'),46 ])47 self.assertEqual(48 rules.get_all_categories(question),49 ['a', 'b', 'Other'])50 def test_get_all_with_answers(self):51 """Include Answer categories that aren't in the rules."""52 question = factories.Question(rules=[53 self._create_rule('a'),54 self._create_rule('b'),55 ])56 factories.Answer(question=question, category='apple')57 factories.Answer(question=question, category='a')58 self.assertEqual(59 rules.get_all_categories(question, answers=question.answers.all()),60 ['a', 'b', 'apple', 'Other'])61 def test_duplicates(self):62 question = factories.Question(rules=[63 self._create_rule('b'),64 self._create_rule('a'),65 self._create_rule('a'),66 self._create_rule('b'),67 ])68 self.assertEqual(69 rules.get_all_categories(question),70 ['b', 'a', 'Other'])71class CheckRuleTestBase(object):72 def call(self, test):73 val, kwargs = test74 return self.rule(val, **kwargs)75 def test_false(self):76 unexpected_successes = []77 for test in self.false_tests:78 if self.call(test) is not False:79 unexpected_successes.append(test)80 if unexpected_successes:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!