Best Python code snippet using localstack_python
_ssh_server.py
Source:_ssh_server.py
1from ipaddress import ip_address2import logging3from re import Pattern4import random5import datetime6from typing import Optional, Set, Tuple7from paramiko.common import (AUTH_FAILED, AUTH_SUCCESSFUL, OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED,8 OPEN_SUCCEEDED)9import paramiko10from paramiko.channel import Channel11from frontend.honeylogger import SSHSession12from frontend.config import config13from ._proxy_handler import ProxyHandler14logger = logging.getLogger(__name__)15class Server(paramiko.ServerInterface):16 """Server implements the ServerInterface from paramiko17 :param paramiko: The SSH server interface18 """19 # The set of channels that have successfully issued a shell or exec request20 _channels_done: Set[int]21 def __init__(22 self,23 transport: paramiko.Transport,24 session: SSHSession,25 proxy_handler: ProxyHandler,26 usernames: Optional[Pattern],27 passwords: Optional[Pattern]) -> None:28 super().__init__()29 self._transport = transport30 self._usernames = usernames31 self._passwords = passwords32 self._session = session33 self._proxy_handler = proxy_handler34 self._channels_done = set()35 self._last_activity = datetime.datetime.now()36 self._logging_session_started = False37 def get_last_activity(self) -> datetime.datetime:38 """Get the datetime of the last activity39 :return: datetime of the last activity40 """41 return self._last_activity42 def _update_last_activity(self) -> None:43 """Updates the last activity seen"""44 self._last_activity = datetime.datetime.now()45 # Make sure to start the logging session if it isn't started46 if not self._logging_session_started:47 try:48 self._session.set_remote_version(self._transport.remote_version)49 self._session.begin()50 self._logging_session_started = True51 except Exception as exc:52 logger.exception("Failed to start the SSH logging session", exc_info=exc)53 raise54 def check_auth_password(self, username: str, password: str) -> int:55 self._update_last_activity()56 self._session.log_login_attempt(username, password)57 # Verify the login attempt against our username and password regex58 if self._usernames is not None and not self._usernames.match(username):59 return AUTH_FAILED60 if self._passwords is not None and not self._passwords.match(password):61 return AUTH_FAILED62 # Check if we have the LOGIN_SUCCESS_RATE set and apply it if we do63 if config.SSH_LOGIN_SUCCESS_RATE != -1:64 if random.randint(1, 100) > config.SSH_LOGIN_SUCCESS_RATE:65 return AUTH_FAILED66 self._proxy_handler.set_attacker_credentials(username, password)67 return AUTH_SUCCESSFUL68 def check_auth_publickey(self, username: str, key: paramiko.PKey) -> int:69 self._update_last_activity()70 return AUTH_FAILED71 def get_allowed_auths(self, username: str) -> str:72 self._update_last_activity()73 return 'password'74 def check_channel_request(self, kind: str, chanid: int) -> int:75 self._update_last_activity()76 logger.info("%s Channel request (id: %s, kind: %s)",77 self._session, chanid, kind)78 if kind == "session":79 return (self._proxy_handler.create_backend_connection()80 and self._proxy_handler.open_channel(kind, chanid))81 return OPEN_SUCCEEDED82 def check_channel_shell_request(self, channel: paramiko.Channel) -> bool:83 self._update_last_activity()84 logger.info("%s Shell request for channel %s",85 self._session, channel.chanid)86 if channel.chanid in self._channels_done or not self._proxy_handler.handle_shell_request(87 channel):88 return False89 self._channels_done.add(channel.chanid)90 return True91 def check_channel_exec_request(self, channel: paramiko.Channel, command: bytes) -> bool:92 self._update_last_activity()93 if channel.chanid in self._channels_done or not self._proxy_handler.handle_exec_request(94 channel, command):95 return False96 self._channels_done.add(channel.chanid)97 return True98 def check_channel_pty_request(self, channel: paramiko.Channel, term: bytes,99 width: int, height: int, pixelwidth: int,100 pixelheight: int, _: bytes) -> bool:101 self._update_last_activity()102 logger.info("%s Pty request on channel %s",103 self._session, channel.chanid)104 try:105 term_string = term.decode("utf-8")106 self._session.log_pty_request(term_string, width, height, pixelwidth, pixelheight)107 except UnicodeError:108 logger.exception("%s Pty request failed to decode the term %s to utf8",109 self._session, term)110 return False111 return self._proxy_handler.handle_pty_request(112 channel, term_string, width, height, pixelwidth, pixelheight)113 def check_channel_window_change_request(self, channel: Channel, width: int, height: int,114 pixelwidth: int, pixelheight: int) -> bool:115 self._update_last_activity()116 return self._proxy_handler.handle_window_change_request(117 channel, width, height, pixelwidth, pixelheight)118 def check_channel_env_request(self, channel: Channel, name: bytes, value: bytes) -> bool:119 self._update_last_activity()120 try:121 name_string = name.decode("utf-8")122 value_string = value.decode("utf-8")123 except UnicodeDecodeError:124 logger.error(125 "%s Env request failed to decode the values (name: %s, value: %s)",126 self._session, name, value)127 return False128 self._session.log_env_request(channel.chanid, name_string, value_string)129 return False130 def check_channel_direct_tcpip_request(131 self, chanid: int, origin: Tuple[str, int],132 destination: Tuple[str, int]) -> int:133 self._update_last_activity()134 try:135 ip = ip_address(origin[0])136 except ValueError:137 logger.error("%s Direct TCPIP request failed to decode the origin IP %s",138 self._session, origin[0])139 return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED140 self._session.log_direct_tcpip_request(141 chanid, ip, origin[1],142 destination[0],143 destination[1])144 return OPEN_FAILED_ADMINISTRATIVELY_PROHIBITED145 def check_channel_x11_request(146 self, channel: Channel, single_connection: bool, auth_protocol: str, auth_cookie: bytes,147 screen_number: int) -> bool:148 self._update_last_activity()149 self._session.log_x11_request(channel.chanid, single_connection,150 auth_protocol, memoryview(auth_cookie), screen_number)151 return False152 def check_channel_forward_agent_request(self, channel: Channel) -> bool:153 self._update_last_activity()154 logger.info("%s Forward agent request on channel %s",155 self._session, channel.chanid)156 return False157 def check_port_forward_request(self, address: str, port: int) -> int:158 self._update_last_activity()159 self._session.log_port_forward_request(address, port)...
test_ssh_server.py
Source:test_ssh_server.py
1import datetime2import re3import logging4import pytest5from ipaddress import ip_address6from unittest.mock import MagicMock, create_autospec7import paramiko8from paramiko.common import (AUTH_FAILED, AUTH_SUCCESSFUL, )9from frontend.protocols.ssh._proxy_handler import ProxyHandler10from frontend.honeylogger import SSHSession11from frontend.honeylogger._console import ConsoleLogSSHSession12from frontend.protocols.ssh._ssh_server import Server13import frontend.protocols.ssh._ssh_server14from frontend.target_systems import TargetSystemProvider15debug_log = logging.getLogger(frontend.protocols.ssh._ssh_server.__name__)16def time_in_range(start, end, x):17 if start <= end:18 return start <= x <= end19 else:20 return start <= x or x <= end21@pytest.fixture()22def logger() -> SSHSession:23 return ConsoleLogSSHSession(ip_address("1.1.1.1"), 1, ip_address("2.2.2.2"), 2)24@pytest.fixture()25def target_system_provider(logger) -> ProxyHandler:26 return create_autospec(TargetSystemProvider)27@pytest.fixture()28def proxy_handler(logger, target_system_provider) -> ProxyHandler:29 return ProxyHandler(logger, target_system_provider)30@pytest.fixture()31def ssh_server(logger, proxy_handler) -> Server:32 transport_mock = MagicMock()33 transport_mock.remote_version = "ExampleVersion"34 return Server(transport_mock, logger, proxy_handler, re.compile(""), re.compile(""))35def test_update_last_activity_without_started_session(ssh_server: Server, logger: SSHSession):36 logger.begin = MagicMock()37 now = datetime.datetime.now()38 ssh_server._update_last_activity()39 logger.begin.assert_called_once()40 assert ssh_server._logging_session_started41 assert time_in_range(now, datetime.datetime.now(), ssh_server._last_activity)42def test_update_last_activity_exception(ssh_server: Server, logger: SSHSession):43 logger.begin = MagicMock(side_effect=Exception)44 now = datetime.datetime.now()45 with pytest.raises(Exception):46 ssh_server._update_last_activity()47 logger.begin.assert_called_once()48 assert ssh_server._logging_session_started == False49 assert time_in_range(now, datetime.datetime.now(), ssh_server._last_activity)50def test_get_last_activity(ssh_server: Server):51 time = datetime.datetime.now()52 ssh_server._last_activity = time53 assert ssh_server.get_last_activity() == time54def test_check_username_password(ssh_server: Server):55 ssh_server._usernames = re.compile(r"linus")56 ssh_server._passwords = re.compile(r"torvalds")57 now = datetime.datetime.now()58 assert ssh_server.check_auth_password("linus", "torvalds") == AUTH_SUCCESSFUL59 assert ssh_server.check_auth_password("inus", "torvalds") == AUTH_FAILED60 assert ssh_server.check_auth_password("linus", "lol") == AUTH_FAILED61 assert ssh_server.check_auth_password("inus", "") == AUTH_FAILED62 assert ssh_server.check_auth_password("", "") == AUTH_FAILED63 assert ssh_server.check_auth_password("Linus", "") == AUTH_FAILED64 ssh_server._usernames = None65 assert ssh_server.check_auth_password("", "torvalds") == AUTH_SUCCESSFUL66 assert ssh_server.check_auth_password("TSRA%$#B$WQ", "torvalds") == AUTH_SUCCESSFUL67 ssh_server._usernames = None68 ssh_server._passwords = None69 assert ssh_server.check_auth_password("TSRA%$#B$WQ", "") == AUTH_SUCCESSFUL70 assert ssh_server.check_auth_password("", "") == AUTH_SUCCESSFUL71 assert time_in_range(now, datetime.datetime.now(), ssh_server._last_activity)72def test_check_auth_publickey(ssh_server: Server):73 now = datetime.datetime.now()74 key = paramiko.RSAKey(filename="./host.key")75 assert ssh_server.check_auth_publickey("", key) == AUTH_FAILED76 assert time_in_range(now, datetime.datetime.now(), ssh_server._last_activity)77def test_check_channel_request_and_exec_once_per_channel(ssh_server: Server):78 now = datetime.datetime.now()79 cmd = b"ls -la"80 ssh_server._channels_done = set([1, 2])81 ssh_server._proxy_handler.handle_exec_request = MagicMock(return_value=True)82 assert ssh_server.check_channel_exec_request(paramiko.Channel(1), cmd) == False83 assert ssh_server.check_channel_shell_request(paramiko.Channel(2)) == False84 channel = paramiko.Channel(3)85 assert ssh_server.check_channel_exec_request(channel, cmd) == True86 assert ssh_server.check_channel_shell_request(channel) == False87 ssh_server._proxy_handler.handle_exec_request.assert_called_once_with(channel, cmd)88 ssh_server._proxy_handler.handle_exec_request.assert_called_once()89 assert time_in_range(now, datetime.datetime.now(), ssh_server._last_activity)90def test_channel_pty_request(ssh_server: Server):91 now = datetime.datetime.now()92 ssh_server._session.log_pty_request = MagicMock()93 ssh_server._proxy_handler.handle_pty_request = MagicMock(return_value=True)94 channel = paramiko.Channel(3)95 term = b"screen"96 height = 43297 width = 3298 pixel_width = 43299 pixel_height = 432100 ssh_server.check_channel_pty_request(101 channel, term, width, height, pixel_width, pixel_height, b"")102 ssh_server._session.log_pty_request.assert_called_with(103 term.decode("utf-8"), width, height, pixel_width, pixel_height)104 ssh_server._proxy_handler.handle_pty_request.assert_called_once_with(105 channel, term.decode("utf-8"), width, height, pixel_width, pixel_height)...
tracker.py
Source:tracker.py
1import urllib.parse2import urllib.request3from typing import Dict, Sequence, Union4import btorrent.transport.tracker as transport5class Tracker:6 def __init__(self, tracker_addr: bytes, proxies: Dict[str, str] = None) -> None:7 self.tracker_addr = urllib.parse.urlparse(tracker_addr)8 self.proxies: Dict[str, str] = proxies or {}9 self._proxy_handler = urllib.request.ProxyHandler(self.proxies)10 self.interval = 011 self.last_announce_time = 012 def __hash__(self):13 return hash(self.tracker_addr)14 def __eq__(self, other: 'Tracker') -> bool:15 return self.tracker_addr == other.tracker_addr16 def set_proxies(self, proxies: Dict[str, str]) -> None:17 self.proxies = proxies18 self._proxy_handler.__init__(self.proxies)19 def _send_request(self, req: transport.Request) -> transport.Response:20 opener = transport.build_opener(self._proxy_handler)21 return opener.open(req.get_req())22 def announce(self,23 info_hash: bytes,24 peer_id: bytes,25 left: int,26 downloaded=0,27 uploaded=0,28 event=transport.AnnounceEvent.NONE,29 key=-1,30 numwant=-1,31 ip=0,32 port=6881,33 udp_port=8881) -> Union[transport.Response, transport.AnnounceResponse]:34 if self.tracker_addr.scheme == b'udp':35 port = udp_port36 return self._send_request(transport.AnnounceRequest(37 self.tracker_addr, info_hash, peer_id,38 downloaded, left, uploaded, event,39 key, numwant, ip, port40 ))41 def scrape(self, info_hashes: Sequence[bytes] = ()) -> Union[transport.Response, transport.ScrapeResponse]:...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!