Best Python code snippet using ATX
commitManager.py
Source:commitManager.py
12from sno import sno3import pyangbind.lib.pybindJSON as pybindJSON4from pyangbind.lib.serialise import pybindJSONEncoder, pybindJSONDecoder5from pyangbind.lib.serialise import pybindIETFXMLEncoder, pybindIETFXMLDecoder6import pprint7import json8#from jsondiff import diff9#from jsondiff.symbols import *10from deviceTransaction import NetworkTransaction11#from xmldiff import formatting, main12#from SNOFormatter import SNOFormatter13from lxml import etree14import xml.etree.ElementTree as ET15import xmltodict16import difflib17from ConfigDB import ConfigDB18from JSONDiff import calculate_diff1920def XMLDiff(left, right):21 root_left = etree.fromstring(left)22 root_right = etree.fromstring(right)2324 tree = etree.ElementTree(root_right)25 26 #etree.register_namespace("nc", "urn:ietf:params:xml:ns:netconf:base:1.0")2728 left_side_recursion(root_left, root_right)29 right_side_recursion(root_left, root_right)3031 return (etree.tostring(root_left).decode())3233def left_side_recursion(root_left, root_right): 343536 if root_left.tag != root_right.tag:37 if not root_right.findall('.//' + root_left.tag):38 root_left.set("operation", "delete")39 for element in root_left.getchildren():40 root_left.remove(element)41 return42 else:43 all_matches = root_right.findall('.//' + root_left.tag)44 all_matches_text = [elem.text for elem in all_matches]45 if root_left.text not in all_matches_text:46 root_left.set("operation", "delete")47 for element in root_left.getchildren():48 root_left.remove(element)49 return50 else:51 for element in root_left.getchildren():52 left_side_recursion(element, root_right)53 else:54 if root_left.text != root_right.text:55 root_left.set("operation", "delete")56 return57 else:58 for element in root_left.getchildren():59 left_side_recursion(element, root_right)6061def right_side_recursion(root_left, root_right):6263 #for element in root_right.getchildren():64 for element in root_right.getchildren():65 if not root_left.findall(element.tag):66 print ("Did not find element tag. so appending")67 print (element.tag)68 print (element.text)69 root_left.append(element)70 else:71 all_matches = root_left.findall(element.tag)72 all_matches_string = [etree.tostring(elem) for elem in all_matches]73 all_matches_text = [elem.text for elem in all_matches]74 if etree.tostring(element) not in all_matches_string:75 best_match = find_best_match(all_matches, element)76 if best_match:77 right_side_recursion(best_match, element)78 else:79 print ("Did not find best match")80 print (element.tag)81 print (element.text)82 root_left.append(element)83 '''84 for elem in all_matches:85 if elem.text == element.text:86 break87 right_side_recursion(elem, element)88 '''89 else:90 return9192def compare_element(left_element, right_element, depth=0):939495 if left_element.tag == right_element.tag and left_element.text == right_element.text:96 depth += 19798 99 print ("inside compare element")100 101 for x,y in zip(left_element, right_element):102 print (x)103 print (y)104105 #print (etree.tostring(left_element))106 for i in range(len(right_element)):107 if i >= len(left_element):108 break109 if etree.tostring(left_element[i]) == etree.tostring(right_element[i]):110 depth += 1111112 return depth113114def find_best_match(all_matches, element):115 best_match = None116 best_depth = 0117118119 for match in all_matches:120 depth = 0121 #depth = compare_element(match, element)122 for elem_child in element.getchildren():123 for match_child in match.getchildren():124 if elem_child.tag == match_child.tag and elem_child.text == match_child.text:125 depth += 1126 break127 else:128 break129 '''130 for elem_child in element.getchildren():131 if match_child.tag == elem_child.tag and match_child.text == elem_child.text:132 print (elem_child.tag)133 print (elem_child.text)134 depth += 1135 break136 '''137 '''138 for match_child in match.getchildren():139 for elem_child in element.getchildren():140 if etree.tostring(match_child) == etree.tostring(elem_child):141 depth += 1142 break143 '''144 145 if depth > best_depth:146 best_depth = depth147 best_match = match148149 return best_match150 151152153154155156157def commit(snoRoot, sessionID, DryRun=False):158 #with open("ConfigDB", "r") as f:159 # originalsno_dict = json.loads(f.read())160161 get_lock = ConfigDB.acquire_lock()162163 if 'Error' in get_lock.keys():164 return get_lock165 else:166 originalsnoRoot = get_lock['ConfigDB']167 168 if DryRun:169 '''170 original_xml = (pybindIETFXMLEncoder.serialise(originalsnoRoot))171 new_xml = (pybindIETFXMLEncoder.serialise(snoRoot))172 diff_xml = XMLDiff(original_xml, new_xml)173 '''174175 diff_xml = calculate_diff(originalsnoRoot, snoRoot)176 ConfigDB.release_lock()177 return diff_xml178179 sno_dict = json.loads(pybindJSON.dumps(snoRoot))180 originalsno_dict = json.loads(pybindJSON.dumps(originalsnoRoot))181 difference = diff(originalsno_dict, sno_dict)182183184 #print ((originalsnoRoot.get()))185 device_config = {}186 if 'devices' in difference.keys() and 'device' in difference['devices'].keys():187 for device in difference['devices']['device'].keys():188 if 'config' in difference['devices']['device'][device]:189 config_dict = difference['devices']['device'][device]['config']190 #print ((originalsnoRoot.devices.device[device].config.get()))191192 #print (config_dict)193194 if device not in originalsnoRoot.devices.device.keys():195 return {"Error" : "Device {} not present in DB. First add the device".format(device)}196197 198 original_config = originalsnoRoot.devices.device[device].config199 new_config = snoRoot.devices.device[device].config200201 '''202 print (original_config)203 print (new_config)204 XMLDiff1 = XMLDiff(original_config, new_config)205206 print (XMLDiff1+"\n")207 configXML_tree = list( etree.fromstring(XMLDiff1) )[0]208 configXML = etree.tostring(configXML_tree).decode()209210 configXML = "<config>\n" + configXML + "</config>"211 '''212213214 XMLDIFF = calculate_diff(original_config, new_config, snoObject=originalsnoRoot.devices.device[device].config)215216 configXML_tree = list(etree.fromstring(XMLDIFF))[0]217 configXML = etree.tostring(configXML_tree).decode()218219 configXML = "<config>\n" + configXML + "</config>"220 print (configXML)221222223 rev_XMLDIFF = calculate_diff(new_config, original_config, snoObject=originalsnoRoot.devices.device[device].config)224 rev_XML_tree = list(etree.fromstring(rev_XMLDIFF))[0]225 rev_XML = etree.tostring(rev_XML_tree).decode()226227 rev_XML = "<config>\n" + rev_XML + "</config>"228229 device_config[device] = {}230 device_config[device]['config'] = configXML231 device_config[device]['rev_config'] = rev_XML232233234 if device_config:235 try:236 NetworkTransaction(device_config, originalsnoRoot)237 except Exception as e:238 return ({"Error" : "Failed to commit. " + str(e)})239240 241 return ({"ConfigDB" : ConfigDB.write(snoRoot, sessionID)})242243 #originalsnoRoot = sno()244 #pybindJSONDecoder.load_json(originalsno_dict, None, None, originalsnoRoot)245246if __name__ == "__main__":247 test_root = sno()248 rt = test_root.devices.device.add('TEST')249 rt.mgmt_ip = "192.168.50.134"250 rt.netconf_port = 8300251 rt.netconf_user = "admin"252 #rt.netconf_password = "CumulusLinux!"253 rt.netconf_password = "admin"254 rt.config.commands.cmd.append('net add interface swp1 ip address 10.1.1.1/24')255 rt.config.commands.cmd.append('net add interface swp2 ip address 10.2.1.1/24')256 rt.config.commands.cmd.append('net add interface swp3 ip address 10.3.1.1/24')257 #rt.config.commands.cmd.append('test1')258 #rt.config.commands.cmd.append('test2')259 #rt.config.commands.cmd.append('test3')260 test_root.devices.device.add('TEST4')261262 test_root_dict = json.loads(pybindJSON.dumps(test_root))263 with open("../ConfigDB", 'w') as conf:264 json.dump(test_root_dict, conf)265266 other_root = sno()267 new_rt = other_root.devices.device.add('TEST')268 new_rt.mgmt_ip = "192.168.50.134"269 new_rt.netconf_port = 8300270 new_rt.netconf_user = "admin"271 new_rt.netconf_password = "admin"272 new_rt.config.commands.cmd.append('net add interface swp1 ip address 20.1.1.1/24')273 new_rt.config.commands.cmd.append('net add interface swp2 ip address 20.2.1.1/24')274 #new_rt.config.commands.cmd.append('test1')275 #new_rt.config.commands.cmd.append('test4')276 new_rt.config.commands.cmd.append('net add interface swp3 ip address 20.3.1.1/24')277 #new_rt.config.commands.cmd.append('test2')278279 '''280 new_rt = other_root.devices.device.add('TEST2')281 new_rt.mgmt_ip = "10.10.10.11"282 new_rt.config.commands.cmd.append('net add swp1 access vlan 10')283284 new_rt = other_root.devices.device.add('TEST3')285 new_rt.mgmt_ip = "10.10.10.11"286 new_rt.config.commands.cmd.append('net add swp1 access vlan 10')287 '''288289 print (commit(other_root, DryRun=True))
...
util.py
Source:util.py
1"""2 This file is part of Picopore.3 Picopore is free software: you can redistribute it and/or modify4 it under the terms of the GNU General Public License as published by5 the Free Software Foundation, either version 3 of the License, or6 (at your option) any later version.7 Picopore is distributed in the hope that it will be useful,8 but WITHOUT ANY WARRANTY; without even the implied warranty of9 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the10 GNU General Public License for more details.11 You should have received a copy of the GNU General Public License12 along with Picopore. If not, see <http://www.gnu.org/licenses/>.13"""14from __future__ import print_function15import os16import numpy as np17import glob18import sys19import re20def log(message='', end='\n'):21 print(message, end=end)22 sys.stdout.flush()23def getPrefixedFilename(filename, prefix=""):24 if prefix is None or prefix == "":25 return filename26 elif os.path.isdir(filename):27 return os.path.join(filename, prefix)28 else:29 return os.path.join(os.path.dirname(filename), ".".join([prefix, os.path.basename(filename)]))30def recursiveFindFast5(inp, skip_root=False, depth=0):31 files = []32 for path in inp:33 if os.path.isdir(path):34 files.extend(recursiveFindFast5([os.path.join(path, i) for i in os.listdir(path)], skip_root, depth+1))35 elif (not skip_root or depth > 1) and os.path.isfile(path) and path.endswith(".fast5"):36 files.append(path)37 elif not skip_root:38 files.extend(glob.glob("{}*.fast5".format(path)))39 if depth == 0 and len(files) == 0:40 log("No files found under {}".format(', '.join(inp)))41 return files42def isType(obj, types):43 try:44 return type(obj).__name__ in types45 except TypeError as e:46 if str(e).endswith("is not iterable"):47 # got a single value, not a list48 return type(obj).__name__ == types49 else:50 raise e51def isGroup(obj):52 return isType(obj, ["Group"])53def isDataset(obj):54 return isType(obj, ["Dataset"])55def isInt(obj):56 return isType(obj, ['int', 'int4', 'int8', 'int16', 'int32', 'int64', 'uint', 'uint4', 'uint8', 'uint16', 'uint32', 'uint64'])57def isStr(obj):58 return isType(obj, ['str', 'string_', 'bytes_', 'bytes', 'unicode'])59def isArray(obj):60 return isType(obj, ['list', 'ndarray', 'MaskedArray'])61def isFloat(obj):62 return isType(obj, ['float', 'float16', 'float32', 'float64'])63def getUIntDtype(num):64 if num < 2**8:65 name='uint8'66 elif num < 2**16:67 name='uint16'68 elif num < 2**32:69 name='uint32'70 else:71 name='uint64'72 return name73def getIntDtype(num):74 if abs(num) < 2**7:75 name='int8'76 elif abs(num) < 2**15:77 name='int16'78 elif abs(num) < 2**31:79 name='int32'80 else:81 name='int64'82 return name83def getDtype(data):84 if isArray(data):85 if isInt(data[0]):86 if min(data) > 0:87 name=getUIntDtype(max(data))88 else:89 name=getIntDtype(max(data))90 elif isStr(data[0]):91 name='|S{}'.format(max(max([len(i) for i in data]),1))92 else:93 name=getDtype(data[0])94 elif isInt(data):95 if data > 0:96 name=getUIntDtype(data)97 else:98 name=getIntDtype(data)99 elif isStr(data):100 name='|S{}'.format(max(len(data),1))101 elif isFloat(data):102 # TODO: is there a better way to type floats? sig figs?103 name=type(data).__name__104 else:105 raise TypeError("Data type for value {} not recognised: {}".format(str(data), type(data).__name__))106 return None107 return np.dtype(name)108def recursiveFindDatasets(group, keyword, match_child):109 eventPaths = []110 if isGroup(group):111 for subgroup in group.values():112 eventPaths.extend(recursiveFindDatasets(subgroup, keyword, match_child))113 name = group.name114 if match_child:115 name = name.split("/")[-1]116 if re.search(keyword, name) is not None:117 eventPaths.append(group.name)118 return eventPaths119def findDatasets(f, group_id="all", keyword="Events", entry_point="Analyses", match_child=False):120 eventPaths = []121 try:122 analyses = f.get(entry_point)123 for group in analyses.values():124 if group_id == "all" or group.endswith(group_id):125 eventPaths.extend(recursiveFindDatasets(group, keyword, match_child))126 except AttributeError:127 # no analyses, dont worry128 pass129 return eventPaths130def rewriteDataset(f, path, compression="gzip", compression_opts=1, dataset=None):131 obj = f.get(path)132 if not isDataset(obj):133 return134 attrs = obj.attrs135 dataset = obj.value if dataset is None else dataset136 del f[path]137 try:138 cols = dataset.dtype.names139 if cols is None:140 raise AttributeError("Array dtype is missing names")141 newtype=[(name, getDtype(dataset[name])) for name in dataset.dtype.names]142 f.create_dataset(path, data=dataset.astype(newtype), dtype=newtype, compression=compression, compression_opts=compression_opts)143 except AttributeError:144 try:145 f.create_dataset(path, data=dataset, dtype=getDtype(dataset), compression=compression, compression_opts=compression_opts)146 except TypeError as e:147 if str(e) == "Scalar datasets don't support chunk/filter options":148 f.create_dataset(path, data=dataset, dtype=getDtype(dataset))149 else:150 log(path)151 raise e152 for name, value in attrs.items():153 f[path].attrs[name] = value154def recursiveCollapseGroups(f, basegroup, path, group):155 for subname, object in group.items():156 subpath = "{}.{}".format(path, subname)157 if isGroup(object):158 recursiveCollapseGroups(f, basegroup, subpath, object)159 else:160 f.move(object.name, "{}/{}".format(basegroup, subpath))161 for k, v in group.attrs.items():162 f[basegroup].attrs.create("{}.{}".format(path, k), v, dtype=getDtype(v))163 del f[group.name]164def uncollapseGroups(f, basegroup):165 for name, object in basegroup.items():166 f.move("{}/{}".format(basegroup.name, name), name.replace(".", "/")) # TODO: does this include basegroup?167 for k, v in basegroup.attrs.items():168 k = k.split(".")169 groupname = "/".join(k[:-1])170 attrname = k[-1]171 try:172 f.create_group(groupname)173 except ValueError as e:174 if groupname in f:175 pass176 else:177 raise e178 f[groupname].attrs.create(attrname, v, dtype=getDtype(v))...
term.py
Source:term.py
...8def build(s_expr):9 node = node_type.Term()10 if match_node(s_expr, s_expression.List):11 assert_child(s_expr, 0, s_expression.Symbol, [lex_token.CALL, lex_token.VAR_REF])12 if match_child(s_expr, 0, s_expression.Symbol, lex_token.CALL):13 node.term_type = node_type.TERM_CALL14 node.name = assert_child(s_expr, 1, s_expression.Symbol).text15 for child_expr in s_expr.children[2:]:16 child_node = build(child_expr)17 node.children.append(child_node)18 return node19 if match_child(s_expr, 0, s_expression.Symbol, lex_token.VAR_REF):20 node.term_type = node_type.TERM_VAR_REF21 node.name = assert_child(s_expr, 1, s_expression.Symbol).text22 return node23 if match_node(s_expr, s_expression.Number):24 node.name = s_expr.value25 node.term_type = node_type.TERM_NUMBER26 return node27 if match_node(s_expr, s_expression.Symbol):28 node.name = s_expr.text29 if lex_token.is_variable(s_expr.text):30 node.term_type = node_type.TERM_VAR31 return node32 if lex_token.is_constant(s_expr.text):33 node.term_type = node_type.TERM_CONST_REF...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!