Best Python code snippet using localstack_python
mm_cluster.py
Source: mm_cluster.py
...30 # connection settings31 self.dbname = dbname32 self.username = username33 # generate pairs of ports for multimaster34 self.ports = [(reserve_port(), reserve_port()) for _ in range(nodes)]35 # generate connection string36 conn_strings = self._build_mm_conn_strings(self.ports,37 dbname,38 username)39 for i in range(nodes):40 pg_port, mm_port = self.ports[i]41 node_id = i + 142 node = ClusterNode(name=''.join(['node_', str(node_id)]),43 pg_port=pg_port,44 mm_port=mm_port)45 node.init().mm_init(node_id,46 max_nodes,47 conn_strings,48 max_connections)49 self.nodes.append(node)50 @staticmethod51 def _get_mm_conn_string():52 return (53 "host=127.0.0.1 "54 "dbname={} "55 "user={} "56 "port={} "57 "arbiter_port={}"58 )59 @staticmethod60 def _build_mm_conn_strings(ports, dbname, username):61 return ','.join([62 Cluster63 ._get_mm_conn_string()64 .format(dbname,65 username,66 pg_port,67 mm_port) for pg_port, mm_port in ports68 ])69 def __enter__(self):70 return self71 def __exit__(self, type, value, traceback):72 self.cleanup()73 def free_ports(self):74 for p1, p2 in self.ports:75 release_port(p1)76 release_port(p2)77 def free_nodes(self):78 for node in self.nodes:79 node.cleanup()80 def cleanup(self):81 self.free_nodes()82 self.free_ports()83 return self84 def start(self):85 for node in self.nodes:86 node.start()87 return self88 def stop(self):89 for node in self.nodes:90 node.stop()91 return self92 def restart(self):93 for node in self.nodes:94 node.restart()95 return self96 def reload(self):97 for node in self.nodes:98 node.reload()99 return self100 def print_conninfo(self):101 print(self._build_mm_conn_strings(self.ports,102 self.dbname,103 self.username))104 def install(self):105 for node in self.nodes:106 node.safe_psql("create extension multimaster;",107 dbname=self.dbname,108 username=self.username)109 return self110 def add_node(self):111 if len(self.nodes) == self.max_nodes:112 raise Exception("max amount of nodes reached ({})"113 .format(self.max_nodes))114 pg_port, mm_port = reserve_port(), reserve_port()115 node_id = len(self.nodes) + 1116 # request multimaster config changes117 conn_string = self._get_mm_conn_string().format(self.dbname,118 self.username,119 pg_port,120 mm_port)121 add_node_cmd = "select mtm.add_node('{}')".format(conn_string)122 self.execute_any(dbname=self.dbname,123 username=self.username,124 query=add_node_cmd,125 commit=True)126 # copy node with new ports127 backup = self.node_any().backup()128 node = backup.spawn_primary(name=''.join(['node_', str(node_id)]),...
port.py
Source: port.py
...18 if res:19 return str(res[0][0])20 else:21 return None22 def reserve_port(self, cr, project_name, db_name):23 cr.execute(24 "SELECT port FROM port_mapping WHERE project=%s", (project_name,)25 )26 port_used = [x[0] for x in cr.fetchall()]27 project_config = config["projects"][project_name]28 start_port = project_config["port_mapping_start"]29 stop_port = start_port + project_config["port_mapping_max"]30 for port in range(start_port, stop_port):31 if port not in port_used:32 cr.execute(33 """INSERT INTO port_mapping(34 project, date, db_name, port35 )36 VALUES(%s, now(), %s, %s);37 """,38 (project_name, db_name, port),39 )40 return str(port)41 return abort(42 404, "Not more available port please stop existing review apps"43 )44 def lock(self, project_name, db_name):45 with cursor() as cr:46 port = self.get_existing_port(cr, project_name, db_name)47 if port:48 return port49 else:50 return self.reserve_port(cr, project_name, db_name)51 def release(self, project_name, db_name):52 with cursor() as cr:53 cr.execute(54 """DELETE FROM port_mapping55 WHERE project=%s AND db_name=%s""",56 (project_name, db_name),57 )58 return "OK"59 def redirect(self, project_name, db_name):60 with cursor() as cr:61 port = self.get_existing_port(cr, project_name, db_name)62 if port:63 domain = config["projects"][project_name]["domain"]64 return redirect("https://{}:{}".format(domain, port))...
final.py
Source: final.py
...21 return custom_endpoint.endpoint22 engine_domain = 'opensearch' if engine_type == EngineType.OpenSearch else 'es'23 if config.OPENSEARCH_ENDPOINT_STRATEGY == 'port':24 try:25 assigned_port = external_service_ports.reserve_port(preferred_port)26 except PortNotAvailableException:27 LOG.warning(f"Preferred port {preferred_port} is not available, trying to reserve another port.")28 assigned_port = external_service_ports.reserve_port()29 assigned_port = external_service_ports.reserve_port()30 else:31 return f"{config.LOCALSTACK_HOSTNAME}:{assigned_port}"32 if config.OPENSEARCH_ENDPOINT_STRATEGY == 'path':33 return f"{config.LOCALSTACK_HOSTNAME}:{config.EDGE_PORT}/{engine_domain}/{domain_key.region}/{domain_key.domain_name}"34 return f"{domain_key.domain_name}.{domain_key.region}.{engine_domain}.{LOCALHOST_HOSTNAME}:{config.EDGE_PORT}"...
Check out the latest blogs from LambdaTest on this topic:
The fact is not alien to us anymore that cross browser testing is imperative to enhance your application’s user experience. Enhanced knowledge of popular and highly acclaimed testing frameworks goes a long way in developing a new app. It holds more significance if you are a full-stack developer or expert programmer.
QA testers have a unique role and responsibility to serve the customer. Serving the customer in software testing means protecting customers from application defects, failures, and perceived failures from missing or misunderstood requirements. Testing for known requirements based on documentation or discussion is the core of the testing profession. One unique way QA testers can both differentiate themselves and be innovative occurs when senseshaping is used to improve the application user experience.
Having a good web design can empower business and make your brand stand out. According to a survey by Top Design Firms, 50% of users believe that website design is crucial to an organization’s overall brand. Therefore, businesses should prioritize website design to meet customer expectations and build their brand identity. Your website is the face of your business, so it’s important that it’s updated regularly as per the current web design trends.
Enterprise resource planning (ERP) is a form of business process management software—typically a suite of integrated applications—that assists a company in managing its operations, interpreting data, and automating various back-office processes. The introduction of a new ERP system is analogous to the introduction of a new product into the market. If the product is not handled appropriately, it will fail, resulting in significant losses for the business. Most significantly, the employees’ time, effort, and morale would suffer as a result of the procedure.
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!