How to use read_member method in avocado

Best Python code snippet using avocado_python

views.py

Source:views.py Github

copy

Full Screen

1from django.db.models import Q2from django.shortcuts import render3from rest_framework.decorators import action4# from apps.cmdb.models import CMDBBase5# from apps.cmdb.serializers import CMDBBaseModelSerializer6from base.mixins import BulkCreateModelMixin7from base.response import json_api_response8from base.views import BaseApiView9from base.views import BaseModelViewSet10from .filters import ServiceTreeFilter11from .models import NodeJoinTag12from .models import NodeLinkOperaPermission13from .models import NodeLinkServer14from .models import ServiceTree15from .serializers import NodeJoinTagSerializer16from .serializers import NodeLinkOperaPermissionModelSerializer17from .serializers import NodeLinkServerSerializer18from .serializers import ServiceTreeListSerializer19# Create your views here.20def show_genres(request):21 return render(22 request,23 "service_tree/genres.html",24 {'genres': ServiceTree.objects.all()}25 )26class ServiceTreeModelViewSet(BulkCreateModelMixin, BaseModelViewSet):27 """服务树"""28 queryset = ServiceTree.objects.all()29 serializer_class = ServiceTreeListSerializer30 pagination_class = None31 filterset_class = ServiceTreeFilter32 def get_queryset(self):33 queryset = super(ServiceTreeModelViewSet, self).get_queryset()34 u = self.request.user35 if u.username in ["admin", "root"]:36 return queryset37 38 node_pks = []39 # u = UserProfile.objects.get(username=self.request.user)40 # write member41 for x in u.write_member.all():42 up_nodes = x.node.get_ancestors(ascending=False, include_self=True)43 node_pks.extend([n.pk for n in up_nodes])44 # read member45 for x in u.read_member.all():46 up_nodes = x.node.get_ancestors(ascending=False, include_self=True)47 node_pks.extend([n.pk for n in up_nodes])48 return queryset.filter(pk__in=list(set(node_pks)))49 50 def list(self, request, *args, **kwargs):51 queryset = self.filter_queryset(self.get_queryset())52 data = self.get_serializer(queryset.filter(parent=None), many=True).data53 for instance in data:54 instance['children'] = self.set_children_nodes(instance["pk"], queryset)55 return json_api_response(code=0, data=data, message="")56 def set_children_nodes(self, pk, qs):57 children_set = qs.filter(parent=pk) # 第2层 ops, ter, prt58 children_data = self.get_serializer(children_set, many=True).data59 for child in children_data:60 child_qs = qs.filter(parent=child['pk']) # 第3层 monkey, api-gw61 if child_qs.exists():62 child['children'] = self.set_children_nodes(child['pk'], qs)63 return children_data64 @action(methods=["get"], detail=True)65 def opera_permission_member(self, request, pk=None):66 ret = {67 "read_member": [],68 "write_member": [],69 "read_member_ref": [], # 继承 read70 "write_member_ref": [] # 继承 write71 }72 # 当前节点73 node = self.get_object()74 # 当前节点下所有的叶子节点75 _nodes = node.get_ancestors(ascending=True, include_self=True)76 # OneToOneField77 for idx, n in enumerate(_nodes):78 try:79 read_member_set = n.nodelinkoperapermission.read_member.all()80 write_member_set = n.nodelinkoperapermission.write_member.all()81 read_member = [{'id': o.pk, 'username': o.username, 'email': o.email, 'name': o.name} for o in read_member_set]82 write_member = [{'id': o.pk, 'username': o.username, 'email': o.email, 'name': o.name} for o in write_member_set]83 except ServiceTree.nodelinkoperapermission.RelatedObjectDoesNotExist:84 continue85 if idx == 0:86 ret['read_member'] = read_member87 ret['write_member'] = write_member88 else:89 ret['read_member_ref'].extend(read_member)90 ret['write_member_ref'].extend(write_member)91 return json_api_response(code=0, data=ret, message=None)92 @action(methods=["get"], detail=True)93 def server(self, request, pk=None, *args, **kwargs):94 """95 查询节点下所有叶子节点的机器信息96 /api/v1/service_tree/<pk>/server/?page=1&page_size=10097 /api/v1/service_tree/<pk>/server/?page=1&page_size=100&hostname=<>&private_ip=<>98 @param request:99 @param pk:100 @return:101 """102 is_pagination = False103 page = request.query_params.get('page', 1)104 page_size = request.query_params.get('page_size', 15)105 if page:106 is_pagination = True107 # 当前节点108 node = self.get_object()109 # 当前节点下所有的叶子节点110 _nodes = node.get_descendants(include_self=True)111 _leaf_nodes = [n for n in _nodes if n.is_leaf_node()]112 # OneToOneField113 cmdb_pks = []114 for leaf_node in _leaf_nodes:115 try:116 link_server = leaf_node.nodelinkserver.cmdbs.all()117 except:118 pass119 else:120 cmdb_pks.extend([s.pk for s in link_server])121 # # ForeignKey122 # node_set = []123 # for leaf_node in _leaf_nodes:124 # node_set.extend(leaf_node.nodelinkserver_set.all())125 # cmdb_pks = []126 # for s in node_set:127 # cmdb_pks.extend([_s.pk for _s in s.cmdbs.all()])128 qs = CMDBBase.objects.filter(pk__in=cmdb_pks, is_deleted=False)129 qs = self.server_filter(qs, request.query_params)130 count = qs.count()131 if is_pagination:132 start = (int(page) - 1) * int(page_size)133 stop = start + int(page_size)134 qs = qs[start:stop]135 response = {136 "count": count,137 "results": CMDBBaseModelSerializer(qs, many=True).data138 }139 return json_api_response(code=0, data=response, message=None)140 @action(methods=["get"], detail=True)141 def tag(self, request, pk=None, *args, **kwargs):142 """143 查询节点关联的tag144 /api/v1/service_tree/<pk>/tag/145 - 不需要分页146 {147 "tag": []148 "tag_ref": [[],[]]149 }150 """151 response = {"tag": [], "tag_ref": []}152 # 当前节点153 node = self.get_object()154 # 当前节点含当前节点上继承的tag155 _nodes = node.get_ancestors(ascending=True, include_self=True)156 for idx, n in enumerate(_nodes):157 kvs = [{"key": info.key, "value": info.value, "id": info.pk} for info in n.nodejointag_set.all()]158 if idx == 0:159 response['tag'] = kvs160 continue161 if not kvs:162 continue163 response['tag_ref'].append(kvs)164 return json_api_response(code=0, data=response, message=None)165 @action(methods=["get"], detail=True)166 def tag_v2(self, request, pk=None, *args, **kwargs):167 """168 查询节点关联的tag169 /api/v1/service_tree/<pk>/tag/170 - 不需要分页171 @param request:172 @param pk:173 @return:174 {175 "tag": {}176 "tag_ref": [{},{}]177 }178 """179 response = {"tag": {}, "tag_ref": []}180 # 当前节点181 node = self.get_object()182 # 当前节点含当前节点上继承的tag183 _nodes = node.get_ancestors(ascending=True, include_self=True)184 for idx, n in enumerate(_nodes):185 kvs = {info.key: info.value for info in n.nodejointag_set.all()}186 if idx == 0:187 response['tag'] = kvs188 continue189 if not kvs:190 continue191 response['tag_ref'].append(kvs)192 return json_api_response(code=0, data=response, message=None)193 def server_filter(self, qs, query):194 # 全局模糊搜索195 dft_v = query.get('default')196 if dft_v:197 qs = qs.filter(198 Q(hostname__contains=dft_v) |199 Q(private_ip__contains=dft_v)200 )201 else:202 # 精准搜索203 hostname_v = query.get('hostname')204 if hostname_v:205 qs = qs.filter(hostname=hostname_v)206 private_ip_v = query.get('private_ip')207 if private_ip_v:208 qs = qs.filter(private_ip=private_ip_v)209 return qs210 def destroy(self, request, *args, **kwargs):211 ins = self.get_object()212 # 不允许跨层级删除213 if not ins.is_leaf_node():214 return json_api_response(code=-1, data=None, message="不允许跨层级删除.")215 return super(ServiceTreeModelViewSet, self).destroy(request, *args, **kwargs)216class NodeOperaPermissionModelViewSet(BaseModelViewSet):217 """服务树 关联节点操作权限"""218 queryset = NodeLinkOperaPermission.objects.all()219 serializer_class = NodeLinkOperaPermissionModelSerializer220 pagination_class = None221 def create(self, request, *args, **kwargs):222 """223 如果 node 不存在时,则添加,否则修改224 """225 serializer = self.get_serializer(data=request.data)226 if serializer.is_valid():227 self.perform_create(serializer)228 return json_api_response(code=0, data=serializer.data, message=None)229 print("data", serializer.data)230 try:231 instance = self.queryset.get(node_id=serializer.data['node'])232 except NodeLinkOperaPermission.DoesNotExist:233 return json_api_response(code=-1, data=None, message="not found.")234 partial = kwargs.pop('partial', True)235 s = self.get_serializer(instance, data=request.data, partial=partial)236 s.is_valid(raise_exception=True)237 try:238 instance.read_member.add(*serializer.data['read_member'])239 instance.write_member.add(*serializer.data['write_member'])240 except Exception as e:241 return json_api_response(code=-1, data=e.args, message=None)242 return json_api_response(code=0, data=s.data, message=None)243 def destroy(self, request, *args, **kwargs):244 """245 /api/v1/service_tree/opera_permission/<node_id>/246 {247 read_member: []248 write_member: []249 }250 """251 data = request.data252 try:253 node = ServiceTree.objects.get(pk=kwargs['pk'])254 except ServiceTree.DoesNotExist:255 return json_api_response(code=-1, data=None, message=f"node {kwargs['pk']} not found")256 node.nodelinkoperapermission.read_member.remove(*data.get('read_member', []))257 node.nodelinkoperapermission.write_member.remove(*data.get('write_member', []))258 return json_api_response(code=0, data=None, message="删除成功.")259class NodeLinkServerModelViewSet(BaseModelViewSet):260 """261 服务树 关联服务器262 """263 queryset = NodeLinkServer.objects.all()264 serializer_class = NodeLinkServerSerializer265 pagination_class = None266 def create(self, request, *args, **kwargs):267 """268 如果 node 不存在时,则添加,否则修改269 """270 if 'app_key' in request.data:271 try:272 node = self.queryset.get(node__appkey=request.data['app_key'])273 except NodeLinkServer.DoesNotExist:274 return json_api_response(code=-1, data=None, message="app_key not found.")275 request_data = {276 "node": node.node_id,277 "cmdbs": request.data['cmdbs'],278 }279 else:280 request_data = request.data281 serializer = self.get_serializer(data=request_data)282 if serializer.is_valid():283 self.perform_create(serializer)284 return json_api_response(code=0, data=serializer.data, message=None)285 else:286 try:287 instance = self.queryset.get(node_id=serializer.data['node'])288 except NodeLinkServer.DoesNotExist:289 return json_api_response(code=-1, data=None, message="not found.")290 partial = kwargs.pop('partial', True)291 s = self.get_serializer(instance, data=request_data, partial=partial)292 s.is_valid(raise_exception=True)293 instance.cmdbs.add(*serializer.data['cmdbs'])294 return json_api_response(code=0, data=serializer.data, message=None)295class NodeLinkTagModelViewSet(BaseModelViewSet):296 """297 服务树 关联Tag298 https://help.aliyun.com/document_detail/171446.html?spm=5176.2020520101manager.0.0.12914a9cStpRuR299 """300 queryset = NodeJoinTag.objects.all()301 serializer_class = NodeJoinTagSerializer302 pagination_class = None303class UnlinkNodeServerApiView(BaseApiView):304 """305 从服务树解绑Server306 delete /api/v1/service_tree/unlink/<node_id>/307 {308 "server_ids": [1, 2, 3] # cmdb的id309 }310 """311 def delete(self, request, *args, **kwargs):312 data = request.data313 node_id = kwargs['pk']314 server_ids = data['server_ids']315 try:316 node_link_server = NodeLinkServer.objects.get(node_id=node_id)317 except NodeLinkServer.DoesNotExist:318 return json_api_response(code=-1, data=None, message=f"node_id {node_id} not found.")319 instances = CMDBBase.objects.filter(pk__in=server_ids)320 node_link_server.cmdbs.remove(*[x.pk for x in instances])321 return json_api_response(code=0, data=None, message=None)322class ParentNodeInfoApiView(BaseApiView):323 """324 查询节点上所有的父节点325 /api/v1/service_tree/parents/100/326 """327 authentication_classes = []328 permission_classes = []329 def get(self, request, pk, *args, **kwargs):330 try:331 node = ServiceTree.objects.get(pk=pk)332 except ServiceTree.DoesNotExist:333 return json_api_response(code=-1, data=None, message=f"pk {pk} not found.")334 s = ServiceTreeListSerializer(node.get_ancestors(ascending=True, include_self=True), many=True)...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful