Best Python code snippet using slash
query.py
Source:query.py
1# -*- coding: utf-8 -*-2# æ¥è¯¢ç±»æ¨¡å3#4import copy5import json6import re7import time8from collections import OrderedDict9from django.db import models10from analysis.apps import AnalysisConfig11from framework.models import BaseModel12from framework.translation import _13from framework.utils import import_func, ObjectDict, trace_msg14from framework.utils.cache import CacheAttribute15from framework.utils.myenum import Enum16from log_def.models import DictDefine, LogDefine17class SqlMarkConfig(ObjectDict):18 mark_name: str19 name: str20 alias: str21 multiple: bool22 single: bool # æ¯å¦åé23 fixed: bool # æ¯å¦å®å¼24 template: bool # 模æ¿25 context_func: lambda x: x # 模æ¿ä¸ä¸æ26 order_num: int27 params_func: lambda x: x28class SqlMarkManager(object):29 mark_map = OrderedDict({30 # "game_alias" : {"name" : "game_alias", "multiple": False,31 # 'template' : 'sdk_center/widgets/game_alias_select2.html',32 # 'context_func': 'sdk_center.views.widgets.get_game_alias_dict'}, # 游æ代å·33 # "game_aliass" : {"name" : "game_alias", "multiple": True,34 # 'template' : 'sdk_center/widgets/game_alias_dialog.html',35 # 'context_func': 'sdk_center.views.widgets.get_game_alias_dict'}, # å¤ä¸ªæ¸¸æ代å·36 #37 "server_id" : {"name" : "server_id", "multiple": False,38 'template' : 'analysis/widgets/query_server_select.html',39 'context_func': 'analysis.views.widgets.get_query_servers'}, # æå¡å¨IDæ¿æ¢40 # 'context_func': 'game_manage.views.widgets.get_group_servers_dict'}, # æå¡å¨IDæ¿æ¢41 # "server_ids" : {"name" : "server_id", "multiple": True,42 # 'template' : 'game_manage/widgets/group_server_dialog.html',43 # 'context_func': 'game_manage.views.widgets.get_group_servers_dict'}, # å¤ä¸ªæå¡å¨IDæ¿æ¢44 "server_name" : {"name": "server_name", "multiple": False, "alias": 'æå¡å¨å'}, # æå¡å¨å45 "master_id" : {"name": "master_id", "multiple": False, "alias": 'æ¯æID'}, # æ¯æID46 "master_db" : {"name": "master_db", "multiple": False, "alias": 'æ¯ædbå'}, # æ¯ædbå47 "sdate" : {"name": "sdate", "alias": 'å¼å§æ¶é´'}, # å¼å§æ¶é´48 "edate" : {"name": "edate", "alias": 'ç»ææ¶é´'}, # ç»ææ¶é´49 # "channel" : {"name" : "channel", "multiple": False, 'type': 'str',50 # 'template' : 'sdk_center/widgets/agent_channel_alias_select.html',51 # 'context_func': 'sdk_center.views.widgets.get_agent_channels_dict'}, # æ¸ éæ è¯52 # "channels" : {"name" : "channel", "multiple": True, 'type': 'str',53 # 'template' : 'sdk_center/widgets/agent_channel_alias_dialog.html',54 # 'context_func': 'sdk_center.views.widgets.get_agent_channels_dict'}, # æ¸ éæ è¯å表55 # "first_channel" : {"name" : "channel", "multiple": True, 'type': 'str',56 # 'template' : 'sdk_center/widgets/agent_alias.html',57 # 'context_func': 'sdk_center.views.widgets.get_agent_dict'}, # æ¸ éæ è¯å表58 # "plan_channel" : {"name" : "channel", "multiple": True, 'type': 'str',59 # 'template' : 'sdk_center/widgets/plan_agent_channel.html',60 # 'context_func': 'sdk_center.views.widgets.get_agent_channels_dict'}, # æ¸ éæ è¯61 # "package": {"name": "package", "multiple": True, 'type': 'str', "alias": 'æ¸ éæ è¯'}, # æ¸ éæ è¯62 # "media": {"name": "media", "multiple": True, 'type': 'str', "alias": 'æ¸ éæ è¯'}, # æ¸ éæ è¯63 #64 # "sdk_code": {"name": "sdk_code", "alias": 'æ¸ éæ è¯'}, # æ¸ é代å·65 # "channel_id" : {"name" : "channel_id", "multiple": False,66 # 'template' : 'game_manage/widgets/agent_channel_select.html',67 # 'context_func': 'game_manage.views.widgets.get_agent_channels_dict'}, # æ¸ éid68 # "channel_ids" : {"name" : "channel_id", "multiple": True,69 # 'template' : 'game_manage/widgets/agent_channel_dialog.html',70 # 'context_func': 'game_manage.views.widgets.get_agent_channels_dict'}, # æ¸ éid71 # "kouliang_rate": {"name": "kouliang_rate"},72 # "min_kouliang_num": {"name": "min_kouliang_num"},73 # "game_server_id" : {"name" : "game_server_id", "multiple": False,74 # 'template': 'sdk_center/widgets/game_server_alias.html'}, # åºæIDæ¿æ¢75 # "game_server_ids" : {"name" : "game_server_id", "multiple": True,76 # 'template': 'sdk_center/widgets/game_server_alias.html'}, # å¤ä¸ªåºæIDæ¿æ¢77 "agent_name" : {"name": "agent_name", "alias": 'å¹³å°å'}, # å¹³å°å78 'platform_id' : {"name": "platform_id", "multiple": False, "alias": '游æå¹³id'}, # 游æå¹³å°79 'platform_ids': {"name": "platform_id", "multiple": True, "alias": 'å¹³å°IDS'},80 'user_id' : {"name" : "user_id", "multiple": False, "alias": '管çåid',81 'params_func': lambda request: [request.user.id]}, # 管çåid82 'admin_id' : {"name" : "admin_id", "multiple": False, "alias": '管çåid',83 'params_func': lambda request: [request.user.id]}, # 管çåid84 'is_root' : {"name" : "is_root", "multiple": False, "alias": 'æ¯å¦è¶
级管çå',85 'params_func': lambda request: [1 if request.user.is_root else 0]}, # æ¯å¦è¶
级管çå86 'is_manager' : {"name" : "is_manager", "multiple": False, "alias": 'æ¯å¦ç®¡çå',87 'params_func': lambda request: [1 if request.user.is_manager else 0]} # æ¯å¦ç®¡çå88 })89 @classmethod90 def register_mark(cls, sql_mark_config):91 cls.mark_map[sql_mark_config.mark_name] = sql_mark_config92class SqlBuilder(object):93 """sql94 @param TAG_FORMAT: é»è®¤çæ ç¾å¤å´95 @param params: åå
¸ ä¸é®ä¸å表 ä¾å¦reuqest.POST96 """97 TAG_FORMAT = '{{%s}}'98 def __init__(self, source_sql, params):99 """åå§å100 @param source_sql åSQL101 """102 self.sql = source_sql103 self.query_sql = source_sql.strip()104 self.params = params105 self.order_str = ''106 self.limit_str = ''107 self.mark_map = copy.copy(SqlMarkManager.mark_map)108 def query_sql_handle(self, sql=''):109 """æ¥è¯¢sql转æ¢110 """111 values_list = []112 for mark_name, config in self.mark_map.items():113 if self.has_mark(mark_name): # sqlåå¨è¿ä¸ªæ ç¾114 param_name = config['name']115 value = self.get_param_value(param_name, config)116 if value or value == 0 or config.get('single', False) or config.get('fixed',117 False): # å
æ没å¼çæ è®°æ¿æ¢,åéåå¼å
³å¯ä»¥ä¸ºç©ºå¼118 values_list.append((mark_name, value))119 else:120 self.empty_param_handle(mark_name)121 if values_list:122 for mark_name, value in values_list:123 self.replace_mark_to_value(mark_name, value)124 return self.query_sql125 def convert_datatable_name(self):126 datatable_tags = re.findall(r'${([^\s,]*)}', self.query_sql)127 if datatable_tags: # å
å«${模åå}æ ç¾128 for data_model_name in set(datatable_tags):129 model_class = __import__(data_model_name)130 self.query_sql = self.query_sql.replace('${%s}' % data_model_name, model_class.get_table_name())131 return self.query_sql132 def get_param_value(self, param_name, config):133 """è·ååæ°å¼134 """135 values = self.params.get(param_name, []) or ['']136 the_value = str(values[0])137 if the_value != '': # åæ°ä¸ä¸ºç©º138 if not config.get('multiple', False): # ä¸æ¯å¤é139 values = values[0]140 else:141 the_sp = r',|\s'142 if re.search(the_sp, the_value):143 values = re.split(the_sp, the_value)144 dict_key = config.get('dict', '') # è¾å
¥å¼è½¬æ¢,æ¯æä¸æ转æid145 if dict_key:146 if not config.get('fixed', False):147 value_def = DictDefine.get_dict_for_key(dict_key, reverse=True)148 values = self.convert_input_value(values, value_def, config.get('type'))149 else: # åºå®å¼150 values = dict_key151 else:152 values = self.convert_input_value(values, {}, config.get('type'))153 else:154 values = config.get('default_value', '')155 return values156 def convert_input_value(self, values, value_def, value_type):157 """转æ¢è¾å
¥158 """159 if isinstance(values, (list, tuple, set)):160 _r = []161 for v in values:162 value = str(value_def.get(v, value_def.get(str(v), v)))163 # é²æ¢ sql 注å
¥164 value = self.get_safe_value(value)165 if not value.isdigit() or value_type == 'str':166 value = "'%s'" % value167 _r.append(value)168 return ','.join(_r)169 else:170 return value_def.get(values, values)171 def empty_param_handle(self, mark_name, value=' 0=0 '):172 """ä¼ å
¥åæ°ä¸ºç©ºæ¶,æ SQLçæ¡ä»¶ ä¾å¦:log_user={{player_id}} æ¢æ¢æ 0=0173 """174 the_mark = self.make_mark(mark_name)175 # pattern = re.compile(r"""[\w\.`(+=*]*[\W`]?(not[\s]+)?(like|in|=|=>|<=|>|<|!=)[\W]*%s[^\s]*[\s$]*""" % the_mark,flags=re.I)176 pattern = re.compile(r"""[\S]*[\W`]?([\s]+not[\s]+)?(like|in|=|=>|<=|>|<|!=)[\W]*%s[^\s]*[\s$]*""" % the_mark,177 flags=re.I)178 _s = int(time.time())179 self.query_sql = re.sub(pattern, value, self.query_sql)180 def set_limit(self, page_size, page_num):181 """设置sql limit182 """183 page_num, page_size = int(page_num), int(page_size)184 self.limit_str = 'limit %s,%s' % ((page_num - 1) * page_size, page_size)185 def set_order(self, sort_key, sort_type='ASC'):186 if sort_key and sort_type.lower() in ('asc', 'desc'):187 if 'order' in self.query_sql.strip()[-30:].lower():188 self.order_str = ' ,%s %s' % (sort_key, sort_type)189 else:190 self.order_str = ' ORDER BY %s %s' % (sort_key, sort_type)191 safe_pattern = re.compile(r"""'|"|;|>|<|%|--""", flags=re.I)192 def get_safe_value(self, value):193 value = re.sub(self.safe_pattern, '', str(value))194 return value195 def replace_mark_to_value(self, mark_name, value):196 self.query_sql = self.query_sql.replace(self.make_mark(mark_name), value)197 def make_mark(self, mark_name):198 return self.TAG_FORMAT % mark_name199 def get_query_sql(self):200 return '%s %s %s' % (Query.filter_sql(self.query_sql), self.order_str, self.limit_str)201 def get_count_sql(self):202 return 'select count(0) from (%s) newTable' % Query.filter_sql(self.query_sql)203 def has_mark(self, mark_name):204 return self.make_mark(mark_name) in self.query_sql205 def generate_mark_conditions(self):206 mark_conditions = {}207 for k, v in self.mark_map.items():208 mark_conditions[k] = v if self.has_mark(k) else False209 return mark_conditions210 def get_context_func(self, mark_name):211 func = self.mark_map.get(mark_name,{}).get('context_func', None)212 if isinstance(func, str):213 return import_func(func)214 return func215 def get_param_func(self, mark_name):216 func = self.mark_map.get(mark_name,{}).get('params_func', None)217 if isinstance(func, str):218 return import_func(func)219 return func220 def set_mark_parmas(self, request):221 for k, item in self.mark_map.items():222 param_func = self.get_param_func(k)223 if param_func:224 self.params[k] = param_func(request)225class QueryCompiler(SqlBuilder):226 def __init__(self, query_model, params):227 self.query = query_model228 self.already_get_query_sql = False229 the_sql = self.query.sql or self.query.get_default_sql()230 super(QueryCompiler, self).__init__(the_sql, params)231 self.mark_map.update(self.query.field_config)232 def get_tfoot_sql(self):233 """è·åtfootçæ±æ»sql234 """235 assert self.already_get_query_sql, 'åªè½å
使ç¨get_query_sql è·åæ¥è¯¢SQL'236 tmp_sql = self.query_sql237 ftoot_sql = self.query.other_sql238 if ftoot_sql:239 self.query_sql = ftoot_sql240 self.query_sql_handle()241 ftoot_sql = self.query_sql242 self.query_sql = tmp_sql243 return ftoot_sql244 def get_query_sql(self):245 self.already_get_query_sql = True246 if self.query.order and not self.order_str: # å¦æ没ææåºè®¾ç½®é»è®¤æ¥è¯¢æåº247 self.set_order(self.query.order, self.query.order_type_str)248 return super(QueryCompiler, self).get_query_sql()249 def get_statistic(self):250 """è·åæéçç»è®¡251 """252 statistic_tags = re.findall(r'<<([^\s,]*)>>', self.query.sql)253 statistic_tags.extend(re.findall(r'<<([^\s,]*)>>', self.query.other_sql))254 result = []255 if statistic_tags: # å
å«<<ç»è®¡å>>æ ç¾256 statistic_names_d = DictDefine.get_dict_for_key('statistic_name', reverse=True)257 for s_name in set(statistic_tags):258 s_id = statistic_names_d.get(re.sub(r'\s', '', s_name)) # å»é¤ç©ºæ ¼çç¹æ®å符259 if s_id:260 result.append((s_id, s_name))261 return result262 def query_sql_handle(self):263 """ å¢å è·åç»è®¡id 使ç¨æ¹æ³ <<ç»è®¡å>>264 """265 record = super(QueryCompiler, self).query_sql_handle()266 if isinstance(self.query_sql, bytes):267 self.query_sql = self.query_sql.decode('utf8')268 statistic_tags = re.findall(r'<<([^\s,]*)>>', self.query_sql)269 from .statistic import Statistic270 if statistic_tags: # å
å«<<ç»è®¡å>>æ ç¾271 statistic_names_d = dict(Statistic.objects.values_list('name',272 'id')) # ,DictDefine.get_dict_for_key('statistic_name', reverse=True)273 for s_name in set(statistic_tags):274 s_id = statistic_names_d.get(re.sub(r'\s', '', s_name)) # å»é¤ç©ºæ ¼çç¹æ®å符275 if s_id:276 self.query_sql = self.query_sql.replace('<<%s>>' % s_name, '"%s"' % s_id)277 # éå¶ä½¿ç¨çç»è®¡278 if self.has_mark("statistic"):279 statistic_list = self.get_statistic()280 self.replace_mark_to_value("statistic", ",".join([s_id for s_id, _ in statistic_list]))281 return self.query_sql282 def has_mark(self, mark_name):283 return super(QueryCompiler, self).has_mark(mark_name)284 def has_conditions(self):285 for k, v in self.query.field_config.items():286 if v.get('search', ''):287 return True288 def get_conn(self, server_id=0):289 from .query_server import QueryServer290 return QueryServer.get_conn(server_id, 'read')291class Query(BaseModel):292 """æ¥è¯¢293 """294 log_key = models.CharField('å
³è表æ è¯', max_length=30, db_index=True, null=False)295 log_type = models.IntegerField(default=0)296 class OrderType(Enum):297 DESC = 1, _('ååº')298 AESC = 0, _('æ£åº')299 key = models.CharField('æ¥è¯¢æ è¯', default='', max_length=100, null=False, blank=True, db_index=True)300 name = models.CharField('æ¥è¯¢å称', default='', max_length=100, null=False, blank=False, unique=True)301 select = models.CharField(_('æ¥è¯¢å段'), max_length=1000, null=False, blank=True)302 where = models.CharField(max_length=500, null=False, blank=True)303 group = models.CharField('ç¨éåç»', max_length=50, null=False, blank=True, default='')304 order = models.CharField(max_length=20, null=False, blank=True, default='')305 order_type = models.IntegerField(default=OrderType.AESC, choices=OrderType.member_list(), null=False)306 sql = models.TextField(_('SQL'), default='', null=False, blank=True)307 other_sql = models.TextField(_('å
¶ä»SQL'), default='', null=False, blank=True)308 cache_validate = models.IntegerField(default=0, null=True)309 is_paging = models.BooleanField(_('æ¯å¦å页'), default=False, null=False)310 remark = models.CharField('å¤æ³¨', max_length=1000, blank=True)311 _field_config = models.TextField('æ¥è¯¢å段å®ä¹', default="", blank=True)312 template_name = models.CharField('模çå', max_length=32, blank=True)313 _DEFAULT_FIELD_CONFIG = {314 "æ è®°å": {"name" : "åæ°å", "dict": "", "sort": False, "order_num": 99, "multiple": False,315 "search": False, "merge_value": True}}316 __cache_config = None317 class Meta:318 app_label = AnalysisConfig.name319 ordering = ('id',)320 @CacheAttribute321 def log_def_list(self):322 return LogDefine.objects.using('read').all()323 def save(self, *args, **kwargs):324 super(Query, self).save(*args, **kwargs)325 def get_default_sql(self):326 """ é»è®¤SQL327 """328 log_def = self.log_def329 the_sql = """SELECT %s FROM %s WHERE log_time BETWEEN '{{sdate}}' AND '{{edate}}' """ % (330 self.select, log_def.table_name)331 if self.where:332 the_sql += ' AND %s' % self.where333 if self.order:334 the_sql += ' ORDER BY %s %s' % (self.order, self.order_type_str)335 for field_name, config in log_def.config.items():336 verbose_name = config['verbose_name']337 if verbose_name:338 the_sql = the_sql.replace(verbose_name, field_name)339 return the_sql340 @property341 def order_type_str(self):342 return 'DESC' if self.order_type == 1 else 'AESC'343 @CacheAttribute344 def log_def(self):345 return LogDefine.objects.filter(key=self.log_key).first()346 @property347 def is_center_query(self):348 """æ¯å¦ä¸å¤®æ¥è¯¢349 """350 try:351 return self.log_def.status == LogDefine.PositionType.CENTER352 except:353 return True354 @CacheAttribute355 def selects(self):356 return [f.strip() for f in self.select.split(',') if f]357 @property358 def field_config(self):359 """å段å®ä¹360 """361 try:362 s_d = OrderedDict()363 self.__cache_config = self.__cache_config or OrderedDict(364 sorted(json.loads(self._field_config).items(), key=lambda x: x[1]['order_num']))365 except:366 print(trace_msg())367 self.__cache_config = self._DEFAULT_FIELD_CONFIG368 return self.__cache_config369 @field_config.setter370 def field_config(self, obj_value):371 if isinstance(obj_value, dict):372 obj_value = json.dumps(obj_value, ensure_ascii=False)373 self._field_config = obj_value374 @classmethod375 def filter_sql(cls, sql):376 p = re.compile('(update|delete|modify|lock[\s]+|drop|table)', re.I)377 sql = p.sub('', sql)378 return sql379 @CacheAttribute380 def safe_sql(self):381 return self.__class__.filter_sql(self.sql)382 def __unicode__(self):383 return '%s' % self.name...
views.py
Source:views.py
1import random2from io import BytesIO3import logging4from django.contrib.auth import authenticate, login as auth_login, logout as auth_logout5from django.contrib.auth.decorators import login_required6from django.core import signing7from django.http import HttpResponse, Http404, JsonResponse8from django.shortcuts import render, redirect, get_object_or_4049from django.shortcuts import render_to_response10from django.core.urlresolvers import reverse_lazy, reverse11from django.template.response import TemplateResponse12from django.utils import timezone13from django.utils.decorators import method_decorator14from django.views.decorators.cache import never_cache15from django.views.decorators.http import last_modified16from django.views.generic import FormView, TemplateView, DetailView17from goods.models import Goods18from .models import User, RecipientsAddress19from .form import UserModelRegisterForm, UserLoginForm20from PIL import Image, ImageDraw, ImageFont, ImageFilter21def create_captcha_code(request):22 '''23 çæ 4个æ±åéªè¯ç ,éè¦ç¨æ·æ顺åºç¹å»å
¶ä¸ç 3个24 ä¿åå°ä¼è¯ä¸ 25 è¿åéªè¯ç å¾ç'''26 # éæºç4个æ±å:27 zh_words = ['ç', 'å½', 'ç', 'æ', '天', '好', 'å', '祥', 'å¦', 'æ']28 zh_words = random.sample(zh_words, 4)29 # ç»å¾30 image = Image.open(r'C:\Users\chaiming\Desktop\IMG_20170502_165107_344.jpg')31 # w, h = image.size32 # image.thumbnail((w // 2, h // 2))33 # éæºé¢è²34 def rnd_color():35 return (random.choice([0, 255]), 0, random.choice([0, 255]))36 # å建Font对象: 大å°ä¸ºé¿å®½ä¸å°çé£ä¸ªç8/137 width, height = image.size38 font_size = int(min(image.size)) // 839 font = ImageFont.truetype(r'F:\pycharm_projects\djcode\dailyfresh\user\fonts\MSYH.TTF', font_size)40 # å建Draw对象:41 draw = ImageDraw.Draw(image)42 # çæéæºåæ 43 x1 = random.randint(0, width - font_size)44 y1 = random.randint(0 + font_size, height - font_size)45 # 循ç¯3次ï¼æå©ä¸ç符åçåæ æå46 points_list = [(x1, y1)]47 for i in range(3):48 is_not_pass = True49 while is_not_pass:50 x2, y2 = random.randint(0, width - font_size), random.randint(0 + font_size, height - font_size)51 for x, y in points_list:52 if (x2 > x + font_size) or (x2 + font_size < x) or (y2 > y + font_size) or (y2 + font_size < y):53 pass54 else:55 is_not_pass = True56 break57 else:58 is_not_pass = False59 points_list.append((x2, y2))60 # è¾åºæå:61 for t in range(4):62 draw.text(points_list[t], zh_words[t], font=font,63 fill=rnd_color())64 z_p = zip(zh_words, points_list) # æåååæ ç»å65 sample = random.sample([i for i in z_p], 3)66 # 设置session å¨å端éªè¯67 captcha_dict = {68 'a1': sample[0][0], 'x1': sample[0][1][0], 'y1': sample[0][1][1],69 'a2': sample[1][0], 'x2': sample[1][1][0], 'y2': sample[1][1][1],70 'a3': sample[2][0], 'x3': sample[2][1][0], 'y3': sample[2][1][1],71 'size': font_size72 }73 # 模ç³:74 # image = image.filter(ImageFilter.BLUR)75 # del draw76 # draw = ImageDraw.Draw(image)77 draw.text(78 (0, 0),79 '"{}", "{}", "{}"'.format(captcha_dict['a1'], captcha_dict['a2'], captcha_dict['a3']),80 font=font,81 fill=(0, 0, 0)82 )83 buff = BytesIO()84 image.save(buff, 'jpeg')85 response = HttpResponse(buff.getvalue(), 'image/jpeg')86 request.session['captcha'] = captcha_dict87 return response88class UserCreate(FormView):89 form_class = UserModelRegisterForm90 template_name = 'user/register.html'91 success_url = reverse_lazy('user:user_center_info') # 跳转å°ç¨æ·ä¸å¿92 @method_decorator(never_cache)93 def dispatch(self, *args, **kwargs):94 return super(UserCreate, self).dispatch(*args, **kwargs)95 def form_valid(self, form):96 # 对éªè¯ç è¿è¡éªè¯97 code = self.request.session.get('captcha')98 if code:99 ret = form.valid_captcha(code=code)100 else:101 logger = logging.getLogger('captcha')102 logger.warning('IP: {}, 主æº: {}, æå¡å¨è®¤è¯åçç¨æ·: {} 客æ·ç«¯ç¯¡æ¹Cookies'.format(103 self.request.META.get('REMOTE_ADDR'),104 self.request.META.get('REMOTE_HOST'),105 self.request.META.get('REMOTE_USER'),106 ))107 raise Http404108 if ret is False:109 return render(self.request, self.template_name, {'form': form})110 User.objects.create_user(111 form.cleaned_data.get('username'),112 form.cleaned_data.get('email'),113 form.cleaned_data.get('password'),114 )115 return super().form_valid(form) # éªè¯æååè°ç¨, é»è®¤è¿åæåçéå®å116def username_is_repeat(request):117 '''ajaxéªè¯ååç¨æ·åè½'''118 if User.objects.filter(username=request.GET.get('username')).exists():119 return JsonResponse({'is_repeat': 1})120 return JsonResponse({'is_repeat': 0})121class UserLogin(FormView):122 form_class = UserLoginForm123 template_name = 'user/login.html'124 success_url = reverse_lazy('user:user_center_info')125 def get_context_data(self, **kwargs):126 context = super().get_context_data(**kwargs)127 context['mark_name'] = self.request.get_signed_cookie('mark_name', '')128 return context129 def form_valid(self, form):130 user = authenticate(username=form.cleaned_data.get('username'), password=form.cleaned_data.get('password'))131 if user is not None:132 if user.is_active:133 # ç¨æ·ç»é134 auth_login(self.request, user)135 # éå®å136 return super().form_valid(form)137 else:138 # ç¨æ·éªè¯ä¸æå139 form.add_error(None, 'ç¨æ·åæå¯ç ä¸æ£ç¡®')140 context = {}141 response = TemplateResponse(self.request, self.template_name, context)142 # print(response.context_data)143 # å¦æå¾éäºè®°ä½ç¨æ·åï¼åä¿åå°Cookiesä¸, key=mark_name, å¦ææªå¾éï¼è®¾ç½®mark_name 为空144 username = self.request.POST.get('username')145 if self.request.POST.get('mark_name') == '1':146 response.set_signed_cookie('mark_name', username)147 else:148 response.delete_cookie('mark_name')149 username = ''150 context = {151 'form': form,152 'mark_name': username153 }154 response.context_data = context155 return response156 # ç´æ¥è°ç¨ self.form_invalid()157 # return self.form_invalid(form)158 def form_invalid(self, form):159 context = {}160 response = TemplateResponse(self.request, self.template_name, context)161 # å¦æå¾éäºè®°ä½ç¨æ·åï¼åä¿åå°Cookiesä¸, key=mark_name162 username = self.request.POST.get('username')163 if self.request.POST.get('mark_name') == '1':164 response.set_signed_cookie('mark_name', username)165 else:166 response.delete_cookie('mark_name')167 username = ''168 context = {169 'form': form,170 'mark_name': username171 }172 response.context_data = context173 return response174def login_datetime(request):175 return timezone.datetime(2018, 5, 26, 16, 36) # ç»é页é¢æåä¿®æ¹æ¶é´176@last_modified(login_datetime)177@never_cache178def login(request):179 template_name = 'user/login.html'180 success_url = reverse_lazy('user:user_center_info') # 跳转å°ç¨æ·ä¸å¿181 if request.method == 'GET':182 return render(request, template_name, {'mark_name': request.get_signed_cookie('mark_name', '')})183 else:184 username = request.POST.get('username')185 form = UserLoginForm(request.POST)186 if form.is_valid():187 user = authenticate(username=form.cleaned_data['username'], password=form.cleaned_data['password'])188 if user is not None:189 if user.is_active:190 # ç»é191 auth_login(request, user)192 # éå®å, è·åè£
饰å¨login_required çnextåæ°ï¼å®ç°è·³è½¬å°ç»éä¹åçè·¯å¾193 success_url = request.GET.get('next', success_url)194 response = redirect(success_url)195 # è®°ä½ç¨æ·ååè½196 if request.POST.get('mark_name') == '1':197 response.set_signed_cookie('mark_name', username)198 else:199 response.delete_cookie('mark_name')200 return response201 else:202 form.add_error(None, 'ç¨æ·åæå¯ç ä¸æ£ç¡®')203 # 页é¢è®°ä½ç¨æ·ååè½ï¼æç¨æ·ååå¨cookieé204 response = TemplateResponse(request, template_name, {})205 if request.POST.get('mark_name') == '1':206 response.set_signed_cookie('mark_name', username)207 else:208 response.delete_cookie('mark_name')209 username = ''210 response.context_data = {'form': form, 'mark_name': username}211 return response212'''213头é¨Last-Modified æ ¼å¼å214Expires: Thu, 10 May 2018 10:17:09 GMT215response['Last-Modified'] = timezone.datetime(2018, 5, 8).strftime('%a, %d %b %Y %H:%M:%S GMT')216'''217def logout_view(request):218 '''ç»åºè§å¾'''219 auth_logout(request)220 # Redirect to a success page.221 next_path = request.GET.get('next')222 if next_path:223 return redirect('%s?next=%s' % (reverse('user:login'), next_path))224 else:225 return redirect(reverse('user:login'))226class UserDetail(TemplateView):227 template_name = 'user/user_center_info.html'228 @method_decorator(login_required)229 def dispatch(self, *args, **kwargs):230 return super().dispatch(*args, **kwargs)231 def get_context_data(self, **kwargs):232 context = super().get_context_data(**kwargs)233 signing_str = self.request.COOKIES.get('gid_list')234 if signing_str is not None:235 try:236 gid_list = signing.loads(signing_str)237 except signing.BadSignature as e:238 logger = logging.getLogger('django.cookies')239 logger.warning('cookiesä¸è·ågidé误ï¼{}, USER[{}], REMOTE_ADDR[{}]'.format(240 e,241 self.request.user,242 self.request.META.get('REMOTE_ADDR'),243 ))244 gid_list = []245 else:246 gid_list = []247 recently_viewed = []248 if gid_list is not []:249 for gid in gid_list:250 try:251 goods = Goods.objects.get(pk=gid)252 except Goods.DoesNotExist:253 goods = None254 if goods is not None:255 recently_viewed.append(goods)256 context['recently_viewed'] = recently_viewed257 return context258 # def get(self, request, *args, **kwargs):259 # self.cookies = request.COOKIES260 # return super().get(request, *args, **kwargs)261@login_required()262def edit_recipients_address(request):263 if request.method == 'POST':264 recipients_address = RecipientsAddress(265 user=request.user,266 recipient=request.POST.get('recipient', ''),267 shipping_address=request.POST.get('shipping_address', ''),268 postcode=request.POST.get('postcode', ''),269 mobile_number=request.POST.get('mobile_number', ''),270 )271 recipients_address.save()272 return render(request, 'user/user_center_site.html')273# å é¤ç¨æ·å
³èå°å274@login_required()275def delete_address(request, uid, recid):276 uid = int(uid)277 recid = int(recid)278 address = get_object_or_404(RecipientsAddress, pk=recid)279 if address.user_id != uid:280 logger = logging.getLogger('myproject.debug')281 logger.debug('ç¨æ·idåæè·åæ°uidä¸ä¸è´ï¼')282 logger.debug('{}[{}] != {}[{}]'.format(type(address.user_id), address.user_id, type(uid), uid))283 raise Http404284 address.delete()...
marked.py
Source:marked.py
1from typing import List, Set, Union, Callable, NamedTuple2from copy import deepcopy3from collections import namedtuple4from beancount.core.data import Transaction, Posting, new_metadata, Entries5import beancount_plugin_utils.metaset as metaset6from beancount_plugin_utils.BeancountError import BeancountError, entry_error_handler7MARK_SEPERATOR = "-"8PluginUtilsMarkedError = namedtuple("PluginUtilsMarkedError", "source message entry")9def normalize_transaction(10 tx: Transaction,11 mark_name: str,12 account_types: Union[Set[str], bool] = False,13):14 """15 Move marks in tags with name `mark_name` into meta, if any, and merge with marks in meta in a way of metaset.16 Then, if `account_types` are provided, move marks into postings with the given account types or error if there's none such posting.17 Note: in beancount `meta` is mandatory on transactions, but optional on postings.18 Example:19 try:20 tx, is_marked = marked.normalize_transaction(config.mark_name, entry, ("Income", "Expenses"))21 except BeancountError as e:22 new_entries.append(entry)23 errors.append(e.to_named_tuple())24 continue25 for posting, orig_posting in zip(tx, orig_tx):26 marks = metaset.get(posting)27 # Do your thing.28 Args:29 txs [Transaction]: transaction instances.30 mark_name [str]: the mark name.31 account_types [Set[str], False]: set of account types that must be considered, defaults to False.32 Returns:33 new Transaction instance with normalized marks.34 boolean of whenever mark was used in this transaction.35 Raises:36 BeancountError.37 """38 copy = deepcopy(tx)39 for tag in copy.tags:40 if tag == mark_name or tag[0 : len(mark_name + MARK_SEPERATOR)] == mark_name + MARK_SEPERATOR:41 copy = copy._replace(42 tags=copy.tags.difference([tag]),43 meta=metaset.add(copy.meta, mark_name, tag[len(mark_name + MARK_SEPERATOR) :] or ""),44 )45 is_used = False46 if metaset.has(copy.meta, mark_name):47 is_used = True48 for posting in copy.postings:49 if posting.meta == None:50 continue51 if not metaset.has(posting.meta, mark_name):52 continue53 is_used = True54 if not account_types:55 raise BeancountError(56 posting.meta,57 'Mark "{}" can be only applied to transactions, not postings: "{}".'.format(mark_name, posting.account),58 tx,59 PluginUtilsMarkedError,60 )61 if not (posting.account.split(":")[0] in account_types):62 raise BeancountError(63 posting.meta,64 'Mark "{}" can be only applied to posting with account types of: {}'.format(mark_name, account_types),65 tx,66 PluginUtilsMarkedError,67 )68 if not account_types:69 return copy, is_used70 if not is_used:71 return copy, False72 is_applied = False73 postings = []74 default_marks = metaset.get(copy.meta, mark_name)75 copy = copy._replace(meta=metaset.clear(copy.meta, mark_name))76 for posting in copy.postings:77 marks = metaset.get(posting.meta, mark_name)78 if len(marks) > 0:79 postings.append(posting)80 is_applied = True81 elif len(default_marks) > 0 and (posting.account.split(":")[0] in account_types):82 postings.append(posting._replace(meta=metaset.set(posting.meta, mark_name, default_marks)))83 is_applied = True84 else:85 postings.append(posting)86 if not is_applied:87 raise BeancountError(88 tx.meta,89 'Mark "{}" on a transaction has no effect because transaction does not have postings with account types of: {}'.format(90 mark_name, account_types91 ),92 tx,93 PluginUtilsMarkedError,94 )95 copy = copy._replace(postings=postings)96 return copy, True97def on_marked_transactions(98 per_marked_transaction: Callable[[Transaction, Transaction, NamedTuple], List[Transaction]],99 entries: Entries,100 config: NamedTuple,101 mark_name: str,102 account_types: Union[Set[str], bool] = False,103 error_named_tuple: NamedTuple = BeancountError,104):105 new_entries: Entries = []106 errors: List[NamedTuple] = []107 for entry in entries:108 with entry_error_handler(entry, new_entries, errors, error_named_tuple):109 if not isinstance(entry, Transaction) or entry.flag == "P": # Ignore txs generated by padding too.110 new_entries.append(entry)111 continue112 tx, is_marked = normalize_transaction(entry, mark_name, account_types)113 if is_marked:114 new_txs = per_marked_transaction(tx, entry, config)115 new_entries.extend(new_txs)116 else:117 new_entries.append(entry)...
common.py
Source:common.py
1from typing import List, Set, Union, Tuple2from copy import deepcopy3from beancount.core.data import Transaction, Posting4from beancount.core.inventory import Inventory5import beancount_plugin_utils.metaset as metaset6def read_config(config_string):7 """8 Args:9 config_string: A configuration string in JSON format given in source file.10 Returns:11 A dict of the configuration string.12 """13 if len(config_string) == 0:14 config_obj = {}15 else:16 config_obj = eval(config_string, {}, {})17 if not isinstance(config_obj, dict):18 raise RuntimeError("Invalid plugin configuration: should be a single dict.")19 return config_obj20MARK_SEPERATOR = "-"21def normalize_marked_txn(tx: Transaction, mark_name: str):22 """23 If a transaction is marked, hoist marked tags into transation meta(s).24 Args:25 txs [Transaction]: transaction instances.26 mark_name [str]: the mark.27 Return:28 Transaction with normalized marks.29 """30 copy = deepcopy(tx)31 for tag in copy.tags:32 if (33 tag == mark_name34 or tag[0 : len(mark_name + MARK_SEPERATOR)] == mark_name + MARK_SEPERATOR35 ):36 copy = copy._replace(37 tags=copy.tags.difference([tag]),38 meta=metaset.add(39 copy.meta, mark_name, tag[len(mark_name + MARK_SEPERATOR) :] or ""40 ),41 )42 return copy43DEFAULT_APPLICABLE_ACCOUNT_TYPES = set(44 ["Income", "Expenses", "Assets", "Liabilities", "Equity"]45)46def marked_postings(47 tx: Transaction,48 mark_name: str,49 applicable_account_types: Set[str] = DEFAULT_APPLICABLE_ACCOUNT_TYPES,50 allow_posting_level_mark: bool = True,51):52 """53 Iterates over postings of the transaction, returning most specific mark value for applicable account types.54 Args:55 tx [Transaction]: transaction instance.56 mark_name [str]: the mark.57 applicable_account_types [Set[str]]: set of account types that must be considered, defaults to all five.58 allow_posting_level_mark [bool]: set to False if posting-level marks should raise error instead.59 Yields:60 list of mark values or None.61 posting.62 original posting.63 original transaction.64 """65 copy = deepcopy(tx)66 default_marks = metaset.get(tx.meta, mark_name)67 copy = copy._replace(meta=metaset.clear(tx.meta, mark_name))68 for _posting in copy.postings:69 marks = metaset.get(_posting.meta, mark_name)70 posting = _posting._replace(meta=metaset.clear(_posting.meta, mark_name))71 if len(marks) > 0:72 yield marks, posting, _posting, copy73 elif len(default_marks) > 0:74 if posting.account.split(":")[0] not in applicable_account_types:75 yield None, posting, _posting, copy76 else:77 yield default_marks, posting, _posting, copy78 else:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!