Best Python code snippet using avocado_python
views.py
Source:views.py
1from django.db.models import Q2from django.shortcuts import render3from rest_framework.decorators import action4# from apps.cmdb.models import CMDBBase5# from apps.cmdb.serializers import CMDBBaseModelSerializer6from base.mixins import BulkCreateModelMixin7from base.response import json_api_response8from base.views import BaseApiView9from base.views import BaseModelViewSet10from .filters import ServiceTreeFilter11from .models import NodeJoinTag12from .models import NodeLinkOperaPermission13from .models import NodeLinkServer14from .models import ServiceTree15from .serializers import NodeJoinTagSerializer16from .serializers import NodeLinkOperaPermissionModelSerializer17from .serializers import NodeLinkServerSerializer18from .serializers import ServiceTreeListSerializer19# Create your views here.20def show_genres(request):21 return render(22 request,23 "service_tree/genres.html",24 {'genres': ServiceTree.objects.all()}25 )26class ServiceTreeModelViewSet(BulkCreateModelMixin, BaseModelViewSet):27 """æå¡æ """28 queryset = ServiceTree.objects.all()29 serializer_class = ServiceTreeListSerializer30 pagination_class = None31 filterset_class = ServiceTreeFilter32 def get_queryset(self):33 queryset = super(ServiceTreeModelViewSet, self).get_queryset()34 u = self.request.user35 if u.username in ["admin", "root"]:36 return queryset37 38 node_pks = []39 # u = UserProfile.objects.get(username=self.request.user)40 # write member41 for x in u.write_member.all():42 up_nodes = x.node.get_ancestors(ascending=False, include_self=True)43 node_pks.extend([n.pk for n in up_nodes])44 # read member45 for x in u.read_member.all():46 up_nodes = x.node.get_ancestors(ascending=False, include_self=True)47 node_pks.extend([n.pk for n in up_nodes])48 return queryset.filter(pk__in=list(set(node_pks)))49 50 def list(self, request, *args, **kwargs):51 queryset = self.filter_queryset(self.get_queryset())52 data = self.get_serializer(queryset.filter(parent=None), many=True).data53 for instance in data:54 instance['children'] = self.set_children_nodes(instance["pk"], queryset)55 return json_api_response(code=0, data=data, message="")56 def set_children_nodes(self, pk, qs):57 children_set = qs.filter(parent=pk) # 第2å± ops, ter, prt58 children_data = self.get_serializer(children_set, many=True).data59 for child in children_data:60 child_qs = qs.filter(parent=child['pk']) # 第3å± monkey, api-gw61 if child_qs.exists():62 child['children'] = self.set_children_nodes(child['pk'], qs)63 return children_data64 @action(methods=["get"], detail=True)65 def opera_permission_member(self, request, pk=None):66 ret = {67 "read_member": [],68 "write_member": [],69 "read_member_ref": [], # ç»§æ¿ read70 "write_member_ref": [] # ç»§æ¿ write71 }72 # å½åèç¹73 node = self.get_object()74 # å½åèç¹ä¸ææçå¶åèç¹75 _nodes = node.get_ancestors(ascending=True, include_self=True)76 # OneToOneField77 for idx, n in enumerate(_nodes):78 try:79 read_member_set = n.nodelinkoperapermission.read_member.all()80 write_member_set = n.nodelinkoperapermission.write_member.all()81 read_member = [{'id': o.pk, 'username': o.username, 'email': o.email, 'name': o.name} for o in read_member_set]82 write_member = [{'id': o.pk, 'username': o.username, 'email': o.email, 'name': o.name} for o in write_member_set]83 except ServiceTree.nodelinkoperapermission.RelatedObjectDoesNotExist:84 continue85 if idx == 0:86 ret['read_member'] = read_member87 ret['write_member'] = write_member88 else:89 ret['read_member_ref'].extend(read_member)90 ret['write_member_ref'].extend(write_member)91 return json_api_response(code=0, data=ret, message=None)92 @action(methods=["get"], detail=True)93 def server(self, request, pk=None, *args, **kwargs):94 """95 æ¥è¯¢èç¹ä¸ææå¶åèç¹çæºå¨ä¿¡æ¯96 /api/v1/service_tree/<pk>/server/?page=1&page_size=10097 /api/v1/service_tree/<pk>/server/?page=1&page_size=100&hostname=<>&private_ip=<>98 @param request:99 @param pk:100 @return:101 """102 is_pagination = False103 page = request.query_params.get('page', 1)104 page_size = request.query_params.get('page_size', 15)105 if page:106 is_pagination = True107 # å½åèç¹108 node = self.get_object()109 # å½åèç¹ä¸ææçå¶åèç¹110 _nodes = node.get_descendants(include_self=True)111 _leaf_nodes = [n for n in _nodes if n.is_leaf_node()]112 # OneToOneField113 cmdb_pks = []114 for leaf_node in _leaf_nodes:115 try:116 link_server = leaf_node.nodelinkserver.cmdbs.all()117 except:118 pass119 else:120 cmdb_pks.extend([s.pk for s in link_server])121 # # ForeignKey122 # node_set = []123 # for leaf_node in _leaf_nodes:124 # node_set.extend(leaf_node.nodelinkserver_set.all())125 # cmdb_pks = []126 # for s in node_set:127 # cmdb_pks.extend([_s.pk for _s in s.cmdbs.all()])128 qs = CMDBBase.objects.filter(pk__in=cmdb_pks, is_deleted=False)129 qs = self.server_filter(qs, request.query_params)130 count = qs.count()131 if is_pagination:132 start = (int(page) - 1) * int(page_size)133 stop = start + int(page_size)134 qs = qs[start:stop]135 response = {136 "count": count,137 "results": CMDBBaseModelSerializer(qs, many=True).data138 }139 return json_api_response(code=0, data=response, message=None)140 @action(methods=["get"], detail=True)141 def tag(self, request, pk=None, *args, **kwargs):142 """143 æ¥è¯¢èç¹å
³èçtag144 /api/v1/service_tree/<pk>/tag/145 - ä¸éè¦å页146 {147 "tag": []148 "tag_ref": [[],[]]149 }150 """151 response = {"tag": [], "tag_ref": []}152 # å½åèç¹153 node = self.get_object()154 # å½åèç¹å«å½åèç¹ä¸ç»§æ¿çtag155 _nodes = node.get_ancestors(ascending=True, include_self=True)156 for idx, n in enumerate(_nodes):157 kvs = [{"key": info.key, "value": info.value, "id": info.pk} for info in n.nodejointag_set.all()]158 if idx == 0:159 response['tag'] = kvs160 continue161 if not kvs:162 continue163 response['tag_ref'].append(kvs)164 return json_api_response(code=0, data=response, message=None)165 @action(methods=["get"], detail=True)166 def tag_v2(self, request, pk=None, *args, **kwargs):167 """168 æ¥è¯¢èç¹å
³èçtag169 /api/v1/service_tree/<pk>/tag/170 - ä¸éè¦å页171 @param request:172 @param pk:173 @return:174 {175 "tag": {}176 "tag_ref": [{},{}]177 }178 """179 response = {"tag": {}, "tag_ref": []}180 # å½åèç¹181 node = self.get_object()182 # å½åèç¹å«å½åèç¹ä¸ç»§æ¿çtag183 _nodes = node.get_ancestors(ascending=True, include_self=True)184 for idx, n in enumerate(_nodes):185 kvs = {info.key: info.value for info in n.nodejointag_set.all()}186 if idx == 0:187 response['tag'] = kvs188 continue189 if not kvs:190 continue191 response['tag_ref'].append(kvs)192 return json_api_response(code=0, data=response, message=None)193 def server_filter(self, qs, query):194 # å
¨å±æ¨¡ç³æç´¢195 dft_v = query.get('default')196 if dft_v:197 qs = qs.filter(198 Q(hostname__contains=dft_v) |199 Q(private_ip__contains=dft_v)200 )201 else:202 # ç²¾åæç´¢203 hostname_v = query.get('hostname')204 if hostname_v:205 qs = qs.filter(hostname=hostname_v)206 private_ip_v = query.get('private_ip')207 if private_ip_v:208 qs = qs.filter(private_ip=private_ip_v)209 return qs210 def destroy(self, request, *args, **kwargs):211 ins = self.get_object()212 # ä¸å
许跨å±çº§å é¤213 if not ins.is_leaf_node():214 return json_api_response(code=-1, data=None, message="ä¸å
许跨å±çº§å é¤.")215 return super(ServiceTreeModelViewSet, self).destroy(request, *args, **kwargs)216class NodeOperaPermissionModelViewSet(BaseModelViewSet):217 """æå¡æ å
³èèç¹æä½æé"""218 queryset = NodeLinkOperaPermission.objects.all()219 serializer_class = NodeLinkOperaPermissionModelSerializer220 pagination_class = None221 def create(self, request, *args, **kwargs):222 """223 å¦æ node ä¸åå¨æ¶ï¼åæ·»å ï¼å¦åä¿®æ¹224 """225 serializer = self.get_serializer(data=request.data)226 if serializer.is_valid():227 self.perform_create(serializer)228 return json_api_response(code=0, data=serializer.data, message=None)229 print("data", serializer.data)230 try:231 instance = self.queryset.get(node_id=serializer.data['node'])232 except NodeLinkOperaPermission.DoesNotExist:233 return json_api_response(code=-1, data=None, message="not found.")234 partial = kwargs.pop('partial', True)235 s = self.get_serializer(instance, data=request.data, partial=partial)236 s.is_valid(raise_exception=True)237 try:238 instance.read_member.add(*serializer.data['read_member'])239 instance.write_member.add(*serializer.data['write_member'])240 except Exception as e:241 return json_api_response(code=-1, data=e.args, message=None)242 return json_api_response(code=0, data=s.data, message=None)243 def destroy(self, request, *args, **kwargs):244 """245 /api/v1/service_tree/opera_permission/<node_id>/246 {247 read_member: []248 write_member: []249 }250 """251 data = request.data252 try:253 node = ServiceTree.objects.get(pk=kwargs['pk'])254 except ServiceTree.DoesNotExist:255 return json_api_response(code=-1, data=None, message=f"node {kwargs['pk']} not found")256 node.nodelinkoperapermission.read_member.remove(*data.get('read_member', []))257 node.nodelinkoperapermission.write_member.remove(*data.get('write_member', []))258 return json_api_response(code=0, data=None, message="å é¤æå.")259class NodeLinkServerModelViewSet(BaseModelViewSet):260 """261 æå¡æ å
³èæå¡å¨262 """263 queryset = NodeLinkServer.objects.all()264 serializer_class = NodeLinkServerSerializer265 pagination_class = None266 def create(self, request, *args, **kwargs):267 """268 å¦æ node ä¸åå¨æ¶ï¼åæ·»å ï¼å¦åä¿®æ¹269 """270 if 'app_key' in request.data:271 try:272 node = self.queryset.get(node__appkey=request.data['app_key'])273 except NodeLinkServer.DoesNotExist:274 return json_api_response(code=-1, data=None, message="app_key not found.")275 request_data = {276 "node": node.node_id,277 "cmdbs": request.data['cmdbs'],278 }279 else:280 request_data = request.data281 serializer = self.get_serializer(data=request_data)282 if serializer.is_valid():283 self.perform_create(serializer)284 return json_api_response(code=0, data=serializer.data, message=None)285 else:286 try:287 instance = self.queryset.get(node_id=serializer.data['node'])288 except NodeLinkServer.DoesNotExist:289 return json_api_response(code=-1, data=None, message="not found.")290 partial = kwargs.pop('partial', True)291 s = self.get_serializer(instance, data=request_data, partial=partial)292 s.is_valid(raise_exception=True)293 instance.cmdbs.add(*serializer.data['cmdbs'])294 return json_api_response(code=0, data=serializer.data, message=None)295class NodeLinkTagModelViewSet(BaseModelViewSet):296 """297 æå¡æ å
³èTag298 https://help.aliyun.com/document_detail/171446.html?spm=5176.2020520101manager.0.0.12914a9cStpRuR299 """300 queryset = NodeJoinTag.objects.all()301 serializer_class = NodeJoinTagSerializer302 pagination_class = None303class UnlinkNodeServerApiView(BaseApiView):304 """305 ä»æå¡æ 解ç»Server306 delete /api/v1/service_tree/unlink/<node_id>/307 {308 "server_ids": [1, 2, 3] # cmdbçid309 }310 """311 def delete(self, request, *args, **kwargs):312 data = request.data313 node_id = kwargs['pk']314 server_ids = data['server_ids']315 try:316 node_link_server = NodeLinkServer.objects.get(node_id=node_id)317 except NodeLinkServer.DoesNotExist:318 return json_api_response(code=-1, data=None, message=f"node_id {node_id} not found.")319 instances = CMDBBase.objects.filter(pk__in=server_ids)320 node_link_server.cmdbs.remove(*[x.pk for x in instances])321 return json_api_response(code=0, data=None, message=None)322class ParentNodeInfoApiView(BaseApiView):323 """324 æ¥è¯¢èç¹ä¸ææçç¶èç¹325 /api/v1/service_tree/parents/100/326 """327 authentication_classes = []328 permission_classes = []329 def get(self, request, pk, *args, **kwargs):330 try:331 node = ServiceTree.objects.get(pk=pk)332 except ServiceTree.DoesNotExist:333 return json_api_response(code=-1, data=None, message=f"pk {pk} not found.")334 s = ServiceTreeListSerializer(node.get_ancestors(ascending=True, include_self=True), many=True)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!