How to use test_list_cache method in tempest

Best Python code snippet using tempest_python

api.py

Source:api.py Github

copy

Full Screen

1# -*- coding: utf-8 -*-2import re3from django.utils import timezone4from rest_framework import exceptions5from rest_framework.decorators import detail_route6from rest_framework.response import Response7from common_framework.utils.cache import CacheProduct8from event import models as event_models9from event.utils import common as event_common10from event.web import api as base_api, error11from event_exam.utils.task import TaskHandler12from event_exam.utils.exam_auth import api_auth_permission13from event_exam.models import SolvedRecord, SubmitRecord, SubmitFlags14from event.models import EventTask15from practice import api as practice_api16from practice_capability.models import TestPaperTask17from practice_theory.models import ChoiceTask18from practice_real_vuln.models import RealVulnTask19from practice_exercise.models import PracticeExerciseTask20from practice.private_api import _commit_task_answer21from x_note.models import Note22from . import serializers as mserializers23from event_exam import models as exam_models24from django.contrib import messages25from django.utils.translation import ugettext_lazy as _26import random27import collections28import json29DELIMITER = '|'30multiple = 131judgment = 232radio = 033operation = 434is_choice = 135RealVulnType = 136PracticeExerciseType = 237DEFAULT_CACHE_TIME = 60 * 538class Serializer:39 def __init__(self, raw, et):40 self.data = {41 'title': raw.title,42 'id': str(raw.id),43 'hash': str(raw.hash),44 'score': float(et.task_score),45 'content': raw.content46 }47 p_type = int(raw.hash.split('.')[-1])48 if p_type == 0:49 self.data['options'] = collections.OrderedDict(sorted(json.loads(raw.option).items(), key=lambda t: t[0]))50 # self.data['options'] = raw.option51 self.data['is_choice_question'] = 152 self.data['is_multiple_choice'] = 1 if raw.multiple else 053 self.data['question_type'] = raw.multiple54 else:55 if raw.file and raw.file.url:56 file_attach = {57 'name': raw.file.name,58 'url': raw.file.url,59 }60 self.data['file_url'] = file_attach61 self.data['url'] = raw.url62 self.data['is_dynamic_env'] = raw.is_dynamic_env63 self.data['question_type'] = operation64 self.data['solving_mode'] = raw.solving_mode65 self.data['is_choice_question'] = 066 self.data['score_mutiple'] = raw.score_multiple67class EventViewSet(base_api.EventViewSet):68 serializer_class = mserializers.EventSerializer69 @detail_route(methods=['get'], )70 @api_auth_permission71 def exam_task(self, request, pk):72 try:73 event = event_models.Event.objects.get(pk=pk)74 except Exception, e:75 raise exceptions.PermissionDenied(error.EVENT_NOT_START)76 taskArrary = event_models.EventTask.objects.filter(event=pk)77 rows = []78 done = False79 score = 080 all_score = 081 myAnswer = []82 eus = event_models.EventUserSubmitLog.objects.filter(event_task__event=pk, user=request.user)83 if eus.exists():84 done = True85 try:86 x_note = Note.objects.get(resource=event.hash, user=request.user)87 writeup_score = x_note.score88 except:89 writeup_score = 090 score = score + writeup_score91 for e in eus:92 right_answer = practice_api.get_task_answer(e.event_task.task_hash, request.user)93 tmp = {94 'taskId': e.event_task.task_hash,95 'answer': e.answer,96 }97 if event.extendevent.ans_display_method == 0:98 tmp['rightAnswer'] = right_answer99 if event.extendevent.ans_display_method == 1:100 if event.end_time < timezone.now():101 tmp['rightAnswer'] = right_answer102 score = score + e.score103 myAnswer.append(tmp)104 for t in taskArrary:105 task = practice_api.get_task_object(t.task_hash)106 rows.append(Serializer(task, t).data)107 all_score = all_score + t.task_score108 '''109 获取试卷状态:1.已经结束;2.正在进行;3.尚未开始110 '''111 now = timezone.now().strftime('%Y-%m-%d %H:%M:%S')112 if (event.start_time.strftime('%Y-%m-%d %H:%M:%S') <= now) and (113 event.end_time.strftime('%Y-%m-%d %H:%M:%S') > now):114 process = 0115 elif (event.start_time.strftime('%Y-%m-%d %H:%M:%S') > now):116 process = 1117 else:118 process = 2119 return Response(120 {121 'response_data':122 {123 'tasks': rows, 'done': done,124 'myAnswer': myAnswer, 'score': score,125 'all_score': all_score,126 'score_status': event.extendevent.score_status,127 'rank_status': event.extendevent.rank_status128 },129 'error_code': 0,130 'process': process131 })132 @detail_route(methods=['GET'], )133 @api_auth_permission134 def exam_user_ranking(self, request, pk):135 """136 获取当前用户考试排名137 """138 try:139 event = event_models.Event.objects.get(pk=pk)140 except Exception, e:141 raise exceptions.PermissionDenied(error.EVENT_NOT_START)142 from django.db.models import Sum143 eus = event_models.EventUserSubmitLog.objects.filter(event_task__event=pk).values('user').annotate(144 score=Sum('score'))145 x_note = Note.objects.filter(resource=event.hash).values('user').annotate(score=Sum('score'))146 # 添加writeup_score147 for data in eus:148 for note in x_note:149 if note['user'] == data['user']:150 data['score'] += 2151 sorted_scores = sorted(list(eus), key=lambda a: -a.get('score'))152 ranking = ''153 for k, value in enumerate(sorted_scores):154 if value['user'] == request.user.id:155 ranking = k + 1156 return Response(data={'ranking': ranking})157 @detail_route(methods=['get'], )158 @api_auth_permission159 def get_tasks(self, request, pk):160 event = event_common.get_event_by_id(pk)161 if event.start_time > timezone.now():162 return Response({'response': {} , 'error_code': 1})163 if not event.public:164 return Response({'response': {}, 'error_code': 1})165 basic_information = {}166 grade = exam_models.SubmitRecord.objects.filter(event_id=event.id).filter(user_id=request.user).first()167 if grade is not None:168 basic_information["is_over"] = "true"169 basic_information["show_score"] = event.extendevent.score_status170 basic_information["answer"] = grade.answer171 if event.extendevent.score_status:172 basic_information["get_score"] = grade.score173 answer_dict, score_dict = self.get_correct_answer(event, pk)174 basic_information["correct_answer"] = answer_dict175 else:176 count_time = self.count_time(event)177 if count_time == 0 :178 basic_information["is_over"] = "true"179 else:180 basic_information["is_over"] = 'false'181 basic_information["count_time"] = count_time182 answer_record = SolvedRecord.objects.filter(user=request.user).filter(event=event).first()183 if answer_record is None:184 exam_models.SolvedRecord.objects.create(185 event=event,186 user=request.user,187 )188 return Response({'response': basic_information, 'error_code': 0})189 @detail_route(methods=['get'], )190 @api_auth_permission191 def review(self, request, pk):192 event = event_common.get_event_by_id(pk)193 if event.start_time > timezone.now():194 return Response({'response': {} , 'error_code': 1})195 if not event.public:196 return Response({'response': {}, 'error_code': 1})197 basic_information = {}198 grade = exam_models.SubmitRecord.objects.filter(event_id=event.id).filter(user_id=request.user).first()199 if grade is not None:200 basic_information["is_over"] = True201 basic_information["show_score"] = event.extendevent.score_status202 basic_information["rank_status"] = event.extendevent.rank_status203 basic_information["answer"] = grade.answer204 if event.extendevent.score_status:205 basic_information["get_score"] = grade.score206 if event.extendevent.rank_status:207 answer_dict, score_dict = self.get_correct_answer(event, pk)208 basic_information["correct_answer"] = answer_dict209 submit_reslut = SubmitFlags.objects.filter(event=event, user=request.user)210 is_right = {}211 if submit_reslut.exists():212 for flag_result in submit_reslut:213 is_right[flag_result.topic_hash] = True if flag_result.score != 0 else False214 basic_information["flag_is_right"] = is_right215 else:216 basic_information["is_over"] = False217 basic_information["answer"] = ''218 return Response({'response': basic_information, 'error_code': 0})219 def get_correct_answer(self, event, pk):220 test_list_cache = CacheProduct("test_list_cache")221 test_score_cache = CacheProduct("test_score_cache")222 key = "%d_%d" % (int(pk), event.id)223 test_list_dateil = test_list_cache.get(key, None)224 test_score_dateil = test_score_cache.get(key, None)225 if test_list_dateil is None or test_score_dateil is None:226 task_arrary = self.get_event_task(pk)227 answer_dict = {}228 score_dict = {}229 for t in task_arrary:230 task = practice_api.get_task_object(t.task_hash)231 score_dict[int(task.id)] = t.task_score232 if task.answer is None:233 is_dynamic_env = hasattr(task, "is_dynamic_env") if hasattr(task, "is_dynamic_env") else False234 if is_dynamic_env:235 task_answer = _("x_dynamic_answer")236 else:237 task_answer = _("x_answer_not_generated")238 else:239 task_answer = task.answer240 answer_dict[int(task.id)] = str(task_answer)241 test_list_cache.set(key, answer_dict, DEFAULT_CACHE_TIME)242 test_score_cache.set(key, score_dict, DEFAULT_CACHE_TIME)243 else:244 answer_dict = test_list_dateil245 score_dict = test_score_dateil246 return answer_dict, score_dict247 # 时间事件248 def count_time(self, event):249 count_time = int((event.end_time - timezone.now()).total_seconds())250 if count_time < 0:251 return 0252 else:253 return count_time254 @detail_route(methods=['POST'], )255 @api_auth_permission256 def submit_testpaper(self, request, pk):257 event = event_common.get_event_by_id(pk)258 if not event:259 return Response({'error_code': 1})260 taskhash = str(request.data.get('taskhash'))261 tasktype = int(taskhash.split(".")[-1])262 task_id = request.data.get("topic_id") #该题目的id263 answers = str(request.data.get('answers').encode('utf-8')) #答案264 user = request.user265 # 没有答案266 if not answers:267 return Response({'error_code': 2})268 test_information = None269 if tasktype == RealVulnType:270 test_information = RealVulnTask.objects.filter(hash=taskhash).first()271 elif tasktype == PracticeExerciseType:272 test_information = PracticeExerciseTask.objects.filter(hash=taskhash).first()273 if not event or not test_information:274 return Response({'error_code': 1})275 _ret = _commit_task_answer(tasktype, test_information, user, answers)276 if _ret['is_solved'] and (not _ret.has_key("is_over")):277 # correct_answer, score_dict = self.get_correct_answer(event, pk)278 # topic_score = score_dict[int(task_id)] # 获取该题目的分数279 task = EventTask.objects.filter(task_hash=taskhash).first()280 topic_score = task.task_score281 get_score = float(format(topic_score * float(_ret["score"]) / float(_ret['all_score']), '.2f'))282 submit_record = SubmitFlags.objects.filter(event=event, user=request.user, topic_hash=taskhash)283 submit_record_ob = submit_record.first()284 if submit_record_ob is not None:285 submit_record.update(286 score = get_score287 )288 else:289 SubmitFlags.objects.create(290 event=event,291 user=request.user,292 score=get_score,293 topic_hash=taskhash294 )295 else:296 pass297 _ret['score_per'] = float(format(float(_ret["score"]) / float(_ret['all_score']), '.2f'))298 _ret["num_task"] = len(event_models.EventTask.objects.filter(event=event))299 _ret["num_done_task"] = request.data.get('topic_num')300 _ret["solving_mode"] = test_information.solving_mode301 _ret["question_type"] = test_information.multiple if (test_information.hash.split(".")[-1] == 0) else operation302 return Response({'response': _ret, 'error_code': 0})303 @detail_route(methods=['POST'], )304 @api_auth_permission305 def finish_up_job(self, request, pk):306 # 交卷307 event = event_common.get_event_by_id(pk)308 user = request.user309 count_time = self.count_time(event)310 if count_time <= 0:311 return Response({'error_code': 1})312 answers = request.data.get('answer')313 get_score = 0314 if answers != "" and answers is not None:315 answers_dict = json.loads(answers)316 correct_answer, score_dict = self.get_correct_answer(event, pk)317 for id, answer in answers_dict.items():318 topic_correct_answer = correct_answer[int(id)].split("|")319 topic_score = score_dict[int(id)]320 if re.match(r'^[A-G]*$', answer) is None:321 pass322 else:323 topic_user_answer = list(str(answer))324 standard_list1 = set(topic_user_answer)325 standard_list2 = set(topic_correct_answer)326 if len(standard_list1.difference(standard_list2)) == 0 and len(327 standard_list2.difference(standard_list1)) == 0:328 get_score = get_score + topic_score329 if SubmitFlags.objects.filter(event=event, user=user).exists():330 for user_topic_score in SubmitFlags.objects.filter(event=event, user=user):331 get_score = get_score + float(user_topic_score.score)332 grade = exam_models.SubmitRecord.objects.filter(event_id=event.id).filter(user_id=request.user)333 if grade.exists():334 grade.update(335 answer=answers,336 score=get_score337 )338 else:339 exam_models.SubmitRecord.objects.create(340 event=event,341 user=user,342 answer=answers,343 score=get_score344 )345 t = SolvedRecord.objects.filter(event=event).filter(user=user)346 t.delete()347 return Response({'error_code': 0})348 def get_event_task(self, pk):349 task_cache = CacheProduct("get_event_task")350 key = pk351 event_task = task_cache.get(key, None)352 if event_task:353 return event_task354 event_task = event_models.EventTask.objects.filter(event=pk)355 if event_task:356 task_cache.set(key, event_task, DEFAULT_CACHE_TIME)357 return event_task358class EventSignupUserViewSet(base_api.EventSignupUserViewSet):359 serializer_class = mserializers.EventSignupUserSerializer360class EventSignupTeamViewSet(base_api.EventSignupTeamViewSet):361 serializer_class = mserializers.EventSignupTeamSerializer362class EventTaskViewSet(base_api.EventTaskViewSet):363 serializer_class = mserializers.EventTaskSerializer364class EventUserSubmitLogViewSet(base_api.EventUserSubmitLogViewSet):365 serializer_class = mserializers.EventUserSubmitLogSerializer366 task_handler_class = TaskHandler367class EventUserAnswerViewSet(base_api.EventUserAnswerViewSet):368 serializer_class = mserializers.EventUserAnswerSerializer369class EventNoticeViewSet(base_api.EventNoticeViewSet):370 serializer_class = mserializers.EventNoticeSerializer371class EventTaskNoticeViewSet(base_api.EventTaskNoticeViewSet):...

Full Screen

Full Screen

test_cache.py

Source:test_cache.py Github

copy

Full Screen

1from lyrebird.mock.cache import ListCache2def test_list_cache():3 cache = ListCache()4 cache.add('1')5 assert cache.items() == ['1']6 cache.clear()7 assert cache.items() == []8def test_list_cache_max():9 cache = ListCache(maxlen=10)10 for i in range(20):11 cache.add(i)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run tempest automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful