Best Python code snippet using slash
viewset.py
Source:viewset.py
1from typing import Callable, List, Sequence, Union2from fastapi import APIRouter, Header3from fastapi.params import Depends4from pydantic import BaseModel5from .crudset import BaseCrudSet6__all__ = ['ViewSet', 'CrudViewSet']7supported_methods_names: List[str] = [8 'list', 'retrieve', 'create', 'update', 'partial_update', 'destroy']9class ViewSet:10 """ router: APIRouter = None11 base_path: str = None12 class_tag: str = None13 path_key: str = "id"14 response_model: BaseModel = None15 dependencies: Sequence[Depends] = None16 """17 router: APIRouter = None18 base_path: str = None19 class_tag: str = None20 path_key: str = "id"21 response_model: BaseModel = None22 dependencies: Sequence[Depends] = None23 marked_functions: List = []24 def __init__(self) -> APIRouter:25 self.functions: List[Callable] = []26 self.extra_functions: List[List] = []27 self.execute()28 def get_response_model(self, action: str) -> Union[BaseModel, None]:29 """ if override this method, you can return different response model for different action """30 if self.response_model is not None:31 return self.response_model32 return None33 def get_dependencies(self, action: str) -> Sequence[Depends]:34 """ if override this method, you can return different dependencies for different action """35 if self.dependencies is not None:36 return self.dependencies37 return None38 def execute(self) -> APIRouter:39 if self.router is None:40 self.router = APIRouter()41 if self.base_path is None:42 self.base_path = '/' + self.__class__.__name__.lower()43 if self.class_tag is None:44 self.class_tag = self.__class__.__name__45 for func in supported_methods_names:46 if hasattr(self, func):47 self.functions.append(getattr(self, func))48 for func in self.functions:49 self._register_route(func)50 for func, methods, path in self.find_marked_functions():51 self._register_extra_route(func, methods=methods, path=path)52 def _register_route(self, func: Callable, hidden_params: List[str] = ["self"]):53 # hidden_params TODO: add support for hidden params54 extras = {}55 extras['response_model'] = self.get_response_model(func.__name__)56 extras['dependencies'] = self.get_dependencies(func.__name__)57 if func.__name__ == 'list':58 self.router.add_api_route(self.base_path, func, tags=[59 self.class_tag], methods=['GET'], **extras)60 elif func.__name__ == 'retrieve':61 self.router.add_api_route(f"{self.base_path}/\u007b{self.path_key}\u007d", func, tags=[62 self.class_tag], methods=['GET'], **extras)63 elif func.__name__ == 'create':64 self.router.add_api_route(self.base_path, func, tags=[65 self.class_tag], methods=['POST'], **extras)66 elif func.__name__ == 'update':67 self.router.add_api_route(f"{self.base_path}/\u007b{self.path_key}\u007d", func, tags=[68 self.class_tag], methods=['PUT'], **extras)69 elif func.__name__ == 'partial_update':70 self.router.add_api_route(f"{self.base_path}/\u007b{self.path_key}\u007d", func, tags=[71 self.class_tag], methods=['PATCH'], **extras)72 elif func.__name__ == 'destroy':73 self.router.add_api_route(f"{self.base_path}/\u007b{self.path_key}\u007d", func, tags=[74 self.class_tag], methods=['DELETE'], **extras)75 else:76 print(f"Method {func.__name__} is not supported")77 def _register_extra_route(self, func: Callable, methods: List[str] = ["GET"], path: str = None):78 extras = {}79 extras['response_model'] = self.get_response_model(func.__name__)80 extras['dependencies'] = self.get_dependencies(func.__name__)81 if path is None:82 path = func.__name__83 self.router.add_api_route(f"{self.base_path}{path}", func, tags=[84 self.class_tag], methods=methods, **extras)85 @classmethod86 def extra_method(cls, methods: List[str] = ["GET"], path_key: str = None):87 """ if you want to add extra method to the viewset, you can use this decorator """88 def decorator(func):89 cls.marked_functions.append([func, methods, path_key])90 return func91 return decorator92 def find_marked_functions(self):93 for func in dir(self):94 for marked_func in self.marked_functions:95 if func == marked_func[0].__name__:96 self.extra_functions.append(marked_func)97 self.marked_functions.remove(marked_func)98 break99 return self.extra_functions100class CrudViewSet(ViewSet):101 """102 This is the base viewset for CRUD operations.103 """104 crud: BaseCrudSet = None105 model: BaseModel = None106 async_db = False107 def __init__(self):108 assert self.crud is not None, "You must define crud model"109 assert self.model is not None, "You must define model"110 self._crud = self.crud()111 super().__init__()112 async def list(self) -> List[model]:113 if self.async_db:114 return await self._crud.list()115 return self._crud.list()116 async def retrieve(self, id: int) -> model:117 if self.async_db:118 return await self._crud.retrieve(id)119 return self._crud.retrieve(id)120 async def create(self, data: model) -> model:121 if self.async_db:122 return await self._crud.create(data)123 return self._crud.create(data)124 async def update(self, id: int, data: model) -> model:125 if self.async_db:126 return await self._crud.update(id, data)127 return self._crud.update(id, data)128 async def partial_update(self, id: int, data: model) -> model:129 if self.async_db:130 return await self._crud.partial_update(id, data)131 return self._crud.partial_update(id, data)132 async def destroy(self, id: int) -> model:133 if self.async_db:134 return await self._crud.destroy(id)...
findCallChains.py
Source:findCallChains.py
12import idaapi34# tcp_v4_rcv5#START_EA = 0xC1780B106START_EA = 0x000000000002A5E078# mark drop functions9#TARGET_EA = 0xC16FAB0010#TARGET_EA = 0x0807320411#TARGET_EA = 0x0801732012#TARGET_EA = 0x0801FF6713#TARGET_EA = 0x0801033F14#TARGET_EA = 0x0801AE28 # set state TCP_NEW_SYN_RECV15TARGET_EA = 0x0000000000017240 # tcp_drop1617entry_func = idaapi.get_func(START_EA)18entry_func_name = Name(entry_func.startEA) or "Entry"19marked_func = idaapi.get_func(TARGET_EA)20marked_func_name = Name(marked_func.startEA) or "MarkedFunc"2122# debug output23print("Entry Function Address: %x" % START_EA)24print("Entry Function Name: '%s'" % entry_func_name)25print("Marked Instruction Address: %x" % TARGET_EA)26print("Marked Instruction belongs to function '%s'" % marked_func_name)2728# There are 2 steps:29# 1. Find all possible call chains (no recursive, i.e. recursive level = 1)30# 2. Find all possible paths within each function in the call chain3132# 1. Find all possible call chains33# backwards or fowards? or both at the same time?3435all_call_chains = []3637def convert_call_stack_to_call_chain(call_stack, reverse=False):38 call_chain = []39 sorted_call_stack = sorted(call_stack.iteritems(), key=lambda (k,v): v['level'], reverse=reverse)4041 for k, v in sorted_call_stack:42 call_chain.append([k, v['start_ea'], v['call_site'], v['level']])4344 # quickfix45 if reverse:46 for i in range(len(call_chain)):47 call_chain[i][3] = len(call_chain) - 1 - call_chain[i][3] 48 else:49 for i in range(len(call_chain)-1):50 call_chain[i][2] = call_chain[i+1][2]51 call_chain[-1][2] = 05253 return call_chain545556# ~75s57def find_call_chain_forward(curr_func, target_func, call_stack, curr_level):58 global fd59 #fd.write("Current Func: %s\n" % (Name(curr_func.startEA) or "N/A"))60 if curr_func == target_func:61 all_call_chains.append(convert_call_stack_to_call_chain(call_stack))62 return6364 for i in FuncItems(curr_func.startEA):65 for xref in XrefsFrom(i, 0):66 #fd.write("%s calls %s from 0x%x. Type: %d" % (Name(curr_func.startEA), Name(xref.to), i, xref.type))67 #fd.write("\n")68 if xref.type == fl_CN or xref.type == fl_CF:69 callee = idaapi.get_func(xref.to)70 callee_name = Name(callee.startEA) or "N/A" 71 if callee_name not in call_stack:72 curr_level += 173 call_stack[callee_name] = {'level': curr_level, 'start_ea': callee.startEA, 'call_site': i}74 find_call_chain_forward(callee, target_func, call_stack, curr_level)75 del call_stack[callee_name]76 curr_level -= 17778# ~25s79def find_call_chain_forward2(curr_func, target_func, call_stack, curr_level):80 global fd81 #fd.write("Current Func: %s\n" % (Name(curr_func.startEA) or "N/A"))82 if curr_func == target_func:83 all_call_chains.append(convert_call_stack_to_call_chain(call_stack))84 return8586 for i in FuncItems(curr_func.startEA):87 for ref in CodeRefsFrom(i, 0):88 callee = idaapi.get_func(ref)89 callee_name = Name(callee.startEA) or "N/A" 90 if callee_name not in call_stack:91 curr_level += 192 call_stack[callee_name] = {'level': curr_level, 'start_ea': callee.startEA, 'call_site': i}93 find_call_chain_forward2(callee, target_func, call_stack, curr_level)94 del call_stack[callee_name]95 curr_level -= 19697# ~1s98def find_call_chain_backward(curr_func, target_func, call_stack, curr_level):99 global fd100 if curr_func == target_func:101 all_call_chains.append(convert_call_stack_to_call_chain(call_stack, True))102 return103104 for xref in XrefsTo(curr_func.startEA, 0):105 if xref.type == fl_CN or xref.type == fl_CF:106 caller = idaapi.get_func(idc.GetFunctionAttr(xref.frm, idc.FUNCATTR_START))107 caller_name = Name(caller.startEA) or "N/A"108 if caller_name not in call_stack:109 curr_level += 1110 call_stack[caller_name] = {'level': curr_level, 'start_ea': caller.startEA, 'call_site': xref.frm}111 find_call_chain_backward(caller, target_func, call_stack, curr_level)112 del call_stack[caller_name]113 curr_level -= 1114115# ~1s116def find_call_chain_backward2(curr_func, target_func, call_stack, curr_level):117 global fd118 if curr_func == target_func:119 all_call_chains.append(convert_call_stack_to_call_chain(call_stack, True))120 return121122 for ref in CodeRefsTo(curr_func.startEA, 0):123 caller = idaapi.get_func(idc.GetFunctionAttr(ref, idc.FUNCATTR_START))124 if caller:125 caller_name = Name(caller.startEA) or "N/A"126 if caller_name not in call_stack:127 curr_level += 1128 call_stack[caller_name] = {'level': curr_level, 'start_ea': caller.startEA, 'call_site': ref}129 find_call_chain_backward2(caller, target_func, call_stack, curr_level)130 del call_stack[caller_name]131 curr_level -= 1132133134file = AskFile(1, "*.txt", "Select output file")135fd = open(file, 'w')136137# forwards138#find_call_chain_forward2(entry_func, marked_func, {entry_func_name: {'level': 0, 'start_ea': START_EA, 'call_site': 0}}, 0)139# backwards140find_call_chain_backward2(marked_func, entry_func, {marked_func_name: {'level': 0, 'start_ea': marked_func.startEA, 'call_site': TARGET_EA}}, 0)141142for cc in all_call_chains:143 func_name, start_ea, call_site, lvl = cc[0]144 fd.write("%s,%x,%x" % (func_name, start_ea, call_site))145 for func_name, start_ea, call_site, lvl in cc[1:]:146 fd.write("|%s,%x,%x" % (func_name, start_ea, call_site))147 fd.write("\n")148149#f = idaapi.FlowChart(get_func(idc.ScreenEA()))150151fd.close()
...
test_function_marker.py
Source:test_function_marker.py
...21 assert function_marker('bla').get_value(func, 1) == 122def test_marker_on_methods(marker):23 class Obj(object):24 @marker25 def marked_func(self):26 pass27 def unmarked_func(self):28 pass29 @classmethod30 def unmarked_classmethod(cls):31 pass32 @staticmethod33 def unmarked_staticmethod(param):34 pass35 @marker36 @classmethod37 def marked_classmethod(cls):38 pass39 @marker40 @staticmethod41 def marked_staticmethod():...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!