Best Python code snippet using autotest_python
state_cvrp.py
Source:state_cvrp.py
1import torch2from typing import NamedTuple3from utils.boolmask import mask_long2bool, mask_long_scatter4class StateCVRP(NamedTuple):5 # Fixed input6 coords: torch.Tensor # Depot + loc7 demand: torch.Tensor8 # If this state contains multiple copies (i.e. beam search) for the same instance, then for memory efficiency9 # the coords and demands tensors are not kept multiple times, so we need to use the ids to index the correct rows.10 ids: torch.Tensor # Keeps track of original fixed data index of rows11 # State12 prev_a: torch.Tensor13 used_capacity: torch.Tensor14 visited_: torch.Tensor # Keeps track of nodes that have been visited15 lengths: torch.Tensor16 cur_coord: torch.Tensor17 i: torch.Tensor # Keeps track of step18 VEHICLE_CAPACITY = 1.0 # Hardcoded19 @property20 def visited(self):21 if self.visited_.dtype == torch.uint8:22 return self.visited_23 else:24 return mask_long2bool(self.visited_, n=self.demand.size(-1))25 @property26 def dist(self):27 return (self.coords[:, :, None, :] - self.coords[:, None, :, :]).norm(p=2, dim=-1)28 def __getitem__(self, key):29 if torch.is_tensor(key) or isinstance(key, slice): # If tensor, idx all tensors by this tensor:30 return self._replace(31 ids=self.ids[key],32 prev_a=self.prev_a[key],33 used_capacity=self.used_capacity[key],34 visited_=self.visited_[key],35 lengths=self.lengths[key],36 cur_coord=self.cur_coord[key],37 )38 return super(StateCVRP, self).__getitem__(key)39 # Warning: cannot override len of NamedTuple, len should be number of fields, not batch size40 # def __len__(self):41 # return len(self.used_capacity)42 @staticmethod43 def initialize(input, visited_dtype=torch.uint8):44 depot = input['depot']45 loc = input['loc']46 demand = input['demand']47 batch_size, n_loc, _ = loc.size()48 return StateCVRP(49 coords=torch.cat((depot[:, None, :], loc), -2),50 demand=demand,51 ids=torch.arange(batch_size, dtype=torch.int64, device=loc.device)[:, None], # Add steps dimension52 prev_a=torch.zeros(batch_size, 1, dtype=torch.long, device=loc.device),53 used_capacity=demand.new_zeros(batch_size, 1),54 visited_=( # Visited as mask is easier to understand, as long more memory efficient55 # Keep visited_ with depot so we can scatter efficiently56 torch.zeros(57 batch_size, 1, n_loc + 1,58 dtype=torch.uint8, device=loc.device59 )60 if visited_dtype == torch.uint861 else torch.zeros(batch_size, 1, (n_loc + 63) // 64, dtype=torch.int64, device=loc.device) # Ceil62 ),63 lengths=torch.zeros(batch_size, 1, device=loc.device),64 cur_coord=input['depot'][:, None, :], # Add step dimension65 i=torch.zeros(1, dtype=torch.int64, device=loc.device) # Vector with length num_steps66 )67 def get_final_cost(self):68 assert self.all_finished()69 return self.lengths + (self.coords[self.ids, 0, :] - self.cur_coord).norm(p=2, dim=-1)70 def update(self, selected):71 assert self.i.size(0) == 1, "Can only update if state represents single step"72 # Update the state73 selected = selected[:, None] # Add dimension for step74 prev_a = selected75 n_loc = self.demand.size(-1) # Excludes depot76 # Add the length77 cur_coord = self.coords[self.ids, selected]78 # cur_coord = self.coords.gather(79 # 1,80 # selected[:, None].expand(selected.size(0), 1, self.coords.size(-1))81 # )[:, 0, :]82 lengths = self.lengths + (cur_coord - self.cur_coord).norm(p=2, dim=-1) # (batch_dim, 1)83 # Not selected_demand is demand of first node (by clamp) so incorrect for nodes that visit depot!84 #selected_demand = self.demand.gather(-1, torch.clamp(prev_a - 1, 0, n_loc - 1))85 selected_demand = self.demand[self.ids, torch.clamp(prev_a - 1, 0, n_loc - 1)]86 # Increase capacity if depot is not visited, otherwise set to 087 #used_capacity = torch.where(selected == 0, 0, self.used_capacity + selected_demand)88 used_capacity = (self.used_capacity + selected_demand) * (prev_a != 0).float()89 if self.visited_.dtype == torch.uint8:90 # Note: here we do not subtract one as we have to scatter so the first column allows scattering depot91 # Add one dimension since we write a single value92 visited_ = self.visited_.scatter(-1, prev_a[:, :, None], 1)93 else:94 # This works, will not set anything if prev_a -1 == -1 (depot)95 visited_ = mask_long_scatter(self.visited_, prev_a - 1)96 return self._replace(97 prev_a=prev_a, used_capacity=used_capacity, visited_=visited_,98 lengths=lengths, cur_coord=cur_coord, i=self.i + 199 )100 def all_finished(self):101 return self.i.item() >= self.demand.size(-1) and self.visited.all()102 def get_finished(self):103 return self.visited.sum(-1) == self.visited.size(-1)104 def get_current_node(self):105 return self.prev_a106 def get_mask(self):107 """108 Gets a (batch_size, n_loc + 1) mask with the feasible actions (0 = depot), depends on already visited and109 remaining capacity. 0 = feasible, 1 = infeasible110 Forbids to visit depot twice in a row, unless all nodes have been visited111 :return:112 """113 if self.visited_.dtype == torch.uint8:114 visited_loc = self.visited_[:, :, 1:]115 else:116 visited_loc = mask_long2bool(self.visited_, n=self.demand.size(-1))117 # For demand steps_dim is inserted by indexing with id, for used_capacity insert node dim for broadcasting118 exceeds_cap = (self.demand[self.ids, :] + self.used_capacity[:, :, None] > self.VEHICLE_CAPACITY)119 # Nodes that cannot be visited are already visited or too much demand to be served now120 mask_loc = visited_loc.to(exceeds_cap.dtype) | exceeds_cap121 # Cannot visit the depot if just visited and still unserved nodes122 mask_depot = (self.prev_a == 0) & ((mask_loc == 0).int().sum(-1) > 0)123 return torch.cat((mask_depot[:, :, None], mask_loc), -1)124 def construct_solutions(self, actions):...
state_sdvrp.py
Source:state_sdvrp.py
1import torch2from typing import NamedTuple3class StateSDVRP(NamedTuple):4 # Fixed input5 coords: torch.Tensor6 demand: torch.Tensor7 # If this state contains multiple copies (i.e. beam search) for the same instance, then for memory efficiency8 # the coords and demands tensors are not kept multiple times, so we need to use the ids to index the correct rows.9 ids: torch.Tensor # Keeps track of original fixed data index of rows10 # State11 prev_a: torch.Tensor12 used_capacity: torch.Tensor13 demands_with_depot: torch.Tensor # Keeps track of remaining demands14 lengths: torch.Tensor15 cur_coord: torch.Tensor16 i: torch.Tensor # Keeps track of step17 VEHICLE_CAPACITY = 1.0 # Hardcoded18 def __getitem__(self, key):19 if torch.is_tensor(key) or isinstance(key, slice): # If tensor, idx all tensors by this tensor:20 return self._replace(21 ids=self.ids[key],22 prev_a=self.prev_a[key],23 used_capacity=self.used_capacity[key],24 demands_with_depot=self.demands_with_depot[key],25 lengths=self.lengths[key],26 cur_coord=self.cur_coord[key],27 )28 return super(StateSDVRP, self).__getitem__(key)29 @staticmethod30 def initialize(input):31 depot = input['depot']32 loc = input['loc']33 demand = input['demand']34 batch_size, n_loc, _ = loc.size()35 return StateSDVRP(36 coords=torch.cat((depot[:, None, :], loc), -2),37 demand=demand,38 ids=torch.arange(batch_size, dtype=torch.int64, device=loc.device)[:, None], # Add steps dimension39 prev_a=torch.zeros(batch_size, 1, dtype=torch.long, device=loc.device),40 used_capacity=demand.new_zeros(batch_size, 1),41 demands_with_depot=torch.cat((42 demand.new_zeros(batch_size, 1),43 demand[:, :]44 ), 1)[:, None, :],45 lengths=torch.zeros(batch_size, 1, device=loc.device),46 cur_coord=input['depot'][:, None, :], # Add step dimension47 i=torch.zeros(1, dtype=torch.int64, device=loc.device) # Vector with length num_steps48 )49 def get_final_cost(self):50 assert self.all_finished()51 return self.lengths + (self.coords[self.ids, 0, :] - self.cur_coord).norm(p=2, dim=-1)52 def update(self, selected):53 assert self.i.size(0) == 1, "Can only update if state represents single step"54 # Update the state55 selected = selected[:, None] # Add dimension for step56 prev_a = selected57 # Add the length58 cur_coord = self.coords[self.ids, selected]59 lengths = self.lengths + (cur_coord - self.cur_coord).norm(p=2, dim=-1) # (batch_dim, 1)60 # Not selected_demand is demand of first node (by clamp) so incorrect for nodes that visit depot!61 selected_demand = self.demands_with_depot.gather(-1, prev_a[:, :, None])[:, :, 0]62 delivered_demand = torch.min(selected_demand, self.VEHICLE_CAPACITY - self.used_capacity)63 # Increase capacity if depot is not visited, otherwise set to 064 #used_capacity = torch.where(selected == 0, 0, self.used_capacity + delivered_demand)65 used_capacity = (self.used_capacity + delivered_demand) * (prev_a != 0).float()66 # demands_with_depot = demands_with_depot.clone()[:, 0, :]67 # Add one dimension since we write a single value68 demands_with_depot = self.demands_with_depot.scatter(69 -1,70 prev_a[:, :, None],71 self.demands_with_depot.gather(-1, prev_a[:, :, None]) - delivered_demand[:, :, None]72 )73 74 return self._replace(75 prev_a=prev_a, used_capacity=used_capacity, demands_with_depot=demands_with_depot,76 lengths=lengths, cur_coord=cur_coord, i=self.i + 177 )78 def all_finished(self):79 return self.i.item() >= self.demands_with_depot.size(-1) and not (self.demands_with_depot > 0).any()80 def get_current_node(self):81 return self.prev_a82 def get_mask(self):83 """84 Gets a (batch_size, n_loc + 1) mask with the feasible actions (0 = depot), depends on already visited and85 remaining capacity. 0 = feasible, 1 = infeasible86 Forbids to visit depot twice in a row, unless all nodes have been visited87 :return:88 """89 # Nodes that cannot be visited are already visited or too much demand to be served now90 mask_loc = (self.demands_with_depot[:, :, 1:] == 0) | (self.used_capacity[:, :, None] >= self.VEHICLE_CAPACITY)91 # Cannot visit the depot if just visited and still unserved nodes92 mask_depot = (self.prev_a == 0) & ((mask_loc == 0).int().sum(-1) > 0)93 return torch.cat((mask_depot[:, :, None], mask_loc), -1)94 def construct_solutions(self, actions):...
1094-car-pooling.py
Source:1094-car-pooling.py
1''' 2[intervals, ]3sol 1. simply go through start~end and check if capacity exceeds4=> O(Nlog(N))5sol 2. Bucket Sort6=> O (max(N,1001))7'''8class Solution:9 def carPooling(self, trips: List[List[int]], capacity: int) -> bool:10 timestamp = []11 for trip in trips:12 timestamp.append([trip[1], trip[0]])13 timestamp.append([trip[2], -trip[0]])14 timestamp.sort()15 # print(timestamp)16 used_capacity = 017 for time, passenger_change in timestamp:18 # print(used_capacity)19 used_capacity += passenger_change20 if used_capacity > capacity:21 return False22 return True23 24# sol 225class Solution:26 def carPooling(self, trips: List[List[int]], capacity: int) -> bool:27 timestamp = [0] * 100128 for trip in trips:29 timestamp[trip[1]] += trip[0]30 timestamp[trip[2]] -= trip[0]31 used_capacity = 032 for passenger_change in timestamp:33 used_capacity += passenger_change34 if used_capacity > capacity:35 return False...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!