Best Python code snippet using ATX
solutions.py
Source:solutions.py
1from jumpscale.clients.explorer.models import NextAction, WorkloadType2from jumpscale.loader import j3from jumpscale.sals.reservation_chatflow.solutions import ChatflowSolutions4from jumpscale.core.base import StoredFactory5from .models import UserPool6class MarketplaceSolutions(ChatflowSolutions):7 def list_network_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):8 networks = j.sals.marketplace.deployer.list_networks(username, next_action=next_action)9 result = []10 for n in networks.values():11 if len(n.network_workloads) == 0:12 continue13 result.append(14 {15 "Name": n.name[len(username) + 1 :],16 "IP Range": n.network_workloads[-1].network_iprange,17 "nodes": {res.info.node_id: res.iprange for res in n.network_workloads},18 "wids": [res.id for res in n.network_workloads],19 }20 )21 return result22 def list_ubuntu_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):23 return self._list_single_container_solution("ubuntu", next_action, sync, owner=username)24 def list_peertube_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):25 return self._list_proxied_solution("peertube", next_action, sync, owner=username)26 def list_discourse_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):27 return self._list_proxied_solution("discourse", next_action, sync, owner=username)28 def list_taiga_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):29 return self._list_proxied_solution("taiga", next_action, sync, "nginx", owner=username)30 def list_flist_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):31 return self._list_single_container_solution("flist", next_action, sync, owner=username)32 def list_gitea_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):33 return self._list_proxied_solution("gitea", next_action, sync, "nginx", owner=username)34 def list_mattermost_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):35 return self._list_proxied_solution("mattermost", next_action, sync, "nginx", owner=username)36 def list_meetings_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):37 return self._list_proxied_solution("meetings", next_action, sync, "nginx", owner=username)38 def list_publisher_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):39 return self._list_proxied_solution("publisher", next_action, sync, None, owner=username)40 def list_wiki_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):41 return self._list_proxied_solution("wiki", next_action, sync, None, owner=username)42 def list_blog_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):43 return self._list_proxied_solution("blog", next_action, sync, None, owner=username)44 def list_website_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):45 return self._list_proxied_solution("website", next_action, sync, None, owner=username)46 def list_threebot_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):47 return self._list_proxied_solution("threebot", next_action, sync, owner=username)48 def list_gollum_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):49 return self._list_single_container_solution("gollum", next_action, sync, owner=username)50 def list_cryptpad_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):51 return self._list_proxied_solution("cryptpad", next_action, sync, "nginx", owner=username)52 def list_minio_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):53 if sync:54 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)55 if not sync and not j.sals.reservation_chatflow.deployer.workloads[next_action][WorkloadType.Container]:56 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)57 result = []58 container_workloads = self._list_container_workloads(59 "minio",60 next_action,61 metadata_filters=[lambda metadata: False if metadata.get("owner") != username else True],62 )63 for name in container_workloads:64 primary_dict = container_workloads[name][0]65 solution_dict = {66 "wids": [primary_dict["wid"]] + primary_dict["vol_ids"],67 "Name": name[len(username) + 1 :],68 "Network": primary_dict["network"],69 "Primary IPv4": primary_dict["ipv4"],70 "Primary IPv6": primary_dict["ipv6"],71 "Primary Pool": primary_dict["pool"],72 }73 for key, val in primary_dict["capacity"].items():74 solution_dict[f"Primary {key}"] = val75 if len(container_workloads[name]) == 2:76 secondary_dict = container_workloads[name][1]77 solution_dict["wids"].append(secondary_dict["wid"])78 solution_dict["wids"] += secondary_dict["vol_ids"]79 solution_dict.update(80 {81 "Secondary IPv4": secondary_dict["ipv4"],82 "Secondary IPv6": secondary_dict["ipv6"],83 "Secondary Pool": secondary_dict["pool"],84 }85 )86 for key, val in secondary_dict["capacity"].items():87 solution_dict[f"Secondary {key}"] = val88 result.append(solution_dict)89 return result90 def list_monitoring_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):91 if sync:92 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)93 if not sync and not j.sals.reservation_chatflow.deployer.workloads[next_action][WorkloadType.Container]:94 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)95 result = []96 container_workloads = self._list_container_workloads(97 "monitoring",98 next_action,99 metadata_filters=[lambda metadata: False if metadata.get("owner") != username else True],100 )101 for name in container_workloads:102 if len(container_workloads[name]) != 3:103 continue104 solution_dict = {"wids": [], "Name": name[len(username) + 1 :]}105 for c_dict in container_workloads[name]:106 solution_dict["wids"].append(c_dict["wid"])107 solution_dict["wids"] += c_dict["vol_ids"]108 if "grafana" in c_dict["flist"]:109 cont_type = "Grafana"110 elif "prometheus" in c_dict["flist"]:111 cont_type = "Prometheus"112 elif "redis_zinit" in c_dict["flist"]:113 cont_type = "Redis"114 else:115 continue116 solution_dict[f"{cont_type} IPv4"] = c_dict["ipv4"]117 solution_dict[f"{cont_type} IPv6"] = c_dict["ipv6"]118 solution_dict[f"{cont_type} Node"] = c_dict["node"]119 solution_dict[f"{cont_type} Pool"] = c_dict["pool"]120 for key, val in c_dict["capacity"].items():121 solution_dict[f"{cont_type} {key}"] = val122 result.append(solution_dict)123 return result124 def list_kubernetes_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):125 if sync:126 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)127 if not sync and not j.sals.reservation_chatflow.deployer.workloads[next_action][WorkloadType.Kubernetes]:128 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)129 result = {}130 for kube_workloads in j.sals.reservation_chatflow.deployer.workloads[next_action][131 WorkloadType.Kubernetes132 ].values():133 for workload in kube_workloads:134 if not workload.info.metadata:135 continue136 try:137 metadata = j.data.serializers.json.loads(workload.info.metadata)138 except:139 continue140 if not metadata.get("form_info"):141 continue142 if metadata.get("owner") != username:143 continue144 name = metadata["form_info"].get("Solution name", metadata.get("name"))145 if name:146 if name in result:147 if len(workload.master_ips) != 0:148 result[name]["wids"].append(workload.id)149 result[name]["Slave IPs"].append(workload.ipaddress)150 result[name]["Slave Pools"].append(workload.info.pool_id)151 continue152 result[name] = {153 "wids": [workload.id],154 "Name": name[len(username) + 1 :],155 "Network": workload.network_id,156 "Master IP": workload.ipaddress if len(workload.master_ips) == 0 else workload.master_ips[0],157 "Slave IPs": [],158 "Slave Pools": [],159 "Master Pool": workload.info.pool_id,160 }161 result[name].update(self.get_workload_capacity(workload))162 if len(workload.master_ips) != 0:163 result[name]["Slave IPs"].append(workload.ipaddress)164 return list(result.values())165 def list_4to6gw_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):166 if sync:167 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)168 if not sync and not j.sals.reservation_chatflow.deployer.workloads[next_action][WorkloadType.Gateway4to6]:169 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)170 result = []171 for gateways in j.sals.reservation_chatflow.deployer.workloads[next_action][WorkloadType.Gateway4to6].values():172 for g in gateways:173 if not g.info.metadata:174 continue175 try:176 metadata = j.data.serializers.json.loads(g.info.metadata)177 except:178 continue179 if metadata.get("owner") != username:180 continue181 result.append(182 {183 "wids": [g.id],184 "Name": g.public_key,185 "Public Key": g.public_key,186 "Gateway": g.info.node_id,187 "Pool": g.info.pool_id,188 }189 )190 return result191 def list_delegated_domain_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):192 if sync:193 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)194 if not sync and not j.sals.reservation_chatflow.deployer.workloads[next_action][WorkloadType.Domain_delegate]:195 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)196 result = []197 for domains in j.sals.reservation_chatflow.deployer.workloads[next_action][198 WorkloadType.Domain_delegate199 ].values():200 for dom in domains:201 try:202 metadata = j.data.serializers.json.loads(dom.info.metadata)203 except:204 continue205 if metadata.get("owner") != username:206 continue207 result.append(208 {"wids": [dom.id], "Name": dom.domain, "Gateway": dom.info.node_id, "Pool": dom.info.pool_id,}209 )210 return result211 def list_exposed_solutions(self, username, next_action=NextAction.DEPLOY, sync=True):212 if sync:213 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)214 if not sync and not j.sals.reservation_chatflow.deployer.workloads[next_action][WorkloadType.Reverse_proxy]:215 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)216 result = {}217 pools = set()218 name_to_proxy = {}219 for proxies in j.sals.reservation_chatflow.deployer.workloads[next_action][WorkloadType.Reverse_proxy].values():220 for proxy in proxies:221 if proxy.info.metadata:222 metadata = j.data.serializers.json.loads(proxy.info.metadata)223 if not metadata:224 continue225 if metadata.get("owner") != username:226 continue227 chatflow = metadata.get("form_info", {}).get("chatflow")228 if chatflow and chatflow != "exposed":229 continue230 result[f"{proxy.info.pool_id}-{proxy.domain}"] = {231 "wids": [proxy.id],232 "Name": proxy.domain,233 "Gateway": proxy.info.node_id,234 "Pool": proxy.info.pool_id,235 "Domain": proxy.domain,236 }237 name = metadata.get("Solution name", metadata.get("form_info", {}).get("Solution name"),)238 name_to_proxy[name] = f"{proxy.info.pool_id}-{proxy.domain}"239 pools.add(proxy.info.pool_id)240 # link subdomains to proxy_reservations241 for subdomains in j.sals.reservation_chatflow.deployer.workloads[next_action][WorkloadType.Subdomain].values():242 for workload in subdomains:243 metadata = j.data.serializers.json.loads(workload.info.metadata)244 if not metadata:245 continue246 if metadata.get("owner") != username:247 continue248 chatflow = metadata.get("form_info", {}).get("chatflow")249 if chatflow and chatflow != "exposed":250 continue251 solution_name = metadata.get(252 "Solution name", metadata.get("name", metadata.get("form_info", {}).get("Solution name")),253 )254 if not solution_name:255 continue256 domain = workload.domain257 if name_to_proxy.get(solution_name):258 result[name_to_proxy[solution_name]]["wids"].append(workload.id)259 # link tcp router containers to proxy reservations260 for pool_id in pools:261 for container_workload in j.sals.reservation_chatflow.deployer.workloads[next_action][262 WorkloadType.Container263 ][pool_id]:264 if (265 container_workload.flist != "https://hub.grid.tf/tf-official-apps/tcprouter:latest.flist"266 or not container_workload.info.metadata267 ):268 continue269 metadata = j.data.serializers.json.loads(container_workload.info.metadata)270 if not metadata:271 continue272 if metadata.get("owner") != username:273 continue274 chatflow = metadata.get("form_info", {}).get("chatflow")275 if chatflow and chatflow != "exposed":276 continue277 solution_name = metadata.get(278 "Solution name", metadata.get("name", metadata.get("form_info", {}).get("Solution name")),279 )280 if not solution_name:281 continue282 if name_to_proxy.get(solution_name):283 domain = name_to_proxy.get(solution_name)284 result[domain]["wids"].append(container_workload.id)285 return list(result.values())286 def count_solutions(self, username, next_action=NextAction.DEPLOY):287 count_dict = {288 "network": 0,289 "ubuntu": 0,290 "kubernetes": 0,291 "minio": 0,292 "monitoring": 0,293 "flist": 0,294 "gitea": 0,295 "4to6gw": 0,296 "delegated_domain": 0,297 "exposed": 0,298 "publisher": 0,299 "peertube": 0,300 "discourse": 0,301 "mattermost": 0,302 "threebot": 0,303 "cryptpad": 0,304 "pools": 0,305 "wiki": 0,306 "blog": 0,307 "website": 0,308 "taiga": 0,309 "meetings": 0,310 }311 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=next_action)312 for key in count_dict.keys():313 method = getattr(self, f"list_{key}_solutions")314 count_dict[key] = len(method(username, next_action=next_action, sync=False))315 return count_dict316 def list_pools_solutions(self, username, next_action=NextAction.DEPLOY, sync=False):317 pools = j.sals.marketplace.deployer.list_user_pools(username)318 result = [pool.to_dict() for pool in pools]319 return result320 def list_solutions(self, username, solution_type):321 j.sals.reservation_chatflow.deployer.load_user_workloads(next_action=NextAction.DEPLOY)322 method = getattr(self, f"list_{solution_type}_solutions")323 return method(username, next_action=NextAction.DEPLOY, sync=False)324 def cancel_solution(self, username, solution_wids, delete_pool=False):325 valid = True326 pool_id = None327 for wid in solution_wids:328 workload = j.sals.zos.get().workloads.get(wid)329 if workload.info.workload_type == WorkloadType.Container:330 pool_id = workload.info.pool_id331 metadata_json = j.sals.reservation_chatflow.deployer.decrypt_metadata(workload.info.metadata)332 metadata = j.data.serializers.json.loads(metadata_json)333 if metadata.get("owner") != username:334 valid = False335 break336 if valid:337 if pool_id and delete_pool:338 # deassociate the pool from user339 pool_factory = StoredFactory(UserPool)340 instance_name = f"pool_{username.replace('.3bot', '')}_{pool_id}"341 pool_factory.delete(instance_name)342 super().cancel_solution(solution_wids)...
mconner_vacuum.py
Source:mconner_vacuum.py
1#!/usr/bin/env python32'''3CPSC 415 -- Homework #1 VacuumAgent file4Morgan Conner5'''67from vacuum import VacuumAgent8import random910class MconnerVacuumAgent(VacuumAgent):1112 def __init__(self):13 super().__init__()14 self.moves = 0 #total moves15 self.x = 0 #current x16 self.y = 0 #current y17 self.nx = 0 #next x18 self.ny = 0 #next y19 self.tempx = 0 #temp x20 self.tempy = 0 #temp y21 self.current_action = 'NoOp'22 self.prev_action = 'NoOp'23 self.action_list = list()24 self.visited = set()25 self.visited.add((0,0))26 2728 def program(self, percept):29 if self.moves >= 675:30 return 'NoOp'31 if percept[0] == 'Dirty' and percept[1] == 'None':32 self.moves += 133 return 'Suck'34 if percept[0] == 'Dirty' and percept[1] == 'Bump':35 self.moves += 136 return 'Suck'37 if percept[0] == 'Clean' and percept[1] == 'None':3839 next_action = random.choice(VacuumAgent.possible_actions[:-2:])40 41 #Calculating potential next actions coordinates42 if next_action == 'Right':43 self.tempx = self.x + 144 elif next_action == 'Left':45 self.tempx = self.x - 146 elif next_action == 'Up':47 self.tempy = self.y + 148 elif next_action == 'Down':49 self.tempy = self.y - 15051 self.nxt_coord = (self.tempx, self.tempy)52 53 #If next spot is already visited get new action54 if self.nxt_coord in self.visited:55 UpdatedAction_list = VacuumAgent.possible_actions[:-2:]5657 if next_action == 'Right':58 UpdatedAction_list.pop(0)59 next_action = random.choice(UpdatedAction_list)60 if next_action == 'Left':61 self.nx = self.x - 162 elif next_action == 'Up':63 self.ny = self.y + 164 elif next_action == 'Down':65 self.ny = self.y - 166 67 elif next_action == 'Left':68 UpdatedAction_list.pop(1)69 next_action = random.choice(UpdatedAction_list)70 if next_action == 'Right':71 self.nx = self.x + 172 elif next_action == 'Up':73 self.ny = self.y + 174 elif next_action == 'Down':75 self.ny = self.y - 17677 elif next_action == 'Up':78 UpdatedAction_list.pop(2)79 next_action = random.choice(UpdatedAction_list)80 if next_action == 'Right':81 self.nx = self.x + 182 elif next_action == 'Left':83 self.nx = self.x - 184 elif next_action == 'Down':85 self.ny = self.y - 18687 elif next_action == 'Down':88 UpdatedAction_list.pop(3)89 next_action = random.choice(UpdatedAction_list)90 if next_action == 'Right':91 self.nx = self.x + 192 elif next_action == 'Left':93 self.nx = self.x - 194 elif next_action == 'Up':95 self.ny = self.y + 196 else:97 #set the potential next action coords to next action coords98 self.nx = self.tempx99 self.ny = self.tempy100101 #sets next action coords to current coordinates102 self.x = self.nx103 self.y = self.ny104 self.current = (self.x, self.y)105106 #Updates initializations107 self.visited.add(self.current)108 self.action_list.append(next_action)109110 self.current_action = self.action_list[-1]111 if len(self.action_list) >=2:112 self.prev_action = self.action_list[-2]113114 self.moves += 1115 116 return next_action117118 if percept[0] == 'Clean' and percept[1] == 'Bump':119120 next_action = random.choice(VacuumAgent.possible_actions[:-2:])121 122 #Ensures next action isn't the same action that caused bump123 while next_action == self.current_action:124 next_action = random.choice(VacuumAgent.possible_actions[:-2:]) 125126 if next_action == 'Right':127 self.tempx = self.x + 1128 elif next_action == 'Left':129 self.tempx = self.x - 1130 elif next_action == 'Up':131 self.tempy = self.y + 1132 elif next_action == 'Down':133 self.tempy = self.y - 1134135 self.nxt_coord = (self.tempx, self.tempy)136137 if self.nxt_coord in self.visited:138 UpdatedAction_list = VacuumAgent.possible_actions[:-2:]139 if next_action == 'Right':140 UpdatedAction_list.pop(0)141 next_action = random.choice(UpdatedAction_list)142 if next_action == 'Left':143 self.nx = self.x - 1144 elif next_action == 'Up':145 self.ny = self.y + 1146 elif next_action == 'Down':147 self.ny = self.y - 1148 149 elif next_action == 'Left':150 UpdatedAction_list.pop(1)151 next_action = random.choice(UpdatedAction_list)152 if next_action == 'Right':153 self.nx = self.x + 1154 elif next_action == 'Up':155 self.ny = self.y + 1156 elif next_action == 'Down':157 self.ny = self.y - 1158159 elif next_action == 'Up':160 UpdatedAction_list.pop(2)161 next_action = random.choice(UpdatedAction_list)162 if next_action == 'Right':163 self.nx = self.x + 1164 elif next_action == 'Left':165 self.nx = self.x - 1166 elif next_action == 'Down':167 self.ny = self.y - 1168169 elif next_action == 'Down':170 UpdatedAction_list.pop(3)171 next_action = random.choice(UpdatedAction_list)172 if next_action == 'Right':173 self.nx = self.x + 1174 elif next_action == 'Left':175 self.nx = self.x - 1176 elif next_action == 'Up':177 self.ny = self.y + 1178 else:179 self.nx = self.tempx180 self.ny = self.tempy181182 self.x = self.nx183 self.y = self.ny184 self.current = (self.x, self.y)185 186 self.visited.add(self.current)187 self.action_list.append(next_action)188189 self.current_action = self.action_list[-1]190 if len(self.action_list) >=2:191 self.prev_action = self.action_list[-2]192 193 self.moves += 1194 195 return next_action196
...
action.py
Source:action.py
1import os2import pygetwindow3import traceback4import asyncio5import copy6from prodict import Prodict7from datetime import datetime8from helpers.printfier import Printer9from runner import Runner10p = Printer(bot_name='TOB')11async def run_action(runner: Runner):12 p.info('Taking bot windows')13 next_action = runner.next_action14 bot_windows = get_bot_windows(next_action.config.window_name)15 bot_name_not_in_the_previous_window_list = not (next_action.bot_name in runner.last_window_amount_by_bot)16 if (bot_name_not_in_the_previous_window_list or runner.last_window_amount_by_bot[next_action.bot_name] != len(bot_windows)):17 runner.all_actions[next_action.bot_name] = await run_all_actions(bot_windows, next_action)18 else:19 result = await run_single_action(next_action)20 index = get_action_index_by_window(runner.all_actions, next_action.bot_name, result.window)21 runner.all_actions[next_action.bot_name][index] = result22 runner.last_window_amount_by_bot[next_action.bot_name] = len(bot_windows)23def get_bot_windows(window_name: str):24 amount_windows = 025 while(amount_windows <= 0):26 windows = pygetwindow.getWindowsWithTitle(window_name)27 amount_windows = len(windows)28 if amount_windows <= 0:29 print(f"Rename all the windows to: {window_name}")30 os.system("pause")31 return sorted(windows, key=lambda w: w.title, reverse=False)32def get_action_index_by_window(all_actions: Prodict, bot_name: str, window):33 for i in range(0, len(all_actions[bot_name])):34 if all_actions[bot_name][i]["window"] == window:35 return i36 raise Exception("Couldn't find the the index for the window")37async def run_single_action(next_action: Prodict):38 p.info('Running action')39 return await wrap_bot_function(next_action)40async def run_all_actions(bot_windows, next_action: Prodict):41 result_actions = [] 42 p.info('Running action') 43 for bot_window in bot_windows:44 aux_next_action = Prodict(copy.deepcopy(next_action))45 aux_next_action.window = bot_window46 result_action = Prodict(await wrap_bot_function(aux_next_action))47 result_actions.append(result_action)48 return result_actions49async def wrap_bot_function(next_action: Prodict):50 retry_count = 051 retry_limit = 152 retry_schedule_name = f'retry_failed_{next_action.bot_name}'53 bot_function = next_action.function54 55 while(retry_count <= retry_limit):56 try:57 if retry_schedule_name in next_action.schedules:58 next_action.schedules = Prodict({})59 return await bot_function(next_action)60 except:61 traceback.print_exc()62 await asyncio.sleep(2)63 retry_count += 164 now = datetime.timestamp(datetime.now())65 next_action.schedules = Prodict({})66 next_action.schedules[retry_schedule_name] = now + 30 * 60...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!