Best Python code snippet using autotest_python
models.py
Source:models.py
...5 _GROUP_COUNT_NAME = 'group_count'6 def _get_key_unless_is_function(self, field):7 if '(' in field:8 return field9 return self.get_key_on_this_table(field)10 def _get_field_names(self, fields, extra_select_fields={}):11 field_names = []12 for field in fields:13 if field in extra_select_fields:14 field_names.append(field)15 else:16 field_names.append(self._get_key_unless_is_function(field))17 return field_names18 def _get_group_query_sql(self, query, group_by, extra_select_fields):19 group_fields = self._get_field_names(group_by, extra_select_fields)20 select_fields = [field for field in group_fields21 if field not in extra_select_fields]22 for field_name, field_sql in extra_select_fields.iteritems():23 field_sql = self._get_key_unless_is_function(field_sql)24 select_fields.append(field_sql + ' AS ' + field_name)25 # add the extra fields to the query selects, so they'll be sortable26 # and Django won't mess with any of them27 query._select[field_name] = field_sql28 _, where, params = query._get_sql_clause()29 # insert GROUP BY clause into query30 group_by_clause = ' GROUP BY ' + ', '.join(group_fields)31 group_by_position = where.rfind('ORDER BY')32 if group_by_position == -1:33 group_by_position = len(where)34 where = (where[:group_by_position] +35 group_by_clause + ' ' +36 where[group_by_position:])37 return ('SELECT ' + ', '.join(select_fields) + where), params38 def _get_column_names(self, cursor):39 """\40 Gets the column names from the cursor description. This method exists41 so that it can be mocked in the unit test for sqlite3 compatibility."42 """43 return [column_info[0] for column_info in cursor.description]44 def execute_group_query(self, query, group_by, extra_select_fields=[]):45 """46 Performs the given query grouped by the fields in group_by with the47 given extra select fields added. extra_select_fields should be a dict48 mapping field alias to field SQL. Usually, the extra fields will use49 group aggregation functions. Returns a list of dicts, where each dict50 corresponds to single row and contains a key for each grouped field as51 well as all of the extra select fields.52 """53 sql, params = self._get_group_query_sql(query, group_by,54 extra_select_fields)55 cursor = readonly_connection.connection().cursor()56 cursor.execute(sql, params)57 field_names = self._get_column_names(cursor)58 row_dicts = [dict(zip(field_names, row)) for row in cursor.fetchall()]59 return row_dicts60 def get_count_sql(self, query):61 """62 Get the SQL to properly select a per-group count of unique matches for63 a grouped query. Returns a tuple (field alias, field SQL)64 """65 if query._distinct:66 pk_field = self.get_key_on_this_table()67 count_sql = 'COUNT(DISTINCT %s)' % pk_field68 else:69 count_sql = 'COUNT(1)'70 return self._GROUP_COUNT_NAME, count_sql71 def _get_num_groups_sql(self, query, group_by):72 group_fields = self._get_field_names(group_by)73 query._order_by = None # this can mess up the query and isn't needed74 _, where, params = query._get_sql_clause()75 return ('SELECT COUNT(DISTINCT %s) %s' % (','.join(group_fields),76 where),77 params)78 def get_num_groups(self, query, group_by):79 """80 Returns the number of distinct groups for the given query grouped by the81 fields in group_by.82 """83 sql, params = self._get_num_groups_sql(query, group_by)84 cursor = readonly_connection.connection().cursor()85 cursor.execute(sql, params)86 return cursor.fetchone()[0]87class Machine(dbmodels.Model):88 machine_idx = dbmodels.AutoField(primary_key=True)89 hostname = dbmodels.CharField(unique=True, maxlength=300)90 machine_group = dbmodels.CharField(blank=True, maxlength=240)91 owner = dbmodels.CharField(blank=True, maxlength=240)92 class Meta:93 db_table = 'machines'94class Kernel(dbmodels.Model):95 kernel_idx = dbmodels.AutoField(primary_key=True)96 kernel_hash = dbmodels.CharField(maxlength=105, editable=False)97 base = dbmodels.CharField(maxlength=90)98 printable = dbmodels.CharField(maxlength=300)99 class Meta:100 db_table = 'kernels'101class Patch(dbmodels.Model):102 kernel = dbmodels.ForeignKey(Kernel, db_column='kernel_idx')103 name = dbmodels.CharField(blank=True, maxlength=240)104 url = dbmodels.CharField(blank=True, maxlength=900)105 hash_ = dbmodels.CharField(blank=True, maxlength=105, db_column='hash')106 class Meta:107 db_table = 'patches'108class Status(dbmodels.Model):109 status_idx = dbmodels.AutoField(primary_key=True)110 word = dbmodels.CharField(maxlength=30)111 class Meta:112 db_table = 'status'113class Job(dbmodels.Model):114 job_idx = dbmodels.AutoField(primary_key=True)115 tag = dbmodels.CharField(unique=True, maxlength=300)116 label = dbmodels.CharField(maxlength=300)117 username = dbmodels.CharField(maxlength=240)118 machine = dbmodels.ForeignKey(Machine, db_column='machine_idx')119 queued_time = dbmodels.DateTimeField(null=True, blank=True)120 started_time = dbmodels.DateTimeField(null=True, blank=True)121 finished_time = dbmodels.DateTimeField(null=True, blank=True)122 class Meta:123 db_table = 'jobs'124class Test(dbmodels.Model, model_logic.ModelExtensions,125 model_logic.ModelWithAttributes):126 test_idx = dbmodels.AutoField(primary_key=True)127 job = dbmodels.ForeignKey(Job, db_column='job_idx')128 test = dbmodels.CharField(maxlength=90)129 subdir = dbmodels.CharField(blank=True, maxlength=180)130 kernel = dbmodels.ForeignKey(Kernel, db_column='kernel_idx')131 status = dbmodels.ForeignKey(Status, db_column='status')132 reason = dbmodels.CharField(blank=True, maxlength=3072)133 machine = dbmodels.ForeignKey(Machine, db_column='machine_idx')134 finished_time = dbmodels.DateTimeField(null=True, blank=True)135 started_time = dbmodels.DateTimeField(null=True, blank=True)136 objects = model_logic.ExtendedManager()137 def _get_attribute_model_and_args(self, attribute):138 return TestAttribute, dict(test=self, attribute=attribute,139 user_created=True)140 def set_attribute(self, attribute, value):141 # ensure non-user-created attributes remain immutable142 try:143 TestAttribute.objects.get(test=self, attribute=attribute,144 user_created=False)145 raise ValueError('Attribute %s already exists for test %s and is '146 'immutable' % (attribute, self.test_idx))147 except TestAttribute.DoesNotExist:148 super(Test, self).set_attribute(attribute, value)149 class Meta:150 db_table = 'tests'151class TestAttribute(dbmodels.Model, model_logic.ModelExtensions):152 test = dbmodels.ForeignKey(Test, db_column='test_idx')153 attribute = dbmodels.CharField(maxlength=90)154 value = dbmodels.CharField(blank=True, maxlength=300)155 user_created = dbmodels.BooleanField(default=False)156 objects = model_logic.ExtendedManager()157 class Meta:158 db_table = 'test_attributes'159class IterationAttribute(dbmodels.Model, model_logic.ModelExtensions):160 # this isn't really a primary key, but it's necessary to appease Django161 # and is harmless as long as we're careful162 test = dbmodels.ForeignKey(Test, db_column='test_idx', primary_key=True)163 iteration = dbmodels.IntegerField()164 attribute = dbmodels.CharField(maxlength=90)165 value = dbmodels.CharField(blank=True, maxlength=300)166 objects = model_logic.ExtendedManager()167 class Meta:168 db_table = 'iteration_attributes'169class IterationResult(dbmodels.Model, model_logic.ModelExtensions):170 # see comment on IterationAttribute regarding primary_key=True171 test = dbmodels.ForeignKey(Test, db_column='test_idx', primary_key=True)172 iteration = dbmodels.IntegerField()173 attribute = dbmodels.CharField(maxlength=90)174 value = dbmodels.FloatField(null=True, max_digits=12, decimal_places=31,175 blank=True)176 objects = model_logic.ExtendedManager()177 class Meta:178 db_table = 'iteration_result'179class TestLabel(dbmodels.Model, model_logic.ModelExtensions):180 name = dbmodels.CharField(maxlength=80, unique=True)181 description = dbmodels.TextField(blank=True)182 tests = dbmodels.ManyToManyField(Test, blank=True,183 filter_interface=dbmodels.HORIZONTAL)184 name_field = 'name'185 objects = model_logic.ExtendedManager()186 class Meta:187 db_table = 'test_labels'188class SavedQuery(dbmodels.Model, model_logic.ModelExtensions):189 # TODO: change this to foreign key once DBs are merged190 owner = dbmodels.CharField(maxlength=80)191 name = dbmodels.CharField(maxlength=100)192 url_token = dbmodels.TextField()193 class Meta:194 db_table = 'saved_queries'195class EmbeddedGraphingQuery(dbmodels.Model, model_logic.ModelExtensions):196 url_token = dbmodels.TextField(null=False, blank=False)197 graph_type = dbmodels.CharField(maxlength=16, null=False, blank=False)198 params = dbmodels.TextField(null=False, blank=False)199 last_updated = dbmodels.DateTimeField(null=False, blank=False,200 editable=False)201 # refresh_time shows the time at which a thread is updating the cached202 # image, or NULL if no one is updating the image. This is used so that only203 # one thread is updating the cached image at a time (see204 # graphing_utils.handle_plot_request)205 refresh_time = dbmodels.DateTimeField(editable=False)206 cached_png = dbmodels.TextField(editable=False)207 class Meta:208 db_table = 'embedded_graphing_queries'209# views210class TestViewManager(TempManager):211 def get_query_set(self):212 query = super(TestViewManager, self).get_query_set()213 # add extra fields to selects, using the SQL itself as the "alias"214 extra_select = dict((sql, sql)215 for sql in self.model.extra_fields.iterkeys())216 return query.extra(select=extra_select)217 def _get_include_exclude_suffix(self, exclude):218 if exclude:219 suffix = '_exclude'220 else:221 suffix = '_include'222 return suffix223 def _add_label_joins(self, query_set, suffix=''):224 query_set = self.add_join(query_set, 'test_labels_tests',225 join_key='test_id', suffix=suffix,226 force_left_join=True)227 second_join_alias = 'test_labels' + suffix228 second_join_condition = ('%s.id = %s.testlabel_id' %229 (second_join_alias,230 'test_labels_tests' + suffix))231 filter_object = self._CustomSqlQ()232 filter_object.add_join('test_labels',233 second_join_condition,234 'LEFT JOIN',235 alias=second_join_alias)236 return query_set.filter(filter_object)237 def _add_attribute_join(self, query_set, join_condition='', suffix=None,238 exclude=False):239 join_condition = self.escape_user_sql(join_condition)240 if suffix is None:241 suffix = self._get_include_exclude_suffix(exclude)242 return self.add_join(query_set, 'test_attributes',243 join_key='test_idx',244 join_condition=join_condition,245 suffix=suffix, exclude=exclude)246 def _get_label_ids_from_names(self, label_names):247 if not label_names:248 return []249 query = TestLabel.objects.filter(name__in=label_names).values('id')250 return [str(label['id']) for label in query]251 def get_query_set_with_joins(self, filter_data, include_host_labels=False):252 include_labels = filter_data.pop('include_labels', [])253 exclude_labels = filter_data.pop('exclude_labels', [])254 query_set = self.get_query_set()255 joined = False256 # TODO: make this check more thorough if necessary257 extra_where = filter_data.get('extra_where', '')258 if 'test_labels' in extra_where:259 query_set = self._add_label_joins(query_set)260 joined = True261 include_label_ids = self._get_label_ids_from_names(include_labels)262 if include_label_ids:263 # TODO: Factor this out like what's done with attributes264 condition = ('test_labels_tests_include.testlabel_id IN (%s)' %265 ','.join(include_label_ids))266 query_set = self.add_join(query_set, 'test_labels_tests',267 join_key='test_id',268 suffix='_include',269 join_condition=condition)270 joined = True271 exclude_label_ids = self._get_label_ids_from_names(exclude_labels)272 if exclude_label_ids:273 condition = ('test_labels_tests_exclude.testlabel_id IN (%s)' %274 ','.join(exclude_label_ids))275 query_set = self.add_join(query_set, 'test_labels_tests',276 join_key='test_id',277 suffix='_exclude',278 join_condition=condition,279 exclude=True)280 joined = True281 include_attributes_where = filter_data.pop('include_attributes_where',282 '')283 exclude_attributes_where = filter_data.pop('exclude_attributes_where',284 '')285 if include_attributes_where:286 query_set = self._add_attribute_join(287 query_set, join_condition=include_attributes_where)288 joined = True289 if exclude_attributes_where:290 query_set = self._add_attribute_join(291 query_set, join_condition=exclude_attributes_where,292 exclude=True)293 joined = True294 if not joined:295 filter_data['no_distinct'] = True296 if include_host_labels or 'test_attributes_host_labels' in extra_where:297 query_set = self._add_attribute_join(298 query_set, suffix='_host_labels',299 join_condition='test_attributes_host_labels.attribute = '300 '"host-labels"')301 return query_set302 def query_test_ids(self, filter_data):303 dicts = self.model.query_objects(filter_data).values('test_idx')304 return [item['test_idx'] for item in dicts]305 def query_test_label_ids(self, filter_data):306 query_set = self.model.query_objects(filter_data)307 query_set = self._add_label_joins(query_set, suffix='_list')308 rows = self._custom_select_query(query_set, ['test_labels_list.id'])309 return [row[0] for row in rows if row[0] is not None]310 def escape_user_sql(self, sql):311 sql = super(TestViewManager, self).escape_user_sql(sql)312 return sql.replace('test_idx', self.get_key_on_this_table('test_idx'))313class TestView(dbmodels.Model, model_logic.ModelExtensions):314 extra_fields = {315 'DATE(job_queued_time)': 'job queued day',316 'DATE(test_finished_time)': 'test finished day',317 }318 group_fields = [319 'test_name',320 'status',321 'kernel',322 'hostname',323 'job_tag',324 'job_name',325 'platform',326 'reason',...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!