Best Python code snippet using autotest_python
where.py
Source:where.py
...17 the correct SQL).18 The children in this tree are usually either Q-like objects or lists of19 [table_alias, field_name, db_type, lookup_type, value_annotation,20 params]. However, a child could also be any class with as_sql() and21 relabel_aliases() methods.22 """23 default = AND24 def add(self, data, connector):25 """26 Add a node to the where-tree. If the data is a list or tuple, it is27 expected to be of the form (alias, col_name, field_obj, lookup_type,28 value), which is then slightly munged before being stored (to avoid29 storing any reference to field objects). Otherwise, the 'data' is30 stored unchanged and can be anything with an 'as_sql()' method.31 """32 # Because of circular imports, we need to import this here.33 from django.db.models.base import ObjectDoesNotExist34 if not isinstance(data, (list, tuple)):35 super(WhereNode, self).add(data, connector)36 return37 alias, col, field, lookup_type, value = data38 try:39 if field:40 params = field.get_db_prep_lookup(lookup_type, value)41 db_type = field.db_type()42 else:43 # This is possible when we add a comparison to NULL sometimes44 # (we don't really need to waste time looking up the associated45 # field object).46 params = Field().get_db_prep_lookup(lookup_type, value)47 db_type = None48 except ObjectDoesNotExist:49 # This can happen when trying to insert a reference to a null pk.50 # We break out of the normal path and indicate there's nothing to51 # match.52 super(WhereNode, self).add(NothingNode(), connector)53 return54 if isinstance(value, datetime.datetime):55 annotation = datetime.datetime56 else:57 annotation = bool(value)58 super(WhereNode, self).add((alias, col, db_type, lookup_type,59 annotation, params), connector)60 def as_sql(self, qn=None):61 """62 Returns the SQL version of the where clause and the value to be63 substituted in. Returns None, None if this node is empty.64 If 'node' is provided, that is the root of the SQL generation65 (generally not needed except by the internal implementation for66 recursion).67 """68 if not qn:69 qn = connection.ops.quote_name70 if not self.children:71 return None, []72 result = []73 result_params = []74 empty = True75 for child in self.children:76 try:77 if hasattr(child, 'as_sql'):78 sql, params = child.as_sql(qn=qn)79 else:80 # A leaf node in the tree.81 sql, params = self.make_atom(child, qn)82 except EmptyResultSet:83 if self.connector == AND and not self.negated:84 # We can bail out early in this particular case (only).85 raise86 elif self.negated:87 empty = False88 continue89 except FullResultSet:90 if self.connector == OR:91 if self.negated:92 empty = True93 break94 # We match everything. No need for any constraints.95 return '', []96 if self.negated:97 empty = True98 continue99 empty = False100 if sql:101 result.append(sql)102 result_params.extend(params)103 if empty:104 raise EmptyResultSet105 conn = ' %s ' % self.connector106 sql_string = conn.join(result)107 if sql_string:108 if self.negated:109 sql_string = 'NOT (%s)' % sql_string110 elif len(self.children) != 1:111 sql_string = '(%s)' % sql_string112 return sql_string, result_params113 def make_atom(self, child, qn):114 """115 Turn a tuple (table_alias, column_name, db_type, lookup_type,116 value_annot, params) into valid SQL.117 Returns the string for the SQL fragment and the parameters to use for118 it.119 """120 table_alias, name, db_type, lookup_type, value_annot, params = child121 if table_alias:122 lhs = '%s.%s' % (qn(table_alias), qn(name))123 else:124 lhs = qn(name)125 field_sql = connection.ops.field_cast_sql(db_type) % lhs126 if value_annot is datetime.datetime:127 cast_sql = connection.ops.datetime_cast_sql()128 else:129 cast_sql = '%s'130 if isinstance(params, QueryWrapper):131 extra, params = params.data132 else:133 extra = ''134 if lookup_type in connection.operators:135 format = "%s %%s %s" % (connection.ops.lookup_cast(lookup_type),136 extra)137 return (format % (field_sql,138 connection.operators[lookup_type] % cast_sql), params)139 if lookup_type == 'in':140 if not value_annot:141 raise EmptyResultSet142 if extra:143 return ('%s IN %s' % (field_sql, extra), params)144 return ('%s IN (%s)' % (field_sql, ', '.join(['%s'] * len(params))),145 params)146 elif lookup_type in ('range', 'year'):147 return ('%s BETWEEN %%s and %%s' % field_sql, params)148 elif lookup_type in ('month', 'day'):149 return ('%s = %%s' % connection.ops.date_extract_sql(lookup_type,150 field_sql), params)151 elif lookup_type == 'isnull':152 return ('%s IS %sNULL' % (field_sql,153 (not value_annot and 'NOT ' or '')), ())154 elif lookup_type == 'search':155 return (connection.ops.fulltext_search_sql(field_sql), params)156 elif lookup_type in ('regex', 'iregex'):157 return connection.ops.regex_lookup(lookup_type) % (field_sql, cast_sql), params158 raise TypeError('Invalid lookup_type: %r' % lookup_type)159 def relabel_aliases(self, change_map, node=None):160 """161 Relabels the alias values of any children. 'change_map' is a dictionary162 mapping old (current) alias values to the new values.163 """164 if not node:165 node = self166 for pos, child in enumerate(node.children):167 if hasattr(child, 'relabel_aliases'):168 child.relabel_aliases(change_map)169 elif isinstance(child, tree.Node):170 self.relabel_aliases(change_map, child)171 else:172 if child[0] in change_map:173 node.children[pos] = (change_map[child[0]],) + child[1:]174class EverythingNode(object):175 """176 A node that matches everything.177 """178 def as_sql(self, qn=None):179 raise FullResultSet180 def relabel_aliases(self, change_map, node=None):181 return182class NothingNode(object):183 """184 A node that matches nothing.185 """186 def as_sql(self, qn=None):187 raise EmptyResultSet188 def relabel_aliases(self, change_map, node=None):...
custom-expression-as-sql.py
Source:custom-expression-as-sql.py
...8 conditional_template = '%(function)s(CASE WHEN %(condition)s THEN %(field)s ELSE null END)'9 def __init__(self, col, source=None, is_summary=False, condition=None, **extra):10 super(SqlAggregate, self).__init__(col, source, is_summary, **extra)11 self.condition = condition12 def relabel_aliases(self, change_map):13 if VERSION < (1, 7):14 super(SqlAggregate, self).relabel_aliases(change_map)15 if self.has_condition:16 condition_change_map = dict((k, v) for k, v in \17 change_map.items() if k in self.condition.query.alias_map18 )19 self.condition.query.change_aliases(condition_change_map)20 def relabeled_clone(self, change_map):21 self.relabel_aliases(change_map)22 return super(SqlAggregate, self).relabeled_clone(change_map)23 def as_sql(self, qn, connection):24 if self.has_condition:25 self.sql_template = self.conditional_template26 self.extra['condition'] = self._condition_as_sql(qn, connection)27 # ruleid: custom-expression-as-sql28 return super(SqlAggregate, self).as_sql(qn, connection)29 @property30 def has_condition(self):31 # Warning: bool(QuerySet) will hit the database32 return self.condition is not None33 def _condition_as_sql(self, qn, connection):34 '''35 Return sql for condition....
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!