Best Python code snippet using toolium_python
ds_databreach_connector.py
Source:ds_databreach_connector.py
...23 self._connector.add_action_result(action_result)24 try:25 breach_service = DataBreachService(self._ds_api_key, self._ds_api_secret_key)26 except Exception as e:27 error_message = self._handle_exception_object.get_error_message_from_exception(e)28 return action_result.set_status(phantom.APP_ERROR, "{0} {1}".format(SERVICE_ERR_MSG, error_message))29 breach_id = param['breach_id']30 # validate 'breach_id' action parameter31 ret_val, breach_id = self._handle_exception_object.validate_integer(action_result, breach_id, BREACH_ID_KEY)32 if phantom.is_fail(ret_val):33 return action_result.get_status()34 try:35 breach = breach_service.find_data_breach_by_id(breach_id)36 except Exception as e:37 error_message = self._handle_exception_object.get_error_message_from_exception(e)38 return action_result.set_status(phantom.APP_ERROR, "Error Connecting to server. {0}".format(error_message))39 if 'id' in breach:40 summary = {41 'data_breach_count': 1,42 'data_breach_found': True43 }44 action_result.update_summary(summary)45 action_result.add_data(breach)46 action_result.set_status(phantom.APP_SUCCESS, DS_GET_BREACH_SUCCESS)47 else:48 summary = {49 'data_breach_count': 0,50 'data_breach_found': False51 }52 action_result.update_summary(summary)53 action_result.set_status(phantom.APP_SUCCESS, DS_GET_BREACH_NOT_FOUND)54 return action_result.get_status()55 def get_data_breach(self, param):56 action_result = ActionResult(dict(param))57 self._connector.add_action_result(action_result)58 date_range = param['date_range']59 param_reposted_credentials = None if 'reposted_credentials' not in param else param['reposted_credentials'].split(',')60 param_severities = None if 'severities' not in param else param.get('severities').split(',')61 param_statuses = None if 'statuses' not in param else param.get('statuses').split(',')62 param_user_name = None if 'user_name' not in param else param.get('user_name').split(',')63 try:64 breach_service = DataBreachService(self._ds_api_key, self._ds_api_secret_key)65 breach_view = DataBreachService.data_breach_view(published=date_range, reposted_credentials=param_reposted_credentials,66 severities=param_severities, statuses=param_statuses, username=param_user_name)67 except Exception as e:68 error_message = self._handle_exception_object.get_error_message_from_exception(e)69 return action_result.set_status(phantom.APP_ERROR, "{0} {1}".format(SERVICE_ERR_MSG, error_message))70 try:71 breach_pages = breach_service.find_all_pages(view=breach_view)72 breach_total = len(breach_pages)73 except StopIteration:74 error_message = 'No DataBreach objects retrieved from the Digital Shadows API in page groups'75 return action_result.set_status(phantom.APP_ERROR, "Error Details: {0}".format(error_message))76 except Exception as e:77 error_message = self._handle_exception_object.get_error_message_from_exception(e)78 return action_result.set_status(phantom.APP_ERROR, "Error Connecting to server. {}".format(error_message))79 if breach_total > 0:80 summary = {81 'data_breach_count': breach_total,82 'data_breach_found': True83 }84 action_result.update_summary(summary)85 for breach_page in breach_pages:86 for breach in breach_page:87 action_result.add_data(breach.payload)88 action_result.set_status(phantom.APP_SUCCESS, DS_GET_BREACH_SUCCESS)89 else:90 summary = {91 'data_breach_count': 0,92 'data_breach_found': False93 }94 action_result.update_summary(summary)95 action_result.set_status(phantom.APP_SUCCESS, DS_GET_BREACH_NOT_FOUND)96 return action_result.get_status()97 def get_data_breach_record(self, param):98 action_result = ActionResult(dict(param))99 self._connector.add_action_result(action_result)100 date_range = param['date_range']101 if 'domain_names' in param:102 param_domain_names = param['domain_names'].split(',')103 else:104 param_domain_names = None105 if 'review_statuses' in param:106 param_review_statuses = param['review_statuses'].split(',')107 else:108 param_review_statuses = None109 param_distinction = param.get('distinction')110 param_user_name = param.get('user_name')111 param_password = param.get('password')112 try:113 breach_record_service = DataBreachRecordService(self._ds_api_key, self._ds_api_secret_key)114 breach_record_view = DataBreachRecordService.data_breach_records_view(published=date_range, domain_names=param_domain_names, username=param_user_name,115 password=param_password, review_statuses=param_review_statuses, distinction=param_distinction)116 except Exception as e:117 error_message = self._handle_exception_object.get_error_message_from_exception(e)118 return action_result.set_status(phantom.APP_ERROR, "{0} {1}".format(SERVICE_ERR_MSG, error_message))119 self._connector.save_progress(str(breach_record_view))120 try:121 breach_record_pages = breach_record_service.read_all_records(view=breach_record_view)122 breach_record_total = len(breach_record_pages)123 except StopIteration:124 error_message = 'No DataBreach objects retrieved from the Digital Shadows API in page groups'125 return action_result.set_status(phantom.APP_ERROR, "Error Details: {0}".format(error_message))126 except Exception as e:127 error_message = self._handle_exception_object.get_error_message_from_exception(e)128 return action_result.set_status(phantom.APP_ERROR, "Error Connecting to server. {}".format(error_message))129 if breach_record_total > 0:130 summary = {131 'data_breach_record_count': breach_record_total,132 'data_breach_record_found': True133 }134 action_result.update_summary(summary)135 for breach_record_page in breach_record_pages:136 for breach_record in breach_record_page:137 action_result.add_data(breach_record.payload)138 action_result.set_status(phantom.APP_SUCCESS, "Digital Shadows data breach records fetched")139 else:140 summary = {141 'data_breach_record_count': 0,142 'data_breach_record_found': False143 }144 action_result.update_summary(summary)145 action_result.set_status(phantom.APP_SUCCESS, "Data breach record not found in Digital Shadows")146 return action_result.get_status()147 def get_data_breach_record_by_id(self, param):148 action_result = ActionResult(dict(param))149 self._connector.add_action_result(action_result)150 try:151 breach_record_service = DataBreachRecordService(self._ds_api_key, self._ds_api_secret_key)152 except Exception as e:153 error_message = self._handle_exception_object.get_error_message_from_exception(e)154 return action_result.set_status(phantom.APP_ERROR, "{0} {1}".format(SERVICE_ERR_MSG, error_message))155 breach_id = param['breach_id']156 # validate 'breach_id' action parameter157 ret_val, breach_id = self._handle_exception_object.validate_integer(action_result, breach_id, BREACH_ID_KEY)158 if phantom.is_fail(ret_val):159 return action_result.get_status()160 try:161 breach_record_pages = breach_record_service.find_all_pages(breach_id)162 breach_record_total = len(breach_record_pages)163 except StopIteration:164 error_message = 'No data breach record retrieved from the Digital Shadows API in page groups'165 return action_result.set_status(phantom.APP_ERROR, "Error Details: {0}".format(error_message))166 except Exception as e:167 error_message = self._handle_exception_object.get_error_message_from_exception(e)168 return action_result.set_status(phantom.APP_ERROR, "Error Connecting to server. {}".format(error_message))169 if breach_record_total > 0:170 summary = {171 'data_breach_record_count': breach_record_total,172 'data_breach_record_found': True173 }174 action_result.update_summary(summary)175 for breach_record_page in breach_record_pages:176 for breach_record in breach_record_page:177 action_result.add_data(breach_record.payload)178 action_result.set_status(phantom.APP_SUCCESS, "Digital Shadows data breach records fetched")179 else:180 summary = {181 'data_breach_record_count': 0,182 'data_breach_record_found': False183 }184 action_result.update_summary(summary)185 action_result.set_status(phantom.APP_SUCCESS, "Data breach record not found in Digital Shadows")186 return action_result.get_status()187 def get_data_breach_record_by_username(self, param):188 action_result = ActionResult(dict(param))189 self._connector.add_action_result(action_result)190 try:191 breach_record_service = DataBreachRecordService(self._ds_api_key, self._ds_api_secret_key)192 except Exception as e:193 error_message = self._handle_exception_object.get_error_message_from_exception(e)194 return action_result.set_status(phantom.APP_ERROR, "{0} {1}".format(SERVICE_ERR_MSG, error_message))195 user_name = param['user_name']196 domain_names_param = None if 'domain_names' not in param else param['domain_names'].split(',')197 review_statuses_param = None if 'review_statuses' not in param else param['review_statuses'].split(',')198 published_date_range = param.get('published_date_range', 'ALL')199 try:200 breach_record_view = DataBreachRecordService.data_breach_records_view(username=user_name, published=published_date_range,201 domain_names=domain_names_param, review_statuses=review_statuses_param)202 self._connector.save_progress("Breach record View: {}".format(breach_record_view))203 breach_record_pages = breach_record_service.read_all_records(view=breach_record_view)204 except StopIteration:205 error_message = 'No DataBreach objects retrieved from the Digital Shadows API in page groups'206 return action_result.set_status(phantom.APP_ERROR, "Error Details: {0}".format(error_message))207 except Exception as e:208 error_message = self._handle_exception_object.get_error_message_from_exception(e)209 return action_result.set_status(phantom.APP_ERROR, "Error Connecting to server. {}".format(error_message))210 breach_record_total = len(breach_record_pages)211 if breach_record_total > 0:212 summary = {213 'data_breach_record_count': breach_record_total,214 'data_breach_record_found': True215 }216 action_result.update_summary(summary)217 for breach_record_page in breach_record_pages:218 for breach_record in breach_record_page:219 action_result.add_data(breach_record.payload)220 action_result.set_status(phantom.APP_SUCCESS, "Digital Shadows data breach records fetched")221 else:222 summary = {223 'data_breach_record_count': 0,224 'data_breach_record_found': False225 }226 action_result.update_summary(summary)227 action_result.set_status(phantom.APP_SUCCESS, "Data breach record not found in Digital Shadows")228 return action_result.get_status()229 def get_data_breach_record_reviews(self, param):230 action_result = ActionResult(dict(param))231 self._connector.add_action_result(action_result)232 try:233 breach_record_service = DataBreachRecordService(self._ds_api_key, self._ds_api_secret_key)234 except Exception as e:235 error_message = self._handle_exception_object.get_error_message_from_exception(e)236 return action_result.set_status(phantom.APP_ERROR, "{0} {1}".format(SERVICE_ERR_MSG, error_message))237 breach_record_id = param['breach_record_id']238 # validate 'breach_record_id' action parameter239 ret_val, breach_record_id = self._handle_exception_object.validate_integer(action_result, breach_record_id, BREACH_RECORD_ID_KEY)240 if phantom.is_fail(ret_val):241 return action_result.get_status()242 try:243 breach_record_reviews = breach_record_service.find_data_breach_record_reviews(breach_record_id)244 breach_record_reviews_total = len(breach_record_reviews)245 except Exception as e:246 error_message = self._handle_exception_object.get_error_message_from_exception(e)247 return action_result.set_status(phantom.APP_ERROR, "Error Connecting to server. {0}".format(error_message))248 if breach_record_reviews_total > 0:249 summary = {250 'breach_record_reviews_count': breach_record_reviews_total,251 'breach_record_reviews_found': True252 }253 action_result.update_summary(summary)254 for breach_record_review in breach_record_reviews:255 action_result.add_data(breach_record_review)256 action_result.set_status(phantom.APP_SUCCESS, "Digital Shadows breach record reviews fetched for the Breach Record ID: {}".format(breach_record_id))257 return action_result.get_status()258 def post_breach_record_review(self, param):259 action_result = ActionResult(dict(param))260 self._connector.add_action_result(action_result)261 try:262 breach_record_service = DataBreachRecordService(self._ds_api_key, self._ds_api_secret_key)263 except Exception as e:264 error_message = self._handle_exception_object.get_error_message_from_exception(e)265 return action_result.set_status(phantom.APP_ERROR, "{0} {1}".format(SERVICE_ERR_MSG, error_message))266 post_data = {267 'note': param.get('review_note'),268 'status': param.get('review_status')269 }270 breach_record_id = param.get('breach_record_id')271 # validate 'breach_record_id' action parameter272 ret_val, breach_record_id = self._handle_exception_object.validate_integer(action_result, breach_record_id, BREACH_RECORD_ID_KEY)273 if phantom.is_fail(ret_val):274 return action_result.get_status()275 try:276 data = breach_record_service.find_data_breach_record_reviews(breach_record_id)277 post_data['version'] = max(v['version'] for v in data)278 response = breach_record_service.post_data_breach_record_review(post_data, breach_record_id=breach_record_id)279 except Exception as e:280 error_message = self._handle_exception_object.get_error_message_from_exception(e)281 return action_result.set_status(phantom.APP_ERROR, "Error Connecting to server. {0}".format(error_message))282 summary = {283 'breach_record_reviews_status_code': response['status'],284 'breach_record_reviews_message': response['message']285 }286 action_result.update_summary(summary)287 if response['message'] == "SUCCESS":288 action_result.set_status(phantom.APP_SUCCESS, "Digital Shadows breach record review posted successfully")289 else:...
ds_incidents_connector.py
Source:ds_incidents_connector.py
...24 self._connector.add_action_result(action_result)25 try:26 incident_service = IncidentService(self._ds_api_key, self._ds_api_secret_key)27 except Exception as e:28 error_message = self._handle_exception_object.get_error_message_from_exception(e)29 return action_result.set_status(phantom.APP_ERROR, "{0} {1}".format(SERVICE_ERR_MSG, error_message))30 incident_id = param['incident_id']31 # validate 'incident_id' action parameter32 ret_val, incident_id = self._handle_exception_object.validate_integer(action_result, incident_id, INCIDENT_ID_KEY)33 if phantom.is_fail(ret_val):34 return action_result.get_status()35 try:36 incident = incident_service.find_incident_by_id(incident_id)37 except Exception as e:38 error_message = self._handle_exception_object.get_error_message_from_exception(e)39 return action_result.set_status(phantom.APP_ERROR, "Error Connecting to server. {0}".format(error_message))40 if 'id' in incident:41 summary = {42 'incident_found': True43 }44 action_result.update_summary(summary)45 action_result.add_data(incident)46 action_result.set_status(phantom.APP_SUCCESS, DS_GET_INCIDENT_SUCCESS)47 return action_result.get_status()48 def get_incident_review_by_id(self, param):49 action_result = ActionResult(dict(param))50 self._connector.add_action_result(action_result)51 incident_id = param['incident_id']52 # validate 'incident_id' action parameter53 ret_val, incident_id = self._handle_exception_object.validate_integer(action_result, incident_id, INCIDENT_ID_KEY)54 if phantom.is_fail(ret_val):55 return action_result.get_status()56 try:57 incident_service = IncidentService(self._ds_api_key, self._ds_api_secret_key)58 except Exception as e:59 error_message = self._handle_exception_object.get_error_message_from_exception(e)60 return action_result.set_status(phantom.APP_ERROR, "{0} {1}".format(SERVICE_ERR_MSG, error_message))61 try:62 incident_reviews = incident_service.find_all_reviews(incident_id)63 incident_reviews_total = len(incident_reviews)64 except Exception as e:65 error_message = self._handle_exception_object.get_error_message_from_exception(e)66 return action_result.set_status(phantom.APP_ERROR, "Error Connecting to server. {0}".format(error_message))67 if incident_reviews_total > 0:68 summary = {69 'incident_reviews_count': incident_reviews_total,70 'incident_reviews_found': True71 }72 action_result.update_summary(summary)73 for incident_review in incident_reviews:74 action_result.add_data(incident_review)75 action_result.set_status(phantom.APP_SUCCESS, "Digital Shadows incident reviews fetched for the Incident ID: {}".format(incident_id))76 return action_result.get_status()77 def get_incident_list(self, param):78 action_result = ActionResult(dict(param))79 self._connector.add_action_result(action_result)80 # interval_startdate = date.today() - timedelta(int(param['date_range']))81 incident_types = []82 if param.get('incident_types') is not None:83 param_incident_types = str(param['incident_types']).split(',')84 for inc_type in param_incident_types:85 if inc_type == "DATA_LEAKAGE":86 incident_types.append({'type': 'DATA_LEAKAGE', 'subTypes': DS_DL_SUBTYPE })87 if inc_type == "BRAND_PROTECTION":88 incident_types.append({'type': 'BRAND_PROTECTION', 'subTypes': DS_BP_SUBTYPE })89 if inc_type == "INFRASTRUCTURE":90 incident_types.append({'type': 'INFRASTRUCTURE', 'subTypes': DS_INFR_SUBTYPE })91 if inc_type == "PHYSICAL_SECURITY":92 incident_types.append({'type': 'PHYSICAL_SECURITY', 'subTypes': DS_PS_SUBTYPE })93 if inc_type == "SOCIAL_MEDIA_COMPLIANCE":94 incident_types.append({'type': 'SOCIAL_MEDIA_COMPLIANCE', 'subTypes': DS_SMC_SUBTYPE })95 if inc_type == "CYBER_THREAT":96 incident_types.append({'type': 'CYBER_THREAT'})97 else:98 param_incident_types = None99 try:100 incident_service = IncidentService(self._ds_api_key, self._ds_api_secret_key)101 incident_view = IncidentService.incidents_view(102 date_range=param.get('date_range'),103 date_range_field='published', types=incident_types)104 except Exception as e:105 error_message = self._handle_exception_object.get_error_message_from_exception(e)106 return action_result.set_status(phantom.APP_ERROR, "{0} {1}".format(SERVICE_ERR_MSG, error_message))107 self._connector.save_progress("incident view: {}".format(incident_view))108 try:109 incident_pages = incident_service.find_all_pages(view=incident_view)110 self._connector.save_progress("incident_pages next: {}".format(incident_pages))111 incident_total = len(incident_pages)112 except StopIteration:113 error_message = 'No Incident objects retrieved from the Digital Shadows API in page groups'114 return action_result.set_status(phantom.APP_ERROR, "Error Details: {0}".format(error_message))115 except Exception as e:116 error_message = self._handle_exception_object.get_error_message_from_exception(e)117 return action_result.set_status(phantom.APP_ERROR, "Error Connecting to server. {0}".format(error_message))118 if incident_total > 0:119 summary = {120 'incident_count': incident_total,121 'incident_found': True122 }123 action_result.update_summary(summary)124 self._connector.save_progress("incident_pages: {}".format(incident_pages))125 for incident_page in incident_pages:126 for incident in incident_page:127 self._connector.save_progress('incident: {}'.format(incident.payload))128 action_result.add_data(incident.payload)129 action_result.set_status(phantom.APP_SUCCESS, DS_GET_INCIDENT_SUCCESS)130 return action_result.get_status()131 def post_incident_review(self, param):132 action_result = ActionResult(dict(param))133 self._connector.add_action_result(action_result)134 try:135 incident_service = IncidentService(self._ds_api_key, self._ds_api_secret_key)136 except Exception as e:137 error_message = self._handle_exception_object.get_error_message_from_exception(e)138 return action_result.set_status(phantom.APP_ERROR, "{0} {1}".format(SERVICE_ERR_MSG, error_message))139 incident_id = param.get('incident_id')140 # validate 'incident_id' action parameter141 ret_val, incident_id = self._handle_exception_object.validate_integer(action_result, incident_id, INCIDENT_ID_KEY)142 if phantom.is_fail(ret_val):143 return action_result.get_status()144 post_data = {145 'note': param.get('review_note'),146 'status': param.get('review_status')147 }148 self._connector.save_progress("post_data: {}".format(post_data))149 try:150 response = incident_service.post_incident_review(post_data, incident_id=incident_id)151 except Exception as e:152 error_message = self._handle_exception_object.get_error_message_from_exception(e)153 return action_result.set_status(phantom.APP_ERROR, "Error Connecting to server. {0}".format(error_message))154 self._connector.save_progress("response: {}".format(response))155 try:156 summary = {157 'incident_reviews_status_code': response['status'],158 'incident_reviews_message': response['message']159 }160 action_result.update_summary(summary)161 action_result.add_data(response['content'][0])162 if response['message'] == "SUCCESS":163 action_result.set_status(phantom.APP_SUCCESS, "Digital Shadows Incident review posted successfully")164 else:165 action_result.set_status(phantom.APP_SUCCESS, "Error in incident review post request")166 except Exception as e:167 error_message = self._handle_exception_object.get_error_message_from_exception(e)168 return action_result.set_status(phantom.APP_ERROR, "Error occurred while fetching data from response. {}".format(error_message))...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!