Best Python code snippet using assertpy_python
data_io.py
Source:data_io.py
1import os2import shutil3import numpy as np456class PatranMesh:7 """8 Class for Patran mesh compatible with ``meshio``910 Args:11 pat (str): Patran file12 read_celltypes (list): List of cell types to be read: ``line``, ``triangle``, ``quad``, ``tetra`` and ``hexahedron``13 """14 def __init__(self, patfile, read_celltypes=["triangle", "tetra"]):15 self.read(patfile, read_celltypes=read_celltypes)16 self.init_mesh()1718 def read(self, patfile, read_celltypes):19 """20 Read points and cells of a Patran mesh21 """22 import re2324 def get_points(line):25 line = line.strip()26 points = re.findall(r"[-\d\.]+E...", line)27 points = [float(point) for point in points]28 return points2930 def get_cell(line, num_points, pointsID):31 line = line.strip()32 cell = re.findall(r"[\d]+", line)[:num_points]33 cell = [pointsID[int(point)] for point in cell]34 return cell3536 meshio_to_patran_type = {37 "line": 2,38 "triangle": 3,39 "quad": 4,40 "tetra": 5,41 "hexahedron": 8,42 }4344 patran_to_meshio_type = {}45 assert len(read_celltypes) > 046 for celltype in read_celltypes:47 patran_to_meshio_type[meshio_to_patran_type[celltype]] = celltype4849 # Read patran file50 f = open(patfile, "r")51 lines = f.read()52 lines = lines.replace(" ", ",")53 for _ in range(15):54 lines = lines.replace(",,", ",")5556 # Read points57 self.pointsID = {}58 self.points = []59 pointlines = re.findall(r"\n(,1,[\d,]+\n[,\d.EG\+-]+\n1[G,\d]+)", lines)60 for i, n in enumerate(pointlines):61 self.pointsID[int(n.split("\n")[0].split(",")[2])] = i62 self.points.append(get_points(n.split("\n")[1]))63 self.points = np.array(self.points)6465 # Read cells66 self.cellsID = {}67 self.cells = {}68 celllines = re.findall(r"\n,2,([\d,E]+\n[\d\.\+,E]+\n[\d,E]+)", lines)69 for e in celllines:70 celltype = int(e.split(",")[1])71 num_points = int(e.split("\n")[1].split(",")[1])72 if celltype not in patran_to_meshio_type:73 continue74 meshio_type = patran_to_meshio_type[celltype]75 cellID = int(e.split(",")[0])76 cell = get_cell(e.split("\n")[2], num_points, self.pointsID)77 if meshio_type in self.cellsID:78 self.cellsID[meshio_type].append(cellID)79 self.cells[meshio_type].append(cell)80 else:81 self.cellsID[meshio_type] = [cellID]82 self.cells[meshio_type] = [cell]8384 for key in self.cells:85 self.cells[key] = np.array(self.cells[key], dtype=int)86 self.cellsID[key] = np.array(self.cellsID[key], dtype=int)8788 self.point_data = {}89 self.cell_data = {}90 self.field_data = {}9192 def init_mesh(self):93 self.ncells_cumsum = np.cumsum([len(cells) for cells in self.cells.values()])94 self.ncells = self.ncells_cumsum[-1]95 self.remove_free_points()96 self.npoints = len(self.points)97 self.ncells_per_point_ = None9899 def scale(self, factor=1e3):100 """101 Scale the coordinates by a factor102 """103 self.points *= factor104105 def ncells_per_point(self):106 """107 Number of cells that every point is connected to108 """109 if self.ncells_per_point_ is not None:110 return self.ncells_per_point_111 else:112 self.ncells_per_point_ = np.zeros(len(self.points), dtype=int)113 for celltype in self.cells:114 for cell in self.cells[celltype]:115 self.ncells_per_point_[cell] += 1116 return self.ncells_per_point_117118 def remove_free_points(self):119 """120 Remove free points from coordinates and cell connectivities121 """122 # Find which points are not mentioned in the cells123 all_cells_flat = np.concatenate(124 [vals for vals in self.cells.values()]125 ).flatten()126 free_points = np.setdiff1d(np.arange(len(self.points)), all_cells_flat)127 if len(free_points) == 0:128 return129130 # Remove free points131 self.points = np.delete(self.points, free_points, axis=0)132 for key in self.point_data:133 self.point_data[key] = np.delete(self.point_data[key], free_points, axis=0)134135 # Adjust cell connectivities136 diff = np.zeros(len(all_cells_flat), dtype=int)137 for free_point in free_points:138 diff[np.argwhere(all_cells_flat > free_point)] += 1139 all_cells_flat -= diff140 k = 0141 for key in self.cells:142 s = self.cells[key].shape143 n = np.prod(s)144 self.cells[key] = all_cells_flat[k:k + n].reshape(s)145 k += n146147 # Adjust pointsID148 pointsID_keys = np.fromiter(self.pointsID.keys(), int)149 pointsID_keys = np.delete(pointsID_keys, free_points)150 pointsID_values = np.arange(len(pointsID_keys))151 self.pointsID = dict(zip(pointsID_keys, pointsID_values))152153 def average_cell_to_point(self, tetra_data):154 """155 Average data defined at tetra cells to nodes156 """157 if tetra_data.ndim == 1:158 point_data = np.zeros(self.npoints)159 else:160 assert tetra_data.ndim == 2161 point_data = np.zeros((self.npoints, tetra_data.shape[1]))162163 ncells_per_point = np.zeros(self.npoints, dtype=int)164 for i, cell in enumerate(self.cells["tetra"]):165 ncells_per_point[cell] += 1166 point_data[cell] += tetra_data[i]167168 point_data /= ncells_per_point[:, None]169 return point_data170171 def average_point_to_cell(self, point_data):172 """173 Average data defined at points to tetra cells174 """175 if point_data.ndim == 1:176 tetra_data = np.zeros(self.ncells)177 else:178 assert point_data.ndim == 2179 tetra_data = np.zeros((self.ncells, point_data.shape[1]))180 for i, cell in enumerate(self.cells["tetra"]):181 tetra_data[i] = np.mean(point_data[cell], axis=0)182 return tetra_data183184185def read_moldflow_xml(fxml, only_last_step=False):186 """187 Read a Moldflow XML result file188189 data["name"]: result name190 data["type"]: NDDT(Node data), ELDT(Element data) or NMDT(Non-mesh data)191 data["unit"]: physical unit192 data["time"]: list of times if exist193 data["val"]: identifiers and corresponding values194 """195 from lxml import etree as ET196197 parser = ET.XMLParser(encoding="windows-1252", remove_comments=True, huge_tree=True)198 try:199 tree = ET.parse(fxml, parser)200 except ET.ParseError:201 return False, None202 root = tree.getroot()[1]203204 # Basic information205 data = {}206 data["name"] = root.attrib["Name"].strip()207 data["type"] = root.find("DataType").text.strip()208 data["unit"] = root.find("DeptVar").attrib["Unit"]209 data["dim"] = int(root.find("NumberOfComponents").text)210211 if data["type"] != "NMDT(Non-mesh data)":212 datasets = root.xpath("Blocks/Block/Data")213 if only_last_step:214 datasets = [datasets[-1]]215 else:216 datasets = root.xpath("Blocks/Block/DeptValues")217 nsteps = len(datasets)218219 data["time"] = np.zeros(nsteps)220 if data["type"] != "NMDT(Non-mesh data)":221 if nsteps <= 1 or only_last_step:222 data["time"] = None223224 if data["type"] != "NMDT(Non-mesh data)":225 data["val"] = {}226 else:227 if data["dim"] == 1:228 data["val"] = np.zeros(nsteps)229 else:230 data["val"] = np.zeros((nsteps, data["dim"]))231232 for i, dataset in enumerate(datasets):233234 if data["time"] is not None:235 try:236 data["time"][i] = float(dataset.getprevious().getprevious().attrib["Value"])237 except(AttributeError, KeyError):238 data["time"][i] = np.nan239240 # For mesh data, loop241 if data["type"] != "NMDT(Non-mesh data)":242 id_val = {}243 for val in dataset:244 identifier = int(val.attrib["ID"])245 array = np.fromstring(246 val.find("DeptValues").text, sep=" ", count=data["dim"]247 )248 array[array > 1e29] = np.nan249 if len(array) == 1:250 id_val[identifier] = array[0]251 else:252 id_val[identifier] = array253254 if data["time"] is not None:255 data["val"][i] = id_val256 else:257 data["val"] = id_val258259 # For non-mesh data, just read260 else:261 array = np.fromstring(dataset.text, sep=" ", count=data["dim"])262 if len(array) == 1:263 data["val"][i] = array[0]264 else:265 data["val"][i] = array266267 return True, data268269270def convert_to_time_series_xdmf(fxdmf, backup=False):271 """272 Convert a plain XDMF file to273 a time-series one readable by ParaView274 """275 # Backup if needed276 assert os.path.splitext(fxdmf)[1] == ".xdmf"277 if backup is not False:278 assert type(backup) == str279 shutil.copyfile(fxdmf, backup)280281 # Parse XDMF282 from copy import deepcopy283 from lxml import etree as ET284285 parser = ET.XMLParser(remove_blank_text=True)286 tree = ET.parse(fxdmf, parser)287 root = tree.getroot()288289 # Retrieve mesh information290 geometry = root.xpath("//Geometry")[0]291 topology = root.xpath("//Topology")[0]292293 # Read all functions inside HDF5294 # Those that are a function of time is of format [name__time]295 # See self._process_time_series_result296 func_all = root.xpath("//Attribute/@Name")297 func_ele = root.xpath("//Attribute")298299 # Unique functions300 func_set = []301 for f in func_all:302 name = f.split("__")[0]303 if name not in func_set:304 func_set.append(name)305306 # Concatenate all time steps307 timestep = [f.split("__")[1] for f in func_all if len(f.split("__")) > 1]308 timestep = sorted(list(set(timestep)), key=float)309310 # Time steps for a particular function311 def _f_timestep(func):312 f_ts = []313 for f in func_all:314 if f.split("__")[0] == func and len(f.split("__")) > 1:315 f_ts.append(f.split("__")[1])316 return f_ts317318 # Given a time-value and a function, return the corresponding XML element319 def _func_at(func, t):320 f_ts = _f_timestep(func)321 # Single time-step functions322 if len(f_ts) == 0:323 ind = func_all.index(func)324 element = deepcopy(func_ele[ind])325326 # For functions containing several time-steps327 # Use the maximum time-step that is <= t328 # If the above set is empty, use the smallest available time-step329 else:330 if t in f_ts:331 t = t332 else:333 f_t_ll_t = [f_t for f_t in f_ts if float(f_t) < float(t)]334 if len(f_t_ll_t) > 0:335 t = max(f_t_ll_t, key=float)336 else:337 t = f_ts[0]338 ind = func_all.index(func + f"__{t}")339 element = deepcopy(func_ele[ind])340 element.attrib["Name"] = func341 return element342343 # Complete grid at a time344 def _time_series_grid(t):345 grid = ET.Element("Grid", Name="Moldflow results", GridType="Uniform")346347 # Append current time value348 grid.append(ET.Element("Time", Value=t))349350 # Append mesh information351 grid.append(deepcopy(geometry))352 grid.append(deepcopy(topology))353354 # Loop on all unique functions355 for func in func_set:356 grid.append(_func_at(func, t))357358 return grid359360 # Loop on all time steps if time-steps exis361 if len(timestep) > 0:362 domain = root.xpath("//Domain")[0]363 timeseries = ET.Element(364 "Grid", Name="TimeSeries", GridType="Collection", CollectionType="Temporal"365 )366 for t in timestep:367 timeseries.append(_time_series_grid(t))368 domain.append(timeseries)369 domain.remove(domain[0])370371 # Output to replace the current file
...
dvars.py
Source:dvars.py
1import nibabel as nib2import numpy as np3from scipy import stats4def load(func_file, mask_file, check4d=True):5 func_img = nib.load(func_file)6 mask_img = nib.load(mask_file)7 mask = mask_img.get_data()8 func = func_img.get_data().astype(np.float)9 if check4d and len(func.shape) != 4:10 raise Exception("Input functional %s should be 4-dimensional" % func_file)11 func = func[mask.nonzero()].T # will have ntpts x nvoxs12 13 #import code14 #code.interact(local=locals())15 16 return(func)17def robust_stdev(func, interp="fraction"):18 """19 Compute robust estimation of standard deviation20 """21 lower_qs = np.percentile(func, 25, axis=0)22 upper_qs = np.percentile(func, 75, axis=0)23 # note: won't work on roxy with scipy == 0.924 #lower_qs = stats.scoreatpercentile(func, 25, interpolation_method=interp, axis=0)25 #upper_qs = stats.scoreatpercentile(func, 75, interpolation_method=interp, axis=0)26 stdev = (upper_qs - lower_qs)/1.34927 return stdev28def ar_nitime(x, order=1, center=False):29 """30 Borrowed from nipy.algorithms.AR_est_YW.31 aka from nitime import algorithms as alg.32 33 We could speed this up by having the autocorr only compute lag1.34 """35 from nitime.lazy import scipy_linalg as linalg36 import nitime.utils as utils37 if center:38 x = x.copy()39 x = x - x.mean()40 r_m = utils.autocorr(x)[:order + 1]41 Tm = linalg.toeplitz(r_m[:order])42 y = r_m[1:]43 ak = linalg.solve(Tm, y)44 return ak[0]45def ar_statsmodels(x, order=(1,0), trend='nc'):46 import statsmodels.api as sm47 arma_mod = sm.tsa.ARMA(x)48 arma_res = arma_mod.fit(order=order, trend=trend, disp=False)49 return arma_res.arparams[0]50def ar1(func, method=ar_nitime):51 func_centered = func - func.mean(0)52 #import code53 #code.interact(local=locals())54 ar_vals = np.apply_along_axis(method, 0, func_centered)55 return ar_vals56def calc_dvars(func, output_all=False, interp="fraction"):57 # Robust standard deviation58 func_sd = robust_stdev(func, interp)59 60 # AR161 func_ar1 = ar1(func)62 # Predicted standard deviation of temporal derivative63 func_sd_pd = np.sqrt(2 * (1 - func_ar1)) * func_sd64 diff_sd_mean= func_sd_pd.mean()65 # Compute temporal difference time series 66 func_deriv = np.diff(func, axis=0)67 # DVARS68 ## (no standardization)69 dvars_plain = func_deriv.std(1, ddof=1) # TODO: Why are we not ^2 this & getting the sqrt?70 ## standardization71 dvars_stdz = dvars_plain/diff_sd_mean72 ## voxelwise standardization73 diff_vx_stdz= func_deriv/func_sd_pd74 dvars_vx_stdz = diff_vx_stdz.std(1, ddof=1)75 76 if output_all:77 out = np.vstack((dvars_stdz, dvars_plain, dvars_vx_stdz))78 else:79 out = dvars_stdz.reshape(len(dvars_stdz), 1)80 81 return out82def calc_mean_dvars(dvars):83 mean_dvars = dvars.mean(0)84 return mean_dvars85def mean_dvars_wrapper(func_file, mask_file, dvars_out_file=None):86 func = load(func_file, mask_file)87 dvars = calc_dvars(func)88 if dvars_out_file:89 np.savetxt(dvars_out_file, dvars, fmt='%.12f')90 mean_d = calc_mean_dvars(dvars)91 return mean_d[0]92def test():93 func = load("sample_func.nii.gz", "sample_func_mask.nii.gz")94 dvars = calc_dvars(func)95 mean_d = calc_mean_dvars(dvars)96 ref_dvars = np.loadtxt("sample_dvars.txt")97 ref_dvars = ref_dvars[:,[1,0,2]]98 99def specific_tests():100 from numpy.testing import *101 102 ffile = "sample_func.nii.gz"103 mfile = "sample_func_mask.nii.gz"104 func = load(ffile, mfile)105 106 # Robust Standard Deviation107 ## Differences in the two approaches exist because python will handle108 ## ties via averaging109 func_sd = robust_stdev(func)110 ref_sd = load("ref_dvars/DVARS-1033--SD.nii.gz", mfile, False)111 print np.abs(func_sd - ref_sd).mean()112 ## so we can fix this issue by changing the interpolation/ties approach113 ## note however that normally we will use interp="fraction"114 func_sd = robust_stdev(func, interp="higher")115 ## test116 assert_allclose(func_sd, ref_sd)117 print np.abs(func_sd - ref_sd).mean()118 119 # AR1120 func_ar1 = ar1(func)121 ref_ar1 = load("ref_dvars/DVARS-1033--AR1.nii.gz", mfile, False)122 ## test123 assert_allclose(func_ar1, ref_ar1, 1e-6, 1e-6)124 print np.abs(func_ar1 - ref_ar1).mean()125 126 # Predicted standard deviation of temporal derivative127 func_sd_pd = np.sqrt(2 * (1 - func_ar1)) * func_sd128 diff_sd_mean= func_sd_pd.mean()129 ref_sd_pd = load("ref_dvars/DVARS-1033--DiffSDhat.nii.gz", mfile, False)130 ## test131 assert_allclose(func_sd_pd, ref_sd_pd, 1e-5, 1e-5)132 print np.abs(func_sd_pd - ref_sd_pd).mean()133 134 # Compute temporal difference time series135 func_deriv = np.diff(func, axis=0)136 ref_deriv = load("ref_dvars/DVARS-1033--Diff.nii.gz", mfile)137 ## test (these should be flipped in sign)138 assert_equal(func_deriv, -1*ref_deriv)139 print np.abs(func_deriv - (-1*ref_deriv)).mean()140 141 # DVARS (no standardization)142 dvars_plain = func_deriv.std(1, ddof=1)143 ref_plain = np.loadtxt("ref_dvars/DVARS-1033--DiffSD.dat")144 print np.abs(dvars_plain - ref_plain).mean()145 print np.vstack((dvars_plain[:10], ref_plain[:10])).T146 print np.vstack((func_deriv.std(1, ddof=1)[:10], ref_plain[:10])).T147 ## it seems like the differences above stem from an incorrect averaging by my modication of DVARS148 func_img = nib.load("ref_dvars/DVARS-1033--Diff.nii.gz")149 func_all = func_img.get_data().astype(np.float)150 func_all = func_all.reshape(np.prod(func_all.shape[:3]), func_all.shape[-1])151 func_all = func_all.T152 print func_all[0,func_all[0,:].nonzero()].std(ddof=1) - ref_plain[0]153 154 # DVARS155 ## (no standardization)156 dvars_plain = func_deriv.std(1, ddof=1) # TODO: Why are we not ^2 this & getting the sqrt?157 ## standardization158 dvars_stdz = dvars_plain/diff_sd_mean159 ## voxelwise standardization160 diff_vx_stdz= func_deriv/func_sd_pd161 dvars_vx_stdz = diff_vx_stdz.std(1, ddof=1)162 ## reference163 ref_dvars = np.loadtxt("sample_dvars.txt")164 ref_dvars = ref_dvars[:,[1,0,2]]165 ## test166 print np.abs(dvars_plain - ref_dvars[:,0]).mean()167 168## Below tests show that ar_nitime is much faster169## so we will use that170# %timeit ar_nitime(func_centered[:,0])...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!