Best Python code snippet using hypothesis
test_utils.py
Source:test_utils.py
1from __future__ import division, print_function, absolute_import2from dipy.utils.six.moves import xrange3import numpy as np4import nose5from dipy.io.bvectxt import orientation_from_string6from dipy.tracking.utils import (affine_for_trackvis, connectivity_matrix,7 density_map, length, move_streamlines,8 ndbincount, reduce_labels,9 reorder_voxels_affine, seeds_from_mask,10 random_seeds_from_mask, target,11 target_line_based, _rmi, unique_rows, near_roi,12 reduce_rois, path_length, flexi_tvis_affine,13 get_flexi_tvis_affine, _min_at)14from dipy.tracking._utils import _to_voxel_coordinates15import dipy.tracking.metrics as metrix16from dipy.tracking.vox2track import streamline_mapping17import numpy.testing as npt18from numpy.testing import assert_array_almost_equal, assert_array_equal19from nose.tools import assert_equal, assert_raises, assert_true20def make_streamlines():21 streamlines = [np.array([[0, 0, 0],22 [1, 1, 1],23 [2, 2, 2],24 [5, 10, 12]], 'float'),25 np.array([[1, 2, 3],26 [3, 2, 0],27 [5, 20, 33],28 [40, 80, 120]], 'float')]29 return streamlines30def test_density_map():31 # One streamline diagonal in volume32 streamlines = [np.array([np.arange(10)] * 3).T]33 shape = (10, 10, 10)34 x = np.arange(10)35 expected = np.zeros(shape)36 expected[x, x, x] = 1.37 dm = density_map(streamlines, vol_dims=shape, voxel_size=(1, 1, 1))38 assert_array_equal(dm, expected)39 # add streamline, make voxel_size smaller. Each streamline should only be40 # counted once, even if multiple points lie in a voxel41 streamlines.append(np.ones((5, 3)))42 shape = (5, 5, 5)43 x = np.arange(5)44 expected = np.zeros(shape)45 expected[x, x, x] = 1.46 expected[0, 0, 0] += 147 dm = density_map(streamlines, vol_dims=shape, voxel_size=(2, 2, 2))48 assert_array_equal(dm, expected)49 # should work with a generator50 dm = density_map(iter(streamlines), vol_dims=shape, voxel_size=(2, 2, 2))51 assert_array_equal(dm, expected)52 # Test passing affine53 affine = np.diag([2, 2, 2, 1.])54 affine[:3, 3] = 1.55 dm = density_map(streamlines, shape, affine=affine)56 assert_array_equal(dm, expected)57 # Shift the image by 2 voxels, ie 4mm58 affine[:3, 3] -= 4.59 expected_old = expected60 new_shape = [i + 2 for i in shape]61 expected = np.zeros(new_shape)62 expected[2:, 2:, 2:] = expected_old63 dm = density_map(streamlines, new_shape, affine=affine)64 assert_array_equal(dm, expected)65def test_to_voxel_coordinates_precision():66 # To simplify tests, use an identity affine. This would be the result of67 # a call to _mapping_to_voxel with another identity affine.68 transfo = np.array([[1.0, 0.0, 0.0],69 [0.0, 1.0, 0.0],70 [0.0, 0.0, 1.0]])71 # Offset is computed by _mapping_to_voxel. With a 1x1x1 dataset72 # having no translation, the offset is half the voxel size, i.e. 0.5.73 offset = np.array([0.5, 0.5, 0.5])74 # Without the added tolerance in _to_voxel_coordinates, this streamline75 # should raise an Error in the call to _to_voxel_coordinates.76 failing_strl = [np.array([[-0.5000001, 0.0, 0.0], [0.0, 1.0, 0.0]],77 dtype=np.float32)]78 indices = _to_voxel_coordinates(failing_strl, transfo, offset)79 expected_indices = np.array([[[0, 0, 0], [0, 1, 0]]])80 assert_array_equal(indices, expected_indices)81def test_connectivity_matrix():82 label_volume = np.array([[[3, 0, 0],83 [0, 0, 0],84 [0, 0, 4]]])85 streamlines = [np.array([[0, 0, 0], [0, 0, 0], [0, 2, 2]], 'float'),86 np.array([[0, 0, 0], [0, 1, 1], [0, 2, 2]], 'float'),87 np.array([[0, 2, 2], [0, 1, 1], [0, 0, 0]], 'float')]88 expected = np.zeros((5, 5), 'int')89 expected[3, 4] = 290 expected[4, 3] = 191 # Check basic Case92 matrix = connectivity_matrix(streamlines, label_volume, (1, 1, 1),93 symmetric=False)94 assert_array_equal(matrix, expected)95 # Test mapping96 matrix, mapping = connectivity_matrix(streamlines, label_volume, (1, 1, 1),97 symmetric=False, return_mapping=True)98 assert_array_equal(matrix, expected)99 assert_equal(mapping[3, 4], [0, 1])100 assert_equal(mapping[4, 3], [2])101 assert_equal(mapping.get((0, 0)), None)102 # Test mapping and symmetric103 matrix, mapping = connectivity_matrix(streamlines, label_volume, (1, 1, 1),104 symmetric=True, return_mapping=True)105 assert_equal(mapping[3, 4], [0, 1, 2])106 # When symmetric only (3,4) is a key, not (4, 3)107 assert_equal(mapping.get((4, 3)), None)108 # expected output matrix is symmetric version of expected109 expected = expected + expected.T110 assert_array_equal(matrix, expected)111 # Test mapping_as_streamlines, mapping dict has lists of streamlines112 matrix, mapping = connectivity_matrix(streamlines, label_volume, (1, 1, 1),113 symmetric=False,114 return_mapping=True,115 mapping_as_streamlines=True)116 assert_true(mapping[3, 4][0] is streamlines[0])117 assert_true(mapping[3, 4][1] is streamlines[1])118 assert_true(mapping[4, 3][0] is streamlines[2])119 # Test passing affine to connectivity_matrix120 expected = matrix121 affine = np.diag([-1, -1, -1, 1.])122 streamlines = [-i for i in streamlines]123 matrix = connectivity_matrix(streamlines, label_volume, affine=affine)124 # In the symmetrical case, the matrix should be, well, symmetric:125 assert_equal(matrix[4, 3], matrix[4, 3])126def test_ndbincount():127 def check(expected):128 assert_equal(bc[0, 0], expected[0])129 assert_equal(bc[0, 1], expected[1])130 assert_equal(bc[1, 0], expected[2])131 assert_equal(bc[2, 2], expected[3])132 x = np.array([[0, 0], [0, 0], [0, 1], [0, 1], [1, 0], [2, 2]]).T133 expected = [2, 2, 1, 1]134 # count occurrences in x135 bc = ndbincount(x)136 assert_equal(bc.shape, (3, 3))137 check(expected)138 # pass in shape139 bc = ndbincount(x, shape=(4, 5))140 assert_equal(bc.shape, (4, 5))141 check(expected)142 # pass in weights143 weights = np.arange(6.)144 weights[-1] = 1.23145 expeceted = [1., 5., 4., 1.23]146 bc = ndbincount(x, weights=weights)147 check(expeceted)148 # raises an error if shape is too small149 assert_raises(ValueError, ndbincount, x, None, (2, 2))150def test_reduce_labels():151 shape = (4, 5, 6)152 # labels from 100 to 220153 labels = np.arange(100, np.prod(shape) + 100).reshape(shape)154 # new labels form 0 to 120, and lookup maps range(0,120) to range(100, 220)155 new_labels, lookup = reduce_labels(labels)156 assert_array_equal(new_labels, labels - 100)157 assert_array_equal(lookup, labels.ravel())158def test_move_streamlines():159 streamlines = make_streamlines()160 affine = np.eye(4)161 new_streamlines = move_streamlines(streamlines, affine)162 for i, test_sl in enumerate(new_streamlines):163 assert_array_equal(test_sl, streamlines[i])164 affine[:3, 3] += (4, 5, 6)165 new_streamlines = move_streamlines(streamlines, affine)166 for i, test_sl in enumerate(new_streamlines):167 assert_array_equal(test_sl, streamlines[i] + (4, 5, 6))168 affine = np.eye(4)169 affine = affine[[2, 1, 0, 3]]170 new_streamlines = move_streamlines(streamlines, affine)171 for i, test_sl in enumerate(new_streamlines):172 assert_array_equal(test_sl, streamlines[i][:, [2, 1, 0]])173 affine[:3, 3] += (4, 5, 6)174 new_streamlines = move_streamlines(streamlines, affine)175 undo_affine = move_streamlines(new_streamlines, np.eye(4),176 input_space=affine)177 for i, test_sl in enumerate(undo_affine):178 assert_array_almost_equal(test_sl, streamlines[i])179 # Test that changing affine does affect moving streamlines180 affineA = affine.copy()181 affineB = affine.copy()182 streamlinesA = move_streamlines(streamlines, affineA)183 streamlinesB = move_streamlines(streamlines, affineB)184 affineB[:] = 0185 for (a, b) in zip(streamlinesA, streamlinesB):186 assert_array_equal(a, b)187def test_target():188 streamlines = [np.array([[0., 0., 0.],189 [1., 0., 0.],190 [2., 0., 0.]]),191 np.array([[0., 0., 0],192 [0, 1., 1.],193 [0, 2., 2.]])]194 _target(target, streamlines, (0, 0, 0), (1, 0, 0), True)195def test_target_lb():196 streamlines = [np.array([[0., 1., 1.],197 [3., 1., 1.]]),198 np.array([[0., 0., 0.],199 [2., 2., 2.]]),200 np.array([[1., 1., 1.]])] # Single-point streamline201 _target(target_line_based, streamlines, (1, 1, 1), (2, 1, 1), False)202def _target(target_f, streamlines, voxel_both_true, voxel_one_true,203 test_bad_points):204 affine = np.eye(4)205 mask = np.zeros((4, 4, 4), dtype=bool)206 # Both pass though207 mask[voxel_both_true] = True208 new = list(target_f(streamlines, mask, affine=affine))209 assert_equal(len(new), 2)210 new = list(target_f(streamlines, mask, affine=affine, include=False))211 assert_equal(len(new), 0)212 # only first213 mask[:] = False214 mask[voxel_one_true] = True215 new = list(target_f(streamlines, mask, affine=affine))216 assert_equal(len(new), 1)217 assert_true(new[0] is streamlines[0])218 new = list(target_f(streamlines, mask, affine=affine, include=False))219 assert_equal(len(new), 1)220 assert_true(new[0] is streamlines[1])221 # Test that bad points raise a value error222 if test_bad_points:223 bad_sl = streamlines + [np.array([[10.0, 10.0, 10.0]])]224 new = target_f(bad_sl, mask, affine=affine)225 assert_raises(ValueError, list, new)226 bad_sl = streamlines + [-np.array([[10.0, 10.0, 10.0]])]227 new = target_f(bad_sl, mask, affine=affine)228 assert_raises(ValueError, list, new)229 # Test smaller voxels230 affine = np.random.random((4, 4)) - .5231 affine[3] = [0, 0, 0, 1]232 streamlines = list(move_streamlines(streamlines, affine))233 new = list(target_f(streamlines, mask, affine=affine))234 assert_equal(len(new), 1)235 assert_true(new[0] is streamlines[0])236 new = list(target_f(streamlines, mask, affine=affine, include=False))237 assert_equal(len(new), 1)238 assert_true(new[0] is streamlines[1])239 # Test that changing mask or affine does not break target/target_line_based240 include = target_f(streamlines, mask, affine=affine)241 exclude = target_f(streamlines, mask, affine=affine, include=False)242 affine[:] = np.eye(4)243 mask[:] = False244 include = list(include)245 exclude = list(exclude)246 assert_equal(len(include), 1)247 assert_true(include[0] is streamlines[0])248 assert_equal(len(exclude), 1)249 assert_true(exclude[0] is streamlines[1])250def test_near_roi():251 streamlines = [np.array([[0., 0., 0.9],252 [1.9, 0., 0.],253 [3, 2., 2.]]),254 np.array([[0.1, 0., 0],255 [0, 1., 1.],256 [0, 2., 2.]]),257 np.array([[2, 2, 2],258 [3, 3, 3]])]259 affine = np.eye(4)260 mask = np.zeros((4, 4, 4), dtype=bool)261 mask[0, 0, 0] = True262 mask[1, 0, 0] = True263 assert_array_equal(near_roi(streamlines, mask, tol=1),264 np.array([True, True, False]))265 assert_array_equal(near_roi(streamlines, mask),266 np.array([False, True, False]))267 # If there is an affine, we need to use it:268 affine[:, 3] = [-1, 100, -20, 1]269 # Transform the streamlines:270 x_streamlines = [sl + affine[:3, 3] for sl in streamlines]271 assert_array_equal(near_roi(x_streamlines, mask, affine=affine, tol=1),272 np.array([True, True, False]))273 assert_array_equal(near_roi(x_streamlines, mask, affine=affine,274 tol=None),275 np.array([False, True, False]))276 # Test for use of the 'all' mode:277 assert_array_equal(near_roi(x_streamlines, mask, affine=affine, tol=None,278 mode='all'), np.array([False, False, False]))279 mask[0, 1, 1] = True280 mask[0, 2, 2] = True281 # Test for use of the 'all' mode, also testing that setting the tolerance282 # to a very small number gets overridden:283 assert_array_equal(near_roi(x_streamlines, mask, affine=affine, tol=0.1,284 mode='all'), np.array([False, True, False]))285 mask[2, 2, 2] = True286 mask[3, 3, 3] = True287 assert_array_equal(near_roi(x_streamlines, mask, affine=affine,288 tol=None,289 mode='all'),290 np.array([False, True, True]))291 # Test for use of endpoints as selection criteria:292 mask = np.zeros((4, 4, 4), dtype=bool)293 mask[0, 1, 1] = True294 mask[3, 2, 2] = True295 assert_array_equal(near_roi(streamlines, mask, tol=0.87,296 mode="either_end"),297 np.array([True, False, False]))298 assert_array_equal(near_roi(streamlines, mask, tol=0.87,299 mode="both_end"),300 np.array([False, False, False]))301 mask[0, 0, 0] = True302 mask[0, 2, 2] = True303 assert_array_equal(near_roi(streamlines, mask, mode="both_end"),304 np.array([False, True, False]))305 # Test with a generator input:306 def generate_sl(streamlines):307 for sl in streamlines:308 yield sl309 assert_array_equal(near_roi(generate_sl(streamlines),310 mask, mode="both_end"),311 np.array([False, True, False]))312def test_voxel_ornt():313 sh = (40, 40, 40)314 sz = (1, 2, 3)315 I4 = np.eye(4)316 ras = orientation_from_string('ras')317 sra = orientation_from_string('sra')318 lpi = orientation_from_string('lpi')319 srp = orientation_from_string('srp')320 affine = reorder_voxels_affine(ras, ras, sh, sz)321 assert_array_equal(affine, I4)322 affine = reorder_voxels_affine(sra, sra, sh, sz)323 assert_array_equal(affine, I4)324 affine = reorder_voxels_affine(lpi, lpi, sh, sz)325 assert_array_equal(affine, I4)326 affine = reorder_voxels_affine(srp, srp, sh, sz)327 assert_array_equal(affine, I4)328 streamlines = make_streamlines()329 box = np.array(sh) * sz330 sra_affine = reorder_voxels_affine(ras, sra, sh, sz)331 toras_affine = reorder_voxels_affine(sra, ras, sh, sz)332 assert_array_equal(np.dot(toras_affine, sra_affine), I4)333 expected_sl = (sl[:, [2, 0, 1]] for sl in streamlines)334 test_sl = move_streamlines(streamlines, sra_affine)335 for ii in xrange(len(streamlines)):336 assert_array_equal(next(test_sl), next(expected_sl))337 lpi_affine = reorder_voxels_affine(ras, lpi, sh, sz)338 toras_affine = reorder_voxels_affine(lpi, ras, sh, sz)339 assert_array_equal(np.dot(toras_affine, lpi_affine), I4)340 expected_sl = (box - sl for sl in streamlines)341 test_sl = move_streamlines(streamlines, lpi_affine)342 for ii in xrange(len(streamlines)):343 assert_array_equal(next(test_sl), next(expected_sl))344 srp_affine = reorder_voxels_affine(ras, srp, sh, sz)345 toras_affine = reorder_voxels_affine(srp, ras, (40, 40, 40), (3, 1, 2))346 assert_array_equal(np.dot(toras_affine, srp_affine), I4)347 expected_sl = [sl.copy() for sl in streamlines]348 for sl in expected_sl:349 sl[:, 1] = box[1] - sl[:, 1]350 expected_sl = (sl[:, [2, 0, 1]] for sl in expected_sl)351 test_sl = move_streamlines(streamlines, srp_affine)352 for ii in xrange(len(streamlines)):353 assert_array_equal(next(test_sl), next(expected_sl))354def test_streamline_mapping():355 streamlines = [np.array([[0, 0, 0], [0, 0, 0], [0, 2, 2]], 'float'),356 np.array([[0, 0, 0], [0, 1, 1], [0, 2, 2]], 'float'),357 np.array([[0, 2, 2], [0, 1, 1], [0, 0, 0]], 'float')]358 mapping = streamline_mapping(streamlines, (1, 1, 1))359 expected = {(0, 0, 0): [0, 1, 2], (0, 2, 2): [0, 1, 2],360 (0, 1, 1): [1, 2]}361 assert_equal(mapping, expected)362 mapping = streamline_mapping(streamlines, (1, 1, 1),363 mapping_as_streamlines=True)364 expected = dict((k, [streamlines[i] for i in indices])365 for k, indices in expected.items())366 assert_equal(mapping, expected)367 # Test passing affine368 affine = np.eye(4)369 affine[:3, 3] = .5370 mapping = streamline_mapping(streamlines, affine=affine,371 mapping_as_streamlines=True)372 assert_equal(mapping, expected)373 # Make the voxel size smaller374 affine = np.diag([.5, .5, .5, 1.])375 affine[:3, 3] = .25376 expected = dict((tuple(i * 2 for i in key), value)377 for key, value in expected.items())378 mapping = streamline_mapping(streamlines, affine=affine,379 mapping_as_streamlines=True)380 assert_equal(mapping, expected)381def test_rmi():382 I1 = _rmi([3, 4], [10, 10])383 assert_equal(I1, 34)384 I1 = _rmi([0, 0], [10, 10])385 assert_equal(I1, 0)386 assert_raises(ValueError, _rmi, [10, 0], [10, 10])387 try:388 from numpy import ravel_multi_index389 except ImportError:390 raise nose.SkipTest()391 # Dtype of random integers is system dependent392 A, B, C, D = np.random.randint(0, 1000, size=[4, 100])393 I1 = _rmi([A, B], dims=[1000, 1000])394 I2 = ravel_multi_index([A, B], dims=[1000, 1000])395 assert_array_equal(I1, I2)396 I1 = _rmi([A, B, C, D], dims=[1000] * 4)397 I2 = ravel_multi_index([A, B, C, D], dims=[1000] * 4)398 assert_array_equal(I1, I2)399 # Check for overflow with small int types400 indices = np.random.randint(0, 255, size=(2, 100))401 dims = (1000, 1000)402 I1 = _rmi(indices, dims=dims)403 I2 = ravel_multi_index(indices, dims=dims)404 assert_array_equal(I1, I2)405def test_affine_for_trackvis():406 voxel_size = np.array([1., 2, 3.])407 affine = affine_for_trackvis(voxel_size)408 origin = np.dot(affine, [0, 0, 0, 1])409 assert_array_almost_equal(origin[:3], voxel_size / 2)410def test_length():411 # Generate a simulated bundle of fibers:412 n_streamlines = 50413 n_pts = 100414 t = np.linspace(-10, 10, n_pts)415 bundle = []416 for i in np.linspace(3, 5, n_streamlines):417 pts = np.vstack((np.cos(2 * t / np.pi), np.zeros(t.shape) + i, t)).T418 bundle.append(pts)419 start = np.random.randint(10, 30, n_streamlines)420 end = np.random.randint(60, 100, n_streamlines)421 bundle = [10 * streamline[start[i]:end[i]] for (i, streamline) in422 enumerate(bundle)]423 bundle_lengths = length(bundle)424 for idx, this_length in enumerate(bundle_lengths):425 assert_equal(this_length, metrix.length(bundle[idx]))426def test_seeds_from_mask():427 mask = np.random.random_integers(0, 1, size=(10, 10, 10))428 seeds = seeds_from_mask(mask, density=1)429 assert_equal(mask.sum(), len(seeds))430 assert_array_equal(np.argwhere(mask), seeds)431 mask[:] = False432 mask[3, 3, 3] = True433 seeds = seeds_from_mask(mask, density=[3, 4, 5])434 assert_equal(len(seeds), 3 * 4 * 5)435 assert_true(np.all((seeds > 2.5) & (seeds < 3.5)))436 mask[4, 4, 4] = True437 seeds = seeds_from_mask(mask, density=[3, 4, 5])438 assert_equal(len(seeds), 2 * 3 * 4 * 5)439 assert_true(np.all((seeds > 2.5) & (seeds < 4.5)))440 in_333 = ((seeds > 2.5) & (seeds < 3.5)).all(1)441 assert_equal(in_333.sum(), 3 * 4 * 5)442 in_444 = ((seeds > 3.5) & (seeds < 4.5)).all(1)443 assert_equal(in_444.sum(), 3 * 4 * 5)444def test_random_seeds_from_mask():445 mask = np.random.random_integers(0, 1, size=(4, 6, 3))446 seeds = random_seeds_from_mask(mask,447 seeds_count=24,448 seed_count_per_voxel=True)449 assert_equal(mask.sum() * 24, len(seeds))450 seeds = random_seeds_from_mask(mask,451 seeds_count=0,452 seed_count_per_voxel=True)453 assert_equal(0, len(seeds))454 mask[:] = False455 mask[2, 2, 2] = True456 seeds = random_seeds_from_mask(mask,457 seeds_count=8,458 seed_count_per_voxel=True)459 assert_equal(mask.sum() * 8, len(seeds))460 assert_true(np.all((seeds > 1.5) & (seeds < 2.5)))461 seeds = random_seeds_from_mask(mask,462 seeds_count=24,463 seed_count_per_voxel=False)464 assert_equal(24, len(seeds))465 seeds = random_seeds_from_mask(mask,466 seeds_count=0,467 seed_count_per_voxel=False)468 assert_equal(0, len(seeds))469 mask[:] = False470 mask[2, 2, 2] = True471 seeds = random_seeds_from_mask(mask,472 seeds_count=100,473 seed_count_per_voxel=False)474 assert_equal(100, len(seeds))475 assert_true(np.all((seeds > 1.5) & (seeds < 2.5)))476def test_connectivity_matrix_shape():477 # Labels: z-planes have labels 0,1,2478 labels = np.zeros((3, 3, 3), dtype=int)479 labels[:, :, 1] = 1480 labels[:, :, 2] = 2481 # Streamline set, only moves between first two z-planes.482 streamlines = [np.array([[0., 0., 0.],483 [0., 0., 0.5],484 [0., 0., 1.]]),485 np.array([[0., 1., 1.],486 [0., 1., 0.5],487 [0., 1., 0.]])]488 matrix = connectivity_matrix(streamlines, labels, affine=np.eye(4))489 assert_equal(matrix.shape, (3, 3))490def test_unique_rows():491 """492 Testing the function unique_coords493 """494 arr = np.array([[1, 2, 3], [1, 2, 3], [2, 3, 4], [3, 4, 5]])495 arr_w_unique = np.array([[1, 2, 3], [2, 3, 4], [3, 4, 5]])496 assert_array_equal(unique_rows(arr), arr_w_unique)497 # Should preserve order:498 arr = np.array([[2, 3, 4], [1, 2, 3], [1, 2, 3], [3, 4, 5]])499 arr_w_unique = np.array([[2, 3, 4], [1, 2, 3], [3, 4, 5]])500 assert_array_equal(unique_rows(arr), arr_w_unique)501 # Should work even with longer arrays:502 arr = np.array([[2, 3, 4], [1, 2, 3], [1, 2, 3], [3, 4, 5],503 [6, 7, 8], [0, 1, 0], [1, 0, 1]])504 arr_w_unique = np.array([[2, 3, 4], [1, 2, 3], [3, 4, 5],505 [6, 7, 8], [0, 1, 0], [1, 0, 1]])506 assert_array_equal(unique_rows(arr), arr_w_unique)507def test_reduce_rois():508 roi1 = np.zeros((4, 4, 4), dtype=np.bool)509 roi2 = np.zeros((4, 4, 4), dtype=np.bool)510 roi1[1, 1, 1] = 1511 roi2[2, 2, 2] = 1512 include_roi, exclude_roi = reduce_rois([roi1, roi2], [True, True])513 npt.assert_equal(include_roi, roi1 + roi2)514 npt.assert_equal(exclude_roi, np.zeros((4, 4, 4)))515 include_roi, exclude_roi = reduce_rois([roi1, roi2], [True, False])516 npt.assert_equal(include_roi, roi1)517 npt.assert_equal(exclude_roi, roi2)518 # Array input:519 include_roi, exclude_roi = reduce_rois(np.array([roi1, roi2]),520 [True, True])521 npt.assert_equal(include_roi, roi1 + roi2)522 npt.assert_equal(exclude_roi, np.zeros((4, 4, 4)))523def test_flexi_tvis_affine():524 sl_vox_order = 'RPI'525 grid_affine = np.array(526 [[-1.08566022e+00, 1.42664334e-03, 2.43463114e-01, 1.34783203e+02],527 [2.43251352e-03, 1.09376717e+00, 1.48301506e-02, -1.07367630e+02],528 [1.33170187e-01, -8.34854878e-03, 1.98454463e+00, -9.98151169e+01],529 [0.00000000e+00, 0.00000000e+00, 0.00000000e+00, 1.00000000e+00]])530 dim = (256, 256, 86)531 voxel_size = np.array([1.09379995, 1.09379995, 1.99947774])532 affine = flexi_tvis_affine(sl_vox_order, grid_affine, dim, voxel_size)533 origin = np.dot(affine, [0, 0, 0, 1])534 assert_array_almost_equal(origin[:3],535 np.multiply(dim, voxel_size) - voxel_size / 2)536def test_get_flexi_tvis_affine():537 tvis_hdr = {'voxel_order': 'RPI', 'dim': (30, 40, 50),538 'voxel_size': [2, 3, 4]}539 grid_affine = np.array([[-2, 0, 0, 0],540 [0, 3, 0, 0],541 [0, 0, 4, 0],542 [0, 0, 0, 1.]])543 affine = get_flexi_tvis_affine(tvis_hdr, grid_affine)544 origin = np.dot(affine, [0, 0, 0, 1])545 vsz = np.array(tvis_hdr['voxel_size'])546 assert_array_almost_equal(origin[:3],547 np.multiply(tvis_hdr['dim'], vsz) - vsz / 2)548 # grid_affine =549 tvis_hdr['voxel_order'] = 'ASL'550 vsz = tvis_hdr['voxel_size'] = np.array([3, 4, 2.])551 affine = get_flexi_tvis_affine(tvis_hdr, grid_affine)552 vox_point = np.array([9, 8, 7])553 trk_point = np.dot(affine, np.append(vox_point, 1))554 assert_array_almost_equal(trk_point[:3],555 (vox_point[[1, 2, 0]] + 0.5) * vsz)556def test_path_length():557 aoi = np.zeros((20, 20, 20), dtype=bool)558 aoi[0, 0, 0] = 1559 # A few tests for basic usage560 x = np.arange(20)561 streamlines = [np.array([x, x, x]).T]562 pl = path_length(streamlines, aoi, affine=np.eye(4))563 expected = x.copy() * np.sqrt(3)564 # expected[0] = np.inf565 npt.assert_array_almost_equal(pl[x, x, x], expected)566 aoi[19, 19, 19] = 1567 pl = path_length(streamlines, aoi, affine=np.eye(4))568 expected = np.minimum(expected, expected[::-1])569 npt.assert_array_almost_equal(pl[x, x, x], expected)570 aoi[19, 19, 19] = 0571 aoi[1, 1, 1] = 1572 pl = path_length(streamlines, aoi, affine=np.eye(4))573 expected = (x - 1) * np.sqrt(3)574 expected[0] = 0575 npt.assert_array_almost_equal(pl[x, x, x], expected)576 z = np.zeros(x.shape, x.dtype)577 streamlines.append(np.array([x, z, z]).T)578 pl = path_length(streamlines, aoi, affine=np.eye(4))579 npt.assert_array_almost_equal(pl[x, x, x], expected)580 npt.assert_array_almost_equal(pl[x, 0, 0], x)581 # Only streamlines that pass through aoi contribute to path length so if582 # all streamlines are duds, plm will be all inf.583 aoi[:] = 0584 aoi[0, 0, 0] = 1585 streamlines = []586 for i in range(1000):587 rando = np.random.random(size=(100, 3)) * 19 + .5588 assert (rando > .5).all()589 assert (rando < 19.5).all()590 streamlines.append(rando)591 pl = path_length(streamlines, aoi, affine=np.eye(4))592 npt.assert_array_almost_equal(pl, -1)593 pl = path_length(streamlines, aoi, affine=np.eye(4), fill_value=-12.)594 npt.assert_array_almost_equal(pl, -12.)595def test_min_at():596 k = np.array([3, 2, 2, 2, 1, 1, 1])597 values = np.array([10., 1, 2, 3, 31, 21, 11])598 i = np.zeros(k.shape, int)599 j = np.zeros(k.shape, int)600 a = np.zeros([1, 1, 4]) + 100.601 _min_at(a, (i, j, k), values)...
results.py
Source:results.py
1# Copyright 2020 Google LLC2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14from sklearn.metrics import roc_auc_score15from sklearn.metrics import average_precision_score16from sklearn.metrics import brier_score_loss17from sklearn.calibration import calibration_curve18import pandas as pd19import numpy as np20import os, sys21import json22np.random.seed(2020)23def config_get(name):24 FILEDIR = os.path.dirname(os.path.realpath(__file__))25 CONFIG_JSON = 'config.json'26 return json.load(open(os.path.join(FILEDIR, CONFIG_JSON)))[name]27WORKDIR = config_get('work-dir')28DATADIR = os.path.join(WORKDIR, 'data')29SLICE_SETS = list(config_get('partitions').values())30TASKS = config_get('labels')31PREDDIR = os.path.join(WORKDIR, 'predictions')32FEATDIR = os.path.join(DATADIR, 'fixedlen')33MODELS = config_get('models-fixedlen')34RESDIR = os.path.join(WORKDIR, 'results')35SLICE_NAMES = config_get('slice-desc')36SLICE_TESTS = { 'mimic-adult' : ['mimic-adult', 'mimic-neonate', 'picdb-paed'],37 'mimic-neonate' : ['mimic-neonate', 'mimic-adult', 'picdb-paed'],38 'picdb-paed' : ['picdb-paed', 'mimic-adult', 'mimic-neonate'],39 'mimic-male' : ['mimic-male', 'mimic-female'],40 'mimic-female' : ['mimic-female', 'mimic-male'],41 'mimic-lt50' : ['mimic-lt50', 'mimic-5060', 'mimic-6070', 'mimic-7080', 'mimic-gt80'] }42METRICS = [ ('AUC', 'ABAR', 'blue'),43 ('ECE', 'EBAR', 'brown'),44 ('OOD', 'OBAR', 'red'),45 ('POOD', 'POBAR', 'black')]46# Expected Calibration Error47def ECE(Y, P, n_bins=10):48 #return brier_score_loss(Y, P)49 P, Y = list(Y), list(P)50 l = 1. * len(Y)51 Y_buckets = [ [], [], [], [], [], [], [], [], [], [] ]52 P_buckets = [ [], [], [], [], [], [], [], [], [], [] ]53 for y, p in zip(Y, P):54 idx = int(p * 10)55 if idx == 10:56 idx = 957 Y_buckets[idx].append(y)58 P_buckets[idx].append(p)59 ece = sum([(len(y_l) / l) *abs(np.mean(y_l) - np.mean(p_l))60 for y_l, p_l in zip(Y_buckets, P_buckets) if len(y_l) > 0])61 return ece62def bootstrap(Y, P, scorefn):63 B = 10064 l = len(Y)65 try:66 Y0 = Y[Y == 0].values67 P0 = P[Y == 0].values68 Y1 = Y[Y == 1].values69 P1 = P[Y == 1].values70 except:71 Y = Y.flatten()72 P = P.flatten()73 Y0 = Y[Y == 0]74 P0 = P[Y == 0]75 Y1 = Y[Y == 1]76 P1 = P[Y == 1]77 l0 = len(Y0)78 l1 = len(Y1)79 choices0 = [np.random.choice(range(l0), l0, replace=True) for _ in range(B)]80 choices1 = [np.random.choice(range(l1), l1, replace=True) for _ in range(B)]81 scores = [scorefn(np.concatenate([Y0[choice0], Y1[choice1]]),82 np.concatenate([P0[choice0], P1[choice1]]))83 for choice0, choice1 in zip(choices0, choices1)]84 return np.mean(scores), 2 * np.std(scores)85def task_print(task):86 d = {'HOSPITAL_EXPIRE_FLAG' : 'Mortality',87 'REMAINING_LOS_3' : 'LoS 3+',88 'REMAINING_LOS_7' : 'LoS 7+' }89 if task in d:90 return d[task]91 return task92def task_desc(task):93 d = {'Mortality' : 'In Hospital Mortality',94 'LoS 3+' : 'Length of Stay 3+ days',95 'LoS 7+' : 'Length of Stay 7+ days'}96 if task in d:97 return d[task]98 return task99def task_BRNN(task):100 d = {'HOSPITAL_EXPIRE_FLAG' : 'mortality',101 'REMAINING_LOS_3' : 'los3',102 'REMAINING_LOS_7' : 'los7' }103 if task in d:104 return d[task]105 return task106def collect(fd, mname):107 for SLICES in SLICE_SETS:108 for train_sl in SLICES:109 try:110 Y_in = pd.read_csv('%s/train-%s/test/%s_Y.csv' % (FEATDIR, train_sl, train_sl))111 P_in = pd.read_csv('%s/%s/train-%s/test/%s_P.csv' % (PREDDIR, mname, train_sl, train_sl))112 except:113 continue114 Y_in['IO'] = 0115 for task in TASKS:116 for test_sl in SLICES:117 try:118 Y = pd.read_csv('%s/train-%s/test/%s_Y.csv' % (FEATDIR, train_sl, test_sl))119 P = pd.read_csv('%s/%s/train-%s/test/%s_P.csv' % (PREDDIR, mname, train_sl, test_sl))120 except:121 continue122 Y['IO'] = 1123 auc, abar = bootstrap(Y[task], P[task], roc_auc_score)124 ece, ebar = bootstrap(Y[task], P[task], ECE)125 prev = Y[task].mean()126 def ood_auroc(Y, P):127 Pfix = P * (1 - prev) / (P + prev - 2 * prev * P)128 return roc_auc_score(Y, Pfix * (1 - Pfix))129 Y_io = Y_in.append(Y)130 P_io = P_in.append(P)131 ood, obar = bootstrap(Y_io['IO'], P_io[task] * (1 - P_io[task]), roc_auc_score)132 pood, pobar = bootstrap(Y_io['IO'], P_io[task], ood_auroc)133 row = '%s,%s,%s,%s,%0.3f,%0.3f,%0.3f,%0.3f,%0.3f,%0.3f,%0.3f,%0.3f' % \134 (mname, train_sl, test_sl, task_print(task), auc, abar, ece, ebar, ood, obar, pood, pobar)135 print(row)136 fd.write(row + '\n')137 pass138def collect_BRNN(fd, basedir, ts='20201210_192808'):139 for SLICES in SLICE_SETS:140 for train_sl in SLICES:141 for task in TASKS:142 try:143 Y_in = np.load('%s/%s/predictions/train-%s/test-%s/%s/test/labels.npy' % \144 (basedir, task_BRNN(task), train_sl, train_sl, ts))145 L_in = np.load('%s/%s/predictions/train-%s/test-%s/%s/test/logits.npy' % \146 (basedir, task_BRNN(task), train_sl, train_sl, ts))147 except:148 continue149 P_in = 1/(1 + np.exp(-L_in))150 for test_sl in SLICES:151 try:152 Y = np.load('%s/%s/predictions/train-%s/test-%s/%s/test/labels.npy' % \153 (basedir, task_BRNN(task), train_sl, test_sl, ts))154 L = np.load('%s/%s/predictions/train-%s/test-%s/%s/test/logits.npy' % \155 (basedir, task_BRNN(task), train_sl, test_sl, ts))156 except:157 continue158 P = 1/(1 + np.exp(-L))159 auc, abar = bootstrap(Y, P, roc_auc_score)160 ece, ebar = bootstrap(Y, P, ECE)161 prev = Y.mean()162 def ood_auroc(Y, P):163 Pfix = P * (1 - prev) / (P + prev - 2 * prev * P)164 return roc_auc_score(Y, Pfix * (1 - Pfix))165 Y_io = np.append(np.zeros(len(Y_in)), np.ones(len(Y)))166 P_io = np.append(P_in, P)167 ood, obar = bootstrap(Y_io, P_io * (1 - P_io), roc_auc_score)168 pood, pobar = bootstrap(Y_io, P_io, ood_auroc)169 row = '%s,%s,%s,%s,%0.3f,%0.3f,%0.3f,%0.3f,%0.3f,%0.3f,%0.3f,%0.3f' % \170 ('BRNN', train_sl, test_sl, task_print(task), auc, abar, ece, ebar, ood, obar, pood, pobar)171 print(row)172 fd.write(row + '\n')173 pass174def create_results_df():175 brnn_dpath = '/tmp/medical_uncertainty/bayesian_rnn'176 brnn_rpath = '%s/BRNN.csv' % RESDIR177 TS = '20201210_192808'178 if not os.path.exists(brnn_rpath) or os.stat(brnn_rpath).st_size < 100:179 fd = open(brnn_rpath, 'w')180 fd.write('Model,Train,Test,Task,AUC,ABAR,ECE,EBAR,OOD,OBAR,POOD,POBAR\n')181 print('Collecting BRNN...')182 collect_BRNN(fd, brnn_dpath, TS)183 fd.close()184 stats = pd.read_csv(brnn_rpath)185 for mname in MODELS:186 rpath = '%s/%s.csv' % (RESDIR, mname)187 if not os.path.exists(rpath) or os.stat(rpath).st_size < 100:188 fd = open(rpath, 'w')189 fd.write('Model,Train,Test,Task,AUC,ABAR,ECE,EBAR,OOD,OBAR,POOD,POBAR\n')190 print('Collecting %s...' % mname)191 collect(fd, mname)192 fd.close()193 df = pd.read_csv(rpath)194 stats = stats.append(df)195 return stats196def gen_latex(df):197 os.makedirs(os.path.join(RESDIR, 'latex'), exist_ok=True)198 fd = open(os.path.join(RESDIR, 'latex', 'results.tex'), 'w')199 TASKNAMES = df['Task'].unique()200 def P(*args):201 line = ' '.join(str(w) for w in args) + '\n'202 fd.write(line)203 hdr = r'''\documentclass{article}204%%\usepackage[landscape]{geometry}205\usepackage{xcolor}206\usepackage{graphicx}207\usepackage{fullpage}208\usepackage{diagbox}209\usepackage{multirow}210\begin{document}211\centering212\tiny213'''214 P(hdr)215 for task in TASKNAMES:216 P(r'\section*{', task_desc(task), '}')217 P(r"\begin{tabular}{|l|l|l|" + ''.join([ 'c|' for _ in MODELS ]) + "}")218 P(r"\hline")219 P(r"Train & Test & Metric & " + ' & '.join(MODELS) + r'\\')220 P(r"\hline")221 for sl, SLICES in SLICE_TESTS.items():222 P(r"\multirow{", len(METRICS) * len(SLICES), "}{*}{\\rotatebox{90}{", SLICE_NAMES[sl], "}}")223 for test_sl in SLICES:224 P(r"& \multirow{", len(METRICS), "}{*}{", SLICE_NAMES[test_sl], "}")225 for i, (metric, bar, color) in enumerate(METRICS):226 if (i > 0):227 P('&')228 P(r'& \color{%s}{%s}' % (color, metric))229 # All stats (across models) to decide which model "wins" the row230 all_stats = []231 for model in MODELS:232 stats = df.query('Train=="%s" and Test=="%s" and Task=="%s" and Model=="%s"' % \233 (sl, test_sl, task, model))234 if stats.shape[0] != 1:235 continue236 all_stats.append(stats.iloc[0][metric])237 for model in MODELS:238 stats = df.query('Train=="%s" and Test=="%s" and Task=="%s" and Model=="%s"' % \239 (sl, test_sl, task, model))240 if stats.shape[0] != 1:241 P('& NA')242 elif sl == test_sl and (metric == 'OOD' or metric == 'POOD'):243 P('& NA')244 else:245 val = stats.iloc[0][metric]246 valbar = stats.iloc[0][bar]247 if metric == 'ECE':248 best = min(all_stats)249 else:250 best = max(all_stats)251 if val != best:252 P(r'& \color{%s}{%0.3f $\pm$ %0.3f}' % (color, val, valbar))253 else:254 P(r'& \textbf{\color{%s}{%0.3f $\pm$ %0.3f}}' % (color, val, valbar))255 P(r"\\")256 P(r"\cline{2-",len(MODELS)+3,"}")257 P(r"\hline")258 P(r"\hline")259 P(r"\end{tabular}")260 P(r"\end{document}")261def main(args):262 os.makedirs(RESDIR, exist_ok=True)263 df = create_results_df()264 df.to_csv('%s/results.csv' % RESDIR)265 gen_latex(df)266if __name__ == '__main__':...
data_preparation.py
Source:data_preparation.py
1import pandas as pd2import random3from configuration import CONSTANTS as C4def data_cls_csv():5 train_sl = pd.read_csv("data/slang_train_10000_split.csv")6 train_st = pd.read_csv("data/standard_train_10000.csv")7 test_sl = pd.read_csv("data/slang_test_10000_split.csv")8 test_st = pd.read_csv("data/standard_test_10000.csv")9 train_sl["label"] = 110 train_st["label"] = 011 test_sl["label"] = 112 test_st["label"] = 013 train_sl = train_sl[['example', 'label']]14 train_st = train_st[['train', 'label']]15 test_sl = test_sl[['example', 'label']]16 test_st = test_st[['test', 'label']]17 train_st.columns = ['example', 'label']18 test_st.columns = ['example', 'label']19 eval_sl = test_sl[:5000]20 eval_st = test_st[:5000]21 test_sl = test_sl[5000:]22 test_st = test_st[5000:]23 trainset = pd.concat([train_sl,train_st], axis = 0).reset_index(drop = True)24 evalset = pd.concat([eval_sl,eval_st], axis = 0).reset_index(drop = True)25 testset = pd.concat([test_sl,test_st], axis = 0).reset_index(drop = True)26 trainset.to_csv('data/train_cls.csv')27 evalset.to_csv('data/eval_cls.csv')28 testset.to_csv('data/test_cls.csv')29def example_gener(word,wordlist):30 s= ""31# print("word",word)32# print("wordlist",wordlist)33 for i in range(len(wordlist)):34# print("i",i)35# print("wordlist[i]",wordlist[i])36 s = s+str(i+1) + ". " + str(word) + " : "+str(wordlist[i])+"\n"37 s = s + str(len(wordlist)+1) + ". " +str(word) + " : "38 return s39def data_trigger_csv():40 filedir = C.DATA_DIR + "slang_augment_50000_updated.csv"41 data_cleaned = pd.read_csv(filedir).sort_values(['word'])42 temp_list = []43 tempword = data_cleaned.iloc[0, 0]44 trigger_list = []45 trigger_word = []46 trigger_len = []47 for i in range(len(data_cleaned)):48 if (data_cleaned.iloc[i, 0] == tempword):49 temp_list.append(data_cleaned.iloc[i, 1])50 else:51 s = example_gener(tempword, temp_list)52 trigger_list.append(s)53 trigger_word.append(tempword)54 trigger_len.append(len(temp_list))55 tempword = data_cleaned.iloc[i, 0]56 temp_list = [data_cleaned.iloc[i, 1]]57 df_trigger = pd.DataFrame(columns = ['word', 'trigger', 'length'])58 df_trigger['word'] = trigger_word59 df_trigger['length'] = trigger_len60 df_trigger['trigger'] = trigger_list61 df_trigger = df_trigger.reset_index()62 df_trigger.to_csv(C.DATA_DIR+'trigger_data.csv', index = False)63def augment_split_csv():64 filedir = C.DATA_DIR + 'augment_result_06251547.csv'65 data_augment = pd.read_csv(filedir, index_col=0) 66 random.seed(122)67 sample_idx = random.sample(range(0, data_augment.shape[0]), k=data_augment.shape[0])68 train_cnt = int(data_augment.shape[0]* 0.8)69 train = data_augment.iloc[sample_idx[:train_cnt]]70 eval = data_augment.iloc[sample_idx[train_cnt:]]71 train.to_csv(C.DATA_DIR + 'augment_train.csv')72 eval.to_csv(C.DATA_DIR + 'augment_eval.csv')73def rsearch_trigger_csv():74 filedir = C.DATA_DIR + 'trigger_data.csv'75 trigger_new = pd.read_csv(filedir, index_col=0)76 random.seed(122)77 sample_idx = random.sample(range(0, trigger_new.shape[0]), k=1000)78 rsearch_trigger = trigger_new.iloc[sample_idx].reset_index(drop = True)79 rsearch_trigger.to_csv(C.DATA_DIR + 'rsearch_trigger.csv')80if __name__ == '__main__':81 data_cls_csv()82 data_trigger_csv()83 # augment_split_csv()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!