Best Python code snippet using localstack_python
graph.py
Source:graph.py
1import heapq2import collections3from Compiler.exceptions import *4class GraphError(CompilerError):5 pass6class SparseDiGraph(object):7 """ Directed graph suitable when each node only has a small number of edges.8 Edges are stored as a list instead of a dictionary to save memory, leading9 to slower searching for dense graphs.10 Node attributes must be specified in advance, as these are stored in the11 same list as edges.12 """13 def __init__(self, max_nodes, default_attributes=None):14 """ max_nodes: maximum no of nodes15 default_attributes: dict of node attributes and default values """16 if default_attributes is None:17 default_attributes = { 'merges': None }18 self.default_attributes = default_attributes19 self.attribute_pos = dict(list(zip(list(default_attributes.keys()), list(range(len(default_attributes))))))20 self.n = max_nodes21 # each node contains list of default attributes, followed by outgoing edges22 self.nodes = [list(self.default_attributes.values()) for i in range(self.n)]23 self.succ = [collections.OrderedDict() for i in range(self.n)]24 self.pred = [set() for i in range(self.n)]25 self.weights = {}26 def __len__(self):27 return self.n28 def __getitem__(self, i):29 """ Get list of the neighbours of node i """30 return self.succ[i].keys()31 def __iter__(self):32 pass #return iter(self.nodes)33 def __contains__(self, i):34 return i >= 0 and i < self.n35 def add_node(self, i, **attr):36 if i >= self.n:37 raise CompilerError('Cannot add node %d to graph of size %d' % (i, self.n))38 node = self.nodes[i]39 for a,value in list(attr.items()):40 if a in self.default_attributes:41 node[self.attribute_pos[a]] = value42 else:43 raise CompilerError('Invalid attribute %s for graph node' % a)44 def set_attr(self, i, attr, value):45 if attr in self.default_attributes:46 self.nodes[i][self.attribute_pos[attr]] = value47 else:48 raise CompilerError('Invalid attribute %s for graph node' % attr)49 def get_attr(self, i, attr):50 return self.nodes[i][self.attribute_pos[attr]]51 def remove_node(self, i):52 """ Remove node i and all its edges """53 succ = self[i]54 pred = self.pred[i]55 for v in succ:56 self.pred[v].remove(i)57 #del self.weights[(i,v)]58 for v in pred:59 del self.succ[v][i]60 #del self.weights[(v,i)]61 #self.nodes[v].remove(i)62 self.pred[i] = []63 self.nodes[i] = list(self.default_attributes.values())64 def add_edge(self, i, j, weight=1):65 if j not in self[i]:66 self.pred[j].add(i)67 self.succ[i][j] = None68 self.weights[(i,j)] = weight69 def add_edges_from(self, tuples):70 for edge in tuples:71 if len(edge) == 3:72 # use weight73 self.add_edge(edge[0], edge[1], edge[2])74 else:75 self.add_edge(edge[0], edge[1])76 def remove_edge(self, i, j):77 del self.succ[i][j]78 self.pred[j].remove(i)79 del self.weights[(i,j)]80 def remove_edges_from(self, pairs):81 for i,j in pairs:82 self.remove_edge(i, j)83 def degree(self, i):84 return len(self.succ[i])85def topological_sort(G, nbunch=None, pref=None):86 seen={}87 order_explored=[] # provide order and 88 explored={} # fast search without more general priorityDictionary89 90 if pref is None:91 def get_children(node):92 return G[node]93 else:94 def get_children(node):95 if node in pref:96 pref_set = set(pref[node])97 for i in G[node]:98 if i not in pref_set:99 yield i100 for i in reversed(pref[node]):101 yield i102 else:103 for i in G[node]:104 yield i105 if nbunch is None:106 nbunch = reversed(list(range(len(G))))107 for v in nbunch: # process all vertices in G108 if v in explored: 109 continue110 fringe=[v] # nodes yet to look at111 while fringe:112 w=fringe[-1] # depth first search113 if w in explored: # already looked down this branch114 fringe.pop()115 continue116 seen[w]=1 # mark as seen117 # Check successors for cycles and for new nodes118 new_nodes=[]119 for n in get_children(w):120 if n not in explored:121 if n in seen: #CYCLE !!122 raise GraphError("Graph contains a cycle at %d (%s,%s)." % \123 (n, G[n], G.pred[n]))124 new_nodes.append(n)125 if new_nodes: # Add new_nodes to fringe126 fringe.extend(new_nodes)127 else: # No new nodes so w is fully explored128 explored[w]=1129 order_explored.append(w)130 fringe.pop() # done considering this node131 132 order_explored.reverse() # reverse order explored133 return order_explored134def dag_shortest_paths(G, source):135 top_order = topological_sort(G)136 dist = [None] * len(G)137 dist[source] = 0138 for u in top_order:139 if dist[u] is None:140 continue141 for v in G[u]:142 if dist[v] is None or dist[v] > dist[u] + G.weights[(u,v)]:143 dist[v] = dist[u] + G.weights[(u,v)]144 return dist145def reverse_dag_shortest_paths(G, source):146 top_order = reversed(topological_sort(G))147 dist = [None] * len(G)148 dist[source] = 0149 for u in top_order:150 if u ==68273:151 print('dist[68273]', dist[u])152 print('pred[u]', G.pred[u])153 if dist[u] is None:154 continue155 for v in G.pred[u]:156 if dist[v] is None or dist[v] > dist[u] + G.weights[(v,u)]:157 dist[v] = dist[u] + G.weights[(v,u)]158 return dist159def single_source_longest_paths(G, source, reverse=False):160 # make weights negative, then do shortest paths161 for edge in G.weights:162 G.weights[edge] = -G.weights[edge]163 if reverse:164 dist = reverse_dag_shortest_paths(G, source)165 else:166 dist = dag_shortest_paths(G, source)167 #dist = johnson(G, sources)168 # reset weights169 for edge in G.weights:170 G.weights[edge] = -G.weights[edge]171 for i,n in enumerate(dist):172 if n is None:173 dist[i] = 0174 else:175 dist[i] = -dist[i]176 #for k, v in dist.iteritems():177 # dist[k] = -v178 return dist179def longest_paths(G, sources=None):180 # make weights negative, then do shortest paths181 for edge in G.weights:182 G.weights[edge] = -G.weights[edge]183 dist = {}184 for source in sources:185 print(('%s, ' % source), end=' ')186 dist[source] = dag_shortest_paths(G, source)187 #dist = johnson(G, sources)188 # reset weights189 for edge in G.weights:190 G.weights[edge] = -G.weights[edge]...
models.py
Source:models.py
1import datetime2from contextlib import closing3import copy4def utcnow():5 return datetime.datetime.utcnow()6class Model:7 def create(self, content):8 message = dict()9 for key in self.default_attributes.keys():10 if key in content.keys():11 message[key] = content[key]12 else:13 message[key] = self.default_attributes[key]()14 15 columns = ', '.join(message.keys())16 named_map = ':' + ', :'.join(message.keys())17 query_string = '''18 INSERT INTO {} 19 ({})20 VALUES21 ({});22 '''.format(self.table_name, columns, named_map)23 return query_string, message24 def delete(self, where):25 values = []26 conditions = []27 for key in where.keys():28 if key in self.default_attributes.keys():29 conditions.append(key + where[key][0] + ':' + key)30 values.append(where[key][1])31 query_string = '''32 DELETE FROM {} 33 WHERE ({})34 '''.format(self.table_name, ', '.join(conditions))35 return query_string, values36 def find(self, where=None, order=None, limit=None):37 values = []38 query_string = '''39 SELECT * FROM {} 40 '''.format(self.table_name)41 if where:42 conditions = []43 for key in where.keys():44 if key in self.default_attributes.keys():45 conditions.append(key + where[key][0] + ':' + key)46 values.append(where[key][1])47 query_string += '''48 WHERE ({})49 '''.format(', '.join(conditions))50 if order:51 query_string += '''52 ORDER BY {} {}53 '''.format(order[0], order[1])54 55 if limit:56 query_string += '''57 LIMIT {}58 '''.format(limit)59 60 return query_string, values61 62 def formatResult(self, result):63 formattedResult = copy.deepcopy(self.default_attributes)64 i = 065 for key in self.default_attributes.keys():66 defaultType = self.default_attributes[key]()67 if type(defaultType) == datetime.datetime:68 formattedResult[key] = datetime.datetime.fromisoformat(result[i])69 elif type(defaultType) == str:70 formattedResult[key] = result[i]71 elif type(defaultType) == bool:72 formattedResult[key] = bool(result[i])73 i += 174 return formattedResult75class Messages(Model):76 table_name = 'Messages'77 default_attributes = {78 'Timestamp': utcnow,79 'Message': str,80 'User': str,81 'ID': str82 }83 84 def __init__(self, conn):85 create_table_query = '''86 CREATE TABLE IF NOT EXISTS Messages (87 Timestamp DATE,88 Message TEXT,89 User TEXT,90 ID INTEGER PRIMARY KEY91 );92 '''93 with closing(conn.cursor()) as cursor:94 cursor.execute(create_table_query)95 conn.commit()96class Members(Model):97 table_name = 'Members'98 default_attributes = {99 'JoinAt': utcnow,100 'Verified': bool,101 'ID': str102 }103 104 def __init__(self, conn):105 create_table_query = '''106 CREATE TABLE IF NOT EXISTS Members (107 JoinAt DATE,108 Verified BOOLEAN,109 ID INTEGER PRIMARY KEY110 );111 '''112 with closing(conn.cursor()) as cursor:113 cursor.execute(create_table_query)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!