Best Python code snippet using tavern
bumps.py
Source:bumps.py
1'''2===============================================3:mod:`gridcells.analysis.bumps` - bump tracking4===============================================5Classes and functions for processing data related to bump attractors.6Classes7-------8.. inheritance-diagram:: gridcells.analysis.bumps9 :parts: 210.. autosummary::11 MLFit12 MLFitList13 MLGaussianFit14 MLGaussianFitList15 SingleBumpPopulation16 SymmetricGaussianParams17Functions18---------19.. autosummary::20 fit_gaussian_tt21 fit_gaussian_bump_tt22 fit_maximum_lh23'''24from __future__ import absolute_import, division, print_function25import collections26import logging27import numpy as np28import scipy.optimize29from . import spikes30from ..core.common import Pair2D, twisted_torus_distance31LOGGER = logging.getLogger(__name__)32class SymmetricGaussianParams(object):33 '''Parameters for the symmetric Gaussian function.'''34 def __init__(self, amplitude, mu_x, mu_y, sigma, err2):35 self.A = amplitude36 self.mu_x = mu_x37 self.mu_y = mu_y38 self.sigma = sigma39 self.err2 = err240class MLFit(object):41 '''Maximum likelihood fit data holer.'''42 def __init__(self, mu, sigma2, ln_lh, err2):43 self.mu = mu44 self.sigma2 = sigma245 self.ln_lh = ln_lh46 self.err2 = err247class MLFitList(MLFit, collections.Sequence):48 '''A container for holding results of maximum likelihood fitting.49 Can be accessed as a Sequence object.50 '''51 def __init__(self, mu=None, sigma2=None, ln_lh=None, err2=None,52 times=None):53 if mu is None:54 mu = []55 if sigma2 is None:56 sigma2 = []57 if ln_lh is None:58 ln_lh = []59 if err2 is None:60 err2 = []61 if times is None:62 times = []63 super(MLFitList, self).__init__(mu, sigma2, ln_lh, err2)64 self.times = times65 if not self._consistent():66 raise ValueError('All input arguments mus have same length')67 def _consistent(self):68 '''Check if the data is consistent.'''69 return len(self.mu) == len(self.sigma2) and \70 len(self.mu) == len(self.ln_lh) and \71 len(self.mu) == len(self.err2) and \72 len(self.mu) == len(self.times)73 def __getitem__(self, key):74 return (MLFit(self.mu[key], self.sigma2[key], self.ln_lh[key],75 self.err2),76 self.times)77 def __len__(self):78 return len(self.mu)79 def append_data(self, d, t):80 '''`d` must be an instance of :class:`MLFit`'''81 if not isinstance(d, MLFit):82 raise TypeError('ML data must be an instance of MLFit')83 self.mu.append(d.mu)84 self.sigma2.append(d.sigma2)85 self.ln_lh.append(d.ln_lh)86 self.err2.append(d.err2)87 self.times.append(t)88class MLGaussianFit(SymmetricGaussianParams):89 '''Gaussian fit performed by applying maximum likelihood estimator.'''90 def __init__(self, amplitude, mu_x, mu_y, sigma, err2, ln_lh,91 lh_precision):92 super(MLGaussianFit, self).__init__(amplitude, mu_x, mu_y, sigma, err2)93 self.ln_lh = ln_lh94 self.lh_precision = lh_precision95class MLGaussianFitList(MLGaussianFit, collections.Sequence):96 '''A container for holding maximum likelihood Gaussian fits.97 Can be accessed as a Sequence.98 '''99 def __init__(self, amplitude=None, mu_x=None, mu_y=None, sigma=None,100 err2=None, ln_lh=None, lh_precision=None, times=None):101 if amplitude is None:102 amplitude = []103 if mu_x is None:104 mu_x = []105 if mu_y is None:106 mu_y = []107 if sigma is None:108 sigma = []109 if err2 is None:110 err2 = []111 if ln_lh is None:112 ln_lh = []113 if lh_precision is None:114 lh_precision = []115 if times is None:116 times = []117 super(MLGaussianFitList, self).__init__(amplitude, mu_x, mu_y, sigma,118 err2, ln_lh,119 lh_precision)120 self.times = times121 if not self._consistent():122 raise ValueError('All input arguments mus have same length')123 def _consistent(self):124 '''Check if the data is consistent.'''125 return \126 len(self.A) == len(self.mu_x) and \127 len(self.A) == len(self.mu_y) and \128 len(self.A) == len(self.sigma) and \129 len(self.A) == len(self.err2) and \130 len(self.A) == len(self.ln_lh) and \131 len(self.A) == len(self.lh_precision) and \132 len(self.A) == len(self.times)133 def append_data(self, d, t):134 '''`d` must be an instance of :class:`MLGaussianFit`'''135 if not isinstance(d, MLGaussianFit):136 raise TypeError('Data must be an instance of MLGaussianFit')137 self.A.append(d.A)138 self.mu_x.append(d.mu_x)139 self.mu_y.append(d.mu_y)140 self.sigma.append(d.sigma)141 self.err2.append(d.err2)142 self.ln_lh.append(d.ln_lh)143 self.lh_precision.append(d.lh_precision)144 self.times.append(t)145 def __getitem__(self, key):146 return MLGaussianFit(self.A[key],147 self.mu_x[key],148 self.mu_y[key],149 self.sigma[key],150 self.err2[key],151 self.ln_lh,152 self.lh_precision), \153 self.times[key]154 def __len__(self):155 return len(self.A) # All same length156def fit_gaussian_tt(sig_f, i):157 r'''Fit a 2D circular Gaussian function to a 2D signal using a maximum158 likelihood estimator.159 The Gaussian is not generic: :math:`\sigma_x = \sigma_y = \sigma`, i.e.160 it is circular only.161 The function fitted looks like this:162 .. math::163 f(\mathbf{X}) = |A| \exp\left\{\frac{-|\mathbf{X} -164 \mathbf{\mu}|^2}{2\sigma^2}\right\}165 where :math:`|\cdot|` is a distance metric on the twisted torus.166 Parameters167 ----------168 sig_f : np.ndarray169 A 2D array that specified the signal to fit the Gaussian onto. The170 dimensions of the torus will be inferred from the shape of `sig_f`:171 (dim.y, dim.x) = `sig_f.shape`.172 i : SymmetricGaussianParams173 Guassian initialisation parameters. The `err2` field will be ignored.174 Returns175 -------176 :class:`MLGaussianFit`177 Estimated values, together with maximum likelihood value and precision178 (inverse variance of noise: *NOT* of the fitted Gaussian).179 '''180 # Fit the Gaussian using least squares181 dim = Pair2D(sig_f.shape[1], sig_f.shape[0])182 X, Y = np.meshgrid( # pylint: disable=unbalanced-tuple-unpacking183 np.arange(dim.x, dtype=np.double),184 np.arange(dim.y, dtype=np.double))185 others = Pair2D(X.flatten(), Y.flatten())186 a = Pair2D(None, None)187 def gaussian_diff(x):188 '''Compute error.'''189 a.x = x[1] # mu_x190 a.y = x[2] # mu_y191 dist = twisted_torus_distance(a, others, dim)192 # A sigma193 # | |194 return (np.abs(x[0]) * np.exp(-dist ** 2 / 2. / x[3] ** 2) -195 sig_f.ravel())196 xest, _ = scipy.optimize.leastsq(gaussian_diff,197 np.array([i.A, i.mu_x, i.mu_y, i.sigma]))198 err2 = gaussian_diff(xest) ** 2199 # Remap the values modulo torus size200 xest[1] = xest[1] % dim.x201 xest[2] = xest[2] % dim.y202 # Compute the log-likelihood203 n = dim.x * dim.y204 aic_correction = 5 # Number of optimized parameters205 beta = 1.0 / (np.mean(err2))206 ln_lh = -beta / 2. * np.sum(err2) + \207 n / 2. * np.log(beta) - \208 n / 2. * np.log(2 * np.pi) - \209 aic_correction210 return MLGaussianFit(xest[0], xest[1], xest[2], xest[3], err2, ln_lh, beta)211def fit_gaussian_bump_tt(sig):212 '''Fit a 2D Gaussian onto a (potential) firing rate bump on the twisted213 torus.214 Parameters215 ----------216 sig : np.ndarray217 2D firing rate map to fit. Axis 0 is the Y position. This will be218 passed directly to :func:`~analysis.image.fit_gaussian_tt`.219 Returns220 -------221 :class:`analysis.image.MLGaussianFit`222 Estimated values of the fit.223 Notes224 -----225 The function initialises the Gaussian fitting parameters to a position at226 the maximum of `sig`.227 '''228 mu0_y, mu0_x = np.unravel_index(np.argmax(sig), sig.shape)229 a0 = sig[mu0_y, mu0_x]230 sigma0 = np.max(sig.shape) / 4.231 init = SymmetricGaussianParams(a0, mu0_x, mu0_y, sigma0, None)232 return fit_gaussian_tt(sig, init)233def fit_maximum_lh(sig):234 '''Fit a maximum likelihood solution under Gaussian noise.235 Parameters236 ----------237 sig : np.ndarray238 A vector containing the samples239 Returns240 fit : MLFit241 Maximum likelihood parameters242 '''243 sig = sig.flatten()244 mu = np.mean(sig)245 sigma2 = np.var(sig)246 err2 = (sig - mu) ** 2247 if sigma2 == 0:248 ln_lh = np.inf249 else:250 n = len(sig)251 aic_correction = 2252 ln_lh = -.5 / sigma2 * np.sum((sig - mu) ** 2) - \253 .5 * n * np.log(sigma2) - \254 .5 * n * np.log(2 * np.pi) - \255 aic_correction256 return MLFit(mu, sigma2, ln_lh, err2)257class SingleBumpPopulation(spikes.TwistedTorusSpikes):258 '''259 A population of neurons that is supposed to form a bump on a twisted torus.260 Parameters261 ----------262 senders : array_like263 A an array of neurons' IDs.264 times : array_like265 An array of spike times. Length must be the same as as <senders>.266 sheet_size : A pair267 A pair of X and Y dimensions of the torus.268 '''269 def __init__(self, senders, times, sheet_size):270 super(SingleBumpPopulation, self).__init__(senders, times, sheet_size)271 def _perform_fit(self, tstart, tend, dt, win_len, fit_callable, list_cls,272 full_err=True):273 '''Perform the fit given the requested ``fit_callable``.'''274 F, Ft = self.sliding_firing_rate(tstart, tend, dt, win_len)275 res = list_cls()276 for tIdx in range(len(Ft)):277 LOGGER.debug('%s:: fitting: %d/%d, %.3f/%.3f ',278 fit_callable.__name__, tIdx + 1, len(Ft), Ft[tIdx],279 Ft[-1])280 fit_params = fit_callable(F[:, :, tIdx])281 if not full_err:282 fit_params.err2 = np.sum(fit_params.err2)283 res.append_data(fit_params, Ft[tIdx])284 return res285 def bump_position(self, tstart, tend, dt, win_len, full_err=True):286 '''Estimate bump positions during the simulation time:287 1. Estimates population firing rate for each bin.288 2. Apply the bump position estimation procedure to each of the289 population activity items.290 Parameters291 ----------292 tstart, tend, dt, win_len : float293 Start and end time, time step, and window length. See also294 :meth:`~gridcells.analysis.spikes.PopulationSpikes.sliding_firing_rate`.295 full_err : bool296 If ``True``, save the full error of fit. Otherwise a sum only.297 Returns298 -------299 pos:list :class:`MLGaussianFitList`300 A list of fitted Gaussian parameters301 Notes302 -----303 This method uses the Maximum likelihood estimator to fit the Gaussian304 function (:meth:`~fit_gaussian_bump_tt`)305 '''306 return self._perform_fit(tstart, tend, dt, win_len,307 fit_gaussian_bump_tt, MLGaussianFitList,308 full_err=full_err)309 def uniform_fit(self, tstart, tend, dt, win_len, full_err=True):310 '''Estimate the mean firing rate using maximum likelihood estimator311 (:func:`~gridcells.analysis.image.fit_maximum_lh`)312 1. Uses :meth:`sliding_firing_rate`.313 2. Apply the estimator.314 Parameters315 ----------316 tstart, tend, dt, win_len317 As in :py:meth:`~analysis.spikes.sliding_firing_rate`.318 full_err : bool319 If ``True``, save the full error of fit. Otherwise a sum only.320 Returns321 -------322 MLFitList323 A list of fitted parameters.324 '''325 return self._perform_fit(tstart, tend, dt, win_len, fit_maximum_lh,...
art_misc.py
Source:art_misc.py
1#!/usr/bin/env python2# Copyright (C) 2002-2018 CERN for the benefit of the ATLAS collaboration3"""Miscellaneous functions."""4__author__ = "Tulay Cuhadar Donszelmann <tcuhadar@cern.ch>"5import concurrent.futures6import logging7import os8import shlex9import subprocess10import sys11from datetime import datetime12MODULE = "art.misc"13EOS_MGM_URL = 'root://eosatlas.cern.ch/'14KByte = 102415MByte = KByte * 102416GByte = MByte * 102417def set_log(kwargs):18 """Set the default log level and message format depending on --verbose or --quiet options."""19 level = logging.DEBUG if kwargs['verbose'] else logging.WARN if kwargs['quiet'] else logging.INFO20 log = logging.getLogger("art")21 log.setLevel(level)22 # create and attach new handler, disable propagation to root logger to avoid double messages23 handler = logging.StreamHandler(sys.stdout)24 format_string = "%(asctime)s %(name)15s.%(funcName)-15s %(levelname)8s %(message)s"25 date_format_string = None26 formatter = logging.Formatter(format_string, date_format_string)27 handler.setFormatter(formatter)28 log.addHandler(handler)29 log.propagate = False30def get_atlas_env():31 """Get all environment variables."""32 log = logging.getLogger(MODULE)33 try:34 nightly_release = os.environ['AtlasBuildBranch']35 project = os.environ['AtlasProject']36 platform = os.environ[project + '_PLATFORM']37 nightly_tag = os.environ['AtlasBuildStamp']38 return (nightly_release, project, platform, nightly_tag)39 except KeyError, e:40 log.critical("Environment variable not set %s", e)41 sys.exit(1)42def run_command(cmd, dir=None, shell=False, env=None, verbose=True):43 """44 Run the given command locally.45 The command runs as separate subprocesses for every piped command.46 Returns tuple of exit_code, output and err.47 """48 # leave at print for basic debugging, log sometimes lost49 start_time = datetime.now()50 if verbose:51 print "Execute:", cmd52 if "|" in cmd:53 cmd_parts = cmd.split('|')54 else:55 cmd_parts = []56 cmd_parts.append(cmd)57 i = 058 p = {}59 for cmd_part in cmd_parts:60 cmd_part = cmd_part.strip()61 if i == 0:62 p[i] = subprocess.Popen(shlex.split(cmd_part), stdin=None, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=dir, shell=shell, env=env)63 else:64 p[i] = subprocess.Popen(shlex.split(cmd_part), stdin=p[i - 1].stdout, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=dir, shell=shell, env=env)65 i = i + 166 (output, err) = p[i - 1].communicate()67 exit_code = p[0].wait()68 end_time = datetime.now()69 return (exit_code, str(output), str(err), cmd, start_time, end_time)70def run_command_parallel(cmd, nthreads, ncores, dir=None, shell=False, env=None, verbose=True):71 """72 Run the given command locally in parallel.73 The command runs as separate subprocesses for every piped command.74 Returns tuple of exit_code, output and err.75 """76 start_time = datetime.now()77 log = logging.getLogger(MODULE)78 ncores = min(ncores, nthreads)79 if env is None:80 env = os.environ.copy()81 env['ArtThreads'] = str(nthreads)82 env['ArtCores'] = str(ncores)83 # Results84 full_exit_code = 085 full_out = ''86 full_err = ''87 # Start88 env['ArtProcess'] = "start"89 (exit_code, out, err, command, start_time_start, end_time_start) = run_command(cmd, dir=dir, shell=shell, env=env, verbose=verbose)90 full_exit_code = full_exit_code if exit_code == 0 else exit_code91 full_out += "-+-art-process start out " + start_time_start.strftime('%Y-%m-%dT%H:%M:%S') + "\n"92 full_out += out93 full_out += "---art-process start out " + end_time_start.strftime('%Y-%m-%dT%H:%M:%S') + "\n"94 full_err += "-+-art-process start err " + start_time_start.strftime('%Y-%m-%dT%H:%M:%S') + "\n"95 full_err += err96 full_err += "---art-process start err " + end_time_start.strftime('%Y-%m-%dT%H:%M:%S') + "\n"97 log.info("Creating executor with cores: %d", ncores)98 executor = concurrent.futures.ThreadPoolExecutor(ncores)99 future_set = []100 # Processing101 log.info("Running threads: %d", nthreads)102 for index in range(nthreads):103 process_env = env.copy()104 process_env['ArtProcess'] = str(index)105 future_set.append(executor.submit(run_command, cmd, dir=dir, shell=shell, env=process_env, verbose=verbose))106 log.info("Waiting for threads to finish...")107 concurrent.futures.wait(future_set)108 for index, future in enumerate(future_set):109 (exit_code, out, err, command, start_time_process, end_time_process) = future.result()110 full_exit_code = full_exit_code if exit_code == 0 else exit_code111 full_out += "-+-art-process " + str(index) + " out " + start_time_process.strftime('%Y-%m-%dT%H:%M:%S') + "\n"112 full_out += out113 full_out += "---art-process " + str(index) + " out " + end_time_process.strftime('%Y-%m-%dT%H:%M:%S') + "\n"114 full_err += "-+-art-process " + str(index) + " err " + start_time_process.strftime('%Y-%m-%dT%H:%M:%S') + "\n"115 full_err += err116 full_err += "---art-process " + str(index) + " err " + end_time_process.strftime('%Y-%m-%dT%H:%M:%S') + "\n"117 # End118 env['ArtProcess'] = "end"119 (exit_code, out, err, command, start_time_end, end_time_end) = run_command(cmd, dir=dir, shell=shell, env=env, verbose=verbose)120 full_exit_code = full_exit_code if exit_code == 0 else exit_code121 full_out += "-+-art-process end out " + start_time_end.strftime('%Y-%m-%dT%H:%M:%S') + "\n"122 full_out += out123 full_out += "---art-process end out " + end_time_end.strftime('%Y-%m-%dT%H:%M:%S') + "\n"124 full_err += "-+-art-process end err " + start_time_end.strftime('%Y-%m-%dT%H:%M:%S') + "\n"125 full_err += err126 full_err += "---art-process end err " + end_time_end.strftime('%Y-%m-%dT%H:%M:%S') + "\n"127 end_time = datetime.now()128 return (full_exit_code, full_out, full_err, cmd, start_time, end_time)129def is_exe(path):130 """Return True if path is executable."""131 return os.path.isfile(path) and os.access(path, os.X_OK)132def make_executable(path):133 """Make file executable (chmod +x)."""134 mode = os.stat(path).st_mode135 mode |= (mode & 0o444) >> 2 # copy R bits to X136 os.chmod(path, mode)137def mkdir(path):138 """Make (missing) directories."""139 log = logging.getLogger(MODULE)140 if path.startswith('/eos'):141 mkdir_cmd = 'eos ' + EOS_MGM_URL + ' mkdir -p'142 else:143 mkdir_cmd = 'mkdir -p'144 if mkdir_cmd is not None:145 (exit_code, out, err, command, start_time, end_time) = run_command(' '.join((mkdir_cmd, path)))146 if exit_code != 0:147 log.error("Mkdir Error: %d %s %s", exit_code, out, err)148 return exit_code149 return 0150def ls(path):151 """List files in directroy."""152 if path.startswith('/eos'):153 ls_cmd = 'eos ' + EOS_MGM_URL + ' ls ' + path + '/'154 else:155 ls_cmd = 'ls ' + path + '/'156 (exit_code, out, err, command, start_time, end_time) = run_command(ls_cmd)157 if exit_code == 0:158 print out159 print err160 return exit_code161def cp(src, dst):162 """Copy files to directory."""163 log = logging.getLogger(MODULE)164 if dst.startswith('/eos'):165 # check which xrdcp we are running166 (exit_code, out, err, command, start_time, end_time) = run_command('which xrdcp')167 print out168 print err169 # check which version of xrdcp we are running170 (exit_code, out, err, command, start_time, end_time) = run_command('xrdcp --version')171 print out172 print err173 cmd = ' '.join(('xrdcp -f -N -r -p -v', src, EOS_MGM_URL + dst + '/'))174 else:175 cmd = ' '.join(('xrdcp -f -N -r -p -v', src, dst + '/'))176 # run the actual command177 log.info("Using: %s", cmd)178 (exit_code, exit_out, exit_err, command, start_time, end_time) = run_command(cmd)179 if exit_code != 0:180 log.error("COPY to DST Error: %d %s %s", exit_code, exit_out, exit_err)181 return exit_code182def count_files(path):183 """Count number of files."""184 log = logging.getLogger(MODULE)185 if path.startswith('/eos'):186 cmd = ' '.join(('eos', EOS_MGM_URL, 'find', path, '|', 'wc', '-l'))187 else:188 cmd = ' '.join(('find', path, '|', 'wc', '-l'))189 (exit_code, out, err, command, start_time, end_time) = run_command(cmd)190 if exit_code == 0:191 nFiles = int(out)192 return nFiles193 log.error("Error retrieving number of files on %s, %s", path, err)194 return -1195def touch(fname, times=None):196 """Touch a file."""197 with open(fname, 'a'):198 os.utime(fname, times)199def rm(fname):200 """Remove a file."""201 try:202 os.remove(fname)203 except OSError:204 pass205def which(program):206 """Show which program is actually found on the PATH."""207 fpath, fname = os.path.split(program)208 if fpath:209 if is_exe(program):210 return program211 else:212 for path in os.environ["PATH"].split(os.pathsep):213 path = path.strip('"')214 exe_file = os.path.join(path, program)215 if is_exe(exe_file):216 return exe_file217 return None218def memory(scale=1):219 """Return free memory."""...
ch4ex4.py
Source:ch4ex4.py
1# Example 4.42# Linearization3from pylab import*4# Clean up5clf()6# Obtain true solution using sympy if available7try:8 from sympy import dsolve, Derivative, symbols9 y, t = symbols("y t", function = True)10 y = dsolve(Derivative(y(t), t) + y(t)*(1 - y(t)), 0, ics={y(0): .5}).rhs11 print("y(t) =", y)12 true_solution = y.evalf(subs = {t:1.})13except ImportError:14 # suppy the true soln15 true_solution = 1./(exp(1.) + 1.)16# Setup schemes17# Linearized Trapeziodal method18lin_trap = lambda y, h: y + h*y*(y - 1.)/(1. - h*(y - 1./2.))19full_trap = lambda y, h: (2./h + 1. - sqrt((2./h + 1.)**2 - 4.*(2./h*y + y*(y - 1.))))/2.20# Collect Data21H = logspace(0., 2.7, 10).round()22STEPS = zeros(len(H))23LIN_ERR = zeros(len(H))24FULL_ERR = zeros(len(H))25for i in range(len(H)):26 N = H[i]27 h = 1./N28 yl = 1./2.29 yf = 1./2.30 for j in range(int(N)):31 yl = lin_trap(yl, h)32 yf = full_trap(yf, h)33 STEPS[i] = N34 LIN_ERR[i] = abs(yl - true_solution)35 FULL_ERR[i] = abs(yf - true_solution)36# Plot37figure(1); clf()38loglog(STEPS, LIN_ERR, "o-", mfc = "none", label = "Trapeziodal")39loglog(STEPS, FULL_ERR, "s-", mfc = "none", label = "Linearized Trapeziodal")40grid("on")41xlabel("N -- Number of Steps", fontsize = 14)42ylabel("Error", fontsize = 14)43legend(loc = 0)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!