How to use get_external_port method in localstack

Best Python code snippet using localstack_python

peers.py

Source:peers.py Github

copy

Full Screen

1import asyncio2import structlog3from .messages import (4 create_peers_message,5 create_block_message,6 create_transaction_message,7 create_ping_message8)9from .transactions import validate_transaction10logger = structlog.getLogger()11class P2PError(Exception):12 pass13class P2PProtocol:14 def __init__(self, server):15 self.server = server16 self.blockchain = server.blockchain17 self.connection_pool = server.connection_pool18 @staticmethod19 async def send_message(writer, message):20 writer.write(message.encode() + b"\n")21 async def handle_message(self, message, writer):22 message_handlers = {23 "block": self.handle_block,24 "ping": self.handle_ping,25 "peers": self.handle_peers,26 "transaciton": self.handle_transaction,27 }28 handler = message_handlers.get(message["name"])29 if not handler:30 return P2PError("Missing handler for message")31 await handler(message, writer)32 async def handle_ping(self, message, writer):33 block_height = message["payload"]["block_height"]34 writer.is_miner = message["payload"]["is_miner"]35 peers = self.connection_pool.get_alive_peers(20)36 peers_message = create_peers_message(37 self.server.get_external_ip,38 self.server.get_external_port,39 peers)40 await self.send_message(writer, peers_message)41 if block_height < self.blockchain.last_block["height"]:42 for block in self.blockchain.chain[block_height + 1:]:43 await self.send_message(44 writer,45 create_block_message(46 self.server.get_external_ip,47 self.server.get_external_port,48 block49 ),50 )51 async def handle_transaction(self, message, writer):52 logger.info("Received transaction")53 tx = message["payload"]54 if validate_transaction(tx) is True:55 self.blockchain.pending_transactions.append(tx)56 for peer in self.connection_pool.get_alive_peers(20):57 await self.send_message(58 peer,59 create_transaction_message(self.server.get_external_ip, self.server.get_external_port, tx),60 )61 else:62 logger.warning("Received invalid transaction")63 async def handle_block(self, message, writer):64 logger.info("Received new block")65 block = message["payload"]66 self.blockchain.add_block(block)67 for peer in self.connection_pool.get_alive_peers(20):68 await self.send_message(69 peer,70 create_block_message(self.server.get_external_ip, self.server.get_external_port, block),71 )72 async def handle_peers(self, message, writer):73 logger.info("Received new peers")74 peers = message["payload"]75 ping_message = create_ping_message(76 self.server.external_ip,77 self.server.external_port,78 len(self.blockchain.chain),79 len(self.connection_pool.get_alive_peers(50)),80 False,81 )82 for peer in peers:83 reader, writer = await asyncio.open_connection(peer["ip"], peer["port"])84 self.connection_pool.add_peer(writer)...

Full Screen

Full Screen

custom_packet1.py

Source:custom_packet1.py Github

copy

Full Screen

...18 self._ip_src = packet[0][1].src19 self._ip_dst = packet[0][1].dst20 self._port_src = packet[0][2].sport21 self._port_dst = packet[0][2].dport22 self._app_proto = self.get_app_proto(self.get_external_port())23 self._trans_proto = self.get_trans_proto(packet)24 self._size = self.get_size(packet)25 self._process = Process(self._ip_src,self._port_src,self._port_dst)26 27 28 def is_incoming(self):29 """ Define if the packet is incoming or outgoing from the interface."""30 if self._ip_src != get_if_addr(conf.iface):31 return True32 return False3334 def get_trans_proto(self,packet):35 """ Define what transportation protocol the packet is using."""36 if packet.haslayer(TCP):37 return "TCP"38 if packet.haslayer(UDP):39 return "UDP"40 if packet.haslayer(ICMP):41 return "ICMP"4243 def get_app_proto(self,port):44 """ Define what application protocol the packet is using.45 It uses the getservbyport from socket module to do this task.46 """47 try:48 return socket.getservbyport(port)49 except:50 return "others"5152 def get_external_port(self):53 """ Define the external port.54 This info is passed to the get_app_proto method in the __init__ method.55 """56 if self._ip_src != get_if_addr(conf.iface):57 return self._port_src 58 else: 59 return self._port_dst6061 def get_external_ip(self):62 """ Define the external IP.63 This info is used in the output_format method.64 """65 if self._ip_src != get_if_addr(conf.iface):66 return self._ip_src ...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful