Best Python code snippet using testcontainers-python_python
alert.py
Source:alert.py
1import concurrent.futures2from flask import request3from assemblyline.common.isotime import now_as_iso4from assemblyline.datastore.exceptions import SearchException5from assemblyline.odm.models.user import ROLES6from assemblyline.odm.models.workflow import PRIORITIES, STATUSES7from assemblyline_ui.api.base import api_login, make_api_response, make_subapi_blueprint8from assemblyline_ui.config import STORAGE, config, CLASSIFICATION as Classification9SUB_API = 'alert'10alert_api = make_subapi_blueprint(SUB_API, api_version=4)11alert_api._doc = "Perform operations on alerts"12def get_timming_filter(tc_start, tc):13 if tc:14 if tc_start:15 return f"reporting_ts:[{tc_start}{STORAGE.ds.DATE_FORMAT['SEPARATOR']}-{tc} TO {tc_start}]"16 else:17 return f"reporting_ts:[{STORAGE.ds.now}-{tc} TO {STORAGE.ds.now}]"18 elif tc_start:19 return f"reporting_ts:[* TO {tc_start}]"20 return None21def get_stats_for_fields(fields, query, tc_start, tc, access_control):22 if not tc_start and "no_delay" not in request.args and config.core.alerter.delay != 0:23 tc_start = now_as_iso(config.core.alerter.delay * -1)24 if tc and config.ui.read_only:25 tc += config.ui.read_only_offset26 timming_filter = get_timming_filter(tc_start, tc)27 filters = [x for x in request.args.getlist("fq") if x != ""]28 if timming_filter:29 filters.append(timming_filter)30 try:31 if isinstance(fields, list):32 with concurrent.futures.ThreadPoolExecutor(len(fields)) as executor:33 res = {field: executor.submit(STORAGE.alert.facet,34 field,35 query=query,36 filters=filters,37 limit=100,38 access_control=access_control)39 for field in fields}40 return make_api_response({k: v.result() for k, v in res.items()})41 else:42 return make_api_response(STORAGE.alert.facet(fields, query=query, filters=filters,43 limit=100, access_control=access_control))44 except SearchException as e:45 return make_api_response("", f"SearchException: {e}", 400)46@alert_api.route("/<alert_id>/", methods=["GET"])47@api_login(required_priv=['R'], require_role=[ROLES.alert_view])48def get_alert(alert_id, **kwargs):49 """50 Get the alert details for a given alert key51 Variables:52 alert_id => ID of the alert to get the details for53 Arguments:54 None55 Data Block:56 None57 API call example:58 /api/v4/alert/1234567890/59 Result example:60 {61 KEY: VALUE, # All fields of an alert in key/value pair62 }63 """64 user = kwargs['user']65 data = STORAGE.alert.get(alert_id, as_obj=False)66 if not data:67 return make_api_response("", "This alert does not exists...", 404)68 if user and Classification.is_accessible(user['classification'], data['classification']):69 return make_api_response(data)70 else:71 return make_api_response("", "You are not allowed to see this alert...", 403)72@alert_api.route("/statistics/", methods=["GET"])73@api_login(require_role=[ROLES.alert_view])74def alerts_statistics(**kwargs):75 """76 Load facet statistics for the alerts matching the query.77 Variables:78 None79 Arguments:80 fq => Post filter queries (you can have multiple of those)81 q => Query to apply to the alert list82 tc_start => Time offset at which we start the time constraint83 tc => Time constraint applied to the API84 no_delay => Do not delay alerts85 Data Block:86 None87 Result example:88 """89 user = kwargs['user']90 query = request.args.get('q', "alert_id:*") or "alert_id:*"91 tc_start = request.args.get('tc_start', None)92 tc = request.args.get('tc', None)93 alert_statistics_fields = config.ui.statistics.alert94 return get_stats_for_fields(alert_statistics_fields, query, tc_start, tc, user['access_control'])95@alert_api.route("/labels/", methods=["GET"])96@api_login(require_role=[ROLES.alert_view])97def alerts_labels(**kwargs):98 """99 Run a facet search to find the different labels matching the query.100 Variables:101 None102 Arguments:103 fq => Post filter queries (you can have multiple of those)104 q => Query to apply to the alert list105 tc_start => Time offset at which we start the time constraint106 tc => Time constraint applied to the API107 no_delay => Do not delay alerts108 Data Block:109 None110 Result example:111 """112 user = kwargs['user']113 query = request.args.get('q', "alert_id:*") or "alert_id:*"114 tc_start = request.args.get('tc_start', None)115 tc = request.args.get('tc', None)116 return get_stats_for_fields("label", query, tc_start, tc, user['access_control'])117@alert_api.route("/priorities/", methods=["GET"])118@api_login(require_role=[ROLES.alert_view])119def alerts_priorities(**kwargs):120 """121 Run a facet search to find the different priorities matching the query.122 Variables:123 None124 Arguments:125 fq => Post filter queries (you can have multiple of those)126 q => Query to apply to the alert list127 tc_start => Time offset at which we start the time constraint128 tc => Time constraint applied to the API129 no_delay => Do not delay alerts130 Data Block:131 None132 Result example:133 """134 user = kwargs['user']135 query = request.args.get('q', "alert_id:*") or "alert_id:*"136 tc_start = request.args.get('tc_start', None)137 tc = request.args.get('tc', None)138 return get_stats_for_fields("priority", query, tc_start, tc, user['access_control'])139@alert_api.route("/statuses/", methods=["GET"])140@api_login(require_role=[ROLES.alert_view])141def alerts_statuses(**kwargs):142 """143 Run a facet search to find the different statuses matching the query.144 Variables:145 None146 Arguments:147 fq => Post filter queries (you can have multiple of those)148 q => Query to apply to the alert list149 tc_start => Time offset at which we start the time constraint150 tc => Time constraint applied to the API151 no_delay => Do not delay alerts152 Data Block:153 None154 Result example:155 """156 user = kwargs['user']157 query = request.args.get('q', "alert_id:*") or "alert_id:*"158 tc_start = request.args.get('tc_start', None)159 tc = request.args.get('tc', None)160 return get_stats_for_fields("status", query, tc_start, tc, user['access_control'])161@alert_api.route("/list/", methods=["GET"])162@api_login(required_priv=['R'], require_role=[ROLES.alert_view])163def list_alerts(**kwargs):164 """165 List all alert in the system (per page)166 Variables:167 None168 Arguments:169 fq => Post filter queries (you can have multiple of those)170 q => Query to apply to the alert list171 no_delay => Do not delay alerts172 offset => Offset at which we start giving alerts173 rows => Numbers of alerts to return174 tc_start => Time offset at which we start the time constraint175 tc => Time constraint applied to the API176 use_archive => List alerts from archive as well (Default: False)177 track_total_hits => Track the total number of item that match the query (Default: 10 000)178 Data Block:179 None180 API call example:181 /api/v4/alert/list/182 Result example:183 {"total": 201, # Total alerts found184 "offset": 0, # Offset in the alert list185 "count": 100, # Number of alerts returned186 "items": [] # List of alert blocks187 }188 """189 user = kwargs['user']190 use_archive = request.args.get('use_archive', 'false').lower() in ['true', '']191 offset = int(request.args.get('offset', 0))192 rows = int(request.args.get('rows', 100))193 query = request.args.get('q', "alert_id:*") or "alert_id:*"194 tc_start = request.args.get('tc_start', None)195 if not tc_start and "no_delay" not in request.args and config.core.alerter.delay != 0:196 tc_start = now_as_iso(config.core.alerter.delay * -1)197 tc = request.args.get('tc', None)198 if tc and config.ui.read_only:199 tc += config.ui.read_only_offset200 timming_filter = get_timming_filter(tc_start, tc)201 track_total_hits = request.args.get('track_total_hits', False)202 filters = [x for x in request.args.getlist("fq") if x != ""]203 if timming_filter:204 filters.append(timming_filter)205 try:206 return make_api_response(STORAGE.alert.search(207 query, offset=offset, rows=rows, fl="*", sort="reporting_ts desc",208 access_control=user['access_control'],209 filters=filters, as_obj=False, use_archive=use_archive, track_total_hits=track_total_hits))210 except SearchException as e:211 return make_api_response("", f"SearchException: {e}", 400)212@alert_api.route("/grouped/<field>/", methods=["GET"])213@api_login(required_priv=['R'], require_role=[ROLES.alert_view])214def list_grouped_alerts(field, **kwargs):215 """216 List all alert grouped by a given field217 Variables:218 None219 Arguments:220 fq => Post filter queries (you can have multiple of those)221 q => Query to apply to the alert list222 no_delay => Do not delay alerts223 offset => Offset at which we start giving alerts224 rows => Numbers of alerts to return225 tc_start => Time offset at which we start the time constraint226 tc => Time constraint applied to the API227 use_archive => List alerts from archive as well (Default: False)228 track_total_hits => Track the total number of item that match the query (Default: 10 000)229 Data Block:230 None231 API call example:232 /api/v4/alert/grouped/md5/233 Result example:234 {"total": 201, # Total alerts found235 "counted_total": 123, # Number of alert returned in the current call236 "offset": 0, # Offset in the alert list237 "rows": 100, # Number of alerts returned238 "items": [], # List of alert blocks239 "tc_start": "2015-05..." # UTC timestamp for future query (ISO Format)240 }241 """242 def get_dict_item(parent, cur_item):243 if "." in cur_item:244 key, remainder = cur_item.split(".", 1)245 return get_dict_item(parent.get(key, {}), remainder)246 else:247 return parent[cur_item]248 user = kwargs['user']249 offset = int(request.args.get('offset', 0))250 rows = int(request.args.get('rows', 100))251 query = request.args.get('q', "alert_id:*") or "alert_id:*"252 tc_start = request.args.get('tc_start', None)253 track_total_hits = request.args.get('track_total_hits', False)254 use_archive = request.args.get('use_archive', 'false').lower() in ['true', '']255 if not tc_start:256 if "no_delay" not in request.args and config.core.alerter.delay != 0:257 tc_start = now_as_iso(config.core.alerter.delay * -1)258 else:259 tc_start = now_as_iso()260 tc = request.args.get('tc', None)261 if tc and config.ui.read_only:262 tc += config.ui.read_only_offset263 timming_filter = get_timming_filter(tc_start, tc)264 filters = [x for x in request.args.getlist("fq") if x != ""]265 if timming_filter:266 filters.append(timming_filter)267 filters.append(f"{field}:*")268 try:269 res = STORAGE.alert.grouped_search(field, query=query, offset=offset, rows=rows, sort="reporting_ts desc",270 group_sort="reporting_ts desc", access_control=user['access_control'],271 filters=filters, fl="*", as_obj=False,272 use_archive=use_archive, track_total_hits=track_total_hits)273 alerts = []274 hash_list = []275 hint_list = set()276 counted_total = 0277 # Loop through grouped alerts278 for item in res['items']:279 # Update total280 counted_total += item['total']281 # Gather the first alert sample, set it's group_count and add it to the alert list282 data = item['items'][0]283 data['group_count'] = item['total']284 alerts.append(data)285 # Gather a list of possible owner hints286 if field in ['file.md5', 'file.sha1', 'file.sha256'] and not data.get('owner', None):287 hash_list.append(get_dict_item(data, field))288 if hash_list:289 # Lookup owner of previous hashes290 hint_resp = STORAGE.alert.grouped_search(field, query=" OR ".join([f"{field}:{h}" for h in hash_list]),291 fl=field, rows=rows, filters=["owner:*"],292 access_control=user['access_control'], as_obj=False)293 for hint_item in hint_resp['items']:294 hint_list.add(hint_item['value'])295 for a in alerts:296 if get_dict_item(a, field) in hint_list:297 a['hint_owner'] = True298 else:299 a['hint_owner'] = False300 res['items'] = alerts301 res['tc_start'] = tc_start302 res['counted_total'] = counted_total303 return make_api_response(res)304 except SearchException as e:305 return make_api_response("", f"SearchException: {e}", 400)306@alert_api.route("/label/<alert_id>/", methods=["POST"])307@api_login(required_priv=['W'], allow_readonly=False, require_role=[ROLES.alert_manage])308def add_labels(alert_id, **kwargs):309 """310 Add one or multiple labels to a given alert311 Variables:312 alert_id => ID of the alert to add the label to313 Arguments:314 None315 Data Block:316 ["LBL1", "LBL2"] => List of labels to add as comma separated string317 API call example:318 /api/v4/alert/label/12345...67890/319 Result example:320 {"success": true}321 """322 user = kwargs['user']323 try:324 labels = set(request.json)325 except ValueError:326 return make_api_response({"success": False}, err="Invalid list of labels received.", status_code=400)327 alert = STORAGE.alert.get(alert_id, as_obj=False)328 if not alert:329 return make_api_response({"success": False}, err="Alert ID %s not found" % alert_id, status_code=404)330 if not Classification.is_accessible(user['classification'], alert['classification']):331 return make_api_response("", "You are not allowed to see this alert...", 403)332 cur_label = set(alert.get('label', []))333 label_diff = labels.difference(labels.intersection(cur_label))334 if label_diff:335 return make_api_response({336 "success": STORAGE.alert.update(alert_id, [(STORAGE.alert.UPDATE_APPEND_IF_MISSING, 'label', lbl)337 for lbl in label_diff])})338 else:339 return make_api_response({"success": True})340@alert_api.route("/label/batch/", methods=["POST"])341@api_login(required_priv=['W'], allow_readonly=False, require_role=[ROLES.alert_manage])342def add_labels_by_batch(**kwargs):343 """344 Apply labels to all alerts matching the given filters345 Variables:346 None347 Arguments:348 q => Main query to filter the data [REQUIRED]349 tc_start => Time offset at which we start the time constraint350 tc => Time constraint applied to the API351 fq => Filter query applied to the data352 Data Block:353 ["LBL1", "LBL2"] => List of labels to add as comma separated string354 API call example:355 /api/v4/alert/label/batch/?q=protocol:SMTP356 Result example:357 { "success": true }358 """359 user = kwargs['user']360 try:361 labels = set(request.json)362 except ValueError:363 return make_api_response({"success": False}, err="Invalid list of labels received.", status_code=400)364 query = request.args.get('q', "alert_id:*") or "alert_id:*"365 tc_start = request.args.get('tc_start', None)366 tc = request.args.get('tc', None)367 if tc and config.ui.read_only:368 tc += config.ui.read_only_offset369 timming_filter = get_timming_filter(tc_start, tc)370 filters = [x for x in request.args.getlist("fq") if x != ""]371 if timming_filter:372 filters.append(timming_filter)373 return make_api_response({374 "success": STORAGE.alert.update_by_query(query, [(STORAGE.alert.UPDATE_APPEND_IF_MISSING, 'label', lbl)375 for lbl in labels],376 filters, access_control=user['access_control'])})377@alert_api.route("/priority/<alert_id>/", methods=["POST"])378@api_login(required_priv=['W'], allow_readonly=False, require_role=[ROLES.alert_manage])379def change_priority(alert_id, **kwargs):380 """381 Change the priority of a given alert382 Variables:383 alert_id => ID of the alert to change the priority384 Arguments:385 "HIGH" => New priority for the alert386 Data Block:387 None388 API call example:389 /api/v4/alert/priority/12345...67890/390 Result example:391 {"success": true}392 """393 user = kwargs['user']394 try:395 priority = request.json396 priority = priority.upper()397 if priority not in PRIORITIES:398 raise ValueError("Not in priorities")399 except ValueError:400 return make_api_response({"success": False}, err="Invalid priority received.", status_code=400)401 alert = STORAGE.alert.get(alert_id, as_obj=False)402 if not alert:403 return make_api_response({"success": False},404 err="Alert ID %s not found" % alert_id,405 status_code=404)406 if not Classification.is_accessible(user['classification'], alert['classification']):407 return make_api_response("", "You are not allowed to see this alert...", 403)408 if priority != alert.get('priority', None):409 return make_api_response({410 "success": STORAGE.alert.update(alert_id, [(STORAGE.alert.UPDATE_SET, 'priority', priority)])})411 else:412 return make_api_response({"success": True})413@alert_api.route("/priority/batch/", methods=["POST"])414@api_login(required_priv=['W'], allow_readonly=False, require_role=[ROLES.alert_manage])415def change_priority_by_batch(**kwargs):416 """417 Apply priority to all alerts matching the given filters418 Variables:419 None420 Arguments:421 q => Main query to filter the data [REQUIRED]422 tc_start => Time offset at which we start the time constraint423 tc => Time constraint applied to the API424 fq => Filter query applied to the data425 Data Block:426 "HIGH" => New priority for the alert427 API call example:428 /api/v4/alert/priority/batch/?q=al_av:*429 Result example:430 {"success": true}431 """432 user = kwargs['user']433 try:434 priority = request.json435 priority = priority.upper()436 if priority not in PRIORITIES:437 raise ValueError("Not in priorities")438 except ValueError:439 return make_api_response({"success": False}, err="Invalid priority received.", status_code=400)440 query = request.args.get('q', "alert_id:*") or "alert_id:*"441 tc_start = request.args.get('tc_start', None)442 tc = request.args.get('tc', None)443 if tc and config.ui.read_only:444 tc += config.ui.read_only_offset445 timming_filter = get_timming_filter(tc_start, tc)446 filters = [x for x in request.args.getlist("fq") if x != ""]447 if timming_filter:448 filters.append(timming_filter)449 return make_api_response({450 "success": STORAGE.alert.update_by_query(query, [(STORAGE.alert.UPDATE_SET, 'priority', priority)],451 filters, access_control=user['access_control'])})452@alert_api.route("/status/<alert_id>/", methods=["POST"])453@api_login(required_priv=['W'], allow_readonly=False, require_role=[ROLES.alert_manage])454def change_status(alert_id, **kwargs):455 """456 Change the status of a given alert457 Variables:458 alert_id => ID of the alert to change the status459 Arguments:460 None461 Data Block:462 "MALICIOUS" => New status for the alert463 API call example:464 /api/v4/alert/status/12345...67890/465 Result example:466 {"success": true}467 """468 user = kwargs['user']469 try:470 status = request.json471 status = status.upper()472 if status not in STATUSES:473 raise ValueError("Not in priorities")474 except ValueError:475 return make_api_response({"success": False}, err="Invalid status received.", status_code=400)476 alert = STORAGE.alert.get(alert_id, as_obj=False)477 if not alert:478 return make_api_response({"success": False},479 err="Alert ID %s not found" % alert_id,480 status_code=404)481 if not Classification.is_accessible(user['classification'], alert['classification']):482 return make_api_response("", "You are not allowed to see this alert...", 403)483 if status != alert.get('status', None):484 return make_api_response({485 "success": STORAGE.alert.update(alert_id, [(STORAGE.alert.UPDATE_SET, 'status', status)])})486 else:487 return make_api_response({"success": True})488@alert_api.route("/status/batch/", methods=["POST"])489@api_login(required_priv=['W'], allow_readonly=False, require_role=[ROLES.alert_manage])490def change_status_by_batch(**kwargs):491 """492 Apply status to all alerts matching the given filters493 Variables:494 status => Status to apply495 Arguments:496 q => Main query to filter the data [REQUIRED]497 tc_start => Time offset at which we start the time constraint498 tc => Time constraint applied to the API499 fq => Filter query applied to the data500 Data Block:501 "MALICIOUS" => New status for the alert502 API call example:503 /api/v4/alert/status/batch/MALICIOUS/?q=al_av:*504 Result example:505 {"success": true}506 """507 user = kwargs['user']508 try:509 status = request.json510 status = status.upper()511 if status not in STATUSES:512 raise ValueError("Not in priorities")513 except ValueError:514 return make_api_response({"success": False}, err="Invalid status received.", status_code=400)515 query = request.args.get('q', "alert_id:*") or "alert_id:*"516 tc_start = request.args.get('tc_start', None)517 tc = request.args.get('tc', None)518 if tc and config.ui.read_only:519 tc += config.ui.read_only_offset520 timming_filter = get_timming_filter(tc_start, tc)521 filters = [x for x in request.args.getlist("fq") if x != ""]522 if timming_filter:523 filters.append(timming_filter)524 return make_api_response({525 "success": STORAGE.alert.update_by_query(query, [(STORAGE.alert.UPDATE_SET, 'status', status)],526 filters, access_control=user['access_control'])})527@alert_api.route("/ownership/<alert_id>/", methods=["GET"])528@api_login(required_priv=['W'], allow_readonly=False, require_role=[ROLES.alert_manage])529def take_ownership(alert_id, **kwargs):530 """531 Take ownership of a given alert532 Variables:533 alert_id => ID of the alert to send to take ownership534 Arguments:535 None536 Data Block:537 None538 API call example:539 /api/v4/alert/ownership/12345...67890/540 Result example:541 {"success": true}542 """543 user = kwargs['user']544 alert = STORAGE.alert.get(alert_id, as_obj=False)545 if not alert:546 return make_api_response({"success": False},547 err="Alert ID %s not found" % alert_id,548 status_code=404)549 if not Classification.is_accessible(user['classification'], alert['classification']):550 return make_api_response({"success": False}, "You are not allowed to see this alert...", 403)551 current_owner = alert.get('owner', None)552 if current_owner is None:553 return make_api_response({554 "success": STORAGE.alert.update(alert_id, [(STORAGE.alert.UPDATE_SET, 'owner', user['uname'])])})555 else:556 return make_api_response({"success": False},557 err="Alert is already owned by %s" % current_owner,558 status_code=403)559@alert_api.route("/ownership/batch/", methods=["GET"])560@api_login(required_priv=['W'], allow_readonly=False, require_role=[ROLES.alert_manage])561def take_ownership_by_batch(**kwargs):562 """563 Take ownership of all alerts matching the given filters564 Variables:565 None566 Arguments:567 q => Main query to filter the data [REQUIRED]568 tc_start => Time offset at which we start the time constraint569 tc => Time constraint applied to the API570 fq => Filter query applied to the data571 Data Block:572 None573 API call example:574 /api/v4/alert/ownership/batch/?q=alert_id:7b*575 Result example:576 { "success": true }577 """578 user = kwargs['user']579 q = request.args.get('q', "alert_id:*") or "alert_id:*"580 tc_start = request.args.get('tc_start', None)581 tc = request.args.get('tc', None)582 if tc and config.ui.read_only:583 tc += config.ui.read_only_offset584 timming_filter = get_timming_filter(tc_start, tc)585 filters = [x for x in request.args.getlist("fq") if x != ""]586 if timming_filter:587 filters.append(timming_filter)588 filters.append("!owner:*")589 return make_api_response({590 "success": STORAGE.alert.update_by_query(q, [(STORAGE.alert.UPDATE_SET, 'owner', user['uname'])],591 filters, access_control=user['access_control'])})592@alert_api.route("/related/", methods=["GET"])593@api_login(require_role=[ROLES.alert_view])594def find_related_alert_ids(**kwargs):595 """596 Return the list of all IDs related to the currently selected query597 Variables:598 None599 Arguments:600 q => Main query to filter the data [REQUIRED]601 tc => Time constraint to apply to the search602 tc_start => Time at which to start the days constraint603 fq => Filter query applied to the data604 Data Block:605 None606 API call example:607 /api/v4/alert/related/?q=file.sha256:123456...ABCDEF608 Result example:609 ["1"]610 """611 user = kwargs['user']612 query = request.args.get('q', None)613 fq = request.args.getlist('fq')614 if not query and not fq:615 return make_api_response([], err="You need to at least provide a query to filter the data", status_code=400)616 if not query:617 query = fq.pop(0)618 tc = request.args.get('tc', None)619 if tc and config.ui.read_only:620 tc += config.ui.read_only_offset621 tc_start = request.args.get('tc_start', None)622 timming_filter = get_timming_filter(tc_start, tc)623 filters = [x for x in fq if x != ""]624 if timming_filter:625 filters.append(timming_filter)626 try:627 return make_api_response([x['alert_id'] for x in628 STORAGE.alert.stream_search(query, filters=filters, fl="alert_id",629 access_control=user['access_control'], as_obj=False)])630 except SearchException as e:631 return make_api_response("", f"SearchException: {e}", 400)632@alert_api.route("/verdict/<alert_id>/<verdict>/", methods=["PUT"])633@api_login(audit=False, check_xsrf_token=False, require_role=[ROLES.alert_manage])634def set_verdict(alert_id, verdict, **kwargs):635 """636 Set the verdict of an alert based on its ID.637 Variables:638 submission_id -> ID of the alert to give a verdict to639 verdict -> verdict that the user think the alert is: malicious or non_malicious640 Arguments:641 None642 Data Block:643 None644 Result example:645 {"success": True} # Has the verdict been set or not646 """647 reverse_verdict = {648 'malicious': 'non_malicious',649 'non_malicious': 'malicious'650 }651 user = kwargs['user']652 if verdict not in ['malicious', 'non_malicious']:653 return make_api_response({"success": False}, f"'{verdict}' is not a valid verdict.", 400)654 document = STORAGE.alert.get(alert_id, as_obj=False)655 if not document:656 return make_api_response({"success": False}, f"There are no alert with id: {alert_id}", 404)657 if not Classification.is_accessible(user['classification'], document['classification']):658 return make_api_response({"success": False}, "You are not allowed to give verdict on alert with "659 f"ID: {alert_id}", 403)660 resp = STORAGE.alert.update_by_query(f"sid:{document['sid']}", [661 ('REMOVE', f'verdict.{verdict}', user['uname']),662 ('APPEND', f'verdict.{verdict}', user['uname']),663 ('REMOVE', f'verdict.{reverse_verdict[verdict]}', user['uname'])664 ])665 STORAGE.submission.update(document['sid'], [666 ('REMOVE', f'verdict.{verdict}', user['uname']),667 ('APPEND', f'verdict.{verdict}', user['uname']),668 ('REMOVE', f'verdict.{reverse_verdict[verdict]}', user['uname'])669 ])...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!