Best Python code snippet using avocado_python
beam_search.py
Source:beam_search.py
1# Copyright 2018 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14# ==============================================================================15"""Beam search to find the translated sequence with the highest probability.16Source implementation from Tensor2Tensor:17https://github.com/tensorflow/tensor2tensor/blob/master/tensor2tensor/utils/beam_search.py18"""19import tensorflow as tf20from tensorflow.python.util import nest21# Default value for INF22INF = 1. * 1e723class _StateKeys(object):24 """Keys to dictionary storing the state of the beam search loop."""25 # Variable storing the loop index.26 CUR_INDEX = "CUR_INDEX"27 # Top sequences that are alive for each batch item. Alive sequences are ones28 # that have not generated an EOS token. Sequences that reach EOS are marked as29 # finished and moved to the FINISHED_SEQ tensor.30 # Has shape [batch_size, beam_size, CUR_INDEX + 1]31 ALIVE_SEQ = "ALIVE_SEQ"32 # Log probabilities of each alive sequence. Shape [batch_size, beam_size]33 ALIVE_LOG_PROBS = "ALIVE_LOG_PROBS"34 # Dictionary of cached values for each alive sequence. The cache stores35 # the encoder output, attention bias, and the decoder attention output from36 # the previous iteration.37 ALIVE_CACHE = "ALIVE_CACHE"38 # Top finished sequences for each batch item.39 # Has shape [batch_size, beam_size, CUR_INDEX + 1]. Sequences that are40 # shorter than CUR_INDEX + 1 are padded with 0s.41 FINISHED_SEQ = "FINISHED_SEQ"42 # Scores for each finished sequence. Score = log probability / length norm43 # Shape [batch_size, beam_size]44 FINISHED_SCORES = "FINISHED_SCORES"45 # Flags indicating which sequences in the finished sequences are finished.46 # At the beginning, all of the sequences in FINISHED_SEQ are filler values.47 # True -> finished sequence, False -> filler. Shape [batch_size, beam_size]48 FINISHED_FLAGS = "FINISHED_FLAGS"49class SequenceBeamSearch(object):50 """Implementation of beam search loop."""51 def __init__(self, symbols_to_logits_fn, vocab_size, batch_size,52 beam_size, alpha, max_decode_length, eos_id):53 self.symbols_to_logits_fn = symbols_to_logits_fn54 self.vocab_size = vocab_size55 self.batch_size = batch_size56 self.beam_size = beam_size57 self.alpha = alpha58 self.max_decode_length = max_decode_length59 self.eos_id = eos_id60 def search(self, initial_ids, initial_cache):61 """Beam search for sequences with highest scores."""62 state, state_shapes = self._create_initial_state(initial_ids, initial_cache)63 finished_state = tf.while_loop(64 self._continue_search, self._search_step, loop_vars=[state],65 shape_invariants=[state_shapes], parallel_iterations=1, back_prop=False)66 finished_state = finished_state[0]67 alive_seq = finished_state[_StateKeys.ALIVE_SEQ]68 alive_log_probs = finished_state[_StateKeys.ALIVE_LOG_PROBS]69 finished_seq = finished_state[_StateKeys.FINISHED_SEQ]70 finished_scores = finished_state[_StateKeys.FINISHED_SCORES]71 finished_flags = finished_state[_StateKeys.FINISHED_FLAGS]72 # Account for corner case where there are no finished sequences for a73 # particular batch item. In that case, return alive sequences for that batch74 # item.75 finished_seq = tf.where(76 tf.reduce_any(finished_flags, 1), finished_seq, alive_seq)77 finished_scores = tf.where(78 tf.reduce_any(finished_flags, 1), finished_scores, alive_log_probs)79 return finished_seq, finished_scores80 def _create_initial_state(self, initial_ids, initial_cache):81 """Return initial state dictionary and its shape invariants.82 Args:83 initial_ids: initial ids to pass into the symbols_to_logits_fn.84 int tensor with shape [batch_size, 1]85 initial_cache: dictionary storing values to be passed into the86 symbols_to_logits_fn.87 Returns:88 state and shape invariant dictionaries with keys from _StateKeys89 """90 # Current loop index (starts at 0)91 cur_index = tf.constant(0)92 # Create alive sequence with shape [batch_size, beam_size, 1]93 alive_seq = _expand_to_beam_size(initial_ids, self.beam_size)94 alive_seq = tf.expand_dims(alive_seq, axis=2)95 # Create tensor for storing initial log probabilities.96 # Assume initial_ids are prob 1.097 initial_log_probs = tf.constant(98 [[0.] + [-float("inf")] * (self.beam_size - 1)])99 alive_log_probs = tf.tile(initial_log_probs, [self.batch_size, 1])100 # Expand all values stored in the dictionary to the beam size, so that each101 # beam has a separate cache.102 alive_cache = nest.map_structure(103 lambda t: _expand_to_beam_size(t, self.beam_size), initial_cache)104 # Initialize tensor storing finished sequences with filler values.105 finished_seq = tf.zeros(tf.shape(alive_seq), tf.int32)106 # Set scores of the initial finished seqs to negative infinity.107 finished_scores = tf.ones([self.batch_size, self.beam_size]) * -INF108 # Initialize finished flags with all False values.109 finished_flags = tf.zeros([self.batch_size, self.beam_size], tf.bool)110 # Create state dictionary111 state = {112 _StateKeys.CUR_INDEX: cur_index,113 _StateKeys.ALIVE_SEQ: alive_seq,114 _StateKeys.ALIVE_LOG_PROBS: alive_log_probs,115 _StateKeys.ALIVE_CACHE: alive_cache,116 _StateKeys.FINISHED_SEQ: finished_seq,117 _StateKeys.FINISHED_SCORES: finished_scores,118 _StateKeys.FINISHED_FLAGS: finished_flags119 }120 # Create state invariants for each value in the state dictionary. Each121 # dimension must be a constant or None. A None dimension means either:122 # 1) the dimension's value is a tensor that remains the same but may123 # depend on the input sequence to the model (e.g. batch size).124 # 2) the dimension may have different values on different iterations.125 state_shape_invariants = {126 _StateKeys.CUR_INDEX: tf.TensorShape([]),127 _StateKeys.ALIVE_SEQ: tf.TensorShape([None, self.beam_size, None]),128 _StateKeys.ALIVE_LOG_PROBS: tf.TensorShape([None, self.beam_size]),129 _StateKeys.ALIVE_CACHE: nest.map_structure(130 _get_shape_keep_last_dim, alive_cache),131 _StateKeys.FINISHED_SEQ: tf.TensorShape([None, self.beam_size, None]),132 _StateKeys.FINISHED_SCORES: tf.TensorShape([None, self.beam_size]),133 _StateKeys.FINISHED_FLAGS: tf.TensorShape([None, self.beam_size])134 }135 return state, state_shape_invariants136 def _continue_search(self, state):137 """Return whether to continue the search loop.138 The loops should terminate when139 1) when decode length has been reached, or140 2) when the worst score in the finished sequences is better than the best141 score in the alive sequences (i.e. the finished sequences are provably142 unchanging)143 Args:144 state: A dictionary with the current loop state.145 Returns:146 Bool tensor with value True if loop should continue, False if loop should147 terminate.148 """149 i = state[_StateKeys.CUR_INDEX]150 alive_log_probs = state[_StateKeys.ALIVE_LOG_PROBS]151 finished_scores = state[_StateKeys.FINISHED_SCORES]152 finished_flags = state[_StateKeys.FINISHED_FLAGS]153 not_at_max_decode_length = tf.less(i, self.max_decode_length)154 # Calculate largest length penalty (the larger penalty, the better score).155 max_length_norm = _length_normalization(self.alpha, self.max_decode_length)156 # Get the best possible scores from alive sequences.157 best_alive_scores = alive_log_probs[:, 0] / max_length_norm158 # Compute worst score in finished sequences for each batch element159 finished_scores *= tf.to_float(finished_flags) # set filler scores to zero160 lowest_finished_scores = tf.reduce_min(finished_scores, axis=1)161 # If there are no finished sequences in a batch element, then set the lowest162 # finished score to -INF for that element.163 finished_batches = tf.reduce_any(finished_flags, 1)164 lowest_finished_scores += (1. - tf.to_float(finished_batches)) * -INF165 worst_finished_score_better_than_best_alive_score = tf.reduce_all(166 tf.greater(lowest_finished_scores, best_alive_scores)167 )168 return tf.logical_and(169 not_at_max_decode_length,170 tf.logical_not(worst_finished_score_better_than_best_alive_score)171 )172 def _search_step(self, state):173 """Beam search loop body.174 Grow alive sequences by a single ID. Sequences that have reached the EOS175 token are marked as finished. The alive and finished sequences with the176 highest log probabilities and scores are returned.177 A sequence's finished score is calculating by dividing the log probability178 by the length normalization factor. Without length normalization, the179 search is more likely to return shorter sequences.180 Args:181 state: A dictionary with the current loop state.182 Returns:183 new state dictionary.184 """185 # Grow alive sequences by one token.186 new_seq, new_log_probs, new_cache = self._grow_alive_seq(state)187 # Collect top beam_size alive sequences188 alive_state = self._get_new_alive_state(new_seq, new_log_probs, new_cache)189 # Combine newly finished sequences with existing finished sequences, and190 # collect the top k scoring sequences.191 finished_state = self._get_new_finished_state(state, new_seq, new_log_probs)192 # Increment loop index and create new state dictionary193 new_state = {_StateKeys.CUR_INDEX: state[_StateKeys.CUR_INDEX] + 1}194 new_state.update(alive_state)195 new_state.update(finished_state)196 return [new_state]197 def _grow_alive_seq(self, state):198 """Grow alive sequences by one token, and collect top 2*beam_size sequences.199 2*beam_size sequences are collected because some sequences may have reached200 the EOS token. 2*beam_size ensures that at least beam_size sequences are201 still alive.202 Args:203 state: A dictionary with the current loop state.204 Returns:205 Tuple of206 (Top 2*beam_size sequences [batch_size, 2 * beam_size, cur_index + 1],207 Scores of returned sequences [batch_size, 2 * beam_size],208 New alive cache, for each of the 2 * beam_size sequences)209 """210 i = state[_StateKeys.CUR_INDEX]211 alive_seq = state[_StateKeys.ALIVE_SEQ]212 alive_log_probs = state[_StateKeys.ALIVE_LOG_PROBS]213 alive_cache = state[_StateKeys.ALIVE_CACHE]214 beams_to_keep = 2 * self.beam_size215 # Get logits for the next candidate IDs for the alive sequences. Get the new216 # cache values at the same time.217 flat_ids = _flatten_beam_dim(alive_seq) # [batch_size * beam_size]218 flat_cache = nest.map_structure(_flatten_beam_dim, alive_cache)219 flat_logits, flat_cache = self.symbols_to_logits_fn(flat_ids, i, flat_cache)220 # Unflatten logits to shape [batch_size, beam_size, vocab_size]221 logits = _unflatten_beam_dim(flat_logits, self.batch_size, self.beam_size)222 new_cache = nest.map_structure(223 lambda t: _unflatten_beam_dim(t, self.batch_size, self.beam_size),224 flat_cache)225 # Convert logits to normalized log probs226 candidate_log_probs = _log_prob_from_logits(logits)227 # Calculate new log probabilities if each of the alive sequences were228 # extended # by the the candidate IDs.229 # Shape [batch_size, beam_size, vocab_size]230 log_probs = candidate_log_probs + tf.expand_dims(alive_log_probs, axis=2)231 # Each batch item has beam_size * vocab_size candidate sequences. For each232 # batch item, get the k candidates with the highest log probabilities.233 flat_log_probs = tf.reshape(log_probs,234 [-1, self.beam_size * self.vocab_size])235 topk_log_probs, topk_indices = tf.nn.top_k(flat_log_probs, k=beams_to_keep)236 # Extract the alive sequences that generate the highest log probabilities237 # after being extended.238 topk_beam_indices = topk_indices // self.vocab_size239 topk_seq, new_cache = _gather_beams(240 [alive_seq, new_cache], topk_beam_indices, self.batch_size,241 beams_to_keep)242 # Append the most probable IDs to the topk sequences243 topk_ids = topk_indices % self.vocab_size244 topk_ids = tf.expand_dims(topk_ids, axis=2)245 topk_seq = tf.concat([topk_seq, topk_ids], axis=2)246 return topk_seq, topk_log_probs, new_cache247 def _get_new_alive_state(self, new_seq, new_log_probs, new_cache):248 """Gather the top k sequences that are still alive.249 Args:250 new_seq: New sequences generated by growing the current alive sequences251 int32 tensor with shape [batch_size, 2 * beam_size, cur_index + 1]252 new_log_probs: Log probabilities of new sequences253 float32 tensor with shape [batch_size, beam_size]254 new_cache: Dict of cached values for each sequence.255 Returns:256 Dictionary with alive keys from _StateKeys:257 {Top beam_size sequences that are still alive (don't end with eos_id)258 Log probabilities of top alive sequences259 Dict cache storing decoder states for top alive sequences}260 """261 # To prevent finished sequences from being considered, set log probs to -INF262 new_finished_flags = tf.equal(new_seq[:, :, -1], self.eos_id)263 new_log_probs += tf.to_float(new_finished_flags) * -INF264 top_alive_seq, top_alive_log_probs, top_alive_cache = _gather_topk_beams(265 [new_seq, new_log_probs, new_cache], new_log_probs, self.batch_size,266 self.beam_size)267 return {268 _StateKeys.ALIVE_SEQ: top_alive_seq,269 _StateKeys.ALIVE_LOG_PROBS: top_alive_log_probs,270 _StateKeys.ALIVE_CACHE: top_alive_cache271 }272 def _get_new_finished_state(self, state, new_seq, new_log_probs):273 """Combine new and old finished sequences, and gather the top k sequences.274 Args:275 state: A dictionary with the current loop state.276 new_seq: New sequences generated by growing the current alive sequences277 int32 tensor with shape [batch_size, beam_size, i + 1]278 new_log_probs: Log probabilities of new sequences279 float32 tensor with shape [batch_size, beam_size]280 Returns:281 Dictionary with finished keys from _StateKeys:282 {Top beam_size finished sequences based on score,283 Scores of finished sequences,284 Finished flags of finished sequences}285 """286 i = state[_StateKeys.CUR_INDEX]287 finished_seq = state[_StateKeys.FINISHED_SEQ]288 finished_scores = state[_StateKeys.FINISHED_SCORES]289 finished_flags = state[_StateKeys.FINISHED_FLAGS]290 # First append a column of 0-ids to finished_seq to increment the length.291 # New shape of finished_seq: [batch_size, beam_size, i + 1]292 finished_seq = tf.concat(293 [finished_seq,294 tf.zeros([self.batch_size, self.beam_size, 1], tf.int32)], axis=2)295 # Calculate new seq scores from log probabilities.296 length_norm = _length_normalization(self.alpha, i + 1)297 new_scores = new_log_probs / length_norm298 # Set the scores of the still-alive seq in new_seq to large negative values.299 new_finished_flags = tf.equal(new_seq[:, :, -1], self.eos_id)300 new_scores += (1. - tf.to_float(new_finished_flags)) * -INF301 # Combine sequences, scores, and flags.302 finished_seq = tf.concat([finished_seq, new_seq], axis=1)303 finished_scores = tf.concat([finished_scores, new_scores], axis=1)304 finished_flags = tf.concat([finished_flags, new_finished_flags], axis=1)305 # Return the finished sequences with the best scores.306 top_finished_seq, top_finished_scores, top_finished_flags = (307 _gather_topk_beams([finished_seq, finished_scores, finished_flags],308 finished_scores, self.batch_size, self.beam_size))309 return {310 _StateKeys.FINISHED_SEQ: top_finished_seq,311 _StateKeys.FINISHED_SCORES: top_finished_scores,312 _StateKeys.FINISHED_FLAGS: top_finished_flags313 }314def sequence_beam_search(315 symbols_to_logits_fn, initial_ids, initial_cache, vocab_size, beam_size,316 alpha, max_decode_length, eos_id):317 """Search for sequence of subtoken ids with the largest probability.318 Args:319 symbols_to_logits_fn: A function that takes in ids, index, and cache as320 arguments. The passed in arguments will have shape:321 ids -> [batch_size * beam_size, index]322 index -> [] (scalar)323 cache -> nested dictionary of tensors [batch_size * beam_size, ...]324 The function must return logits and new cache.325 logits -> [batch * beam_size, vocab_size]326 new cache -> same shape/structure as inputted cache327 initial_ids: Starting ids for each batch item.328 int32 tensor with shape [batch_size]329 initial_cache: dict containing starting decoder variables information330 vocab_size: int size of tokens331 beam_size: int number of beams332 alpha: float defining the strength of length normalization333 max_decode_length: maximum length to decoded sequence334 eos_id: int id of eos token, used to determine when a sequence has finished335 Returns:336 Top decoded sequences [batch_size, beam_size, max_decode_length]337 sequence scores [batch_size, beam_size]338 """339 batch_size = tf.shape(initial_ids)[0]340 sbs = SequenceBeamSearch(symbols_to_logits_fn, vocab_size, batch_size,341 beam_size, alpha, max_decode_length, eos_id)342 return sbs.search(initial_ids, initial_cache)343def _log_prob_from_logits(logits):344 return logits - tf.reduce_logsumexp(logits, axis=2, keep_dims=True)345def _length_normalization(alpha, length):346 """Return length normalization factor."""347 return tf.pow(((5. + tf.to_float(length)) / 6.), alpha)348def _expand_to_beam_size(tensor, beam_size):349 """Tiles a given tensor by beam_size.350 Args:351 tensor: tensor to tile [batch_size, ...]352 beam_size: How much to tile the tensor by.353 Returns:354 Tiled tensor [batch_size, beam_size, ...]355 """356 tensor = tf.expand_dims(tensor, axis=1)357 tile_dims = [1] * tensor.shape.ndims358 tile_dims[1] = beam_size359 return tf.tile(tensor, tile_dims)360def _shape_list(tensor):361 """Return a list of the tensor's shape, and ensure no None values in list."""362 # Get statically known shape (may contain None's for unknown dimensions)363 shape = tensor.get_shape().as_list()364 # Ensure that the shape values are not None365 dynamic_shape = tf.shape(tensor)366 for i in range(len(shape)): # pylint: disable=consider-using-enumerate367 if shape[i] is None:368 shape[i] = dynamic_shape[i]369 return shape370def _get_shape_keep_last_dim(tensor):371 shape_list = _shape_list(tensor)372 # Only the last373 for i in range(len(shape_list) - 1):374 shape_list[i] = None375 if isinstance(shape_list[-1], tf.Tensor):376 shape_list[-1] = None377 return tf.TensorShape(shape_list)378def _flatten_beam_dim(tensor):379 """Reshapes first two dimensions in to single dimension.380 Args:381 tensor: Tensor to reshape of shape [A, B, ...]382 Returns:383 Reshaped tensor of shape [A*B, ...]384 """385 shape = _shape_list(tensor)386 shape[0] *= shape[1]387 shape.pop(1) # Remove beam dim388 return tf.reshape(tensor, shape)389def _unflatten_beam_dim(tensor, batch_size, beam_size):390 """Reshapes first dimension back to [batch_size, beam_size].391 Args:392 tensor: Tensor to reshape of shape [batch_size*beam_size, ...]393 batch_size: Tensor, original batch size.394 beam_size: int, original beam size.395 Returns:396 Reshaped tensor of shape [batch_size, beam_size, ...]397 """398 shape = _shape_list(tensor)399 new_shape = [batch_size, beam_size] + shape[1:]400 return tf.reshape(tensor, new_shape)401def _gather_beams(nested, beam_indices, batch_size, new_beam_size):402 """Gather beams from nested structure of tensors.403 Each tensor in nested represents a batch of beams, where beam refers to a404 single search state (beam search involves searching through multiple states405 in parallel).406 This function is used to gather the top beams, specified by407 beam_indices, from the nested tensors.408 Args:409 nested: Nested structure (tensor, list, tuple or dict) containing tensors410 with shape [batch_size, beam_size, ...].411 beam_indices: int32 tensor with shape [batch_size, new_beam_size]. Each412 value in beam_indices must be between [0, beam_size), and are not413 necessarily unique.414 batch_size: int size of batch415 new_beam_size: int number of beams to be pulled from the nested tensors.416 Returns:417 Nested structure containing tensors with shape418 [batch_size, new_beam_size, ...]419 """420 # Computes the i'th coodinate that contains the batch index for gather_nd.421 # Batch pos is a tensor like [[0,0,0,0,],[1,1,1,1],..].422 batch_pos = tf.range(batch_size * new_beam_size) // new_beam_size423 batch_pos = tf.reshape(batch_pos, [batch_size, new_beam_size])424 # Create coordinates to be passed to tf.gather_nd. Stacking creates a tensor425 # with shape [batch_size, beam_size, 2], where the last dimension contains426 # the (i, j) gathering coordinates.427 coordinates = tf.stack([batch_pos, beam_indices], axis=2)428 return nest.map_structure(429 lambda state: tf.gather_nd(state, coordinates), nested)430def _gather_topk_beams(nested, score_or_log_prob, batch_size, beam_size):431 """Gather top beams from nested structure."""432 _, topk_indexes = tf.nn.top_k(score_or_log_prob, k=beam_size)...
sampling_module.py
Source:sampling_module.py
1# Copyright 2022 The TensorFlow Authors. All Rights Reserved.2#3# Licensed under the Apache License, Version 2.0 (the "License");4# you may not use this file except in compliance with the License.5# You may obtain a copy of the License at6#7# http://www.apache.org/licenses/LICENSE-2.08#9# Unless required by applicable law or agreed to in writing, software10# distributed under the License is distributed on an "AS IS" BASIS,11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.12# See the License for the specific language governing permissions and13# limitations under the License.14"""Sampling module for top_k, top_p and greedy decoding."""15import abc16from typing import Any, Callable, Dict, Optional17import numpy as np18import tensorflow as tf19from official.nlp.modeling.ops import decoding_module20def greedy(log_probs):21 """Returns the top ids and scores based on greedy decoding."""22 log_probs, ids = tf.math.top_k(log_probs, k=1)23 return log_probs, ids24def sample_logits_with_temperature(logits, temperature):25 """Applies a sampling temperature.26 Temperature skews the distribution towards high probability27 tokens and lowers the mass in tail distribution.28 Args:29 logits: Input logits for next token.30 temperature: Tensor for specifying the sampling temperature.31 Returns:32 Logits with applied temperature.33 """34 return logits / temperature35def sample_top_k(logits, top_k):36 """Chooses top_k logits and sets the others to negative infinity.37 Args:38 logits: Input logits for next token.39 top_k: Tensor to specify the top_k values.40 Returns:41 Logits with top_k filtering applied.42 """43 top_k_logits = tf.math.top_k(logits, k=top_k)44 indices_to_remove = logits < tf.expand_dims(top_k_logits[0][..., -1], -1)45 top_k_logits = set_tensor_by_indices_to_value(logits, indices_to_remove,46 np.NINF)47 return top_k_logits48def sample_top_p(logits, top_p):49 """Chooses most probable logits with cumulative probabilities upto top_p.50 Sets the remaining logits to negative infinity.51 Args:52 logits: Input logits for next token.53 top_p: Float tensor with a value >=0 and < 1.054 Returns:55 Logits with top_p filtering applied.56 """57 sorted_indices = tf.argsort(logits, direction="DESCENDING")58 # Flatten logits as tf.gather on TPU needs axis to be compile time constant.59 logits_shape = decoding_module.shape_list(logits)60 range_for_gather = tf.expand_dims(tf.range(0, logits_shape[0]), axis=1)61 range_for_gather = tf.tile(range_for_gather * logits_shape[1],62 [1, logits_shape[1]]) + sorted_indices63 flattened_logits = tf.reshape(logits, [-1])64 flattened_sorted_indices = tf.reshape(range_for_gather, [-1])65 sorted_logits = tf.reshape(66 tf.gather(flattened_logits, flattened_sorted_indices),67 [logits_shape[0], logits_shape[1]])68 cumulative_probs = tf.cumsum(tf.nn.softmax(sorted_logits, axis=-1), axis=-1)69 # Remove tokens with cumulative probability above the threshold.70 sorted_indices_to_remove = cumulative_probs > top_p71 # Shift the indices to the right to keep the first token above threshold.72 sorted_indices_to_remove = tf.roll(sorted_indices_to_remove, 1, axis=-1)73 sorted_indices_to_remove = tf.concat([74 tf.zeros_like(sorted_indices_to_remove[:, :1]),75 sorted_indices_to_remove[:, 1:]76 ], -1)77 # Scatter sorted indices to original indexes.78 indices_to_remove = scatter_values_on_batch_indices(sorted_indices_to_remove,79 sorted_indices)80 top_p_logits = set_tensor_by_indices_to_value(logits, indices_to_remove,81 np.NINF)82 return top_p_logits83def scatter_values_on_batch_indices(values, batch_indices):84 """Scatter `values` into a tensor using `batch_indices`.85 Args:86 values: tensor of shape [batch_size, vocab_size] containing the values to87 scatter88 batch_indices: tensor of shape [batch_size, vocab_size] containing the89 indices to insert (should be a permutation in range(0, n))90 Returns:91 Tensor of shape [batch_size, vocab_size] with values inserted at92 batch_indices93 """94 tensor_shape = decoding_module.shape_list(batch_indices)95 broad_casted_batch_dims = tf.reshape(96 tf.broadcast_to(97 tf.expand_dims(tf.range(tensor_shape[0]), axis=-1), tensor_shape),98 [1, -1])99 pair_indices = tf.transpose(100 tf.concat([broad_casted_batch_dims,101 tf.reshape(batch_indices, [1, -1])], 0))102 return tf.scatter_nd(pair_indices, tf.reshape(values, [-1]), tensor_shape)103def set_tensor_by_indices_to_value(input_tensor, indices, value):104 """Where indices is True, set the value in input_tensor to value.105 Args:106 input_tensor: float (batch_size, dim)107 indices: bool (batch_size, dim)108 value: float scalar109 Returns:110 output_tensor: same shape as input_tensor.111 """112 value_tensor = tf.zeros_like(input_tensor) + value113 output_tensor = tf.where(indices, value_tensor, input_tensor)114 return output_tensor115class SamplingModule(decoding_module.DecodingModule, metaclass=abc.ABCMeta):116 """Implementation for sampling strategies (go/decoding-tf-nlp)."""117 def __init__(self,118 symbols_to_logits_fn,119 vocab_size: int,120 max_decode_length: int,121 eos_id: int,122 padded_decode: bool,123 length_normalization_fn: Optional[Callable[[int, tf.DType],124 float]] = None,125 top_k=0,126 top_p=1.0,127 sample_temperature=0.0,128 enable_greedy: bool = True,129 dtype: tf.DType = tf.float32):130 """Initialize sampling module."""131 self.symbols_to_logits_fn = symbols_to_logits_fn132 self.length_normalization_fn = length_normalization_fn133 self.eos_id = eos_id134 self.padded_decode = padded_decode135 self.dtype = tf.as_dtype(dtype)136 self.vocab_size = tf.convert_to_tensor(vocab_size, dtype=tf.int32)137 self.max_decode_length = max_decode_length138 self.top_k = tf.convert_to_tensor(top_k, dtype=tf.int32)139 self.top_p = tf.convert_to_tensor(top_p, dtype=tf.float32)140 self.sample_temperature = tf.convert_to_tensor(141 sample_temperature, dtype=tf.float32)142 self.enable_greedy = enable_greedy143 super(SamplingModule, self).__init__(144 length_normalization_fn=length_normalization_fn, dtype=dtype)145 def _grow_alive_seq(self,146 state: Dict[str, Any],147 batch_size: int) -> decoding_module.InternalState:148 """Grow alive sequences by one token.149 This function will implement the decoding strategies like top_p, top_k150 and greedy for the choosing the next logit.151 Args:152 state: A dictionary with the current loop state.153 batch_size: The given batch size154 Returns:155 Tuple of156 (Top sequences [batch, curr_index + 1] or [batch, max_decode_length + 1],157 Scores of returned sequences [batch, 1],158 New ids [batch, 1],159 New alive cache)160 """161 i = state[decoding_module.StateKeys.CUR_INDEX]162 alive_seq = state[decoding_module.StateKeys.ALIVE_SEQ]163 alive_log_probs = state[decoding_module.StateKeys.ALIVE_LOG_PROBS]164 alive_cache = state[decoding_module.StateKeys.ALIVE_CACHE]165 if self.padded_decode:166 ids = tf.slice(alive_seq, [0, i], [batch_size, 1])167 else:168 ids = alive_seq169 new_logits, new_cache = self.symbols_to_logits_fn(ids, i, alive_cache)170 candidate_log_probs = decoding_module.log_prob_from_logits(171 new_logits)172 original_log_probs = candidate_log_probs + alive_log_probs173 topk_log_probs, topk_ids = None, None174 if self.enable_greedy:175 topk_log_probs, topk_ids = greedy(original_log_probs)176 else:177 temperature_fn = sample_logits_with_temperature178 sampled_logits = tf.cond(179 self.sample_temperature > 0.0,180 lambda: temperature_fn(new_logits, self.sample_temperature),181 lambda: new_logits)182 sampled_logits = tf.cond(183 self.top_k > 0,184 lambda: sample_top_k(sampled_logits, self.top_k),185 lambda: sampled_logits)186 sampled_logits = tf.cond(187 self.top_p < 1,188 lambda: sample_top_p(sampled_logits, self.top_p),189 lambda: sampled_logits)190 topk_ids = tf.random.categorical(191 sampled_logits, dtype=tf.int32, num_samples=1)192 topk_log_probs = tf.gather(193 original_log_probs, topk_ids, axis=1, batch_dims=1)194 if self.padded_decode:195 topk_seq = tf.transpose(alive_seq, perm=[1, 0])196 topk_seq = tf.tensor_scatter_nd_update(197 topk_seq, [[i + 1]], tf.expand_dims(tf.squeeze(topk_ids, -1), 0))198 topk_seq = tf.transpose(topk_seq, perm=[1, 0])199 else:200 topk_seq = tf.concat([alive_seq, topk_ids], axis=-1)201 return topk_seq, topk_log_probs, topk_ids, new_cache202 def _create_initial_state(self,203 initial_ids: tf.Tensor,204 initial_cache: Dict[str, tf.Tensor],205 batch_size: int) -> decoding_module.InitialState:206 """Return initial state dictionary and its shape invariants."""207 for key, value in initial_cache.items():208 for inner_value in tf.nest.flatten(value):209 if inner_value.dtype != self.dtype:210 raise TypeError(211 "initial_cache element for key '%s' has dtype %s that does not "212 "match sampling_module's dtype of %s. Value: %s" %213 (key, value.dtype.name, self.dtype.name, inner_value))214 # Current loop index (starts at 0)215 cur_index = tf.constant(0)216 # Alive sequence with shape [batch_size, 1]217 alive_seq = initial_ids218 alive_seq = tf.expand_dims(alive_seq, axis=-1)219 if self.padded_decode:220 alive_seq = tf.tile(alive_seq, [1, self.max_decode_length + 1])221 # Initial log probabilities with shape [batch_size, 1].222 initial_log_probs = tf.constant([[0.]], dtype=self.dtype)223 alive_log_probs = tf.tile(initial_log_probs, [batch_size, 1])224 alive_cache = initial_cache225 # Initialize tensor storing finished sequences [batch_size, 1, 1].226 finished_seq = tf.zeros(tf.shape(alive_seq), tf.int32)227 # Set scores of the initial finished seqs to negative infinity.228 finished_scores = tf.zeros([batch_size, 1], dtype=self.dtype)229 # Initialize finished flags with all False values.230 finished_flags = tf.zeros([batch_size, 1], tf.bool)231 # Create state dictionary and state shapes.232 state = {233 decoding_module.StateKeys.CUR_INDEX: cur_index,234 decoding_module.StateKeys.ALIVE_SEQ: alive_seq,235 decoding_module.StateKeys.ALIVE_LOG_PROBS: alive_log_probs,236 decoding_module.StateKeys.ALIVE_CACHE: alive_cache,237 decoding_module.StateKeys.FINISHED_SEQ: finished_seq,238 decoding_module.StateKeys.FINISHED_SCORES: finished_scores,239 decoding_module.StateKeys.FINISHED_FLAGS: finished_flags240 }241 if self.padded_decode:242 state_shape_invariants = {243 decoding_module.StateKeys.CUR_INDEX:244 tf.TensorShape([]),245 decoding_module.StateKeys.ALIVE_SEQ:246 tf.TensorShape(247 [batch_size, self.max_decode_length + 1]),248 decoding_module.StateKeys.ALIVE_LOG_PROBS:249 tf.TensorShape([batch_size, 1]),250 decoding_module.StateKeys.ALIVE_CACHE:251 tf.nest.map_structure(lambda state: state.get_shape(),252 alive_cache),253 decoding_module.StateKeys.FINISHED_SEQ:254 tf.TensorShape(255 [batch_size, self.max_decode_length + 1]),256 decoding_module.StateKeys.FINISHED_SCORES:257 tf.TensorShape([batch_size, 1]),258 decoding_module.StateKeys.FINISHED_FLAGS:259 tf.TensorShape([batch_size, 1])260 }261 else:262 state_shape_invariants = {263 decoding_module.StateKeys.CUR_INDEX:264 tf.TensorShape([]),265 decoding_module.StateKeys.ALIVE_SEQ:266 tf.TensorShape([None, None]),267 decoding_module.StateKeys.ALIVE_LOG_PROBS:268 tf.TensorShape([None, 1]),269 decoding_module.StateKeys.ALIVE_CACHE:270 tf.nest.map_structure(271 decoding_module.get_shape_keep_last_dim,272 alive_cache),273 decoding_module.StateKeys.FINISHED_SEQ:274 tf.TensorShape([None, None]),275 decoding_module.StateKeys.FINISHED_SCORES:276 tf.TensorShape([None, 1]),277 decoding_module.StateKeys.FINISHED_FLAGS:278 tf.TensorShape([None, 1])279 }280 return state, state_shape_invariants281 def _get_new_alive_state(self, new_seq: tf.Tensor, new_log_probs: tf.Tensor,282 new_finished_flags: tf.Tensor,283 new_cache: Dict[str, tf.Tensor]) -> Dict[str, Any]:284 """Gather the sequences that are still alive.285 This function resets the sequences in the alive_state that are finished.286 Args:287 new_seq: New sequences generated by growing the current alive sequences288 int32 tensor with shape [batch_size, cur_index + 1]289 new_log_probs: Log probabilities of new sequences float32 tensor with290 shape [batch_size, 1]291 new_finished_flags: A boolean Tensor indicates which sequences are live292 inside the beam.293 new_cache: Dict of cached values for each sequence.294 Returns:295 Dictionary with alive keys.296 """297 new_seq = tf.multiply(298 new_seq, tf.cast(tf.logical_not(new_finished_flags), new_seq.dtype))299 return {300 decoding_module.StateKeys.ALIVE_SEQ: new_seq,301 decoding_module.StateKeys.ALIVE_LOG_PROBS: new_log_probs,302 decoding_module.StateKeys.ALIVE_CACHE: new_cache303 }304 def _get_new_finished_state(self, state: Dict[str, Any], new_seq: tf.Tensor,305 new_log_probs: tf.Tensor,306 new_finished_flags: tf.Tensor,307 batch_size: int) -> Dict[str, tf.Tensor]:308 """Combine new and old finished sequences.309 Args:310 state: A dictionary with the current loop state.311 new_seq: New sequences generated by growing the current alive sequences312 int32 tensor [batch, curr_index + 1] or [batch, max_decode_length + 1].313 new_log_probs: Log probabilities of new sequences float32 tensor with314 shape [batch, 1].315 new_finished_flags: A boolean Tensor indicates which sequences are live.316 batch_size: The given batch size.317 Returns:318 Dictionary with finished keys from StateKeys.319 """320 i = state[decoding_module.StateKeys.CUR_INDEX]321 finished_seq = state[decoding_module.StateKeys.FINISHED_SEQ]322 finished_scores = state[decoding_module.StateKeys.FINISHED_SCORES]323 finished_flags = state[decoding_module.StateKeys.FINISHED_FLAGS]324 if not self.padded_decode:325 finished_seq = tf.concat(326 [finished_seq, tf.zeros([batch_size, 1], tf.int32)], axis=-1)327 new_scores = new_log_probs328 if self.length_normalization_fn is not None:329 length_norm = self.length_normalization_fn(i + 1, self.dtype)330 new_scores = new_log_probs / length_norm331 new_seq = tf.multiply(332 new_seq, tf.cast(tf.logical_not(finished_flags), new_seq.dtype))333 new_scores = tf.multiply(334 new_scores, tf.cast(tf.logical_not(finished_flags), new_scores.dtype))335 finished_seq += tf.multiply(new_seq,336 tf.cast(new_finished_flags, new_seq.dtype))337 finished_scores += tf.multiply(338 new_scores, tf.cast(new_finished_flags, new_scores.dtype))339 new_finished_flags = tf.logical_or(new_finished_flags, finished_flags)340 return {341 decoding_module.StateKeys.FINISHED_SEQ: finished_seq,342 decoding_module.StateKeys.FINISHED_SCORES: finished_scores,343 decoding_module.StateKeys.FINISHED_FLAGS: new_finished_flags344 }345 def _process_finished_state(346 self, finished_state: Dict[str, Any]) -> decoding_module.Output:347 """Process the alive/finished state to return final sequences and scores."""348 alive_seq = finished_state[decoding_module.StateKeys.ALIVE_SEQ]349 alive_log_probs = finished_state[decoding_module.StateKeys.ALIVE_LOG_PROBS]350 finished_seq = finished_state[decoding_module.StateKeys.FINISHED_SEQ]351 finished_scores = finished_state[decoding_module.StateKeys.FINISHED_SCORES]352 finished_flags = finished_state[decoding_module.StateKeys.FINISHED_FLAGS]353 finished_cond = tf.reduce_any(finished_flags, 1, name="finished_cond")354 if self.length_normalization_fn is not None:355 length_norm = self.length_normalization_fn(self.max_decode_length + 1,356 self.dtype)357 alive_log_probs = alive_log_probs / length_norm358 seq_cond = decoding_module.expand_to_same_rank(finished_cond, finished_seq)359 score_cond = decoding_module.expand_to_same_rank(finished_cond,360 finished_scores)361 finished_seq = tf.where(seq_cond, finished_seq, alive_seq)362 finished_scores = tf.where(score_cond, finished_scores, alive_log_probs)363 return finished_seq, finished_scores364 def _continue_search(self, state) -> tf.Tensor:365 i = state[decoding_module.StateKeys.CUR_INDEX]366 # Have we reached max decoding length?367 not_at_end = tf.less(i, self.max_decode_length)368 # Have all sampled sequences reached an EOS?369 all_has_eos = tf.reduce_all(370 state[decoding_module.StateKeys.FINISHED_FLAGS],371 axis=None,372 name="search_finish_cond")373 return tf.logical_and(not_at_end, tf.logical_not(all_has_eos))374 def _finished_flags(self, topk_ids, state) -> tf.Tensor:375 new_finished_flags = tf.equal(topk_ids, self.eos_id)376 new_finished_flags = tf.logical_or(377 new_finished_flags, state[decoding_module.StateKeys.FINISHED_FLAGS])...
scheduling_algorithms.py
Source:scheduling_algorithms.py
1from components import *2import operator3from abc import ABC, abstractmethod4class Scheduling(ABC):5 def __init__(self):6 self.turnaround = 07 self.finished = True8 def add_process_at_the_end(self):9 self.cur_processes.append(process)10 def add_process_at_the_beginning(self, process: Process):11 self.cur_processes = [process] + self.cur_processes12 def remove_process_at_the_beginning(self):13 self.cur_processes = self.cur_processes[1:]14 def get_first_process(self):15 return self.cur_processes[0]16 def get_arrived_processes(self, t: int):17 while len(self.processes) > 0 and self.processes[0].arrival_time <= t:18 self.cur_processes.append(self.processes[0])19 self.processes = self.processes[1:]20 def set_executed(self, process: Process, ram: RAM):21 for page in process.pages:22 ram.set_executed(page)23 @abstractmethod24 def execute(self):25 pass26class Premptive(Scheduling):27 def __init__(self, quantum: int, overhead: int):28 self.quantum = quantum29 self.overhead = overhead30 self.cnt_quantum = 031 self.cnt_overhead = -132 super().__init__()33class Non_Premptive(Scheduling):34 def __init__(self):35 super().__init__()36class FIFO(Non_Premptive):37 def __init__(self):38 super().__init__()39 # returns the process executed and a flag determining whether it has finished40 def execute(self, waiting_processes: list, t: int, ram: RAM) -> (Process, bool):41 if len(waiting_processes) == 0:42 print("t = ", "empty")43 return Process(-1, -1, -1, -1, -1, -1), False44 front = waiting_processes[0]45 assert front.exec_time != 046 self.set_executed(front, ram)47 if front.exec_time == 1:48 waiting_processes = waiting_processes[1:]49 self.turnaround = self.turnaround + (t - front.arrival_time + 1)50 self.finished = True51 else:52 waiting_processes[0].exec_time = waiting_processes[0].exec_time - 153 self.finished = False54 return front, self.finished55class SJF(Non_Premptive):56 def __init__(self):57 super().__init__()58 def execute(self, waiting_processes: list, t: int, ram: RAM) -> (Process, bool):59 if len(waiting_processes) == 0:60 print("t = ", t, "empty")61 return Process(-1, -1, -1, -1, -1, -1), False62 if self.finished:63 waiting_processes.sort(key=lambda p: p.exec_time)64 self.finished = False65 front = waiting_processes[0]66 assert front.exec_time != 067 self.set_executed(front, ram)68 finished = False69 if front.exec_time == 1:70 # waiting_processes = waiting_processes[1:]71 self.turnaround = self.turnaround + (t - front.arrival_time + 1)72 finished = True73 else:74 waiting_processes[0].exec_time = waiting_processes[0].exec_time - 175 finished = False76 return front, finished77class Round_Robin(Premptive):78 def __init__(self, quantum: int, overhead: int):79 assert quantum >= 1 and overhead >= 180 super().__init__(quantum=quantum, overhead=overhead)81 def execute(self, waiting_processes: list, t: int, ram: RAM) -> (Process, bool):82 # if cnt_overhead is negative, the process is executing83 if self.cnt_overhead >= 0:84 print("\n\n\n\nOVERHEAD\n\n\n\n")85 self.cnt_overhead += 186 if self.cnt_overhead == self.overhead:87 self.cnt_overhead = -188 self.cnt_quantum = 089 return Process(-1, -1, -1, -1, -1, -1), False90 if len(waiting_processes) == 0:91 print("t = ", t, "empty")92 return Process(-1, -1, -1, -1, -1, -1), False93 front = waiting_processes[0]94 assert front.exec_time != 095 self.set_executed(front, ram)96 finished = False97 if front.exec_time == 1:98 # waiting_processes = waiting_processes[1:]99 self.turnaround = self.turnaround + (t - front.arrival_time + 1)100 finished = True101 self.cnt_overhead = -1102 self.cnt_quantum = 0103 else:104 waiting_processes[0].exec_time = waiting_processes[0].exec_time - 1105 finished = False106 self.cnt_quantum += 1107 if self.cnt_quantum == self.quantum:108 self.cnt_overhead = 0109 self.cnt_quantum = 0110 # move process to the end of the queue if it will run again111 if not finished:112 waiting_processes = waiting_processes[1:] + [front]113 # returns a flag to indicates whether the process has finished and this process114 return front, finished115class EDF(Premptive):116 def __init__(self, quantum: int, overhead: int):117 self.quantum = quantum118 self.overhead = overhead119 super().__init__(quantum, overhead)120 def execute(self, waiting_processes: list, t: int, ram: RAM) -> (Process, bool):121 # if cnt_overhead is negative, the process is executing122 if self.cnt_overhead >= 0:123 print("\n\n\n\nOVERHEAD\n\n\n\n")124 self.cnt_overhead += 1125 if self.cnt_overhead == self.overhead:126 self.cnt_overhead = -1127 self.cnt_quantum = 0128 return Process(-1, -1, -1, -1, -1, -1), False129 if len(waiting_processes) == 0:130 print ("t = ", t, "empty")131 return Process(-1, -1, -1, -1, -1, -1), False132 if self.finished:133 waiting_processes.sort(key=lambda p: p.deadline)134 self.finished = False135 front = waiting_processes[0]136 assert front.exec_time != 0137 self.set_executed(front, ram)138 finished = False139 if front.exec_time == 1:140 # waiting_processes = waiting_processes[1:]141 self.turnaround = self.turnaround + (t - front.arrival_time + 1)142 finished = True143 self.cnt_overhead = -1144 self.cnt_quantum = 0145 else:146 waiting_processes[0].exec_time = waiting_processes[0].exec_time - 1147 finished = False148 self.cnt_quantum += 1149 if self.cnt_quantum == self.quantum:150 if not finished:151 self.cnt_overhead = 0152 self.cnt_quantum = 0153 # move process to the end of the queue if it will run again154 if not finished:155 waiting_processes = waiting_processes[1:] + [front]156 # returns a flag to indicates whether the process has finished and this process...
test_item.py
Source:test_item.py
1from django.urls import reverse2from rest_framework import status3from rest_framework.test import APITestCase4from django.contrib.auth.models import User5from .test_todo import create_todo6class ItemTest(APITestCase):7 """Tests API for items."""8 def prepare(self):9 user = User.objects.create_user("test_user4", "test@test.com", "test_password")10 self.client.force_authenticate(user=user)11 to_do_id_1 = create_todo(self.client, "ToDoList1").data["id"]12 to_do_id_2 = create_todo(self.client, "ToDoList2").data["id"]13 return to_do_id_1, to_do_id_214 def get(self, expected_titles, todo_id=None, finished=None):15 url = reverse("ToDoItems-list")16 data = {}17 if finished is not None:18 data["finished"] = finished19 if todo_id is not None:20 data["parent"] = todo_id21 response = self.client.get(url, data, format="json")22 self.assertEqual(response.status_code, status.HTTP_200_OK)23 real_titles = [(d["text"], d["parent"]) for d in response.data["results"]]24 self.assertEqual(real_titles, expected_titles)25 if finished is not None:26 item_status = [data["finished"] for data in response.data["results"]]27 self.assertEqual(finished, all(item_status))28 def post(self, item_text, todo_id, finished=None):29 url = reverse("ToDoItems-list")30 if finished is not None:31 data = {"text": item_text, "parent": todo_id, "finished": finished}32 else:33 data = {"text": item_text, "parent": todo_id}34 response = self.client.post(url, data, format="json")35 self.assertEqual(response.status_code, status.HTTP_201_CREATED)36 check_finished = False if (finished is None) else finished37 self.assertEqual(response.data["text"], item_text)38 self.assertEqual(response.data["parent"], todo_id)39 self.assertEqual(response.data["finished"], check_finished)40 return response.data["id"], response.data["finished"]41 def get_by_id(self, id, text, finished, parent):42 url_with_id = reverse("ToDoItems-detail", args=(id,))43 response = self.client.get(url_with_id, {id: id}, format="json")44 self.assertEqual(response.status_code, status.HTTP_200_OK)45 self.assertEqual(response.data["text"], text)46 self.assertEqual(response.data["finished"], finished)47 self.assertEqual(response.data["parent"], parent)48 def put(self, id, text, parent, finished=None):49 url_with_id = reverse("ToDoItems-detail", args=(id,))50 data = {"text": text, "parent": parent}51 if finished is not None:52 data["finished"] = finished53 response = self.client.put(url_with_id, data, format="json")54 self.assertEqual(response.status_code, status.HTTP_200_OK)55 self.assertEqual(response.data["text"], text)56 self.assertEqual(response.data["parent"], parent)57 if finished is not None:58 self.assertEqual(response.data["finished"], finished)59 def patch(self, id, text=None, finished=None, parent=None):60 url_with_id = reverse("ToDoItems-detail", args=(id,))61 data = {}62 if text is not None:63 data["text"] = text64 if finished is not None:65 data["finished"] = finished66 if parent is not None:67 data["parent"] = parent68 response = self.client.patch(url_with_id, data, format="json")69 self.assertEqual(response.status_code, status.HTTP_200_OK)70 if text is not None:71 self.assertEqual(response.data["text"], text)72 if finished is not None:73 self.assertEqual(response.data["finished"], finished)74 if parent is not None:75 self.assertEqual(response.data["parent"], parent)76 def delete(self, id, title, finished, to_do_id):77 self.get_by_id(id, title, finished, to_do_id)78 url_with_id = reverse("ToDoItems-detail", args=(id,))79 response = self.client.delete(url_with_id, {}, format="json")80 self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT)81 def test_create_delete(self):82 """83 /todo_items/: get, post (create)84 /todo_items/{id}/: get (read), delete85 """86 to_do_id_1, to_do_id_2 = self.prepare()87 self.get([], to_do_id_1)88 item_text_1, item_text_2, item_text_3, item_text_4 = "Item1", "Item2", "Item3", "Item4"89 item_id_1, item_finished_1 = self.post(item_text_1, to_do_id_1)90 self.get([(item_text_1, to_do_id_1)], to_do_id_1)91 item_id_2, item_finished_2 = self.post(item_text_2, to_do_id_1, finished=False)92 self.get([(item_text_1, to_do_id_1), (item_text_2, to_do_id_1)], to_do_id_1)93 item_id_3, item_finished_3 = self.post(item_text_3, to_do_id_1, finished=True)94 self.get(95 [(item_text_1, to_do_id_1), (item_text_2, to_do_id_1), (item_text_3, to_do_id_1)],96 to_do_id_1,97 )98 item_id_4, item_finished_4 = self.post(item_text_4, to_do_id_2, finished=False)99 self.get(100 [101 (item_text_1, to_do_id_1),102 (item_text_2, to_do_id_1),103 (item_text_3, to_do_id_1),104 (item_text_4, to_do_id_2),105 ]106 )107 self.get(108 [(item_text_1, to_do_id_1), (item_text_2, to_do_id_1), (item_text_3, to_do_id_1)],109 to_do_id_1,110 )111 self.get([(item_text_1, to_do_id_1), (item_text_2, to_do_id_1)], to_do_id_1, finished=False)112 self.get([(item_text_3, to_do_id_1)], to_do_id_1, finished=True)113 self.get_by_id(item_id_1, item_text_1, item_finished_1, to_do_id_1)114 self.get_by_id(item_id_2, item_text_2, item_finished_2, to_do_id_1)115 self.get_by_id(item_id_3, item_text_3, item_finished_3, to_do_id_1)116 self.delete(item_id_3, item_text_3, item_finished_3, to_do_id_1)117 self.get([(item_text_1, to_do_id_1), (item_text_2, to_do_id_1)], to_do_id_1)118 def test_update(self):119 """120 /todo_items/{id}/: put (update), patch (partial_update)121 """122 to_do_id_1, to_do_id_2 = self.prepare()123 item_text_1 = "Item1"124 item_id_1, item_finished_1 = self.post(item_text_1, to_do_id_1)125 item_text_1_2 = "Item5"126 self.put(item_id_1, item_text_1_2, to_do_id_2)127 self.put(item_id_1, item_text_1_2, to_do_id_2, finished=False)128 self.put(item_id_1, item_text_1_2, to_do_id_2, finished=True)129 item_text_1_3 = "Item6"130 self.patch(item_id_1, parent=to_do_id_1)131 self.patch(item_id_1, finished=True)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!