Best Python code snippet using Kiwi_python
RankedVoteRunner.py
Source:RankedVoteRunner.py
1from RankedVote import RankedVote2import operator3class RankedVoteRunner:4 def __init__(self, rankedVote, add_remove):5 self.ranked_vote = rankedVote6 self.add_remove_allowed = add_remove7 if add_remove is True:8 self.add_order = self.best_add_order()9 self.remove_order = self.best_remove_order()10 else:11 self.add_order = []12 self.remove_order = []13 # This generates the best order to add the candidates in so it doesn't have to be re-calculated every election14 # system15 def best_add_order(self):16 loop_length = len(self.ranked_vote.backup_candidates_copy)17 returndict = []18 for i in range(loop_length):19 add = self.ranked_vote.find_best_add()20 # if add == " that means that it is no longer beneficial to add more candidates21 if add == "":22 break23 returndict.append(add)24 self.ranked_vote.delete_backup(add)25 return returndict26 # This generates the best order to remove the candidates in so it doesn't have to be re-calculated every election27 # system28 def best_remove_order(self):29 loop_length = len(self.ranked_vote.valid_candidates_copy)30 returndict = []31 for i in range(loop_length):32 add = self.ranked_vote.find_best_remove()33 # if add == "" that means it is no longer beneficial to continue to remove candidates from the ballot34 if add == "":35 break36 returndict.append(add)37 self.ranked_vote.delete_valid(add)38 self.ranked_vote.reset()39 return returndict40 # This is where the computation is done, this function runs every voting method for ranked votes41 def run_election(self):42 return_dict = {}43 candidate_to_win = self.ranked_vote.candidateToWin44 if self.add_remove_allowed is False:45 # Here is runs every vote where editing is not allowed46 # each methid returns the candidate that won so if this == candidate_to_win add it to the47 # dictionary that will be returned48 runoff = self.run_instantrunoff()49 self.ranked_vote.reset()50 if runoff == candidate_to_win:51 return_dict.update({"IRN": ""})52 av_plus = self.run_avplus()53 self.ranked_vote.reset()54 if av_plus == candidate_to_win:55 return_dict.update({"AVP": ""})56 borda = self.run_bordacount()57 self.ranked_vote.reset()58 if borda == candidate_to_win:59 return_dict.update({"BC": ""})60 copeland = self.run_copeland()61 self.ranked_vote.reset()62 if copeland == candidate_to_win:63 return_dict.update({"CPLN": ""})64 minmax = self.run_minmax()65 self.ranked_vote.reset()66 if minmax == candidate_to_win:67 return_dict.update({"MNX":""})68 rankedpairs = self.run_rankedpairs()69 self.ranked_vote.reset()70 if rankedpairs == candidate_to_win:71 return_dict.update({"RP":""})72 else:73 # This runs every voting method with editing allowed here it will return the candidate that won along74 # with any edditing that was required to help the chosen candidate win75 runoff = self.run_instantrunnoff_true()76 self.ranked_vote.reset()77 if runoff[0] == candidate_to_win:78 return_dict.update(runoff[1])79 avplus = self.run_avplus_true()80 self.ranked_vote.reset()81 if avplus[0] == candidate_to_win:82 return_dict.update(avplus[1])83 borda = self.run_boardacount_true()84 self.ranked_vote.reset()85 if borda[0] == candidate_to_win:86 return_dict.update(borda[1])87 copeland = self.run_copeland_true()88 self.ranked_vote.reset()89 if copeland[0] == candidate_to_win:90 return_dict.update(copeland[1])91 minmax = self.run_minmax_true()92 self.ranked_vote.reset()93 if minmax[0] == candidate_to_win:94 return_dict.update(minmax[1])95 rankedpairs = self.run_rankedpairs_true()96 self.ranked_vote.reset()97 if rankedpairs[0] == candidate_to_win:98 return_dict.update(rankedpairs[1])99 return return_dict100 def run_instantrunoff(self):101 updates = {}102 breakdown = self.ranked_vote.instantrunoffmethod()103 winner = max(breakdown, key=breakdown.get)104 return winner105 def run_instantrunnoff_true(self):106 updates = {}107 json = {"IRN": {}}108 loop_len = len(self.add_order)109 breakdown = self.ranked_vote.instantrunoffmethod()110 winner = max(breakdown, key=breakdown.get)111 if winner == self.ranked_vote.candidateToWin:112 return winner, json113 else:114 # First it will loop adding candidates and re-checking if the chosen candidate has won115 for loop in range(loop_len):116 add = self.add_order[loop]117 self.ranked_vote.add_candidate(add)118 updates[add] = "added"119 # keep not of which candidates have been added to the ballot120 breakdown = self.ranked_vote.instantrunoffmethod()121 winner = max(breakdown, key=breakdown.get)122 if winner == self.ranked_vote.candidateToWin:123 json["IRN"] = updates124 return winner, json125 # If it runs out of candidates to add then it will start removing candidates note that candidates126 # that have been previously added will not be removed127 for loop in range(len(self.remove_order)):128 remove = self.remove_order[loop]129 self.ranked_vote.remove_candidate(remove)130 updates[remove] = "removed"131 breakdown = self.ranked_vote.instantrunoffmethod()132 winner = max(breakdown, key=breakdown.get)133 if winner == self.ranked_vote.candidateToWin:134 json["IRN"] = updates135 return winner, json136 return winner, json137 def run_avplus(self):138 breakdown = self.ranked_vote.av_plus()139 winner = max(breakdown, key=breakdown.get)140 return winner141 def run_avplus_true(self):142 updates = {}143 json = {"AVP": {}}144 loop_len = len(self.add_order)145 breakdown = self.ranked_vote.av_plus()146 winner = max(breakdown, key=breakdown.get)147 if winner == self.ranked_vote.candidateToWin:148 return winner, json149 else:150 for loop in range(loop_len):151 add = self.add_order[loop]152 self.ranked_vote.add_candidate(add)153 updates[add] = "added"154 breakdown = self.ranked_vote.av_plus()155 winner = max(breakdown, key=breakdown.get)156 if winner == self.ranked_vote.candidateToWin:157 json["AVP"] = updates158 return winner, json159 for loop in range(len(self.remove_order)):160 remove = self.remove_order[loop]161 self.ranked_vote.remove_candidate(remove)162 updates[remove] = "removed"163 breakdown = self.ranked_vote.av_plus()164 winner = max(breakdown, key=breakdown.get)165 if winner == self.ranked_vote.candidateToWin:166 json["AVP"] = updates167 return winner, json168 return winner, json169 def run_bordacount(self):170 json = {"election": "BC"}171 breakdown = self.ranked_vote.borda_count()172 winner = max(breakdown, key=breakdown.get)173 return winner174 def run_boardacount_true(self):175 updates = {}176 json = {"BC": {}}177 loop_len = len(self.add_order)178 breakdown = self.ranked_vote.borda_count()179 winner = max(breakdown, key=breakdown.get)180 if winner == self.ranked_vote.candidateToWin:181 return winner, json182 else:183 for loop in range(loop_len):184 add = self.ranked_vote.find_borda_add()185 self.ranked_vote.add_candidate(add)186 updates[add] = "added"187 breakdown = self.ranked_vote.borda_count()188 winner = max(breakdown, key=breakdown.get)189 if winner == self.ranked_vote.candidateToWin:190 json["BC"] = updates191 return winner, json192 for loop in range(len(self.remove_order)):193 remove = self.remove_order[loop]194 self.ranked_vote.remove_candidate(remove)195 updates[remove] = "removed"196 breakdown = self.ranked_vote.borda_count()197 winner = max(breakdown, key=breakdown.get)198 if winner == self.ranked_vote.candidateToWin:199 json["BC"] = updates200 return winner, json201 return winner, json202 def run_copeland(self):203 updates = {"nil": "nil"}204 json = {"CPLN": updates}205 breakdown = self.ranked_vote.copeland_method()206 if len(breakdown) <= 0:207 return ""208 winner = max(breakdown, key=breakdown.get)209 return winner210 def run_copeland_true(self):211 updates = {}212 json = {"CPLN": {}}213 loop_len = len(self.add_order)214 breakdown = self.ranked_vote.copeland_method()215 if len(breakdown) > 0:216 winner = max(breakdown, key=breakdown.get)217 else:218 return "", json219 if winner == self.ranked_vote.candidateToWin:220 return winner, json221 else:222 for loop in range(loop_len):223 add = self.add_order[loop]224 self.ranked_vote.add_candidate(add)225 updates[add] = "added"226 breakdown = self.ranked_vote.copeland_method()227 winner = max(breakdown, key=breakdown.get)228 if winner == self.ranked_vote.candidateToWin:229 json["CPLN"] = updates230 return winner, json231 for loop in range(len(self.remove_order)):232 remove = self.remove_order[loop]233 self.ranked_vote.remove_candidate(remove)234 updates[remove] = "removed"235 breakdown = self.ranked_vote.copeland_method()236 winner = max(breakdown, key=breakdown.get)237 if winner == self.ranked_vote.candidateToWin:238 json["CPLN"] = updates239 return winner, json240 return winner, json241 def run_minmax(self):242 updates = {"nil": "nil"}243 json = {"MNX": updates}244 breakdown = self.ranked_vote.minmax_method()245 winner = min(breakdown, key=breakdown.get)246 return winner247 def run_minmax_true(self):248 updates = {}249 json = {"MNX": {}}250 loop_len = len(self.add_order)251 breakdown = self.ranked_vote.minmax_method()252 winner = min(breakdown, key=breakdown.get)253 if winner == self.ranked_vote.candidateToWin:254 return winner, json255 else:256 for loop in range(loop_len):257 add = self.add_order[loop]258 self.ranked_vote.add_candidate(add)259 updates[add] = "added"260 breakdown = self.ranked_vote.minmax_method()261 winner = min(breakdown, key=breakdown.get)262 if winner == self.ranked_vote.candidateToWin:263 json["MNX"] = updates264 return winner, json265 for loop in range(len(self.remove_order)):266 remove = self.remove_order[loop]267 self.ranked_vote.remove_candidate(remove)268 updates[remove] = "removed"269 breakdown = self.ranked_vote.minmax_method()270 winner = min(breakdown, key=breakdown.get)271 if winner == self.ranked_vote.candidateToWin:272 json["MNX"] = updates273 return winner, json274 return winner, json275 def run_rankedpairs(self):276 updates = {"nil": "nil"}277 json = {"RP": updates}278 winner = self.ranked_vote.ranked_pairs()279 return winner280 def run_rankedpairs_true(self):281 updates = {}282 json = {"RP": {}}283 loop_len = len(self.add_order)284 breakdown = self.ranked_vote.ranked_pairs()285 winner = breakdown #max(breakdown, key=breakdown.get)286 if winner == self.ranked_vote.candidateToWin:287 return winner, json288 else:289 for loop in range(loop_len):290 add = self.add_order[loop]291 self.ranked_vote.add_candidate(add)292 updates[add] = "added"293 breakdown = self.ranked_vote.ranked_pairs()294 winner = breakdown #max(breakdown, key=breakdown.get)295 if winner == self.ranked_vote.candidateToWin:296 json["RP"] = updates297 return winner, json298 for loop in range(len(self.remove_order)):299 remove = self.remove_order[loop]300 self.ranked_vote.remove_candidate(remove)301 updates[remove] = "removed"302 breakdown = self.ranked_vote.ranked_pairs()303 winner = breakdown304 if winner == self.ranked_vote.candidateToWin:305 json["RP"] = updates306 return winner, json...
test_SBUIdentifier.py
Source:test_SBUIdentifier.py
1import unittest2from MofIdentifier.fileIO import XyzReader3from MofIdentifier.fileIO.CifReader import get_mof4class SBUIdentifierTest(unittest.TestCase):5 def test_simple_mof(self):6 mof_abavij = get_mof('../MofIdentifier/mofsForTests/ABAVIJ_clean.cif')7 sbu_breakdown = mof_abavij.sbus()8 assert (len(sbu_breakdown.clusters) == 1)9 assert (sbu_breakdown.clusters[0].frequency == 4)10 assert (len(sbu_breakdown.clusters[0].adjacent_connector_ids) == 6)11 assert (len(sbu_breakdown.clusters[0].atoms) == 1)12 assert (len(sbu_breakdown.connectors) == 1)13 assert (sbu_breakdown.connectors[0].frequency == 8)14 assert (len(sbu_breakdown.connectors[0].adjacent_cluster_ids) == 3)15 assert (len(sbu_breakdown.connectors[0].atoms) == 13)16 assert (len(sbu_breakdown.auxiliaries) == 0)17 def test_complex_mof(self):18 mof_808 = get_mof('../MofIdentifier/mofsForTests/smod7-pos-1.cif')19 sbu_breakdown = mof_808.sbus()20 assert (len(sbu_breakdown.clusters) == 1)21 assert (sbu_breakdown.clusters[0].frequency == 4)22 assert (len(sbu_breakdown.clusters[0].adjacent_cluster_ids) == 0)23 assert (len(sbu_breakdown.clusters[0].adjacent_connector_ids) == 6)24 assert (len(sbu_breakdown.clusters[0].adjacent_auxiliary_ids) == 10)25 assert (len(sbu_breakdown.clusters[0].atoms) == 18)26 assert (len(sbu_breakdown.connectors) == 1)27 assert (sbu_breakdown.connectors[0].frequency == 8)28 assert (len(sbu_breakdown.connectors[0].adjacent_cluster_ids) == 3)29 assert (len(sbu_breakdown.connectors[0].adjacent_connector_ids) == 0)30 assert (len(sbu_breakdown.connectors[0].adjacent_auxiliary_ids) == 0)31 assert (len(sbu_breakdown.connectors[0].atoms) == 18)32 assert (len(sbu_breakdown.auxiliaries) == 3)33 def test_small_mof(self):34 mof_abetae = get_mof('../MofIdentifier/mofsForTests/ABETAE_clean.cif')35 sbu_breakdown = mof_abetae.sbus()36 assert (len(sbu_breakdown.clusters) == 1)37 assert (sbu_breakdown.clusters[0].frequency == 20)38 assert (len(sbu_breakdown.clusters[0].adjacent_cluster_ids) == 0)39 assert (len(sbu_breakdown.clusters[0].adjacent_connector_ids) == 4)40 assert (len(sbu_breakdown.clusters[0].adjacent_auxiliary_ids) == 0)41 assert (len(sbu_breakdown.clusters[0].atoms) == 1)42 assert (len(sbu_breakdown.connectors) == 1)43 assert (sbu_breakdown.connectors[0].frequency == 16)44 assert (len(sbu_breakdown.connectors[0].adjacent_cluster_ids) == 5)45 assert (len(sbu_breakdown.connectors[0].adjacent_connector_ids) == 0)46 assert (len(sbu_breakdown.connectors[0].adjacent_auxiliary_ids) == 0)47 assert (len(sbu_breakdown.connectors[0].atoms) == 5)48 def test_notalladjacent_core(self):49 mof_akoheo = get_mof('../MofIdentifier/mofsForTests/AKOHEO_clean.cif')50 sbu_breakdown = mof_akoheo.sbus()51 assert (len(sbu_breakdown.clusters) == 2)52 iron_node = None53 big_node = None54 for cluster in sbu_breakdown.clusters:55 if len(cluster.atoms) == 1:56 iron_node = cluster57 elif len(cluster.atoms) == 31:58 big_node = cluster59 assert (iron_node is not None and big_node is not None)60 assert (iron_node.frequency == 4)61 assert (big_node.frequency == 2)62 assert (len(iron_node.adjacent_cluster_ids) == len(big_node.adjacent_cluster_ids) == 0)63 assert (len(iron_node.adjacent_connector_ids) == 5)64 assert (len(iron_node.adjacent_auxiliary_ids) == 1)65 assert (len(big_node.adjacent_connector_ids) == 8)66 assert (len(big_node.adjacent_auxiliary_ids) == 0)67 assert (len(sbu_breakdown.connectors) == 2)68 central_phosphate = None69 straight_phosphate = None70 for connector in sbu_breakdown.connectors:71 if len(connector.adjacent_cluster_ids) == 3:72 central_phosphate = connector73 elif len(connector.adjacent_cluster_ids) == 2:74 straight_phosphate = connector75 assert (central_phosphate is not None and straight_phosphate is not None)76 assert (central_phosphate.frequency == 4)77 assert (straight_phosphate.frequency == 12)78 assert (len(central_phosphate.adjacent_connector_ids) == len(straight_phosphate.adjacent_connector_ids) == 0)79 assert (len(central_phosphate.adjacent_auxiliary_ids) == len(straight_phosphate.adjacent_auxiliary_ids) == 0)80 assert (len(sbu_breakdown.auxiliaries) == 1)81 assert (sbu_breakdown.auxiliaries[0].frequency == 4)82 assert (len(sbu_breakdown.auxiliaries[0].adjacent_cluster_ids) == 1)83 assert (len(sbu_breakdown.auxiliaries[0].adjacent_connector_ids) == 0)84 assert (len(sbu_breakdown.auxiliaries[0].adjacent_auxiliary_ids) == 0)85 assert (len(sbu_breakdown.auxiliaries[0].atoms) == 2)86 def test_abnormal_fractional_coordinates(self):87 mof_ja_007 = get_mof('../MofIdentifier/mofsForTests/ja500330a_si_007_auto.cif')88 sbu_breakdown = mof_ja_007.sbus()89 self.assertEqual(len(sbu_breakdown.clusters), 1)90 self.assertEqual(sbu_breakdown.clusters[0].frequency, 16)91 self.assertEqual(len(sbu_breakdown.clusters[0].adjacent_cluster_ids), 0)92 self.assertEqual(len(sbu_breakdown.clusters[0].adjacent_connector_ids), 6)93 self.assertEqual(len(sbu_breakdown.clusters[0].adjacent_auxiliary_ids), 12)94 self.assertEqual(len(sbu_breakdown.clusters[0].atoms), 18)95 self.assertEqual(len(sbu_breakdown.connectors), 1)96 self.assertEqual(sbu_breakdown.connectors[0].frequency, 32)97 self.assertEqual(len(sbu_breakdown.connectors[0].adjacent_cluster_ids), 3)98 self.assertEqual(len(sbu_breakdown.connectors[0].adjacent_connector_ids), 0)99 self.assertEqual(len(sbu_breakdown.connectors[0].adjacent_auxiliary_ids), 0)100 self.assertEqual(len(sbu_breakdown.connectors[0].atoms), 18)101 self.assertEqual(len(sbu_breakdown.auxiliaries), 1)102 def test_single_metal_wide_unit_cell(self):103 mof_russaa = get_mof('../MofIdentifier/mofsForTests/RUSSAA_clean.cif')104 sbu_breakdown = mof_russaa.sbus()105 assert (len(sbu_breakdown.clusters) == 2)106 assert (len(sbu_breakdown.clusters[0].atoms) == 1)107 assert (len(sbu_breakdown.clusters[0].atoms) == 1)108 assert (len(sbu_breakdown.auxiliaries) == 0)109 assert (len(sbu_breakdown.connectors) == 3)110 assert (sbu_breakdown.connectors[0].frequency == 4)111 assert (sbu_breakdown.connectors[1].frequency == 4)112 assert (sbu_breakdown.connectors[2].frequency == 4)113 def test_infinite_band_connector(self):114 mof_24205 = get_mof('../MofIdentifier/mofsForTests/acscombsci.5b00188_24205_clean.cif')115 sbu_breakdown = mof_24205.sbus()116 assert (len(sbu_breakdown.clusters) == 1)117 assert (len(sbu_breakdown.auxiliaries) == 0)118 assert (len(sbu_breakdown.connectors) == 3)119 infinite_connector = [c for c in sbu_breakdown.connectors if len(c.atoms) == 23][0]120 def test_h_connections_in_xyz(self):121 mof_maxhoj = get_mof('../MofIdentifier/mofsForTests/MAXHOJ01_clean.cif')122 cif_connector = mof_maxhoj.sbus().connectors[0]123 xyz_connector = XyzReader.get_molecule('../MofIdentifier/ligands/test_resources/connector_1826.xyz')124 self.assertEqual(cif_connector, xyz_connector)125 def test_connector_across_unit_cell(self):126 mof_amoyor = get_mof('../MofIdentifier/mofsForTests/AMOYOR_clean.cif')127 for sbu in mof_amoyor:128 pass129class BreakLargeClustersTest(unittest.TestCase):130 def test_breaking_up_large_clusters(self):131 mof_dotyes = get_mof('../MofIdentifier/mofsForTests/DOTYES_clean.cif')132 sbu_breakdown = mof_dotyes.sbus()133 self.assertEqual(2, len(sbu_breakdown.clusters))134 large_cluster = [c for c in sbu_breakdown.clusters if len(c.atoms) == 57][0]135 small_cluster = [c for c in sbu_breakdown.clusters if len(c.atoms) == 1][0]136 self.assertEqual(8, large_cluster.frequency)137 self.assertEqual(12, small_cluster.frequency)138 self.assertEqual(21, len([atom for atom in large_cluster.atoms if atom.is_metal()]))139 def test_breaking_up_large_clusters_in_complex_mof(self):140 mof_yojman = get_mof(r'../MofIdentifier/mofsForTests/YOJMAN_clean.cif')141 sbu_breakdown = mof_yojman.sbus()142 self.assertGreater(9, len(sbu_breakdown.clusters))143 self.assertGreater(len(sbu_breakdown.clusters), 3)144 large_cluster = [c for c in sbu_breakdown.clusters if len(c.atoms) == 54][0]145 med_cluster = [c for c in sbu_breakdown.clusters if len(c.atoms) == 3][0]146 self.assertEqual(4, large_cluster.frequency)147 self.assertEqual(4, med_cluster.frequency)148 self.assertEqual(18, len([atom for atom in large_cluster.atoms if atom.is_metal()]))149 def test_breaking_up_different_clusters_when_they_are_connected_in_multiple_spots(self):150 mof_ocuvuf = get_mof(r'../MofIdentifier/mofsForTests/OCUVUF_clean.cif')151 sbu_breakdown = mof_ocuvuf.sbus()152 self.assertEqual(3, len(sbu_breakdown.clusters))153 large_cluster = [c for c in sbu_breakdown.clusters if len(c.atoms) == 59][0]154 self.assertEqual(8, large_cluster.frequency)155 def test_comparing_auxiliaries_with_connectors_to_add_bonds(self):156 mof_nopjuz = get_mof(r'../MofIdentifier/mofsForTests/NOPJUZ_clean.cif')157 sbu_breakdown = mof_nopjuz.sbus()158 self.assertEqual(0, len(sbu_breakdown.auxiliaries))159if __name__ == '__main__':...
fact2sql.py
Source:fact2sql.py
1import json2# SELECT [DISTINCT|ALL ] { * | [fieldExpression [AS newName]} FROM tableName [alias] [WHERE condition][GROUP BY fieldName(s)] [HAVING condition] ORDER BY fieldName(s)3def fact2sql(factjson, table):4 fact = json.loads(factjson)5 clause = ""6 for subspace in fact['subspace']:7 if clause == "":8 clause += "WHERE"9 else:10 clause += " AND"11 clause += " %s='%s'"%(subspace['field'], subspace['value'])12 if fact['type'] == 'value':13 measure = fact['measure'][0]['field']14 aggregate = fact['measure'][0]['aggregate']15 sql = "SELECT %s(%s) FROM %s %s"%(agg(aggregate),measure, table, clause)16 return sql17 elif fact['type'] == 'difference':18 measure = fact['measure'][0]['field']19 aggregate = fact['measure'][0]['aggregate']20 breakdown = fact['breakdown'][0]21 if clause == "":22 clause += "WHERE"23 else:24 clause += " AND"25 clause += " (%s='%s'"%(fact['focus'][0]['field'], fact['focus'][0]['value'])26 clause += " OR %s='%s')"%(fact['focus'][1]['field'], fact['focus'][1]['value'])27 sql = "SELECT %s(%s), %s FROM %s %s GROUP BY %s"%(agg(aggregate), measure, breakdown, table, clause, breakdown)28 return sql29 elif fact['type'] == 'proportion':30 measure = fact['measure'][0]['field']31 aggregate = fact['measure'][0]['aggregate']32 breakdown = fact['breakdown'][0]33 sql = "SELECT %s(%s), %s FROM %s %s GROUP BY %s"%(agg(aggregate), measure, breakdown, table, clause, breakdown)34 return sql35 elif fact['type'] == 'trend':36 measure = fact['measure'][0]['field']37 aggregate = fact['measure'][0]['aggregate']38 breakdown = fact['breakdown'][0]39 sql = "SELECT %s(%s), %s FROM %s %s GROUP BY %s ORDER BY %s ASC"%(agg(aggregate), measure, breakdown, table, clause, breakdown, breakdown)40 return sql41 elif fact['type'] == 'categorization':42 breakdown = fact['breakdown'][0]43 sql = "SELECT COUNT(*), %s FROM %s %s GROUP BY %s"%(breakdown, table, clause, breakdown)44 return sql45 elif fact['type'] == 'distribution':46 measure = fact['measure'][0]['field']47 aggregate = fact['measure'][0]['aggregate']48 breakdown = fact['breakdown'][0]49 sql = "SELECT %s(%s), %s FROM %s %s GROUP BY %s"%(agg(aggregate), measure, breakdown, table, clause, breakdown)50 return sql51 elif fact['type'] == 'rank':52 measure = fact['measure'][0]['field']53 aggregate = fact['measure'][0]['aggregate']54 breakdown = fact['breakdown'][0]55 sql = "SELECT %s(%s) AS rank_measure, %s FROM %s %s GROUP BY %s ORDER BY rank_measure DESC"%(agg(aggregate), measure, breakdown, table, clause, breakdown)56 return sql57 elif fact['type'] == 'association':58 measure1 = fact['measure'][0]['field']59 aggregate1 = fact['measure'][0]['aggregate']60 measure2 = fact['measure'][1]['field']61 aggregate2 = fact['measure'][1]['aggregate']62 breakdown = fact['breakdown'][0]63 sql = "SELECT %s(%s), %s(%s), %s FROM %s %s GROUP BY %s"%(agg(aggregate1), measure1, agg(aggregate2), measure2, breakdown, table, clause, breakdown)64 return sql65 elif fact['type'] == 'extreme':66 measure = fact['measure'][0]['field']67 aggregate = fact['measure'][0]['aggregate']68 breakdown = fact['breakdown'][0]69 sql = "SELECT %s(%s), %s FROM %s GROUP BY %s"%(agg(aggregate), measure, breakdown, table, breakdown)70 return sql71 elif fact['type'] == 'outlier':72 measure = fact['measure'][0]['field']73 aggregate = fact['measure'][0]['aggregate']74 breakdown = fact['breakdown'][0]75 sql = "SELECT %s(%s), %s FROM %s GROUP BY %s"%(agg(aggregate), measure, breakdown, table, breakdown)76 return sql77 else:78 return ""79def agg(aggregate):80 if aggregate == 'sum':81 return 'SUM'82 elif aggregate == 'max':83 return 'MAX'84 elif aggregate == 'min':85 return 'MIN'86 elif aggregate == 'avg':87 return 'AVG'88 elif aggregate == 'count':89 return 'COUNT'90 else:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!