Best Python code snippet using ATX
train.py
Source:train.py
...87 labels.update(arc.ilabel for arc in fst.arcs(state))88 reader.next()89 assert not reader.error()90 return labels91def _mktemp(suffix: str) -> str:92 """Creates temporary file with desired suffix.93 Args:94 suffix: the desired suffix for the temporary file.95 Returns:96 Path to a temporary file.97 """98 path = tempfile.mkstemp(suffix=f".{suffix}")[1]99 logging.debug("New temporary file:\t%s", path)100 return path101def _rmtemp(path: str) -> None:102 """Removes temporary file.103 Args:104 path: path to temporary file to be removed.105 """106 logging.debug("Removing temporary file:\t%s", path)107 os.remove(path)108@dataclasses.dataclass109class RandomStart:110 """Struct representing a random start."""111 idx: int112 seed: int113 ifar_path: str114 ofar_path: str115 cg_path: str116 train_opts: List[str]117 def train(self) -> Tuple[str, float]:118 """Trains a single random start.119 Returns:120 A tuple containing the aligner FST path and the negative log likelihood.121 """122 start = time.time()123 # Randomizes the channel model weights.124 rfst_path = _mktemp(f"random-{self.seed:05d}.fst")125 _log_check_call([126 "baumwelchrandomize",127 f"--seed={self.seed}",128 self.cg_path,129 rfst_path,130 ])131 # Trains model and reads likelihood.132 afst_path = _mktemp(f"aligner-{self.seed:05d}.fst")133 likelihood = INF134 cmd = [135 "baumwelchtrain",136 *self.train_opts,137 self.ifar_path,138 self.ofar_path,139 rfst_path,140 afst_path,141 ]142 logging.debug("Subprocess call:\t%s", " ".join(cmd))143 with subprocess.Popen(cmd, stderr=subprocess.PIPE, text=True) as proc:144 for line in proc.stderr: # type: ignore145 match = re.match(146 r"INFO:\s+Iteration\s+(\d+):\s+(-?\d*(\.\d*)?)",147 line.rstrip(),148 )149 assert match, line150 iteration = int(match.group(1))151 likelihood = float(match.group(2))152 logging.debug(153 "start:\t%3d;\titer:\t%3d;\tLL:\t%.4f;\telapsed:\t%3ds",154 self.idx,155 iteration,156 likelihood,157 time.time() - start,158 )159 _rmtemp(rfst_path)160 return afst_path, likelihood161# Major stages.162def _compile_fars(tsv: str, input_token_type: str,163 output_token_type: str) -> Tuple[str, str]:164 """Compiles FAR files and returns their paths.165 Args:166 tsv: path to the data TSV file.167 input_token_type: input token type (one of: "byte", "utf8", or a symbol168 table).169 output_token_type: output token_type (one of: "byte", "utf8", or a symbol170 table).171 Returns:172 A tuple containing the input FAR path and output FAR path.173 """174 with tempfile.NamedTemporaryFile(suffix=".i.txt", mode="w") as itxt:175 with tempfile.NamedTemporaryFile(suffix=".o.txt", mode="w") as otxt:176 with open(tsv, "r") as source:177 for col1, col2 in csv.reader(source, delimiter="\t"):178 print(col1, file=itxt)179 print(col2, file=otxt)180 ifar_path = _mktemp("i.far")181 _log_check_call([182 "farcompilestrings",183 "--fst_type=compact",184 f"--token_type={input_token_type}",185 itxt.name,186 ifar_path,187 ])188 ofar_path = _mktemp("o.far")189 _log_check_call([190 "farcompilestrings",191 "--fst_type=compact",192 f"--token_type={output_token_type}",193 otxt.name,194 ofar_path,195 ])196 # Temporary text files are now deleted.197 return ifar_path, ofar_path198def _compile_cg(ifar_path: str, ofar_path: str, insertions: bool,199 deletions: bool) -> str:200 """Compiles the covering grammar from the input and output FARs.201 Args:202 ifar_path: path to the input FAR.203 ofar_path: path to the output FAR.204 insertions: should insertions be permitted?205 deletions: should deletions be permitted?206 Returns:207 The path to the CG FST.208 """209 ilabels = _get_far_labels(ifar_path)210 olabels = _get_far_labels(ofar_path)211 cg = pywrapfst.VectorFst()212 state = cg.add_state()213 cg.set_start(state)214 one = pywrapfst.Weight.one(cg.weight_type())215 for ilabel, olabel in itertools.product(ilabels, olabels):216 cg.add_arc(state, pywrapfst.Arc(ilabel, olabel, one, state))217 # Handles epsilons, carefully avoiding adding a useless 0:0 label.218 if insertions:219 for olabel in olabels:220 cg.add_arc(state, pywrapfst.Arc(0, olabel, one, state))221 if deletions:222 for ilabel in ilabels:223 cg.add_arc(state, pywrapfst.Arc(ilabel, 0, one, state))224 cg.set_final(state)225 assert cg.verify(), "Label acceptor is ill-formed"226 cg_path = _mktemp("cg.fst")227 cg.write(cg_path)228 return cg_path229def _train_aligner(230 ifar_path: str,231 ofar_path: str,232 cg_path: str,233 seed: int,234 random_starts: int,235 processes: int,236 batch_size: Optional[int] = None,237 delta: float = None,238 alpha: float = None,239 max_iters: Optional[int] = None,240) -> str:241 """Trains the aligner.242 NB: many arguments inherit default values from the `baumwelchtrain` tool.243 Args:244 ifar_path: path to the input FAR.245 ofar_path: path to the output FAR.246 cg_path: path to the convering grammar FST.247 seed: integer random seed.248 random_starts: number of random starts.249 processes: maximum number of processes running concurrently.250 batch_size: batch size (default: from `baumwelchtrain`).251 delta: comparison/quantization delta (default: from `baumwelchtrain`).252 alpha: learning rate (default: from `baumwelchtrain`).253 max_iters: maximum number of iterations (default: from `baumwelchtrain`).254 Returns:255 The path to the aligner FST.256 """257 train_opts: List[str] = []258 if batch_size:259 train_opts.append(f"--batch_size={batch_size}")260 if delta:261 train_opts.append(f"--delta={delta}")262 if alpha:263 train_opts.append(f"--alpha={alpha}")264 if max_iters:265 train_opts.append(f"--max_iters={max_iters}")266 random.seed(seed)267 # Each random start is associated with a randomly chosen unique integer in268 # the range [1, RAND_MAX).269 starts = [270 RandomStart(idx, seed, ifar_path, ofar_path, cg_path, train_opts)271 for idx, seed in enumerate(272 random.sample(range(1, RAND_MAX), random_starts), 1)273 ]274 with multiprocessing.Pool(processes) as pool:275 # Setting chunksize to 1 means that random starts are processed276 # in roughly the order you'd expect.277 pairs = pool.map(RandomStart.train, starts, chunksize=1)278 # Finds best aligner; we `min` because this is in negative log space.279 best_aligner_path, best_likelihood = min(pairs, key=operator.itemgetter(1))280 logging.debug("Best aligner:\t%s", best_aligner_path)281 logging.debug("Best likelihood:\t%.4f", best_likelihood)282 # Deletes suboptimal aligner FSTs.283 for aligner_path, _ in pairs:284 if aligner_path == best_aligner_path:285 continue286 _rmtemp(aligner_path)287 _rmtemp(cg_path)288 return best_aligner_path289def _align(ifar_path: str, ofar_path: str, afst_path: str) -> str:290 """Computes the alignments FAR.291 Args:292 ifar_path: path to the input FAR.293 ofar_path: path to the output FAR.294 afst_path: path to the aligner FST.295 Returns:296 The path to the alignments FAR.297 """298 afar_path = _mktemp("a.far")299 _log_check_call(300 ["baumwelchdecode", ifar_path, ofar_path, afst_path, afar_path])301 _rmtemp(ifar_path)302 _rmtemp(ofar_path)303 return afar_path304def _encode(afar_path: str) -> Tuple[str, str]:305 """Encodes the alignments FAR.306 Args:307 afar_path: path to the alignments FAR.308 Returns:309 A (path to the encoded FAR, path to the encoder) tuple.310 """311 efar_path = _mktemp("e.far")312 encoder_path = _mktemp("encoder")313 _log_check_call(314 ["farencode", "--encode_labels", afar_path, encoder_path, efar_path])315 _rmtemp(afar_path)316 return efar_path, encoder_path317def _compile_pair_ngram(318 efar_path: str,319 encoder_path: str,320 ofst_path: str,321 order: Optional[int] = None,322 size: Optional[int] = None,323) -> None:324 """Compiles the pair n-gram model.325 Args:326 efar_path: path to the encoded FAR.327 encoder_path: path to the encoder.328 ofst_path: path for the pair n-gram FST.329 order: n-gram model order (default: from `ngramcount`).330 size: n-gram model size to prune to (default: no pruning).331 """332 cfst_path = _mktemp("c.fst")333 cmd = ["ngramcount", "--require_symbols=false"]334 if order:335 cmd.append(f"--order={order}")336 cmd.append(efar_path)337 cmd.append(cfst_path)338 _log_check_call(cmd)339 mfst_path = _mktemp("m.fst")340 _log_check_call(["ngrammake", "--method=kneser_ney", cfst_path, mfst_path])341 _rmtemp(cfst_path)342 if size:343 sfst_path = _mktemp("s.fst")344 _log_check_call([345 "ngramshrink",346 "--method=relative_entropy",347 f"--target_number_of_ngrams={size}",348 mfst_path,349 sfst_path,350 ])351 _rmtemp(mfst_path)352 else:353 sfst_path = mfst_path354 _log_check_call(["fstencode", "--decode", sfst_path, encoder_path, ofst_path])355 _rmtemp(encoder_path)356 _rmtemp(sfst_path)357def main(args: argparse.Namespace) -> None:...
json_store.py
Source:json_store.py
...46 def __iter__(self):47 return iter(self._data)48 def __len__(self):49 return len(self._data)50 def _mktemp(self):51 prefix = os.path.basename(self.path) + "."52 dirname = os.path.dirname(self.path)53 return NamedTemporaryFile(mode='w', prefix=prefix, dir=dirname, delete=False)54 def sync(self, json_kw=None, force=False):55 """Atomically write the entire store to disk if it's changed.56 If a dict is passed in as `json_kw`, it will be used as keyword57 arguments to the json module.58 If force is set True, a new file will be written even if the store59 hasn't changed since last sync.60 """61 json_kw = json_kw or self.json_kw62 if self._synced_json_kw != json_kw:63 self._needs_sync = True64 if not (self._needs_sync or force):65 return False66 with self._mktemp() as fp:67 json.dump(self._data, fp, **json_kw)68 if self.mode != MODE_600: # _mktemp uses 0600 by default69 os.chmod(fp.name, self.mode)70 shutil.move(fp.name, self.path)71 self._synced_json_kw = json_kw72 self._needs_sync = False...
test_tempfile.py
Source:test_tempfile.py
...11 super().setUp()12 self.cassette.storage_file = get_datafile_filename(self.id)13 TempFile.set_cassette(self.cassette)14 def testSimple(self):15 output = TempFile._mktemp()16 self.assertIn(17 f"/tmp/{os.path.basename(self.cassette.storage_file)}/static_tmp_1",18 output,19 )20 def testChangeFile(self):21 self.cassette.storage_file = str(self.cassette.storage_file) + ".x"22 output = TempFile._mktemp()23 self.assertIn(24 f"/tmp/{os.path.basename(self.cassette.storage_file)}/static_tmp_1",25 output,26 )27 output = TempFile._mktemp()28 self.assertIn(29 f"/tmp/{os.path.basename(self.cassette.storage_file)}/static_tmp_2",30 output,31 )32 self.cassette.storage_file = str(self.cassette.storage_file) + ".y"33 self.assertEqual(TempFile.counter, 2)34 output = TempFile._mktemp()35 self.assertEqual(TempFile.counter, 1)36 self.assertIn(37 f"/tmp/{os.path.basename(self.cassette.storage_file)}/static_tmp_1",38 output,39 )40 output = TempFile._mktemp()41 self.assertIn(42 f"/tmp/{os.path.basename(self.cassette.storage_file)}/static_tmp_2",43 output,44 )45@replace(what="tempfile.mktemp", decorate=MkTemp.decorator_plain())46def new_tempfile():47 return tempfile.mktemp()48@replace(what="tempfile.mkdtemp", decorate=MkDTemp.decorator_plain())49def new_tempdir():50 return tempfile.mkdtemp()51class TempFile_New(BaseClass):52 def test_tempfile(self):53 """When regeneration, tempfile will change, so change the expected output"""54 filename = new_tempfile()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!