Best Python code snippet using robotframework-ioslibrary_python
CoreApiHelper.py
Source:CoreApiHelper.py
...36 node_host = API.get_host_info()37 self.system_config = core_api.Configuration(node_host, verify_ssl=verify_ssl)38 def network_configuration(self, print_response=False):39 with core_api.ApiClient(self.system_config) as api_client:40 api_client = self.set_basic_auth(api_client, "admin", "admin")41 try:42 api = network_api.NetworkApi(api_client)43 response: NetworkConfigurationResponse = api.network_configuration_post(dict())44 if print_response:45 print(response)46 return response47 except ApiException as e:48 Helpers.handleApiException(e)49 def network_status(self, print_response=False):50 with core_api.ApiClient(self.system_config) as api_client:51 api_client = self.set_basic_auth(api_client, "admin", "admin")52 try:53 api = network_api.NetworkApi(api_client)54 response = api.network_status_post(55 NetworkStatusRequest(self.network_configuration().network_identifier))56 return self.handle_response(response, print_response)57 except ApiException as e:58 Helpers.handleApiException(e)59 60 def engine_configuration(self, print_response=False):61 with core_api.ApiClient(self.system_config) as api_client:62 api_client = self.set_basic_auth(api_client, "admin", "admin")63 try:64 api = engine_api.EngineApi(api_client)65 response: EngineConfigurationResponse = api.engine_configuration_post(66 EngineConfigurationRequest(self.network_configuration().network_identifier))67 return self.handle_response(response, print_response)68 except ApiException as e:69 Helpers.handleApiException(e)70 71 def vote(self, print_response=False):72 with core_api.ApiClient(self.system_config) as api_client:73 api_client = self.set_basic_auth(api_client, "superadmin", "superadmin")74 try:75 api = key_api.KeyApi(api_client)76 request = UpdateVoteRequest(network_identifier=self.network_configuration().network_identifier)77 Helpers.print_request_body(request, "/key/vote")78 response: UpdateVoteResponse = api.key_vote_post(request)79 return self.handle_response(response, print_response)80 except ApiException as e:81 Helpers.handleApiException(e)82 83 def withdraw_vote(self, print_response=False):84 with core_api.ApiClient(self.system_config) as api_client:85 api_client = self.set_basic_auth(api_client, "superadmin", "superadmin")86 try:87 api = key_api.KeyApi(api_client)88 request = UpdateVoteRequest(network_identifier=self.network_configuration().network_identifier)89 Helpers.print_request_body(request, "/key/withdraw_vote")90 response: UpdateVoteResponse = api.key_withdraw_vote_post(request)91 return self.handle_response(response, print_response)92 except ApiException as e:93 Helpers.handleApiException(e)94 def key_list(self, print_response=False):95 with core_api.ApiClient(self.system_config) as api_client:96 api_client = self.set_basic_auth(api_client, "superadmin", "superadmin")97 try:98 api = key_api.KeyApi(api_client)99 request = KeyListRequest(network_identifier=self.network_configuration().network_identifier)100 Helpers.print_request_body(request, "/key/list")101 response: KeyListResponse = api.key_list_post(102 request)103 return self.handle_response(response, print_response)104 except ApiException as e:105 Helpers.handleApiException(e)106 def mempool(self, print_response=False):107 with core_api.ApiClient(self.system_config) as api_client:108 api_client = self.set_basic_auth(api_client, "admin", "admin")109 try:110 api = mempool_api.MempoolApi(api_client)111 response: MempoolResponse = api.mempool_post(112 MempoolRequest(network_identifier=self.network_configuration().network_identifier))113 return self.handle_response(response, print_response)114 except ApiException as e:115 Helpers.handleApiException(e)116 def mempool_transaction(self, transactionId: str, print_response=False):117 with core_api.ApiClient(self.system_config) as api_client:118 api_client = self.set_basic_auth(api_client, "admin", "admin")119 try:120 api = mempool_api.MempoolApi(api_client)121 response: MempoolResponse = api.mempool_transaction_post(122 MempoolTransactionRequest(123 network_identifier=self.network_configuration().network_identifier,124 transaction_identifier=TransactionIdentifier(transactionId)125 )126 )127 return self.handle_response(response, print_response)128 except ApiException as e:129 Helpers.handleApiException(e)130 def entity(self, entity_identifier, print_response=False):131 with core_api.ApiClient(self.system_config) as api_client:132 api_client = self.set_basic_auth(api_client, "admin", "admin")133 try:134 api = entity_api.EntityApi(api_client)135 entityRequest = EntityRequest(136 network_identifier=self.network_configuration().network_identifier,137 entity_identifier=entity_identifier138 )139 Helpers.print_request_body(entityRequest, "/entity")140 response: EntityResponse = api.entity_post(entityRequest)141 return self.handle_response(response, print_response)142 except ApiException as e:143 Helpers.handleApiException(e)144 def construction_build(self, actions, print_response=False, ask_user=False):145 with core_api.ApiClient(self.system_config) as api_client:146 api_client = self.set_basic_auth(api_client, "admin", "admin")147 try:148 network_configuration: NetworkConfigurationResponse = self.network_configuration()149 key_list: KeyListResponse = self.key_list()150 operation_groups = ValidatorConfig.build_operations(actions, key_list, ask_user=ask_user)151 if len(operation_groups) == 0: 152 return153 api = construction_api.ConstructionApi(api_client)154 build_request = ConstructionBuildRequest(155 network_identifier=network_configuration.network_identifier,156 fee_payer=key_list.public_keys[0].identifiers.account_entity_identifier,157 operation_groups=operation_groups)158 Helpers.print_request_body(build_request, "/construction/build")159 build: ConstructionBuildResponse = api.construction_build_post(build_request)160 return self.handle_response(build, print_response)161 except ApiException as e:162 Helpers.handleApiException(e)163 def key_sign(self, unsigned_transaction, print_response=False):164 with core_api.ApiClient(self.system_config) as api_client:165 api_client = self.set_basic_auth(api_client, "superadmin", "superadmin")166 try:167 network_configuration: NetworkConfigurationResponse = self.network_configuration()168 key_list: KeyListResponse = self.key_list()169 api = key_api.KeyApi(api_client)170 request = KeySignRequest(network_identifier=network_configuration.network_identifier,171 public_key=key_list.public_keys[0].public_key,172 unsigned_transaction=unsigned_transaction)173 Helpers.print_request_body(request, "/key/sign")174 response = api.key_sign_post(request)175 return self.handle_response(response, print_response)176 except ApiException as e:177 Helpers.handleApiException(e)178 def construction_submit(self, signed_transaction, print_response=False):179 with core_api.ApiClient(self.system_config) as api_client:180 api_client = self.set_basic_auth(api_client, "admin", "admin")181 try:182 api = construction_api.ConstructionApi(api_client)183 network_configuration: NetworkConfigurationResponse = self.network_configuration()184 request = ConstructionSubmitRequest(network_identifier=network_configuration.network_identifier,185 signed_transaction=signed_transaction)186 Helpers.print_request_body(request, "/construction/submit")187 response = api.construction_submit_post(request)188 return self.handle_response(response, print_response)189 except ApiException as e:190 Helpers.handleApiException(e)191 @staticmethod192 def set_validator_metadata(name, url):193 return lambda node_identifiers: [194 OperationGroup([...
server_tests.py
Source:server_tests.py
1#!/usr/bin/env python2# -*- coding: utf-8 -*-3# Licensed to Cloudera, Inc. under one4# or more contributor license agreements. See the NOTICE file5# distributed with this work for additional information6# regarding copyright ownership. Cloudera, Inc. licenses this file7# to you under the Apache License, Version 2.0 (the8# "License"); you may not use this file except in compliance9# with the License. You may obtain a copy of the License at10#11# http://www.apache.org/licenses/LICENSE-2.012#13# Unless required by applicable law or agreed to in writing, software14# distributed under the License is distributed on an "AS IS" BASIS,15# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.16# See the License for the specific language governing permissions and17# limitations under the License.18import logging19import sys20from nose.tools import assert_equal, assert_not_equal, assert_true, assert_false, assert_raises21from desktop.lib.exceptions_renderable import PopupException22from desktop.lib.django_test_util import make_logged_in_client23from useradmin.models import User24from impala.server import ImpalaDaemonApi, _get_impala_server_url25if sys.version_info[0] > 2:26 from unittest.mock import patch, Mock, MagicMock27else:28 from mock import patch, Mock, MagicMock29LOG = logging.getLogger(__name__)30class TestImpalaDaemonApi():31 def setUp(self):32 self.client = make_logged_in_client(username="test", groupname="default", recreate=True, is_superuser=False)33 self.user = User.objects.get(username="test")34 def test_get_impala_server_url_when_no_session(self):35 assert_raises(PopupException, _get_impala_server_url, session=None)36 def test_digest_auth(self):37 with patch('impala.server.DAEMON_API_USERNAME.get') as DAEMON_API_USERNAME_get:38 with patch('impala.server.DAEMON_API_PASSWORD.get') as DAEMON_API_PASSWORD_get:39 with patch('impala.server.HttpClient') as HttpClient:40 DAEMON_API_USERNAME_get.return_value = 'impala'41 DAEMON_API_PASSWORD_get.return_value = 'impala'42 server = ImpalaDaemonApi('localhost')43 server._client.set_digest_auth.assert_called()44 server._client.set_kerberos_auth.assert_not_called()45 server._client.set_basic_auth.assert_not_called()46 with patch('impala.server.DAEMON_API_USERNAME.get') as DAEMON_API_USERNAME_get:47 with patch('impala.server.DAEMON_API_PASSWORD.get') as DAEMON_API_PASSWORD_get:48 with patch('impala.server.HttpClient') as HttpClient:49 DAEMON_API_USERNAME_get.return_value = None50 DAEMON_API_PASSWORD_get.return_value = 'impala'51 server = ImpalaDaemonApi('localhost')52 server._client.set_digest_auth.assert_not_called()53 server._client.set_kerberos_auth.assert_not_called()54 server._client.set_basic_auth.assert_not_called()55 def test_basic_auth(self):56 with patch('impala.server.DAEMON_API_USERNAME.get') as DAEMON_API_USERNAME_get:57 with patch('impala.server.DAEMON_API_PASSWORD.get') as DAEMON_API_PASSWORD_get:58 with patch('impala.server.HttpClient') as HttpClient:59 with patch('impala.server.DAEMON_API_AUTH_SCHEME.get') as DAEMON_API_AUTH_SCHEME_get:60 DAEMON_API_USERNAME_get.return_value = 'impala'61 DAEMON_API_PASSWORD_get.return_value = 'impala'62 DAEMON_API_AUTH_SCHEME_get.return_value = 'basic'63 server = ImpalaDaemonApi('localhost')64 server._client.set_basic_auth.assert_called()65 server._client.set_digest_auth.assert_not_called()66 server._client.set_kerberos_auth.assert_not_called()67 with patch('impala.server.DAEMON_API_USERNAME.get') as DAEMON_API_USERNAME_get:68 with patch('impala.server.DAEMON_API_PASSWORD.get') as DAEMON_API_PASSWORD_get:69 with patch('impala.server.HttpClient') as HttpClient:70 with patch('impala.server.DAEMON_API_AUTH_SCHEME.get') as DAEMON_API_AUTH_SCHEME_get:71 DAEMON_API_USERNAME_get.return_value = 'impala'72 DAEMON_API_PASSWORD_get.return_value = None73 DAEMON_API_AUTH_SCHEME_get.return_value = 'basic'74 server = ImpalaDaemonApi('localhost')75 server._client.set_basic_auth.assert_not_called()76 server._client.set_digest_auth.assert_not_called()77 server._client.set_kerberos_auth.assert_not_called()78 def test_kerberos_auth(self):79 with patch('impala.server.DAEMON_API_USERNAME.get') as DAEMON_API_USERNAME_get:80 with patch('impala.server.DAEMON_API_PASSWORD.get') as DAEMON_API_PASSWORD_get:81 with patch('impala.server.HttpClient') as HttpClient:82 with patch('impala.server.is_webserver_spnego_enabled') as is_webserver_spnego_enabled:83 DAEMON_API_USERNAME_get.return_value = 'impala'84 DAEMON_API_PASSWORD_get.return_value = 'impala'85 is_webserver_spnego_enabled.return_value = True86 server = ImpalaDaemonApi('localhost')87 server._client.set_digest_auth.assert_called()88 server._client.set_kerberos_auth.assert_not_called()89 server._client.set_basic_auth.assert_not_called()90 with patch('impala.server.DAEMON_API_USERNAME.get') as DAEMON_API_USERNAME_get:91 with patch('impala.server.DAEMON_API_PASSWORD.get') as DAEMON_API_PASSWORD_get:92 with patch('impala.server.HttpClient') as HttpClient:93 with patch('impala.server.is_webserver_spnego_enabled') as is_webserver_spnego_enabled:94 DAEMON_API_USERNAME_get.return_value = None95 DAEMON_API_PASSWORD_get.return_value = 'impala'96 is_webserver_spnego_enabled.return_value = False97 server = ImpalaDaemonApi('localhost')98 server._client.set_digest_auth.assert_not_called()99 server._client.set_kerberos_auth.assert_not_called()100 server._client.set_basic_auth.assert_not_called()101 with patch('impala.server.DAEMON_API_USERNAME.get') as DAEMON_API_USERNAME_get:102 with patch('impala.server.DAEMON_API_PASSWORD.get') as DAEMON_API_PASSWORD_get:103 with patch('impala.server.HttpClient') as HttpClient:104 with patch('impala.server.is_kerberos_enabled') as is_kerberos_enabled:105 DAEMON_API_USERNAME_get.return_value = None106 DAEMON_API_PASSWORD_get.return_value = None107 is_kerberos_enabled.return_value = True108 server = ImpalaDaemonApi('localhost')109 server._client.set_digest_auth.assert_not_called()110 server._client.set_kerberos_auth.assert_called()111 server._client.set_basic_auth.assert_not_called()112 with patch('impala.server.DAEMON_API_USERNAME.get') as DAEMON_API_USERNAME_get:113 with patch('impala.server.DAEMON_API_PASSWORD.get') as DAEMON_API_PASSWORD_get:114 with patch('impala.server.HttpClient') as HttpClient:115 with patch('impala.server.is_webserver_spnego_enabled') as is_webserver_spnego_enabled:116 DAEMON_API_USERNAME_get.return_value = None117 DAEMON_API_PASSWORD_get.return_value = None118 is_webserver_spnego_enabled.return_value = True119 server = ImpalaDaemonApi('localhost')120 server._client.set_digest_auth.assert_not_called()121 server._client.set_kerberos_auth.assert_called()122 server._client.set_basic_auth.assert_not_called()123 with patch('impala.server.DAEMON_API_USERNAME.get') as DAEMON_API_USERNAME_get:124 with patch('impala.server.DAEMON_API_PASSWORD.get') as DAEMON_API_PASSWORD_get:125 with patch('impala.server.HttpClient') as HttpClient:126 with patch('impala.server.is_kerberos_enabled') as is_kerberos_enabled:127 with patch('impala.server.is_webserver_spnego_enabled') as is_webserver_spnego_enabled:128 DAEMON_API_USERNAME_get.return_value = None129 DAEMON_API_PASSWORD_get.return_value = None130 is_kerberos_enabled.return_value = False131 is_webserver_spnego_enabled.return_value = False132 server = ImpalaDaemonApi('localhost')133 server._client.set_digest_auth.assert_not_called()134 server._client.set_kerberos_auth.assert_not_called()135 server._client.set_basic_auth.assert_not_called()136 with patch('impala.server.DAEMON_API_USERNAME.get') as DAEMON_API_USERNAME_get:137 with patch('impala.server.DAEMON_API_PASSWORD.get') as DAEMON_API_PASSWORD_get:138 with patch('impala.server.HttpClient') as HttpClient:139 with patch('impala.server.is_kerberos_enabled') as is_kerberos_enabled:140 with patch('impala.server.is_webserver_spnego_enabled') as is_webserver_spnego_enabled:141 DAEMON_API_USERNAME_get.return_value = None142 DAEMON_API_PASSWORD_get.return_value = None143 is_kerberos_enabled.return_value = True144 is_webserver_spnego_enabled.return_value = True145 server = ImpalaDaemonApi('localhost')146 server._client.set_digest_auth.assert_not_called()147 server._client.set_kerberos_auth.assert_called()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!