Best Python code snippet using tempest_python
resources.py
Source:resources.py
...42 def __init__(self):43 pass44 #45 @staticmethod46 def _prepare_params(params):47 _tmp_params = {}48 for pair in additional_params:49 _tmp_params[pair[0]] = pair[1]50 return dict(_tmp_params, **params)51 # extra_params = es_additional_params52 # interfaces about domain and token53 @staticmethod54 def create_domain(param, data):55 global URL56 param = BackendRequest._prepare_params(param)57 params = dict({"act": "create_domain"}, **param)58 try:59 if data:60 p = urllib.urlencode(params)61 url = URL + "?" + p62 body = json.dumps(data)63 res = requests.post(url, data=body, timeout=10.000)64 else:65 # print "create_domain:", params66 res = requests.get(URL, params=params, timeout=3.000)67 log.info("create_domain: %s", res.url)68 except Exception, e:69 log.error("create_domain error: %s", e)70 return {'result': False}71 result = res.json()72 log.info("create_domain result: %s", result)73 return result74 @staticmethod75 def get_domain(param):76 global URL77 param = BackendRequest._prepare_params(param)78 params = dict({"act": "get_domain"}, **param)79 # print "get_domain:", params80 try:81 res = requests.get(URL, params=params, timeout=30.000)82 log.info("get_domain: %s", res.url)83 except Exception, e:84 print e85 log.error("get_domain error: %s", e)86 return {'result': False}87 result = res.json()88 log.info("get_domain result: %s", result)89 return result90 @staticmethod91 def activate_domain(param):92 global URL93 params = dict({"act": "activate_domain"}, **param)94 # print "get_domain:", params95 try:96 res = requests.get(URL, params=params, timeout=30.000)97 log.info("activate_domain: %s", res.url)98 except Exception, e:99 print e100 log.error("activate_domain error: %s", e)101 return {'result': False}102 result = res.json()103 log.info("activate_domain result: %s", result)104 return result105 @staticmethod106 def update_domain(param):107 global URL108 params = dict({"act": "update_domain"}, **param)109 try:110 res = requests.get(URL, params=params, timeout=30.000)111 log.info("update_domain: %s", res.url)112 result = res.json()113 log.info("update_domain result: %s", result)114 return result115 except Exception, e:116 print e117 log.error("update_domain error: %s", e)118 return {'result': False}119 @staticmethod120 def get_func_auth(param):121 global URL122 params = dict({"act": "get_func_auth"}, **param)123 # print "get_domain:", params124 try:125 res = requests.get(URL, params=params, timeout=30.000)126 log.info("get_func_auth: %s", res.url)127 except Exception, e:128 print e129 log.error("get_func_auth error: %s", e)130 return {'result': False}131 result = res.json()132 log.info("get_func_auth result: %s", result)133 return result134 @staticmethod135 def get_upload_bytes_stat(param):136 global URL137 param = BackendRequest._prepare_params(param)138 params = dict({"act": "get_upload_bytes_stat"}, **param)139 # print "get_domain:", params140 try:141 res = requests.get(URL, params=params, timeout=3.000)142 log.info("get_upload_bytes_stat: %s", res.url)143 except Exception, e:144 print e145 return {'result': False}146 result = res.json()147 log.info("get_upload_bytes_stat result: %s", result)148 return result149 # interfaces about account150 @staticmethod151 def login(param):152 global URL153 param = BackendRequest._prepare_params(param)154 params = dict({"act": "login"}, **param)155 # print "login:", params156 try:157 res = requests.get(URL, params=params, timeout=30.000)158 log.info("login: %s", res.url)159 except Exception, e:160 if len(es_url_list) > 1:161 count = len(es_url_list)162 rst = {}163 i = 0164 while i < count-1:165 new_url = switch_frontend(URL)166 URL = new_url167 i=i+1168 try:169 res = requests.get(URL, params=params, timeout=30.000)170 log.info("login: %s", res.url)171 rst = res172 break173 except Exception, e:174 log.info("login error: %s", e)175 if not rst:176 return {'result': False, 'error': "server error"}177 else:178 result = rst.json()179 log.info("frontend_url: %s", URL)180 log.info("login result: %s", result)181 return result182 else:183 log.info("login error: %s", e)184 return {'result': False, 'error': "server error"}185 result = res.json()186 log.info("login result: %s", result)187 return result188 @staticmethod189 def try_frontend(param):190 global URL191 param = BackendRequest._prepare_params(param)192 params = dict({"act": "login"}, **param)193 # print "login:", params194 try:195 res = requests.get(URL, params=params, timeout=30.000)196 log.info("try_frontend url: %s", res.url)197 except Exception, e:198 if len(es_url_list) > 1:199 count = len(es_url_list)200 rst = {}201 i = 0202 while i < count-1:203 new_url = switch_frontend(URL)204 URL = new_url205 i += 1206 try:207 res = requests.get(URL, params=params, timeout=30.000)208 log.info("try_frontend url: %s", res.url)209 break210 except Exception, e:211 log.info("try_frontend error: %s", e)212 else:213 log.error("try_frontend error: %s", e)214 log.info("try_frontend frontend_url is: %s", URL)215 return URL216 @staticmethod217 def send_reset_passwd_email(param, datas):218 global URL219 param = BackendRequest._prepare_params(param)220 params = dict({"act": "send_reset_passwd_email"}, **param)221 p = urllib.urlencode(params)222 url = URL + "?" + p223 body = json.dumps(datas)224 try:225 res = requests.post(url, data=body, timeout=10.000)226 log.info("send_reset_passwd_email: %s", res.url)227 except Exception, e:228 print e229 return {'result': False, 'error': "server error"}230 result = res.json()231 log.info("send_reset_passwd_email result: %s", result)232 return result233 @staticmethod234 def reset_passwd(param):235 global URL236 param = BackendRequest._prepare_params(param)237 params = dict({"act": "reset_passwd"}, **param)238 try:239 res = requests.get(URL, params=params, timeout=10.000)240 log.info("reset_passwd: %s", res.url)241 except Exception, e:242 print e243 return {'result': False, 'error': "server error"}244 result = res.json()245 log.info("reset_passwd result: %s", result)246 return result247 @staticmethod248 def get_account_list(param):249 global URL250 param = BackendRequest._prepare_params(param)251 params = dict({"act": "get_account_list"}, **param)252 # print "get_account_list:", params253 res = requests.get(URL, params=params, timeout=10.000)254 log.info("get_account_list: %s", res.url)255 result = res.json()256 log.info("get_account_list result: %s", result)257 return result258 @staticmethod259 def create_account(param):260 global URL261 param = BackendRequest._prepare_params(param)262 params = dict({"act": "create_account"}, **param)263 # print "create_account:", params264 res = requests.get(URL, params=params, timeout=3.000)265 log.info("create_account: %s", res.url)266 result = res.json()267 log.info("create_account result: %s", result)268 return result269 @staticmethod270 def update_account(param):271 global URL272 param = BackendRequest._prepare_params(param)273 params = dict({"act": "update_account"}, **param)274 # print "update_account:", params275 try:276 res = requests.get(URL, params=params, timeout=3.000)277 log.info("update_account: %s", res.url)278 except Exception, e:279 print e280 return {'result': False}281 result = res.json()282 log.info("update_account result: %s", result)283 return result284 @staticmethod285 def delete_account(param):286 global URL287 param = BackendRequest._prepare_params(param)288 params = dict({"act": "delete_account"}, **param)289 # print "delete_account:", params290 try:291 res = requests.get(URL, params=params, timeout=3.000)292 log.info("delete_account: %s", res.url)293 except Exception, e:294 print e295 return {'result': False}296 result = res.json()297 log.info("delete_account result: %s", result)298 return result299 @staticmethod300 def list_urls(param):301 global URL302 param = BackendRequest._prepare_params(param)303 params = dict({"act": "list_urls"}, **param)304 res = requests.get(URL, params=params, timeout=10.000)305 log.info("list_urls: %s", res.url)306 result = res.json()307 log.info("list_urls result: %s", result)308 return result309 @staticmethod310 def get_role_list(param):311 global URL312 param = BackendRequest._prepare_params(param)313 params = dict({"act": "list_role"}, **param)314 res = requests.get(URL, params=params, timeout=10.000)315 log.info("list_role: %s", res.url)316 result = res.json()317 log.info("list_role result: %s", result)318 return result319 @staticmethod320 def create_role(param, data):321 global URL322 param = BackendRequest._prepare_params(param)323 params = dict({"act": "create_role"}, **param)324 p = urllib.urlencode(params)325 url = URL + "?" + p326 body = json.dumps(data)327 res = requests.post(url, data=body, timeout=3.000)328 log.info("create_role: %s", res.url)329 result = res.json()330 log.info("create_role result: %s", result)331 return result332 @staticmethod333 def get_role(param):334 global URL335 param = BackendRequest._prepare_params(param)336 params = dict({"act": "get_role"}, **param)337 res = requests.get(URL, params=params, timeout=3.000)338 log.info("get_role: %s", res.url)339 result = res.json()340 log.info("get_role result: %s", result)341 return result342 @staticmethod343 def update_role(param, data):344 global URL345 param = BackendRequest._prepare_params(param)346 params = dict({"act": "update_role"}, **param)347 try:348 p = urllib.urlencode(params)349 url = URL + "?" + p350 body = json.dumps(data)351 res = requests.post(url, data=body, timeout=3.000)352 log.info("update_role: %s", res.url)353 except Exception, e:354 log.error("update_role error: %s", e)355 return {'result': False}356 result = res.json()357 log.info("update_role result: %s", result)358 return result359 @staticmethod360 def delete_role(param):361 global URL362 param = BackendRequest._prepare_params(param)363 params = dict({"act": "delete_role"}, **param)364 try:365 res = requests.get(URL, params=params, timeout=3.000)366 log.info("delete_role: %s", res.url)367 except Exception, e:368 print e369 return {'result': False}370 result = res.json()371 log.info("delete_role result: %s", result)372 return result373 @staticmethod374 def get_permissionmeta_list(param):375 global URL376 param = BackendRequest._prepare_params(param)377 params = dict({"act": "list_permission_meta"}, **param)378 res = requests.get(URL, params=params, timeout=10.000)379 log.info("list_permission_meta: %s", res.url)380 result = res.json()381 log.info("list_permission_meta result: %s", result)382 return result383 @staticmethod384 def privilege_list_resource_group(param):385 global URL386 param = BackendRequest._prepare_params(param)387 params = dict({"act": "privilege_list_resource_group"}, **param)388 res = requests.get(URL, params=params, timeout=10.000)389 log.info("privilege_list_resource_group: %s", res.url)390 result = res.json()391 log.info("privilege_list_resource_group result: %s", result)392 return result393 @staticmethod394 def get_permission_meta(param):395 global URL396 param = BackendRequest._prepare_params(param)397 params = dict({"act": "get_permission_meta"}, **param)398 res = requests.get(URL, params=params, timeout=10.000)399 log.info("get_permission_meta: %s", res.url)400 result = res.json()401 log.info("get_permission_meta result: %s", result)402 return result403 @staticmethod404 def get_resource_group(param):405 global URL406 param = BackendRequest._prepare_params(param)407 params = dict({"act": "get_resource_group"}, **param)408 res = requests.get(URL, params=params, timeout=3.000)409 log.info("get_resource_group: %s", res.url)410 result = res.json()411 log.info("get_resource_group result: %s", result)412 return result413 @staticmethod414 def update_resource_group(param):415 global URL416 param = BackendRequest._prepare_params(param)417 params = dict({"act": "update_resource_group"}, **param)418 try:419 res = requests.get(URL, params=params, timeout=3.000)420 log.info("update_resource_group: %s", res.url)421 except Exception, e:422 print e423 return {'result': False}424 result = res.json()425 log.info("update_resource_group result: %s", result)426 return result427 @staticmethod428 def create_resource_group(param):429 global URL430 param = BackendRequest._prepare_params(param)431 params = dict({"act": "create_resource_group"}, **param)432 res = requests.get(URL, params=params, timeout=3.000)433 log.info("create_resource_group: %s", res.url)434 result = res.json()435 log.info("create_resource_group result: %s", result)436 return result437 @staticmethod438 def delete_resource_group(param):439 global URL440 param = BackendRequest._prepare_params(param)441 params = dict({"act": "delete_resource_group"}, **param)442 try:443 res = requests.get(URL, params=params, timeout=3.000)444 log.info("delete_resource_group: %s", res.url)445 except Exception, e:446 print e447 return {'result': False}448 result = res.json()449 log.info("delete_resource_group result: %s", result)450 return result451 @staticmethod452 def get_batch_account(param):453 global URL454 param = BackendRequest._prepare_params(param)455 if "ids" in param and param['ids'] == "":456 return {u'accounts': [], u'result': True}457 params = dict({"act": "get_batch_account"}, **param)458 res = requests.get(URL, params=params, timeout=10.000)459 log.info("get_batch_account: %s", res.url)460 result = res.json()461 log.info("get_batch_account result: %s", result)462 return result463 @staticmethod464 def get_batch_saved_search(param):465 global URL466 param = BackendRequest._prepare_params(param)467 if "ids" in param and param['ids'] == "":468 return {u'saved_searchs': [], u'result': True}469 params = dict({"act": "get_batch_saved_search"}, **param)470 res = requests.get(URL, params=params, timeout=10.000)471 log.info("get_batch_saved_search: %s", res.url)472 result = res.json()473 log.info("get_batch_saved_search result: %s", result)474 return result475 @staticmethod476 def get_batch_dashboard_group(param):477 global URL478 param = BackendRequest._prepare_params(param)479 if "ids" in param and param['ids'] == "":480 return {u'dashboard_groups': [], u'result': True}481 params = dict({"act": "get_batch_dashboard_group"}, **param)482 res = requests.get(URL, params=params, timeout=10.000)483 log.info("get_batch_dashboard_group: %s", res.url)484 result = res.json()485 log.info("get_batch_dashboard_group result: %s", result)486 return result487 @staticmethod488 def get_batch_trend(param):489 global URL490 param = BackendRequest._prepare_params(param)491 if "ids" in param and param['ids'] == "":492 return {u'trends': [], u'result': True}493 params = dict({"act": "get_batch_trend"}, **param)494 res = requests.get(URL, params=params, timeout=10.000)495 log.info("get_batch_trend: %s", res.url)496 result = res.json()497 log.info("get_batch_trend result: %s", result)498 return result499 @staticmethod500 def get_batch_alert(param):501 global URL502 param = BackendRequest._prepare_params(param)503 if "ids" in param and param['ids'] == "":504 return {u'alerts': [], u'result': True}505 params = dict({"act": "get_batch_alert"}, **param)506 res = requests.get(URL, params=params, timeout=10.000)507 log.info("get_batch_alert: %s", res.url)508 result = res.json()509 log.info("get_batch_alert result: %s", result)510 return result511 @staticmethod512 def get_batch_sourcegroup(param):513 param = BackendRequest._prepare_params(param)514 if "ids" in param and param['ids'] == "":515 return {u'source_groups': [], u'result': True}516 params = dict({"act": "get_batch_source_group"}, **param)517 res = requests.get(URL, params=params, timeout=3.000)518 log.info("get_batch_sourcegroup: %s", res.url)519 result = res.json()520 log.info("get_batch_sourcegroup result: %s", result)521 return result522 @staticmethod523 def get_batch_saved_schedule(param):524 param = BackendRequest._prepare_params(param)525 if "ids" in param and param['ids'] == "":526 return {u'saved_schedules': [], u'result': True}527 params = dict({"act": "get_batch_saved_schedule"}, **param)528 res = requests.get(URL, params=params, timeout=3.000)529 log.info("get_batch_saved_schedule: %s", res.url)530 result = res.json()531 log.info("get_batch_saved_schedule result: %s", result)532 return result533 @staticmethod534 def get_batch_dictionary(param):535 param = BackendRequest._prepare_params(param)536 if "ids" in param and param['ids'] == "":537 return {u'list': [], u'result': True}538 # format incompatible539 params = dict({"act": "dict_batch"}, **param)540 res = requests.get(URL, params=params, timeout=10.000)541 log.info("get_batch_dictionary: %s", res.url)542 result = res.json()543 log.info("get_batch_dictionary result: %s", result)544 return result545 @staticmethod546 def get_batch_report(param):547 param = BackendRequest._prepare_params(param)548 if "ids" in param and param['ids'] == "":549 return {u'list': [], u'result': True}550 # format incompatible551 params = dict({"act": "report_batch"}, **param)552 res = requests.get(URL, params=params, timeout=10.000)553 log.info("get_batch_report: %s", res.url)554 result = res.json()555 log.info("get_batch_report result: %s", result)556 return result557 @staticmethod558 def get_batch_config(param):559 param = BackendRequest._prepare_params(param)560 if "ids" in param and param['ids'] == "":561 return {u'list': [], u'result': True}562 # format incompatible563 params = dict({"act": "config_batch"}, **param)564 res = requests.get(URL, params=params, timeout=10.000)565 log.info("get_batch_config: %s", res.url)566 result = res.json()567 log.info("get_batch_config result: %s", result)568 return result569 @staticmethod570 def list_assigned_resource_group(param):571 global URL572 param = BackendRequest._prepare_params(param)573 params = dict({"act": "list_assigned_resource_group"}, **param)574 res = requests.get(URL, params=params, timeout=3.000)575 log.info("list_assigned_resource_group: %s", res.url)576 result = res.json()577 log.info("list_assigned_resource_group result: %s", result)578 return result579 @staticmethod580 def list_derelict_resource_ids(param):581 global URL582 param = BackendRequest._prepare_params(param)583 params = dict({"act": "list_derelict_resource_ids"}, **param)584 res = requests.get(URL, params=params, timeout=3.000)585 log.info("list_derelict_resource_ids: %s", res.url)586 result = res.json()587 log.info("list_derelict_resource_ids result: %s", result)588 return result589 @staticmethod590 def permit_list_resource_group(param):591 global URL592 param = BackendRequest._prepare_params(param)593 params = dict({"act": "permit_list_resource_group"}, **param)594 res = requests.get(URL, params=params, timeout=10.000)595 log.info("permit_list_resource_group: %s", res.url)596 result = res.json()597 log.info("permit_list_resource_group result: %s", result)598 return result599 @staticmethod600 def get_all_user_group(param):601 global URL602 param = BackendRequest._prepare_params(param)603 params = dict({"act": "get_all_user_group"}, **param)604 res = requests.get(URL, params=params, timeout=3.000)605 log.info("get_all_user_group: %s", res.url)606 result = res.json()607 log.info("get_all_user_group result: %s", result)608 return result609 @staticmethod610 def get_user_group(param):611 global URL612 param = BackendRequest._prepare_params(param)613 params = dict({"act": "get_user_group"}, **param)614 res = requests.get(URL, params=params, timeout=3.000)615 log.info("get_user_group: %s", res.url)616 result = res.json()617 log.info("get_user_group result: %s", result)618 return result619 @staticmethod620 def create_user_group(param):621 global URL622 param = BackendRequest._prepare_params(param)623 params = dict({"act": "create_user_group"}, **param)624 res = requests.get(URL, params=params, timeout=3.000)625 log.info("create_user_group: %s", res.url)626 result = res.json()627 log.info("create_user_group result: %s", result)628 return result629 @staticmethod630 def update_user_group(param):631 global URL632 param = BackendRequest._prepare_params(param)633 params = dict({"act": "update_user_group"}, **param)634 try:635 res = requests.get(URL, params=params, timeout=3.000)636 log.info("update_user_group: %s", res.url)637 except Exception, e:638 print e639 return {'result': False}640 result = res.json()641 log.info("update_user_group result: %s", result)642 return result643 @staticmethod644 def can_visit(param):645 global URL646 param = BackendRequest._prepare_params(param)647 params = dict({"act": "can_visit"}, **param)648 res = requests.get(URL, params=params, timeout=3.000)649 log.info("can_visit: %s", res.url)650 result = res.json()651 log.info("can_visit result: %s", result)652 return result653 @staticmethod654 def permit_can(param):655 global URL656 param = BackendRequest._prepare_params(param)657 params = dict({"act": "permit_can"}, **param)658 res = requests.get(URL, params=params, timeout=3.000)659 log.info("permit_can: %s", res.url)660 result = res.json()661 log.info("permit_can result: %s", result)662 return result663 @staticmethod664 def batch_permit_can(param, data):665 global URL666 param = BackendRequest._prepare_params(param)667 params = dict({"act": "batch_permit_can"}, **param)668 try:669 p = urllib.urlencode(params)670 url = URL + "?" + p671 body = json.dumps(data)672 res = requests.post(url, data=body, timeout=3.000)673 log.info("batch_permit_can: %s", res.url)674 except Exception, e:675 log.error("batch_permit_can error: %s", e)676 return {'result': False}677 result = res.json()678 log.info("batch_permit_can result: %s", result)679 return result680 @staticmethod681 def assign_manager_role(param):682 global URL683 param = BackendRequest._prepare_params(param)684 params = dict({"act": "assign_manager_role"}, **param)685 res = requests.get(URL, params=params, timeout=3.000)686 log.info("assign_manager_role: %s", res.url)687 result = res.json()688 log.info("assign_manager_role result: %s", result)689 return result690 @staticmethod691 def assign_role(param):692 global URL693 param = BackendRequest._prepare_params(param)694 params = dict({"act": "assign_role"}, **param)695 res = requests.get(URL, params=params, timeout=3.000)696 log.info("assign_role: %s", res.url)697 result = res.json()698 log.info("assign_role result: %s", result)699 return result700 @staticmethod701 def assign_user(param):702 global URL703 param = BackendRequest._prepare_params(param)704 params = dict({"act": "assign_user"}, **param)705 res = requests.get(URL, params=params, timeout=3.000)706 log.info("assign_user: %s", res.url)707 result = res.json()708 log.info("assign_user result: %s", result)709 return result710 @staticmethod711 def delete_user_group(param):712 global URL713 param = BackendRequest._prepare_params(param)714 params = dict({"act": "delete_user_group"}, **param)715 # print "delete_account:", params716 try:717 res = requests.get(URL, params=params, timeout=3.000)718 log.info("delete_user_group: %s", res.url)719 except Exception, e:720 print e721 return {'result': False}722 result = res.json()723 log.info("delete_user_group result: %s", result)724 return result725 @staticmethod726 def get_account(param):727 global URL728 param = BackendRequest._prepare_params(param)729 params = dict({"act": "get_account"}, **param)730 # print "get_account:", params731 try:732 res = requests.get(URL, params=params, timeout=3.000)733 log.info("get_account: %s", res.url)734 except Exception, e:735 print e736 return {'result': False}737 result = res.json()738 log.info("get_account result: %s", result)739 return result740 @staticmethod741 def delete_account_action(param):742 global URL743 param = BackendRequest._prepare_params(param)744 params = dict({"act": "delete_account_action"}, **param)745 # print "add_account_action:", params746 try:747 res = requests.post(URL, params=params, timeout=3.000)748 log.info("delete_account_action: %s", res.url)749 except Exception, e:750 print e751 return {'result': False}752 result = res.json()753 log.info("delete_account_action result: %s", result)754 return result755 @staticmethod756 def add_account_action(param, datas):757 global URL758 param = BackendRequest._prepare_params(param)759 params = dict({"act": "add_account_action"}, **param)760 # print "add_account_action:", params761 p = urllib.urlencode(params)762 url = URL + "?" + p763 body = json.dumps(datas)764 # print body765 try:766 res = requests.post(url, data=body, timeout=10.000)767 log.info("add_account_action: %s", res.url)768 # print res.url769 except Exception, e:770 print e771 return {'result': False}772 result = res.json()773 log.info("add_account_action result: %s", result)774 return result775 @staticmethod776 def get_account_action(param):777 global URL778 param = BackendRequest._prepare_params(param)779 params = dict({"act": "get_account_action"}, **param)780 # print "get_account_action:", params781 try:782 res = requests.get(URL, params=params, timeout=3.000)783 log.info("get_account_action: %s", res.url)784 except Exception, e:785 print e786 return {'result': False}787 result = res.json()788 log.info("get_account_action result: %s", result)789 return result790 # interfaces about source_group791 @staticmethod792 def get_source_group(param):793 global URL794 param = BackendRequest._prepare_params(param)795 params = dict({"act": "get_source_group"}, **param)796 # print "get_source_group:", params797 try:798 res = requests.get(URL, params=params, timeout=3.000)799 log.info("get_source_group: %s", res.url)800 except Exception, e:801 print e802 return {'result': False}803 result = res.json()804 log.info("get_source_group result: %s", result)805 return result806 @staticmethod807 def create_source_group(param):808 global URL809 param = BackendRequest._prepare_params(param)810 params = dict({"act": "create_source_group"}, **param)811 # print "create_source_group:", params812 try:813 res = requests.get(URL, params=params, timeout=3.000)814 log.info("create_source_group: %s", res.url)815 except Exception, e:816 print e817 return {'result': False}818 result = res.json()819 log.info("create_source_group result: %s", result)820 return result821 @staticmethod822 def get_source_group_related_infos(param):823 global URL824 param = BackendRequest._prepare_params(param)825 params = dict({"act": "get_source_group_related_infos"}, **param)826 # print "update_source_group:", params827 try:828 res = requests.get(URL, params=params, timeout=3.000)829 log.info("get_source_group_related_infos: %s", res.url)830 except Exception, e:831 print e832 return {'result': False}833 result = res.json()834 log.info("get_source_group_related_infos result: %s", result)835 return result836 @staticmethod837 def update_source_group(param):838 global URL839 param = BackendRequest._prepare_params(param)840 params = dict({"act": "update_source_group"}, **param)841 # print "update_source_group:", params842 try:843 res = requests.get(URL, params=params, timeout=3.000)844 log.info("update_source_group: %s", res.url)845 except Exception, e:846 print e847 return {'result': False}848 result = res.json()849 log.info("update_source_group result: %s", result)850 return result851 @staticmethod852 def delete_source_group(param):853 global URL854 param = BackendRequest._prepare_params(param)855 params = dict({"act": "delete_source_group"}, **param)856 # print "delete_source_group:", params857 try:858 res = requests.get(URL, params=params, timeout=3.000)859 log.info("delete_source_group: %s", res.url)860 except Exception, e:861 print e862 return {'result': False}863 result = res.json()864 log.info("delete_source_group result: %s", result)865 return result866 @staticmethod867 def assign_source_group(param):868 global URL869 param = BackendRequest._prepare_params(param)870 params = dict({"act": "assign_source_group"}, **param)871 # print "delete_source_group:", params872 try:873 res = requests.get(URL, params=params, timeout=3.000)874 log.info("assign_source_group: %s", res.url)875 except Exception, e:876 print e877 return {'result': False}878 result = res.json()879 log.info("assign_source_group result: %s", result)880 return result881 @staticmethod882 def cancel_source_group(param):883 global URL884 param = BackendRequest._prepare_params(param)885 params = dict({"act": "cancel_source_group"}, **param)886 # print "delete_source_group:", params887 try:888 res = requests.get(URL, params=params, timeout=3.000)889 log.info("cancel_source_group: %s", res.url)890 except Exception, e:891 print e892 return {'result': False}893 result = res.json()894 log.info("cancel_source_group result: %s", result)895 return result896 @staticmethod897 def get_upload_status(param):898 global URL899 param = BackendRequest._prepare_params(param)900 params = dict({"act": "get_domain"}, **param)901 # print "get_source_group:", params902 try:903 res = requests.get(URL, params=params, timeout=3.000)904 log.info("get_source_group: %s", res.url)905 except Exception, e:906 print e907 return {'result': False}908 result = res.json()909 log.info("get_source_group result: %s", result)910 return result911 # interfaces about alert912 @staticmethod913 def get_all_alert(param):914 global URL915 param = BackendRequest._prepare_params(param)916 params = dict({"act": "get_all_alert"}, **param)917 # print "get_all_alert:", params918 try:919 res = requests.get(URL, params=params, timeout=30.000)920 log.info("get_all_alert: %s", res.url)921 except Exception, e:922 print e923 return {'result': False}924 result = res.json()925 log.info("get_all_alert result: %s", result)926 return result927 @staticmethod928 def get_alert(param):929 global URL930 param = BackendRequest._prepare_params(param)931 params = dict({"act": "get_alert"}, **param)932 # print "get_alert:", params933 try:934 res = requests.get(URL, params=params, timeout=10.000)935 log.info("get_alert: %s", res.url)936 except Exception, e:937 print e938 return {'result': False}939 result = res.json()940 log.info("get_alert result: %s", result)941 return result942 @staticmethod943 def preview_alert(param):944 param = BackendRequest._prepare_params(param)945 params = dict({"act": "preview_alert"}, **param)946 body = json.dumps(params.pop('alert_meta'))947 p = urllib.urlencode(params)948 url = URL + "?" + p949 try:950 res = requests.post(url, data=body, timeout=600.000)951 log.info("preview_alert: %s", res.url)952 except Exception, e:953 print e954 return {'result': False}955 result = res.json()956 log.info("preview_alert result: %s", result)957 return result958 @staticmethod959 def create_alert(param):960 global URL961 param = BackendRequest._prepare_params(param)962 params = dict({"act": "create_alert"}, **param)963 # print "create_alert:", params964 body = json.dumps(params.pop('alert_meta'))965 p = urllib.urlencode(params)966 url = URL + "?" + p967 try:968 res = requests.post(url, data=body, timeout=10.000)969 log.info("create_alert: %s", res.url)970 except Exception, e:971 print e972 return {'result': False}973 result = res.json()974 log.info("create_alert result: %s", result)975 return result976 @staticmethod977 def update_alert(param):978 global URL979 param = BackendRequest._prepare_params(param)980 params = dict({"act": "update_alert"}, **param)981 if 'alert_meta' in params:982 body = json.dumps(params.pop('alert_meta'))983 else:984 body = json.dumps("")985 p = urllib.urlencode(params)986 url = URL + "?" + p987 try:988 res = requests.post(url, data=body, timeout=10.000)989 log.info("update_alert: %s", res.url)990 except Exception, e:991 print e992 return {'result': False}993 log.info("update_alert frontend res: %s", res)994 result = res.json()995 log.info("update_alert result: %s", result)996 return result997 @staticmethod998 def delete_alert(param):999 global URL1000 param = BackendRequest._prepare_params(param)1001 params = dict({"act": "delete_alert"}, **param)1002 # print "delete_alert:", params1003 try:1004 res = requests.get(URL, params=params, timeout=3.000)1005 log.info("delete_alert: %s", res.url)1006 except Exception, e:1007 print e1008 return {'result': False}1009 result = res.json()1010 log.info("delete_alert result: %s", result)1011 return result1012 @staticmethod1013 def attempt_run_alert(param):1014 global URL1015 param = BackendRequest._prepare_params(param)1016 params = dict({"act": "attempt_run_alert"}, **param)1017 body = json.dumps(params.pop('alert_meta'))1018 p = urllib.urlencode(params)1019 url = URL + "?" + p1020 try:1021 res = requests.post(url, data=body, timeout=600.000)1022 log.info("attempt_run_alert: %s", res.url)1023 except Exception, e:1024 print e1025 return {'result': False}1026 result = res.json()1027 log.info("attempt_run_alert result: %s", result)1028 return result1029 # interfaces about source_group1030 @staticmethod1031 def get_all_saved_schedule(param):1032 global URL1033 param = BackendRequest._prepare_params(param)1034 params = dict({"act": "get_all_saved_schedule"}, **param)1035 # print "get_all_saved_search:", params1036 try:1037 res = requests.get(URL, params=params, timeout=3.000)1038 log.info("get_all_saved_schedule: %s", res.url)1039 except Exception, e:1040 print e1041 return {'result': False}1042 result = res.json()1043 log.info("get_all_saved_schedule result: %s", result)1044 return result1045 @staticmethod1046 def create_schedule(param):1047 global URL1048 param = BackendRequest._prepare_params(param)1049 params = dict({"act": "create_saved_schedule"}, **param)1050 # print "create_alert:", params1051 try:1052 res = requests.get(URL, params=params, timeout=3.000)1053 log.info("create_saved_schedule : %s", res.url)1054 except Exception, e:1055 print e1056 return {'result': False}1057 result = res.json()1058 log.info("create_saved_schedule result: %s", result)1059 return result1060 @staticmethod1061 def update_schedule(param):1062 global URL1063 param = BackendRequest._prepare_params(param)1064 params = dict({"act": "update_schedule"}, **param)1065 # print "update_schedule:", params1066 try:1067 res = requests.get(URL, params=params, timeout=3.000)1068 log.info("update_schedule: %s", res.url)1069 except Exception, e:1070 print e1071 return {'result': False}1072 result = res.json()1073 log.info("update_schedule result: %s", result)1074 return result1075 @staticmethod1076 def delete_saved_schedule(param):1077 global URL1078 param = BackendRequest._prepare_params(param)1079 params = dict({"act": "delete_saved_schedule"}, **param)1080 # print "create_alert:", params1081 try:1082 res = requests.get(URL, params=params, timeout=3.000)1083 log.info("delete_saved_schedule : %s", res.url)1084 except Exception, e:1085 print e1086 return {'result': False}1087 result = res.json()1088 log.info("delete_saved_schedule result: %s", result)1089 return result1090 @staticmethod1091 def enable_schedule(param):1092 global URL1093 param = BackendRequest._prepare_params(param)1094 params = dict({"act": "enable_saved_schedule"}, **param)1095 try:1096 res = requests.get(URL, params=params, timeout=3.000)1097 log.info("enable_saved_schedule : %s", res.url)1098 except Exception, e:1099 print e1100 return {'result': False}1101 result = res.json()1102 log.info("enable_saved_schedule result: %s", result)1103 return result1104 @staticmethod1105 def disable_schedule(param):1106 global URL1107 param = BackendRequest._prepare_params(param)1108 params = dict({"act": "disable_saved_schedule"}, **param)1109 try:1110 res = requests.get(URL, params=params, timeout=3.000)1111 log.info("disable_saved_schedule : %s", res.url)1112 except Exception, e:1113 print e1114 return {'result': False}1115 result = res.json()1116 log.info("disable_saved_schedule result: %s", result)1117 return result1118 # interfaces about source_group1119 @staticmethod1120 def get_saved_schedule(param):1121 global URL1122 param = BackendRequest._prepare_params(param)1123 params = dict({"act": "get_saved_schedule"}, **param)1124 # print "get_all_saved_search:", params1125 try:1126 res = requests.get(URL, params=params, timeout=3.000)1127 log.info("get_saved_schedule: %s", res.url)1128 except Exception, e:1129 print e1130 return {'result': False}1131 result = res.json()1132 log.info("get_saved_schedule result: %s", result)1133 return result1134 @staticmethod1135 def get_schedule_result(param):1136 global URL1137 param = BackendRequest._prepare_params(param)1138 params = dict({"act": "get_schedule_result"}, **param)1139 # print "get_all_saved_search:", params1140 try:1141 res = requests.get(URL, params=params, timeout=3.000)1142 log.info("get_schedule_result: %s", res.url)1143 except Exception, e:1144 print e1145 return {'result': False}1146 result = res.json()1147 log.info("get_schedule_result result: %s", result)1148 return result1149 # interfaces about source_group1150 @staticmethod1151 def get_all_saved_search(param):1152 global URL1153 param = BackendRequest._prepare_params(param)1154 params = dict({"act": "get_all_saved_search"}, **param)1155 # print "get_all_saved_search:", params1156 try:1157 res = requests.get(URL, params=params, timeout=20.000)1158 log.info("get_all_saved_search: %s", res.url)1159 except Exception, e:1160 print e1161 return {'result': False}1162 result = res.json()1163 if result["result"]:1164 log.info("get_all_saved_search result: %s", result.get("result",""))1165 else:1166 log.info("get_all_saved_search result: %s", result.get("error",""))1167 return result1168 @staticmethod1169 def get_saved_search(param):1170 global URL1171 param = BackendRequest._prepare_params(param)1172 params = dict({"act": "get_saved_search"}, **param)1173 try:1174 res = requests.get(URL, params=params, timeout=3.000)1175 log.info("get_saved_search: %s", res.url)1176 except Exception, e:1177 print e1178 return {'result': False}1179 result = res.json()1180 log.info("get_saved_search result: %s", result)1181 return result1182 @staticmethod1183 def create_saved_search(param):1184 global URL1185 param = BackendRequest._prepare_params(param)1186 params = dict({"act": "create_saved_search"}, **param)1187 # print "create_saved_search:", params1188 try:1189 res = requests.get(URL, params=params, timeout=3.000)1190 log.info("create_saved_search: %s", res.url)1191 except Exception, e:1192 print e1193 return {'result': False}1194 result = res.json()1195 log.info("create_saved_search result: %s", result)1196 return result1197 @staticmethod1198 def update_saved_search(param):1199 global URL1200 param = BackendRequest._prepare_params(param)1201 params = dict({"act": "update_saved_search"}, **param)1202 try:1203 res = requests.get(URL, params=params, timeout=3.000)1204 log.info("update_saved_search: %s", res.url)1205 except Exception, e:1206 print e1207 return {'result': False}1208 result = res.json()1209 log.info("update_saved_search result: %s", result)1210 return result1211 @staticmethod1212 def delete_saved_search(param):1213 global URL1214 param = BackendRequest._prepare_params(param)1215 params = dict({"act": "delete_saved_search"}, **param)1216 try:1217 res = requests.get(URL, params=params, timeout=3.000)1218 log.info("delete_saved_search: %s", res.url)1219 except Exception, e:1220 print e1221 return {'result': False}1222 result = res.json()1223 log.info("delete_saved_search result: %s", result)1224 return result1225 @staticmethod1226 def search_logtail(param):1227 global URL1228 param = BackendRequest._prepare_params(param)1229 params = dict({"act": "logtail"}, **param)1230 try:1231 log.info("logtail start")1232 res = requests.get(URL, params=params, timeout=30.000)1233 log.info("logtail: %s", res.url)1234 except Exception, e:1235 log.error("logtail error: %s", e)1236 return {'result': False}1237 result = res.json()1238 log.info("logtail result: %s", result)1239 return result1240 @staticmethod1241 def search_submit(param):1242 global URL1243 param = BackendRequest._prepare_params(param)1244 params = dict({"act": "submit"}, **param)1245 try:1246 res = requests.get(URL, params=params, timeout=30.000)1247 log.info("submit: %s", res.url)1248 except Exception, e:1249 log.error("submit error: %s", e)1250 return {'result': False}1251 result = res.json()1252 log.info("submit result: %s", result)1253 return result1254 @staticmethod1255 def search_download_submit(param):1256 global URL1257 param = BackendRequest._prepare_params(param)1258 params = dict({"act": "download"}, **param)1259 try:1260 res = requests.get(URL, params=params, timeout=30.000)1261 log.info("download submit: %s", res.url)1262 except Exception, e:1263 log.error("download submit error: %s", e)1264 return {'result': False}1265 result = res.json()1266 log.info("download submit result: %s", result)1267 return result1268 @staticmethod1269 def search_offlinetask_submit(param):1270 global URL1271 param = BackendRequest._prepare_params(param)1272 params = dict({"act": "background"}, **param)1273 try:1274 res = requests.get(URL, params=params, timeout=30.000)1275 log.info("offlinetask submit: %s", res.url)1276 except Exception, e:1277 log.error("offlinetask submit error: %s", e)1278 return {'result': False}1279 result = res.json()1280 log.info("offlinetask submit result: %s", result)1281 return result1282 @staticmethod1283 def search_preview(param):1284 global URL1285 param = BackendRequest._prepare_params(param)1286 params = dict({"act": "preview"}, **param)1287 sid = params.get("sid", "default")1288 try:1289 res = requests.get(URL, params=params, timeout=30.000)1290 log.info("[%s] preview: %s" % (sid, res.url))1291 except Exception, e:1292 log.error("[%s] preview error: %s" % (sid, e))1293 return {'result': False}1294 result = res.json()1295 log.info("[%s] preview result: %s" % (sid, result))1296 # log.info("[%s] preview result rc: %s" % (sid, result["rc"]))1297 return result1298 @staticmethod1299 def search_stats_events(param):1300 global URL1301 param = BackendRequest._prepare_params(param)1302 params = dict({"act": "statevents"}, **param)1303 sid = params.get("sid", "default")1304 try:1305 res = requests.get(URL, params=params, timeout=30.000)1306 log.info("[%s] statevents: %s" % (sid, res.url))1307 except Exception, e:1308 log.error("[%s] statevents error: %s" % (sid, e))1309 return {'result': False}1310 result = res.json()1311 log.info("[%s] statevents result: %s" % (sid, result))1312 return result1313 @staticmethod1314 def search_kill(param):1315 global URL1316 param = BackendRequest._prepare_params(param)1317 params = dict({"act": "kill"}, **param)1318 sid = params.get("sid", "default")1319 try:1320 res = requests.get(URL, params=params, timeout=30.000)1321 log.info("[%s] kill: %s" % (sid, res.url))1322 except Exception, e:1323 log.error("[%s] kill error: %s" % (sid, e))1324 return {'result': False}1325 result = res.json()1326 log.info("[%s] kill result: %s" % (sid, result))1327 return result1328 @staticmethod1329 def search_pause(param):1330 global URL1331 param = BackendRequest._prepare_params(param)1332 params = dict({"act": "pause"}, **param)1333 sid = params.get("sid", "default")1334 try:1335 res = requests.get(URL, params=params, timeout=30.000)1336 log.info("[%s] pause: %s" % (sid, res.url))1337 except Exception, e:1338 log.error("[%s] pause error: %s" % (sid, e))1339 return {'result': False}1340 result = res.json()1341 log.info("[%s] pause result: %s" % (sid, result))1342 return result1343 @staticmethod1344 def search_recover(param):1345 global URL1346 param = BackendRequest._prepare_params(param)1347 params = dict({"act": "recover"}, **param)1348 sid = params.get("sid", "default")1349 try:1350 res = requests.get(URL, params=params, timeout=30.000)1351 log.info("[%s] recover: %s" % (sid, res.url))1352 except Exception, e:1353 log.error("[%s] recover error: %s" % (sid, e))1354 return {'result': False}1355 result = res.json()1356 log.info("[%s] recover result: %s" % (sid, result))1357 return result1358 @staticmethod1359 def search_heartbeat(param):1360 global URL1361 param = BackendRequest._prepare_params(param)1362 params = dict({"act": "heartbeat"}, **param)1363 sid = params.get("sid", "default")1364 try:1365 res = requests.get(URL, params=params, timeout=30.000)1366 log.info("[%s] heartbeat: %s" % (sid, res.url))1367 except Exception, e:1368 log.error("[%s] heartbeat error: %s" % (sid, e))1369 return {'result': False}1370 result = res.json()1371 log.info("[%s] heartbeat result: %s" % (sid, result))1372 return result1373 @staticmethod1374 def search_topfields(param):1375 global URL1376 param = BackendRequest._prepare_params(param)1377 params = dict({"act": "topfields"}, **param)1378 sid = params.get("sid", "default")1379 try:1380 res = requests.get(URL, params=params, timeout=30.000)1381 log.info("[%s] topfields: %s" % (sid, res.url))1382 except Exception, e:1383 log.error("[%s] topfields error: %s" % (sid, e))1384 return {'result': False}1385 result = res.json()1386 log.info("[%s] topfields result: %s" % (sid, result))1387 return result1388 @staticmethod1389 def context_search(param):1390 global URL1391 _id = str(int(time.time()*1000))+"_"+str(random.randint(1000,9999))+"_"+string.join(random.sample('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ', 3)).replace(" ","")1392 param = BackendRequest._prepare_params(param)1393 params = dict({"act": "context_search"}, **param)1394 timeout = True1395 try:1396 start = int(time.time()*1000)1397 log.info('context_search [%s] start: %d'%(_id, start))1398 res = requests.get(URL + "es_status/", params=params, timeout=120.000)1399 log.info('context_search [%s]: %s'%(_id, res.url))1400 # print '#####################url: ', res.url1401 timeout = False1402 result = res.json()1403 except Exception, e:1404 log.info('context_search [%s] error: %s' % (_id, e))1405 end = int(time.time()*1000)1406 log.info('context_search [%s] cost: %d' % (_id, end-start))1407 error = 'system' if not timeout else 'timeout'1408 return {'result': False, 'error': error}1409 # print "search result:", result1410 end = int(time.time()*1000)1411 if not result["result"]:1412 log.error("context_search [%s] result: %s and cost: %d ms" % (_id, result['error'], end-start))1413 else:1414 log.info("context_search [%s] result: %s and cost: %d ms" % (_id, result['result'], end-start))1415 return result1416 # search interface1417 @staticmethod1418 def search(param):1419 global URL1420 _id = str(int(time.time()*1000))+"_"+str(random.randint(1000,9999))+"_"+string.join(random.sample('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ', 3)).replace(" ","")1421 param = BackendRequest._prepare_params(param)1422 params = dict({"act": "search"}, **param)1423 timeout = True1424 try:1425 start = int(time.time()*1000)1426 log.info('search [%s] start: %d'%(_id, start))1427 res = requests.get(URL + "es_status/", params=params, timeout=120.000)1428 log.info('search [%s]: %s'%(_id, res.url))1429 # print '#####################url: ', res.url1430 timeout = False1431 result = res.json()1432 except Exception, e:1433 log.info('search [%s] error: %s' % (_id, e))1434 end = int(time.time()*1000)1435 log.info('search [%s] cost: %d' % (_id, end-start))1436 error = 'system' if not timeout else 'timeout'1437 return {'result': False, 'error': error}1438 # print "search result:", result1439 end = int(time.time()*1000)1440 if not result.get("result"):1441 log.error("search [%s] result: %s and cost: %d ms" % (_id, result['error'], end-start))1442 else:1443 log.info("search [%s] result: %s and cost: %d ms" % (_id, result['result'], end-start))1444 return result1445 @staticmethod1446 def multi_search(param, post_data):1447 global URL1448 param = BackendRequest._prepare_params(param)1449 params = dict({"act": "search"}, **param)1450 timeout = True1451 p = urllib.urlencode(params)1452 url = URL + "?" + p1453 body = json.dumps(post_data)1454 try:1455 res = requests.post(url, data=body, timeout=120.000)1456 log.info('search: %s', res.url)1457 # print '#####################url: ', res.url1458 timeout = False1459 result = res.json()1460 except Exception, e:1461 log.info('search-error: %s', e)1462 error = 'system' if not timeout else 'timeout'1463 return {'result': False, 'error': error}1464 # print "search result:", result1465 log.info("search result: %s", result['result'])1466 return result1467 @staticmethod1468 def field_search(param):1469 global URL1470 param = BackendRequest._prepare_params(param)1471 params = dict({"act": "field_search"}, **param)1472 try:1473 res = requests.post(URL, params=params, timeout=120.000)1474 log.info('field_search: %s', res.url)1475 # print '#####################url: ', res.url1476 result = res.json()1477 except Exception, e:1478 log.info('field_search-error: %s', e)1479 return {'result': False, 'error': error}1480 # print "search result:", result1481 log.info("field_search result: %s", result['result'])1482 return result1483 @staticmethod1484 def update_dashboard(param):1485 global URL1486 param = BackendRequest._prepare_params(param)1487 params = dict({}, **param)1488 try:1489 res = requests.post(URL, params=params, timeout=3.000)1490 log.info("update_dashboard contents params: %s", res.url)1491 except Exception, e:1492 print e1493 return {'result': False}1494 result = res.json()1495 log.info("update_dashboard contents result: %s", result)1496 return result1497 @staticmethod1498 def get_dashboard_group(param):1499 global URL1500 param = BackendRequest._prepare_params(param)1501 params = dict({"act": "get_dashboard_group"}, **param)1502 try:1503 res = requests.post(URL, params=params, timeout=30.000)1504 log.info("get_dashboard_group contents params: %s", res.url)1505 except Exception, e:1506 print e1507 return {'result': False}1508 result = res.json()1509 log.info("get_dashboard_group contents result: %s", result)1510 return result1511 @staticmethod1512 def get_all_dashboard_group(param):1513 global URL1514 param = BackendRequest._prepare_params(param)1515 params = dict({"act": "get_all_dashboard_group"}, **param)1516 try:1517 res = requests.post(URL, params=params, timeout=30.000)1518 log.info("get_all_dashboard_group contents params: %s", res.url)1519 except Exception, e:1520 print e1521 return {'result': False}1522 result = res.json()1523 log.info("get_all_dashboard_group contents result: %s", result)1524 return result1525 @staticmethod1526 def create_dashboard_group(param):1527 global URL1528 param = BackendRequest._prepare_params(param)1529 params = dict({"act": "create_dashboard_group"}, **param)1530 try:1531 res = requests.post(URL, params=params, timeout=30.000)1532 log.info("create_dashboard_group contents params: %s", res.url)1533 except Exception, e:1534 print e1535 return {'result': False}1536 result = res.json()1537 log.info("create_dashboard_group contents result: %s", result)1538 return result1539 @staticmethod1540 def delete_dashboard_group(param):1541 global URL1542 param = BackendRequest._prepare_params(param)1543 params = dict({"act": "delete_dashboard_group"}, **param)1544 try:1545 res = requests.post(URL, params=params, timeout=30.000)1546 log.info("delete_dashboard_group contents params: %s", res.url)1547 except Exception, e:1548 print e1549 return {'result': False}1550 result = res.json()1551 log.info("delete_dashboard_group contents result: %s", result)1552 return result1553 @staticmethod1554 def update_dashboard_group(param):1555 global URL1556 param = BackendRequest._prepare_params(param)1557 params = dict({"act": "update_dashboard_group"}, **param)1558 try:1559 res = requests.post(URL, params=params, timeout=30.000)1560 log.info("update_dashboard_group contents params: %s", res.url)1561 except Exception, e:1562 print e1563 return {'result': False}1564 result = res.json()1565 log.info("update_dashboard_group contents result: %s", result)1566 return result1567 @staticmethod1568 def create_dashboard_tab(param):1569 global URL1570 param = BackendRequest._prepare_params(param)1571 params = dict({"act": "create_dashboard"}, **param)1572 try:1573 res = requests.post(URL, params=params, timeout=30.000)1574 log.info("create_dashboard_tab contents params: %s", res.url)1575 except Exception, e:1576 print e1577 return {'result': False}1578 result = res.json()1579 log.info("create_dashboard_tab contents result: %s", result)1580 return result1581 @staticmethod1582 def update_dashboard_tab(param):1583 global URL1584 param = BackendRequest._prepare_params(param)1585 params = dict({"act": "update_dashboard"}, **param)1586 body = json.dumps(params.pop('content'))1587 p = urllib.urlencode(params)1588 url = URL + "?" + p1589 try:1590 res = requests.post(url, data=body, timeout=30.000)1591 log.info("update_dashboard_tab contents params: %s", res.url)1592 except Exception, e:1593 print e1594 return {'result': False}1595 result = res.json()1596 log.info("update_dashboard_tab contents result: %s", result)1597 return result1598 @staticmethod1599 def get_all_trends(param):1600 global URL1601 param = BackendRequest._prepare_params(param)1602 params = dict({"act": "get_all_trends"}, **param)1603 try:1604 res = requests.get(URL, params=params, timeout=3.000)1605 log.info('get_all_trends: %s', res.url)1606 except Exception, e:1607 print e1608 log.info('get_all_trends error: %s', str(e))1609 return {'result': False}1610 result = res.json()1611 log.info('get_all_trends result:%s', result)1612 return result1613 @staticmethod1614 def convert_id(param):1615 global URL1616 param = BackendRequest._prepare_params(param)1617 params = dict({"act": "convert_id"}, **param)1618 try:1619 res = requests.get(URL, params=params, timeout=3.000)1620 log.info('convert_id: %s', res.url)1621 except Exception, e:1622 print e1623 log.info('convert_id error: %s', str(e))1624 return {'result': False}1625 result = res.json()1626 log.info('convert_id result:%s', result)1627 return result1628 @staticmethod1629 def get_trend(param):1630 global URL1631 param = BackendRequest._prepare_params(param)1632 params = dict({"act": "get_trend"}, **param)1633 try:1634 res = requests.get(URL, params=params, timeout=3.000)1635 log.info('get_trend: %s', res.url)1636 except Exception, e:1637 print e1638 log.info('get_trend error: %s', str(e))1639 return {'result': False}1640 result = res.json()1641 log.info('get_trend result:%s', result)1642 return result1643 @staticmethod1644 def create_trend(param, data):1645 global URL1646 param = BackendRequest._prepare_params(param)1647 params = dict({"act": "create_trend"}, **param)1648 p = urllib.urlencode(params)1649 url = URL + "?" + p1650 body = json.dumps(data)1651 try:1652 res = requests.post(url, data=body, timeout=10.000)1653 log.info('create_trend: %s', res.url)1654 except Exception, e:1655 print e1656 log.info('create_trend error: %s', str(e))1657 return {'result': False}1658 result = res.json()1659 log.info('create_trend result:%s', result)1660 return result1661 @staticmethod1662 def delete_trend(param):1663 global URL1664 param = BackendRequest._prepare_params(param)1665 params = dict({"act": "delete_trend"}, **param)1666 try:1667 res = requests.get(URL, params=params, timeout=3.000)1668 log.info('delete_trend: %s', res.url)1669 except Exception, e:1670 print e1671 log.info('delete_trend error: %s', str(e))1672 return {'result': False}1673 result = res.json()1674 log.info('delete_trend result:%s', result)1675 return result1676 @staticmethod1677 def update_trend(param, data):1678 global URL1679 # param = BackendRequest._prepare_params(param)1680 # params = dict({"act": "update_trend"}, **param)1681 # try:1682 # res = requests.get(URL, params=params, timeout=3.000)1683 # log.info('update_trend: %s', res.url)1684 # except Exception, e:1685 # print e1686 # log.info('update_trend error: %s', str(e))1687 # return {'result': False}1688 # result = res.json()1689 # log.info('update_trend result:%s', result)1690 # return result1691 param = BackendRequest._prepare_params(param)1692 params = dict({"act": "update_trend"}, **param)1693 p = urllib.urlencode(params)1694 url = URL + "?" + p1695 body = json.dumps(data)1696 try:1697 if body:1698 res = requests.post(url, data=body, timeout=10.000)1699 else:1700 res = requests.post(url, timeout=10.000)1701 log.info('update_trend: %s', res.url)1702 except Exception, e:1703 print e1704 log.info('update_trend error: %s', str(e))1705 return {'result': False}1706 result = res.json()1707 log.info('update_trend result:%s', result)1708 return result1709 @staticmethod1710 def add_account_trend(param, datas):1711 global URL1712 param = BackendRequest._prepare_params(param)1713 params = dict({"act": "add_account_trend"}, **param)1714 p = urllib.urlencode(params)1715 url = URL + "?" + p1716 body = json.dumps(datas)1717 # print body1718 try:1719 res = requests.post(url, data=body, timeout=10.000)1720 log.info("add_account_trend: %s", res.url)1721 except Exception, e:1722 print e1723 return {'result': False}1724 result = res.json()1725 log.info("add_account_trend result:%s", result)1726 return result1727 @staticmethod1728 def call_account_trend(param):1729 global URL1730 param = BackendRequest._prepare_params(param)1731 params = dict({}, **param)1732 try:1733 res = requests.get(URL, params=params, timeout=3.000)1734 log.info('call_account_trend: %s', res.url)1735 except Exception, e:1736 print e1737 return {'result': False}1738 result = res.json()1739 log.info('call_account_trend result:%s', result)1740 return result1741 @staticmethod1742 def update_account_trend_name(param):1743 global URL1744 param = BackendRequest._prepare_params(param)1745 params = dict({}, **param)1746 try:1747 res = requests.get(URL, params=params, timeout=10.000)1748 log.info('update_account_trend_name: %s', res.url)1749 except Exception, e:1750 print e1751 return {'result': False}1752 result = res.json()1753 log.info('update_account_trend_name result:%s', result)1754 return result1755 @staticmethod1756 def get_logtype(param):1757 global URL1758 param = BackendRequest._prepare_params(param)1759 params = dict({"act": "get_logtype"}, **param)1760 try:1761 res = requests.get(URL, params=params, timeout=10.000)1762 log.info('get_logtype: %s', res.url)1763 except Exception, e:1764 print e1765 return {'result': False}1766 result = res.json()1767 log.info('get_logtype result:%s', result)1768 return result1769 @staticmethod1770 def get_logtype_profiles(param):1771 global URL1772 param = BackendRequest._prepare_params(param)1773 params = dict({"act": "get_logtype_profiles"}, **param)1774 try:1775 res = requests.get(URL, params=params, timeout=10.000)1776 log.info('get_logtype_profiles: %s', res.url)1777 except Exception, e:1778 print e1779 return {'result': False}1780 result = res.json()1781 log.info('get_logtype_profiles result:%s', result)1782 return result1783 @staticmethod1784 def toggle_logtype(param):1785 global URL1786 param = BackendRequest._prepare_params(param)1787 params = dict({"act": "toggle_logtype"}, **param)1788 try:1789 res = requests.get(URL, params=params, timeout=10.000)1790 log.info('toggle_logtype: %s', res.url)1791 except Exception, e:1792 print e1793 return {'result': False}1794 result = res.json()1795 log.info('toggle_logtype result:%s', result)1796 return result1797 @staticmethod1798 def toggle_security(param):1799 global URL1800 param = BackendRequest._prepare_params(param)1801 params = dict({"act": "toggle_security"}, **param)1802 try:1803 res = requests.get(URL, params=params, timeout=10.000)1804 log.info('toggle_logtype: %s', res.url)1805 except Exception, e:1806 print e1807 return {'result': False}1808 result = res.json()1809 log.info('toggle_logtype result:%s', result)1810 return result1811 @staticmethod1812 def delete_logtype(param):1813 global URL1814 param = BackendRequest._prepare_params(param)1815 params = dict({"act": "delete_logtype"}, **param)1816 try:1817 res = requests.get(URL, params=params, timeout=10.000)1818 log.info('delete_logtype: %s', res.url)1819 except Exception, e:1820 print e1821 return {'result': False}1822 result = res.json()1823 log.info('delete_logtype result:%s', result)1824 return result1825 @staticmethod1826 def verify_logtype(param, datas):1827 global URL1828 # URL = "http://192.168.1.70:8181/"1829 param = BackendRequest._prepare_params(param)1830 params = dict({"act": "verify_logtype"}, **param)1831 p = urllib.urlencode(params)1832 url = URL + "?" + p1833 body = json.dumps(datas)1834 # print body1835 try:1836 res = requests.post(url, data=body, timeout=10.000)1837 log.info("verify_logtype: %s", res.url)1838 except Exception, e:1839 print e1840 return {'result': False}1841 result = res.json()1842 log.info("verify_logtype result:%s", result)1843 return result1844 @staticmethod1845 def create_logtype(param, datas):1846 global URL1847 param = BackendRequest._prepare_params(param)1848 params = dict({"act": "create_logtype"}, **param)1849 p = urllib.urlencode(params)1850 url = URL + "?" + p1851 body = json.dumps(datas)1852 # print body1853 try:1854 res = requests.post(url, data=body, timeout=10.000)1855 log.info("create_logtype: %s", res.url)1856 except Exception, e:1857 print e1858 return {'result': False}1859 result = res.json()1860 log.info("create_logtype result:%s", result)1861 return result1862 @staticmethod1863 def update_logtype(param, datas):1864 global URL1865 param = BackendRequest._prepare_params(param)1866 params = dict({"act": "update_logtype"}, **param)1867 p = urllib.urlencode(params)1868 url = URL + "?" + p1869 body = json.dumps(datas)1870 # print body1871 try:1872 res = requests.post(url, data=body, timeout=10.000)1873 log.info("update_logtype: %s", res.url)1874 except Exception, e:1875 print e1876 return {'result': False}1877 result = res.json()1878 log.info("update_logtype result:%s", result)1879 return result1880 @staticmethod1881 def get_agent_status(param):1882 global URL1883 # URL = 'http://192.168.1.31:8080/'1884 param = BackendRequest._prepare_params(param)1885 params = dict({"act": "get_agent_status"}, **param)1886 try:1887 res = requests.get(URL, params=params, timeout=10.000)1888 #print URL, params1889 log.info('get_agent_status: %s', res.url)1890 except Exception, e:1891 print e1892 return {'result': False}1893 result = res.json()1894 log.info('get_agent_status result:%s', result)1895 return result1896 @staticmethod1897 def add_resource_to_group(param):1898 global URL1899 # URL = 'http://192.168.1.31:8080/'1900 param = BackendRequest._prepare_params(param)1901 params = dict({"act": "add_resource_to_group"}, **param)1902 try:1903 res = requests.post(URL, params=params, timeout=10.000)1904 log.info('add_resource_to_group: %s', res.url)1905 except Exception, e:1906 print e1907 return {'result': False}1908 result = res.json()1909 log.info('add_resource_to_group result:%s', result)1910 return result1911 @staticmethod1912 def remove_resource_from_group(param):1913 global URL1914 # URL = 'http://192.168.1.31:8080/'1915 param = BackendRequest._prepare_params(param)1916 params = dict({"act": "remove_resource_from_group"}, **param)1917 try:1918 res = requests.get(URL, params=params, timeout=10.000)1919 log.info('remove_resource_from_group: %s', res.url)1920 except Exception, e:1921 print e1922 return {'result': False}1923 result = res.json()1924 log.info('remove_resource_from_group result: %s', result)1925 return result1926 @staticmethod1927 def delete_agent(param):1928 global URL1929 # URL = 'http://192.168.1.31:8080/'1930 param = BackendRequest._prepare_params(param)1931 params = dict({"act": "delete_agent"}, **param)1932 try:1933 res = requests.get(URL, params=params, timeout=10.000)1934 log.info('delete_agent: %s', res.url)1935 except Exception, e:1936 print e1937 return {'result': False}1938 result = res.json()1939 log.info('delete_agent result:%s', result)1940 return result1941 @staticmethod1942 def get_search_stats(param):1943 global URL1944 # URL = 'http://192.168.1.71:8080/'1945 param = BackendRequest._prepare_params(param)1946 params = dict({"act": "get_search_stats"}, **param)1947 try:1948 res = requests.get(URL, params=params, timeout=20.000)1949 log.info('get_search_stats: %s', res.url)1950 except Exception, e:1951 print e1952 return {'result': False}1953 result = res.json()1954 log.info('get_search_stats result:%s', result)1955 return result1956 # 2016-01-04 by huang.huajun1957 # for server heka config1958 @staticmethod1959 def get_agent_config(param):1960 global URL1961 # URL = 'http://192.168.1.31:8080/'1962 param = BackendRequest._prepare_params(param)1963 params = dict({"act": "get_agent_config"}, **param)1964 try:1965 res = requests.get(URL, params=params, timeout=10.000)1966 #print URL, params1967 log.info('get_agent_config: %s', res.url)1968 except Exception, e:1969 print e1970 return {'result': False}1971 result = res.json()1972 log.info('get_agent_config result:%s', result)1973 return result1974 @staticmethod1975 def add_agent_config(param):1976 global URL1977 # URL = 'http://192.168.1.31:8080/'1978 param = BackendRequest._prepare_params(param)1979 params = dict({"act": "add_agent_config"}, **param)1980 try:1981 res = requests.get(URL, params=params, timeout=10.000)1982 #print URL, params1983 log.info('add_agent_config: %s', res.url)1984 except Exception, e:1985 print e1986 return {'result': False}1987 result = res.json()1988 log.info('add_agent_config result:%s', result)1989 return result1990 @staticmethod1991 def add_agent_package(param):1992 global URL1993 # URL = 'http://192.168.1.31:8080/'1994 param = BackendRequest._prepare_params(param)1995 params = dict({"act": "add_agent_package"}, **param)1996 try:1997 res = requests.get(URL, params=params, timeout=10.000)1998 #print URL, params1999 log.info('add_agent_package: %s', res.url)2000 except Exception, e:2001 print e2002 return {'result': False}2003 result = res.json()2004 log.info('add_agent_package result:%s', result)2005 return result2006 @staticmethod2007 def update_agent(param):2008 global URL2009 # URL = 'http://192.168.1.31:8080/'2010 param = BackendRequest._prepare_params(param)2011 params = dict({"act": "update_agent"}, **param)2012 try:2013 res = requests.get(URL, params=params, timeout=10.000)2014 #print URL, params2015 log.info('update_agent: %s', res.url)2016 except Exception, e:2017 print e2018 return {'result': False}2019 result = res.json()2020 log.info('update_agent result:%s', result)2021 return result2022 @staticmethod2023 def get_agent_package(param):2024 global URL2025 # URL = 'http://192.168.1.31:8080/'2026 param = BackendRequest._prepare_params(param)2027 params = dict({"act": "get_agent_package"}, **param)2028 try:2029 res = requests.get(URL, params=params, timeout=10.000)2030 #print URL, params2031 log.info('get_agent_package: %s', res.url)2032 except Exception, e:2033 print e2034 return {'result': False}2035 result = res.json()2036 log.info('get_agent_package result:%s', result)2037 return result2038 @staticmethod2039 def delete_agent_package(param):2040 global URL2041 # URL = 'http://192.168.1.31:8080/'2042 print "PPPPPPPPPPPPPPPPPPPPPPP %s" % param2043 param = BackendRequest._prepare_params(param)2044 params = dict({"act": "delete_agent_package"}, **param)2045 try:2046 print "PPPPPPPPPP %s" % params2047 res = requests.get(URL, params=params, timeout=10.000)2048 #print URL, params2049 print "DDDDDDDDDDD %s" % res2050 log.info('delete_agent_package: %s', res.url)2051 except Exception, e:2052 print e2053 return {'result': False}2054 result = res.json()2055 log.info('delete_agent_package result:%s', result)2056 return result2057 @staticmethod2058 def modify_agent_config(param):2059 global URL2060 # URL = 'http://192.168.1.31:8080/'2061 param = BackendRequest._prepare_params(param)2062 params = dict({"act": "modify_agent_config"}, **param)2063 try:2064 res = requests.get(URL, params=params, timeout=10.000)2065 #print URL, params2066 log.info('modify_agent_config: %s', res.url)2067 except Exception, e:2068 print e2069 return {'result': False}2070 result = res.json()2071 log.info('modify_agent_config result:%s', result)2072 return result2073 @staticmethod2074 def delete_agent_config(param):2075 global URL2076 # URL = 'http://192.168.1.31:8080/'2077 param = BackendRequest._prepare_params(param)2078 params = dict({"act": "delete_agent_config"}, **param)2079 try:2080 res = requests.get(URL, params=params, timeout=10.000)2081 #print URL, params2082 log.info('del_agent_config: %s', res.url)2083 except Exception, e:2084 print e2085 return {'result': False}2086 result = res.json()2087 log.info('del_agent_config result:%s', result)2088 return result2089 @staticmethod2090 def get_licence_info(param):2091 global URL2092 # URL = 'http://192.168.1.71:8080/'2093 param = BackendRequest._prepare_params(param)2094 params = dict({"act": "get_license_info"}, **param)2095 try:2096 res = requests.get(URL, params=params, timeout=10.000)2097 log.info('get_license_info: %s', res.url)2098 except Exception, e:2099 print e2100 return {'result': False}2101 result = res.json()2102 log.info('get_search_stats result:%s', result)2103 return result2104 @staticmethod2105 def get_records(param):2106 global URL2107 # URL = 'http://192.168.1.71:8080/'2108 param = BackendRequest._prepare_params(param)2109 params = dict({}, **param)2110 try:2111 res = requests.get(URL, params=params, timeout=10.000)2112 log.info('get_records: %s', res.url)2113 except Exception, e:2114 print e2115 return {'result': False}2116 result = res.json()2117 log.info('get_records result:%s', result)2118 return result2119 @staticmethod2120 def get_notice(param):2121 global URL2122 # URL = 'http://192.168.1.71:8080/'2123 param = BackendRequest._prepare_params(param)2124 params = dict({}, **param)2125 try:2126 res = requests.get(URL, params=params, timeout=10.000)2127 log.info('get_notice: %s', res.url)2128 except Exception, e:2129 print e2130 return {'result': False, 'error': 'Get notice error!'}2131 result = res.json()2132 if result["result"]:2133 log.info('get_notice result:%s', result.get("result",""))2134 else:2135 log.info('get_notice result:%s', result.get("error",""))2136 return result2137 @staticmethod2138 def get_appname_ttl(param):2139 global URL2140 # _URL = 'http://192.168.1.117:8080/'2141 _URL = URL2142 param = BackendRequest._prepare_params(param)2143 params = dict({"act": "get_appname_ttl"}, **param)2144 try:2145 res = requests.post(_URL, params=params, timeout=30.000)2146 log.info("get_appname_ttl contents params: %s", res.url)2147 except Exception, e:2148 print e2149 return {'result': False}2150 result = res.json()2151 log.info("get_appname_ttl contents result: %s", result)2152 return result2153 @staticmethod2154 def get_ttl_bk_times(param):2155 global URL2156 # _URL = 'http://192.168.1.117:8080/'2157 _URL = URL2158 param = BackendRequest._prepare_params(param)2159 params = dict({"act": "get_valid_appname_ttl"}, **param)2160 try:2161 res = requests.post(_URL, params=params, timeout=30.000)2162 log.info("get_valid_appname_ttl contents params: %s", res.url)2163 except Exception, e:2164 print e2165 return {'result': False}2166 result = res.json()2167 log.info("get_valid_appname_ttl contents result: %s", result)2168 return result2169 @staticmethod2170 def update_ttls(param):2171 global URL2172 # _URL = 'http://192.168.1.117:8080/'2173 _URL = URL2174 param = BackendRequest._prepare_params(param)2175 params = dict({"act": "create_appname_ttl"}, **param)2176 try:2177 res = requests.post(_URL, params=params, timeout=30.000)2178 print "###################update ttls url : ",res.url2179 log.info("create_appname_ttl contents params: %s", res.url)2180 except Exception, e:2181 print e2182 return {'result': False}2183 result = res.json()2184 log.info("create_appname_ttl contents result: %s", result)2185 return result2186 ############################################################################2187 # author: "Junwei Zhao"2188 # date: 10/17/20162189 # Dictionary section2190 # frontend dictionary api wrappers2191 ############################################################################2192 @staticmethod2193 def get_dict_list(param):2194 # construct url and dictionary for HTTP "GET"'s query string2195 # global URL2196 _URL = URL2197 params = BackendRequest.form_params(param, "dict_lists")2198 try:2199 res = requests.get(_URL, params=params, timeout=30.000)2200 log.info("get_dict_list: %s", res.url)2201 result = res.json()2202 except Exception as e:2203 return BackendRequest.print_and_rtn_exception_wrapper(e)2204 log.info("get_dict_list result: %s", result)2205 return result2206 @staticmethod2207 def get_dict_list_title(param):2208 _URL = URL2209 params = BackendRequest.form_params(param, "dict_lists_title")2210 try:2211 res = requests.get(_URL, params=params, timeout=30.000)2212 log.info("get_dict_list_title: %s", res.url)2213 result = res.json()2214 except Exception as e:2215 return BackendRequest.print_and_rtn_exception_wrapper(e)2216 log.info("get_dict_list_title result: %s", result)2217 return result2218 @staticmethod2219 def get_dict_detail(param):2220 _URL = URL2221 params = BackendRequest.form_params(param, "dict_detail")2222 try:2223 res = requests.get(_URL, params=params, timeout=30.000)2224 log.info("get_dict_detail: %s", res.url)2225 result = res.json()2226 except Exception as e:2227 return BackendRequest.print_and_rtn_exception_wrapper(e)2228 log.info("get_dict_detail result: %s", result)2229 return result2230 @staticmethod2231 def delete_dict(param):2232 # global URL2233 _URL = URL2234 params = BackendRequest.form_params(param, "dict_delete")2235 try:2236 res = requests.get(_URL, params=params, timeout=30.000)2237 log.info("delete_dict: %s", res.url)2238 result = res.json()2239 except Exception as e:2240 return BackendRequest.print_and_rtn_exception_wrapper(e)2241 log.info("delete_dict result: %s", result)2242 return result2243 @staticmethod2244 def update_dict(param, file=""):2245 # global URL2246 _URL = URL2247 params = BackendRequest.form_params(param, "dict_update")2248 body = file2249 # p = urllib.urlencode(params)2250 # url = URL + "?" + p2251 try:2252 res = requests.post(_URL, params=params, data=body, timeout=30.000)2253 log.info("update_dict: %s", res.url)2254 result = res.json()2255 except Exception as e:2256 return BackendRequest.print_and_rtn_exception_wrapper(e)2257 log.info("update_dict result: %s", result)2258 return result2259 @staticmethod2260 def upload_dict(param, file=""):2261 # global URL2262 _URL = URL2263 params = BackendRequest.form_params(param, "dict_upload")2264 body = file2265 # p = urllib.urlencode(params)2266 # url = URL + "?" + p2267 try:2268 res = requests.post(_URL, params=params, data=body, timeout=30.000)2269 log.info("upload_dict: %s", res.url)2270 result = res.json()2271 except Exception as e:2272 return BackendRequest.print_and_rtn_exception_wrapper(e)2273 log.info("upload_dict result: %s", result)2274 return result2275 @staticmethod2276 def dict_exist(param):2277 # get existing csv files list2278 file_name = str(param.get('name', ""))2279 rtn = BackendRequest.get_dict_list(param)2280 # check if uploading/updating file has already been stored in the database2281 if rtn["result"]:2282 dict_list = rtn.get('list', [])2283 if len(dict_list):2284 for item in dict_list:2285 if str(item.get('name', "")) == file_name:2286 return True2287 return False2288 ############################################################################2289 # author: "Junwei Zhao"2290 # date: 10/18/20162291 # Config section2292 # frontend config api wrappers2293 ############################################################################2294 @staticmethod2295 def get_config_list(param):2296 _URL = URL2297 params = BackendRequest.form_params(param, "config_list")2298 try:2299 res = requests.get(_URL, params=params, timeout=30.000)2300 log.info("get_config_list: %s", res.url)2301 if int(res.status_code) == 500:2302 return BackendRequest.print_and_rtn_exception_wrapper()2303 result = res.json()2304 except Exception as e:2305 return BackendRequest.print_and_rtn_exception_wrapper(e)2306 log.info("get_config_list result: %s", result)2307 BackendRequest.reformat_config_category_id(result.get('list', []))2308 return result2309 @staticmethod2310 def get_config_detail(param):2311 _URL = URL2312 params = BackendRequest.form_params(param, "config_detail")2313 try:2314 res = requests.get(_URL, params=params, timeout=30.000)2315 log.info("get_config_detail: %s", res.url)2316 print res.url2317 if int(res.status_code) == 500:2318 return BackendRequest.print_and_rtn_exception_wrapper()2319 result = res.json()2320 except Exception as e:2321 return BackendRequest.print_and_rtn_exception_wrapper(e)2322 log.info("get_config_detail result: %s", result)2323 return result2324 @staticmethod2325 def delete_config(param):2326 _URL = URL2327 params = BackendRequest.form_params(param, "config_delete")2328 try:2329 res = requests.get(_URL, params=params, timeout=30.000)2330 log.info("delete_config: %s", res.url)2331 if int(res.status_code) == 500:2332 return BackendRequest.print_and_rtn_exception_wrapper()2333 result = res.json()2334 except Exception as e:2335 return BackendRequest.print_and_rtn_exception_wrapper(e)2336 log.info("delete_config result: %s", result)2337 BackendRequest.reformat_config_category_id(result.get('list', []))2338 return result2339 @staticmethod2340 def get_category_list(param):2341 _URL = URL2342 params = BackendRequest.form_params(param, "category_list")2343 try:2344 res = requests.get(_URL, params=params, timeout=30.000)2345 log.info("get_category_list: %s", res.url)2346 if int(res.status_code) == 500:2347 return BackendRequest.print_and_rtn_exception_wrapper()2348 result = res.json()2349 except Exception as e:2350 return BackendRequest.print_and_rtn_exception_wrapper(e)2351 log.info("get_category_list result", result)2352 return result2353 @staticmethod2354 def create_config(param, data={}):2355 _URL = URL2356 params = BackendRequest.form_params(param, "config_new")2357 # pass data as string format instead of json2358 data = json.dumps(data)2359 try:2360 res = requests.post(_URL, params=params, data=data, timeout=30.000)2361 log.info("create_config: %s", res.url)2362 result = res.json()2363 except Exception as e:2364 return BackendRequest.print_and_rtn_exception_wrapper(e)2365 log.info("create_config result: %s", result)2366 return result2367 @staticmethod2368 def update_config(param, data={}):2369 _URL = URL2370 params = BackendRequest.form_params(param, "config_update")2371 data = json.dumps(data)2372 try:2373 res = requests.post(_URL, params=params, data=data, timeout=30.000)2374 log.info("update_config: %s", res.url)2375 result = res.json()2376 except Exception as e:2377 return BackendRequest.print_and_rtn_exception_wrapper(e)2378 log.info("update_config result: %s", result)2379 # result['category_id'] = result['categoryId']2380 # del result['categoryId']2381 return result2382 # reformat key "categoryId" to "category_id"2383 @staticmethod2384 def reformat_config_category_id(result):2385 if result and len(result):2386 for item in result:2387 item['category_id'] = item['categoryId']2388 del item['categoryId']2389 ############################################################################2390 # author: "Junwei Zhao"2391 # date: 10/21/20162392 # Report section2393 # frontend report api wrappers2394 ############################################################################2395 @staticmethod2396 def get_report_list(param):2397 _URL = URL2398 params = BackendRequest.form_params(param, "report_list")2399 try:2400 res = requests.get(_URL, params=params, timeout=30.000)2401 log.info("get_report_list: %s", res.url)2402 result = res.json()2403 except Exception as e:2404 return BackendRequest.print_and_rtn_exception_wrapper(e)2405 log.info("get_report_list result: %s", result)2406 return result2407 @staticmethod2408 def get_report_detail(param):2409 _URL = URL2410 params = BackendRequest.form_params(param, "report_detail")2411 try:2412 res = requests.get(_URL, params=params, timeout=30.000)2413 log.info("get_report_detail: %s", res.url)2414 result = res.json()2415 except Exception as e:2416 return BackendRequest.print_and_rtn_exception_wrapper(e)2417 log.info("get_report_detail result: %s", result)2418 return result2419 @staticmethod2420 def create_report(param, data={}):2421 _URL = URL2422 params = BackendRequest.form_params(param, "report_new")2423 data = json.dumps(data)2424 try:2425 res = requests.post(_URL, params=params, data=data, timeout=30.000)2426 log.info("create_report: %s", res.url)2427 result = res.json()2428 except Exception as e:2429 return BackendRequest.print_and_rtn_exception_wrapper(e)2430 log.info("create_report result: %s", result)2431 return result2432 @staticmethod2433 def update_report(param, data={}):2434 _URL = URL2435 params = BackendRequest.form_params(param, "report_update")2436 data = json.dumps(data)2437 log.info("update_report: %s", data)2438 try:2439 res = requests.post(_URL, params=params, data=data, timeout=30.000)2440 log.info("update_report: %s", res.url)2441 result = res.json()2442 except Exception as e:2443 return BackendRequest.print_and_rtn_exception_wrapper(e)2444 log.info("update_report result: %s", result)2445 return result2446 @staticmethod2447 def enable_report(param):2448 global URL2449 param = BackendRequest._prepare_params(param)2450 params = dict({"act": "report_enable"}, **param)2451 try:2452 res = requests.get(URL, params=params, timeout=3.000)2453 log.info("report_enable : %s", res.url)2454 except Exception, e:2455 print e2456 return {'result': False}2457 result = res.json()2458 log.info("report_enable result: %s", result)2459 return result2460 @staticmethod2461 def disable_report(param):2462 global URL2463 param = BackendRequest._prepare_params(param)2464 params = dict({"act": "report_disable"}, **param)2465 try:2466 res = requests.get(URL, params=params, timeout=3.000)2467 log.info("report_disable : %s", res.url)2468 except Exception, e:2469 print e2470 return {'result': False}2471 result = res.json()2472 log.info("report_disable result: %s", result)2473 return result2474 @staticmethod2475 def delete_report(param):2476 _URL = URL2477 params = BackendRequest.form_params(param, "report_delete")2478 try:2479 res = requests.post(_URL, params=params, timeout=30.000)2480 log.info("delete_report: %s", res.url)2481 result = res.json()2482 except Exception as e:2483 return BackendRequest.print_and_rtn_exception_wrapper(e)2484 log.info("delete_report result: %s", result)2485 return result2486 @staticmethod2487 def delete_download(param):2488 global URL2489 param = BackendRequest._prepare_params(param)2490 params = dict({"act": "delete_file"}, **param)2491 try:2492 res = requests.get(URL, params=params, timeout=3.000)2493 log.info('delete_download: %s', res.url)2494 except Exception, e:2495 print e2496 log.info('delete_download error: %s', str(e))2497 return {'result': False}2498 result = res.json()2499 log.info('delete_download result:%s', result)2500 return result2501 @staticmethod2502 def get_job_list(param):2503 _URL = URL2504 params = BackendRequest.form_params(param, "joblist")2505 try:2506 res = requests.get(_URL, params=params, timeout=30.000)2507 log.info("get_job_list: %s", res.url)2508 result = res.json()2509 except Exception as e:2510 return BackendRequest.print_and_rtn_exception_wrapper(e)2511 log.info("get_job_list result: %s", result)2512 return result2513 @staticmethod2514 def drill_down(param):2515 global URL2516 param = BackendRequest._prepare_params(param)2517 params = dict({"act": "drill_down"}, **param)2518 try:2519 res = requests.post(URL, params=params, timeout=30.000)2520 log.info("drill_down contents params: %s", res.url)2521 except Exception, e:2522 print e2523 return {'rc': 1}2524 result = res.json()2525 log.info("drill_down contents result: %s", result)2526 return result2527 @staticmethod2528 def drill_down_filter(param):2529 global URL2530 param = BackendRequest._prepare_params(param)2531 params = dict({"act": "filter"}, **param)2532 try:2533 res = requests.post(URL, params=params, timeout=30.000)2534 log.info("filter contents params: %s", res.url)2535 except Exception, e:2536 print e2537 return {'rc': 1}2538 result = res.json()2539 log.info("filter contents result: %s", result)2540 return result2541 @staticmethod2542 def delete_offlinetask(param):2543 global URL2544 param = BackendRequest._prepare_params(param)2545 params = dict({"act": "delete_file"}, **param)2546 try:2547 res = requests.get(URL, params=params, timeout=3.000)2548 log.info('delete_offlinetask: %s', res.url)2549 except Exception, e:2550 print e2551 log.info('delete_offlinetask error: %s', str(e))2552 return {'result': False}2553 result = res.json()2554 log.info('delete_offlinetask result:%s', result)2555 return result2556 ############################################################################2557 # author: "Junwei Zhao"2558 # date: 10/18/20162559 # Utility Functions2560 ############################################################################2561 # return given parameters grouped with proper act name2562 @staticmethod2563 def form_params(param, act):2564 param = BackendRequest._prepare_params(param)2565 return dict({'act': act}, **param)2566 # if any exception related to http transmission, json parsing and database operation2567 # print exception and return a mocking response back to its previous caller with error info2568 @staticmethod2569 def print_and_rtn_exception_wrapper(error="Server Error"):2570 print error2571 return {2572 'result': False,2573 'errorCode': 500,2574 'error': error2575 }2576 @staticmethod2577 def get_index_info_list(param):2578 _URL = URL2579 params = BackendRequest.form_params(param, "get_index_info_list")2580 try:2581 res = requests.get(_URL, params=params, timeout=30.000)2582 log.info("get_index_info_list: %s", res.url)2583 result = res.json()2584 except Exception as e:2585 return BackendRequest.print_and_rtn_exception_wrapper(e)2586 log.info("get_index_info_list result: %s", result)2587 return result2588 @staticmethod2589 def get_index_info(param):2590 _URL = URL2591 params = BackendRequest.form_params(param, "get_index_info")2592 try:2593 res = requests.get(_URL, params=params, timeout=30.000)2594 log.info("get_index_info: %s", res.url)2595 result = res.json()2596 except Exception as e:2597 return BackendRequest.print_and_rtn_exception_wrapper(e)2598 log.info("get_index_info result: %s", result)2599 return result2600 @staticmethod2601 def create_index_info(param):2602 _URL = URL2603 params = BackendRequest.form_params(param, "create_index_info")2604 try:2605 res = requests.get(_URL, params=params, timeout=30.000)2606 log.info("create_index_info: %s", res.url)2607 result = res.json()2608 except Exception as e:2609 return BackendRequest.print_and_rtn_exception_wrapper(e)2610 log.info("create_index_info result: %s", result)2611 return result2612 @staticmethod2613 def update_index_info(param):2614 _URL = URL2615 params = BackendRequest.form_params(param, "update_index_info")2616 try:2617 res = requests.get(_URL, params=params, timeout=30.000)2618 log.info("update_index_info: %s", res.url)2619 result = res.json()2620 except Exception as e:2621 return BackendRequest.print_and_rtn_exception_wrapper(e)2622 log.info("update_index_info result: %s", result)2623 return result2624 @staticmethod2625 def delete_index_info(param):2626 _URL = URL2627 params = BackendRequest.form_params(param, "delete_index_info")2628 try:2629 res = requests.get(_URL, params=params, timeout=30.000)2630 log.info("delete_index_info: %s", res.url)2631 result = res.json()2632 except Exception as e:2633 return BackendRequest.print_and_rtn_exception_wrapper(e)2634 log.info("delete_index_info result: %s", result)2635 return result2636 @staticmethod2637 def get_index_match_rule_list(param):2638 _URL = URL2639 params = BackendRequest.form_params(param, "get_index_match_rule_list")2640 try:2641 res = requests.get(_URL, params=params, timeout=30.000)2642 log.info("get_index_match_rule_list: %s", res.url)2643 result = res.json()2644 except Exception as e:2645 return BackendRequest.print_and_rtn_exception_wrapper(e)2646 log.info("get_index_match_rule_list result: %s", result)2647 return result2648 @staticmethod2649 def get_index_match_rule(param):2650 _URL = URL2651 params = BackendRequest.form_params(param, "get_index_match_rule")2652 try:2653 res = requests.get(_URL, params=params, timeout=30.000)2654 log.info("get_index_match_rule: %s", res.url)2655 result = res.json()2656 except Exception as e:2657 return BackendRequest.print_and_rtn_exception_wrapper(e)2658 log.info("get_index_match_rule result: %s", result)2659 return result2660 @staticmethod2661 def create_index_match_rule(param):2662 _URL = URL2663 params = BackendRequest.form_params(param, "create_index_match_rule")2664 try:2665 res = requests.get(_URL, params=params, timeout=30.000)2666 log.info("create_index_match_rule: %s", res.url)2667 result = res.json()2668 except Exception as e:2669 return BackendRequest.print_and_rtn_exception_wrapper(e)2670 log.info("create_index_match_rule result: %s", result)2671 return result2672 @staticmethod2673 def update_index_match_rule(param):2674 _URL = URL2675 params = BackendRequest.form_params(param, "update_index_match_rule")2676 try:2677 res = requests.get(_URL, params=params, timeout=30.000)2678 log.info("update_index_match_rule: %s", res.url)2679 result = res.json()2680 except Exception as e:2681 return BackendRequest.print_and_rtn_exception_wrapper(e)2682 log.info("update_index_match_rule result: %s", result)2683 return result2684 @staticmethod2685 def delete_index_match_rule(param):2686 _URL = URL2687 params = BackendRequest.form_params(param, "delete_index_match_rule")2688 try:2689 res = requests.get(_URL, params=params, timeout=30.000)2690 log.info("delete_index_match_rule: %s", res.url)2691 result = res.json()2692 except Exception as e:2693 return BackendRequest.print_and_rtn_exception_wrapper(e)2694 log.info("delete_index_match_rule result: %s", result)2695 return result2696 @staticmethod2697 def get_beneficiary_list(param):2698 global URL2699 param = BackendRequest._prepare_params(param)2700 params = dict({"act": "get_beneficiary_list"}, **param)2701 res = requests.get(URL, params=params, timeout=3.000)2702 log.info("get_beneficiary_list: %s", res.url)2703 result = res.json()2704 log.info("get_beneficiary_list result: %s", result)2705 return result2706 2707 @staticmethod2708 def get_beneficiary(param):2709 global URL2710 param = BackendRequest._prepare_params(param)2711 params = dict({"act": "get_beneficiary"}, **param)2712 res = requests.get(URL, params=params, timeout=3.000)2713 log.info("get_beneficiary: %s", res.url)2714 result = res.json()2715 log.info("get_beneficiary result: %s", result)2716 return result2717 @staticmethod2718 def create_beneficiary(param):2719 global URL2720 param = BackendRequest._prepare_params(param)2721 params = dict({"act": "create_beneficiary"}, **param)2722 res = requests.get(URL, params=params, timeout=3.000)2723 log.info("create_beneficiary: %s", res.url)2724 result = res.json()2725 log.info("create_beneficiary result: %s", result)2726 return result2727 2728 @staticmethod2729 def update_beneficiary(param):2730 global URL2731 param = BackendRequest._prepare_params(param)2732 params = dict({"act": "update_beneficiary"}, **param)2733 res = requests.get(URL, params=params, timeout=3.000)2734 log.info("update_beneficiary: %s", res.url)2735 result = res.json()2736 log.info("update_beneficiary result: %s", result)2737 return result2738 2739 @staticmethod2740 def delete_beneficiary(param):2741 global URL2742 param = BackendRequest._prepare_params(param)2743 params = dict({"act": "delete_beneficiary"}, **param)2744 res = requests.get(URL, params=params, timeout=3.000)2745 log.info("delete_beneficiary: %s", res.url)2746 result = res.json()2747 log.info("delete_beneficiary result: %s", result)2748 return result2749 2750 @staticmethod2751 def get_beneficiary_usages(param):2752 global URL2753 param = BackendRequest._prepare_params(param)2754 params = dict({"act": "get_beneficiary_usages"}, **param)2755 res = requests.get(URL, params=params, timeout=3.000)2756 log.info("get_beneficiary_usages: %s", res.url)2757 result = res.json()2758 log.info("get_beneficiary_usages result: %s", result)2759 return result2760 @staticmethod2761 def get_appname_list(param):2762 global URL2763 param = BackendRequest._prepare_params(param)2764 params = dict({"act": "get_appname_list"}, **param)2765 res = requests.get(URL, params=params, timeout=3.000)2766 log.info("get_appname_list: %s", res.url)2767 result = res.json()2768 log.info("get_appname_list result: %s", result)2769 return result2770 2771 @staticmethod2772 def get_appname(param):2773 global URL2774 param = BackendRequest._prepare_params(param)2775 params = dict({"act": "get_appname"}, **param)2776 res = requests.get(URL, params=params, timeout=3.000)2777 log.info("get_appname: %s", res.url)2778 result = res.json()2779 log.info("get_appname result: %s", result)2780 return result2781 @staticmethod2782 def get_assigned_appname_list(param):2783 global URL2784 param = BackendRequest._prepare_params(param)2785 params = dict({"act": "get_assigned_appname_list"}, **param)2786 res = requests.get(URL, params=params, timeout=3.000)2787 log.info("get_assigned_appname_list: %s", res.url)2788 result = res.json()2789 log.info("get_assigned_appname_list result: %s", result)2790 return result2791 @staticmethod2792 def get_unassigned_appname_list(param):2793 global URL2794 param = BackendRequest._prepare_params(param)2795 params = dict({"act": "get_unassigned_appname_list"}, **param)2796 res = requests.get(URL, params=params, timeout=3.000)2797 log.info("get_unassigned_appname_list: %s", res.url)2798 result = res.json()2799 log.info("get_unassigned_appname_list result: %s", result)2800 return result2801 2802 @staticmethod2803 def assign_appname(param):2804 global URL2805 param = BackendRequest._prepare_params(param)2806 params = dict({"act": "assign_appname"}, **param)2807 res = requests.get(URL, params=params, timeout=10.000)2808 log.info("assign_appname: %s", res.url)2809 result = res.json()2810 log.info("assign_appname result: %s", result)2811 return result2812 @staticmethod2813 def reassign_appnames(param):2814 global URL2815 param = BackendRequest._prepare_params(param)2816 params = dict({"act": "reassign_appnames"}, **param)2817 res = requests.get(URL, params=params, timeout=10.000)2818 log.info("reassign_appnames: %s", res.url)2819 result = res.json()2820 log.info("reassign_appnames result: %s", result)2821 return result2822 @staticmethod2823 def get_appname_list_of_beneficiary(param):2824 global URL2825 param = BackendRequest._prepare_params(param)2826 params = dict({"act": "get_appname_list_of_beneficiary"}, **param)2827 res = requests.get(URL, params=params, timeout=3.000)2828 log.info("get_appname_list_of_beneficiary: %s", res.url)2829 result = res.json()2830 log.info("get_appname_list_of_beneficiary result: %s", result)2831 return result2832 2833 @staticmethod2834 def create_appname(param):2835 global URL2836 param = BackendRequest._prepare_params(param)2837 params = dict({"act": "create_appname"}, **param)2838 res = requests.get(URL, params=params, timeout=3.000)2839 log.info("create_appname: %s", res.url)2840 result = res.json()2841 log.info("create_appname result: %s", result)2842 return result2843 @staticmethod2844 def get_beneficiary_usages_distribution(param):2845 global URL2846 param = BackendRequest._prepare_params(param)2847 params = dict({"act": "get_beneficiary_usages_distribution"}, **param)2848 res = requests.get(URL, params=params, timeout=3.000)2849 log.info("get_beneficiary_usages_distribution: %s", res.url)2850 result = res.json()2851 log.info("get_beneficiary_usages_distribution result: %s", result)2852 return result2853 2854 @staticmethod2855 def verify_resource_package(param, data):2856 global URL2857 param = BackendRequest._prepare_params(param)2858 try:2859 p = urllib.urlencode(param)2860 url = URL + "?" + p2861 body = data2862 res = requests.post(url, data=body, timeout=3.000)2863 log.info("verify_resource_package: %s", res.url)2864 except Exception, e:2865 log.error("verify_resource_package error: %s", e)2866 return {'result': False}2867 result = res.json()2868 log.info("verify_resource_package result: %s", result)2869 return result2870 @staticmethod2871 def import_resource_package(param, data):2872 global URL2873 param = BackendRequest._prepare_params(param)2874 try:2875 p = urllib.urlencode(param)2876 url = URL + "?" + p2877 body = data2878 res = requests.post(url, data=body, timeout=60.000)2879 log.info("import_resource_package: %s", res.url)2880 except Exception, e:2881 log.error("import_resource_package error: %s", e)2882 return {'result': False}2883 result = res.json()2884 log.info("import_resource_package result: %s", result)2885 return result2886 @staticmethod2887 def export_resource_package(param):2888 global URL2889 param = BackendRequest._prepare_params(param)2890 params = dict({"act": "export_resource_package"}, **param)2891 res = requests.get(URL, params=params, timeout=10.000)2892 log.info("export_resource_package: %s", res.url)2893 result = res.json()2894 log.info("export_resource_package result: %s", result)...
test_services.py
Source:test_services.py
...93 self.assertEqual(94 Url(response['next']),95 Url(self._prepare_url(96 IMAGE_PATH,97 self._prepare_params(params, marker=SMALLEST_IMAGE)98 ))99 )100 def test_sort_images_date_limit_ascending(self):101 """Sort images by last update, ascending, limit to 2."""102 params = {103 'sort': 'updated_at:asc',104 'limit': 2105 }106 response = json.loads(services.aggregate(IMAGES, 'images',107 params, IMAGE_PATH))108 # Check the first and second are the correct ids.109 self.assertEqual(response['images'][0]['id'], EARLIEST_IMAGE)110 self.assertEqual(response['images'][1]['id'], SECOND_EARLIEST_IMAGE)111 self.assertEqual(2, len(response['images']))112 # Check the next link113 self.assertEqual(114 Url(response['next']),115 Url(self._prepare_url(116 IMAGE_PATH,117 self._prepare_params(params, marker=SECOND_EARLIEST_IMAGE)118 ))119 )120 def test_sort_images_date_limit_descending(self):121 """Sort images by last update, descending, limit 1."""122 params = {123 'sort': 'updated_at:desc',124 'limit': 1125 }126 response = json.loads(services.aggregate(IMAGES, 'images',127 params, IMAGE_PATH))128 # Check the id and size129 self.assertEqual(response['images'][0]['id'], LATEST_IMAGE)130 self.assertEqual(1, len(response['images']))131 # Check the next link132 self.assertEqual(133 Url(response['next']),134 Url(self._prepare_url(135 IMAGE_PATH,136 self._prepare_params(params, marker=LATEST_IMAGE)137 ))138 )139 def test_sort_images_date_ascending_pagination(self):140 """Sort images by last update, ascending, skip the first one."""141 params = {142 'sort': 'updated_at:asc',143 'limit': 1,144 'marker': EARLIEST_IMAGE145 }146 response = json.loads(services.aggregate(IMAGES, 'images',147 params, IMAGE_PATH))148 # Ensure we skipped the first one149 self.assertEqual(response['images'][0]['id'], SECOND_EARLIEST_IMAGE)150 self.assertEqual(1, len(response['images']))151 # Next link152 self.assertEqual(153 Url(response['next']),154 Url(self._prepare_url(155 IMAGE_PATH,156 self._prepare_params(params, marker=SECOND_EARLIEST_IMAGE)157 ))158 )159 # Start link160 self.assertEqual(161 Url(response['start']),162 Url(self._prepare_url(163 IMAGE_PATH,164 self._prepare_params(params)165 ))166 )167 def test_marker_without_limit(self):168 """Test marker without limit."""169 params = {170 'sort': 'updated_at:asc',171 'marker': EARLIEST_IMAGE172 }173 response = json.loads(services.aggregate(IMAGES, 'images',174 params, IMAGE_PATH))175 # Ensure we skipped the first one176 self.assertEqual(response['images'][0]['id'], SECOND_EARLIEST_IMAGE)177 self.assertEqual(IMAGES_IN_SAMPLE - 1, len(response['images']))178 # Start link179 self.assertEqual(180 Url(response['start']),181 Url(self._prepare_url(182 IMAGE_PATH,183 self._prepare_params(params)184 ))185 )186 def test_marker_last(self):187 """Test marker without limit, nothing to return."""188 params = {189 'sort': 'updated_at:asc',190 'marker': LATEST_IMAGE191 }192 response = json.loads(services.aggregate(IMAGES, 'images',193 params, IMAGE_PATH))194 # Ensure we skipped the first one195 self.assertEqual(0, len(response['images']))196 # Start link197 self.assertEqual(198 Url(response['start']),199 Url(self._prepare_url(200 IMAGE_PATH,201 self._prepare_params(params)202 ))203 )204 def test_list_api_versions(self):205 self.config_fixture.load_raw_values(group='proxy',206 image_api_versions=API_VERSIONS,207 volume_api_versions=API_VERSIONS)208 # List image api209 response = json.loads(services.list_api_versions('image',210 IMAGE_UNVERSIONED))211 current_version = response['versions'][0]['id']212 current_version_status = response['versions'][0]['status']213 current_version_url = response['versions'][0]['links'][0]['href']214 self.assertEqual(NUM_OF_VERSIONS, len(response['versions']))215 self.assertEqual(current_version, 'v3.2')216 self.assertEqual(current_version_status, 'CURRENT')217 self.assertEqual(218 Url(current_version_url),219 Url(IMAGE_VERSIONED))220 @staticmethod221 def _prepare_params(user_params, marker=None):222 params = user_params.copy()223 if marker:224 params['marker'] = marker225 else:226 params.pop('marker', None)227 return params228 @staticmethod229 def _prepare_url(url, params):...
test_bitrix24.py
Source:test_bitrix24.py
...20 def setUp(self):21 self.b24 = Bitrix24('https://example.bitrix24.com/rest/1/123456789')22 def test_one_level(self):23 params = {"fruit": "apple"}24 param_string = self.b24._prepare_params(params)25 self.assertEqual(param_string, "fruit=apple&")26 def test_one_level_several_items(self):27 params = {"fruit": "apple", "vegetable": "broccoli"}28 param_string = self.b24._prepare_params(params)29 self.assertEqual(param_string, "fruit=apple&vegetable=broccoli&")30 def test_multi_level(self):31 params = {"fruit": {"citrus": "lemon"}}32 param_string = self.b24._prepare_params(params)33 self.assertEqual(param_string, "fruit[citrus]=lemon&")34 def test_multi_level_deep(self):35 params = {"root": {"level 1": {"level 2": {"level 3": "value"}}}}36 param_string = self.b24._prepare_params(params)37 self.assertEqual(38 param_string, "root[level 1][level 2][level 3]=value&")39 def test_list_dict_mixed(self):40 params = {"root": {"level 1": [41 {"list_dict 1": "value 1"}, {"list_dict 2": "value 2"}]}}42 param_string = self.b24._prepare_params(params)43 self.assertEqual(44 param_string, "root[level 1][0][list_dict 1]=value 1&root[level 1][1][list_dict 2]=value 2&")45 def test_multi_level_several_items(self):46 params = {"fruit": {"citrus": "lemon", "sweet": "apple"}}47 param_string = self.b24._prepare_params(params)48 self.assertEqual(49 param_string, "fruit[citrus]=lemon&fruit[sweet]=apple&")50 def test_list(self):51 params = {"fruit": ["lemon", "apple"]}52 param_string = self.b24._prepare_params(params)53 self.assertEqual(param_string, "fruit[0]=lemon&fruit[1]=apple&")54 def test_tuple(self):55 params = {"fruit": ("lemon", "apple")}56 param_string = self.b24._prepare_params(params)57 self.assertEqual(param_string, "fruit[0]=lemon&fruit[1]=apple&")58 def test_string(self):59 param_string = self.b24._prepare_params('')60 self.assertEqual(param_string, "")61if __name__ == '__main__':...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!