Best Python code snippet using robotframework-ioslibrary_python
test_rotation.py
Source:test_rotation.py
1from .util import journalpump_initialized2from journalpump import senders3from journalpump.journalpump import JournalPump, JournalReader, PumpReader, statsd4from journalpump.senders.base import MsgBuffer5from pathlib import Path6from typing import Callable, Dict, List, Optional7import json8import logging9import os10import pytest11import shutil12import subprocess13import threading14import time15_LOG = logging.getLogger(__name__)16class LogFiles:17 """Emulates rotation of log files.18 journald rotates files using following algorithm:19 * Move user-1000.journal -> user-1000@....journal20 * Create new user-1000.journal21 * Delete old file22 """23 def __init__(self, destination: Path) -> None:24 orig_path = Path(__file__).parent / "data" / "rotated_logs"25 self._orig_log_files: List[Path] = list(reversed(sorted(orig_path.glob("*.journal"))))26 self._rotate_to: Optional[Path] = None27 self._destination: Path = destination28 self._current_log_files: List[Path] = []29 @property30 def log_files(self):31 return self._current_log_files.copy()32 def rotate(self):33 log_file = self._orig_log_files.pop()34 tail_file = self._destination / "user-1000.journal"35 if self._rotate_to:36 shutil.move(tail_file, self._rotate_to)37 _LOG.info("Moved %s log file to %s", tail_file, self._rotate_to)38 self._current_log_files.insert(1, self._rotate_to)39 else:40 assert not self._current_log_files41 self._current_log_files.insert(0, self._destination / "user-1000.journal")42 dst_path = self._destination / "user-1000.journal"43 _LOG.info("Added %s log file", dst_path)44 shutil.copyfile(log_file, dst_path)45 self._rotate_to = self._destination / log_file.name46 def remove(self, *, last: int):47 for _ in range(last):48 log_file = self._current_log_files.pop()49 log_file.unlink()50 _LOG.info("Removed %s log file", log_file)51 if not self._current_log_files:52 self._rotate_to = None53def test_log_rotator(tmp_path):54 log_path = tmp_path / "logs"55 log_path.mkdir()56 log_file_handler = LogFiles(log_path)57 assert len(list(log_path.glob("*"))) == 058 log_file_handler.rotate()59 log_files = [p.name for p in log_path.glob("*")]60 assert len(log_files) == 161 assert "user-1000.journal" in log_files62 log_file_handler.rotate()63 log_files = [p.name for p in log_path.glob("*")]64 assert len(log_files) == 265 assert "user-1000.journal" in log_files66 log_file_handler.remove(last=1)67 log_files = [p.name for p in log_path.glob("*")]68 assert len(log_files) == 169 assert "user-1000.journal" in log_files70class _MsgBuffer(MsgBuffer):71 """Wrapper around MsgBuffer which allows to wait for messages."""72 def __init__(self) -> None:73 super().__init__()74 self.has_messages = threading.Event()75 self.wait_threshold: Optional[int] = None76 def add_item(self, *, item, cursor):77 super().add_item(item=item, cursor=cursor)78 if self.wait_threshold and len(self.messages) >= self.wait_threshold:79 self.has_messages.set()80 def get_items(self):81 res = super().get_items()82 self.wait_threshold = None83 self.has_messages.clear()84 return res85 def set_threshold(self, th: int) -> None:86 self.wait_threshold = th87 if len(self.messages) >= self.wait_threshold:88 self.has_messages.set()89class StubSender:90 field_filter: Optional[dict] = None91 unit_log_levels: Optional[dict] = None92 extra_field_values: Optional[dict] = None93 def __init__(self, *args, **kwargs): # pylint: disable=unused-argument94 self.msg_buffer = _MsgBuffer()95 def start(self):96 pass97 def request_stop(self):98 pass99 def refresh_stats(self, *args, **kwargs): # pylint: disable=unused-argument100 pass101 def __call__(self, *args, **kwargs): # pylint: disable=unused-argument102 return self103 def get_messages(self, count: int, timeout: int):104 self.msg_buffer.set_threshold(count)105 assert self.msg_buffer.has_messages.wait(timeout), f"Timeout. Total messages received: {len(self.msg_buffer)}"106 messages = self.msg_buffer.get_items()107 return [json.loads(m[0]) for m in messages]108@pytest.fixture(name="journal_log_dir")109def fixture_journal_log_dir(tmp_path):110 log_path = tmp_path / "logs"111 log_path.mkdir()112 return log_path113@pytest.fixture(name="journalpump_factory")114def fixture_journalpump_factory(mocker, tmp_path, journal_log_dir):115 pump_thread = None116 pump = None117 def _start_journalpump(sender, *, pump_conf=None):118 nonlocal pump_thread119 nonlocal pump120 pump_conf = pump_conf or {}121 mocker.patch.object(PumpReader, "has_persistent_files", return_value=True)122 mocker.patch.object(PumpReader, "has_runtime_files", return_value=True)123 senders.output_type_to_sender_class["stub_sender"] = sender124 config_path = tmp_path / "journalpump.json"125 with open(config_path, "w") as fp:126 json.dump(127 {128 "readers": {129 "my-reader": {130 "journal_path": str(journal_log_dir),131 "initial_position": "head",132 "senders": {133 "test-sender": {"output_type": "stub_sender"},134 },135 "searches": [136 {137 "fields": {138 "MESSAGE": "Message.*",139 },140 "name": "test-messages",141 }142 ],143 },144 },145 **pump_conf,146 },147 fp,148 )149 pump = JournalPump(str(config_path))150 pump.poll_interval_ms = 100151 pump_thread = threading.Thread(target=pump.run)152 pump_thread.start()153 assert journalpump_initialized(pump)154 return pump155 yield _start_journalpump156 if pump_thread and pump:157 pump.running = False158 pump_thread.join(timeout=3)159def test_journalpump_rotated_files(journalpump_factory, journal_log_dir):160 stub_sender = StubSender()161 journalpump_factory(stub_sender)162 lf = LogFiles(journal_log_dir)163 lf.rotate()164 messages = stub_sender.get_messages(10, timeout=3)165 assert set(m["MESSAGE"] for m in messages) == {f"Message {i}" for i in range(0, 10)}166 lf.rotate()167 lf.rotate()168 messages = stub_sender.get_messages(20, timeout=3)169 assert set(m["MESSAGE"] for m in messages) == {f"Message {i}" for i in range(10, 30)}170@pytest.mark.parametrize("msg_buffer_max_length", [3, 5, 10])171def test_journalpump_rotated_files_threshold(journalpump_factory, journal_log_dir, msg_buffer_max_length):172 stub_sender = StubSender()173 pump = journalpump_factory(stub_sender, pump_conf={"msg_buffer_max_length": msg_buffer_max_length})174 lf = LogFiles(journal_log_dir)175 lf.rotate()176 reader: JournalReader = next(iter(pump.readers.values()))177 assert reader.msg_buffer_max_length == msg_buffer_max_length178 lf.rotate()179 lf.rotate()180 for it in range(30 // msg_buffer_max_length):181 messages = stub_sender.get_messages(msg_buffer_max_length, timeout=3)182 expected = {f"Message {i}" for i in range(it * msg_buffer_max_length, msg_buffer_max_length * (it + 1))}183 assert len(messages) == msg_buffer_max_length184 assert set(m["MESSAGE"] for m in messages) == expected185@pytest.mark.parametrize("size,num_messages", [(50, 1), (1000, 2)])186def test_journalpump_rotated_files_threshold_bytes(journalpump_factory, journal_log_dir, size, num_messages):187 stub_sender = StubSender()188 pump = journalpump_factory(stub_sender, pump_conf={"msg_buffer_max_bytes": size})189 lf = LogFiles(journal_log_dir)190 lf.rotate()191 lf.rotate()192 lf.rotate()193 reader: JournalReader = next(iter(pump.readers.values()))194 assert reader.msg_buffer_max_bytes == size195 for it in range(30 // num_messages):196 messages = stub_sender.get_messages(num_messages, timeout=3)197 expected = {f"Message {i}" for i in range(it * num_messages, num_messages * (it + 1))}198 assert len(messages) == num_messages199 assert set(m["MESSAGE"] for m in messages) == expected200def _lsof_is_file_open(filenames: List[str]) -> Dict[str, bool]:201 """Check if file is open using lsof"""202 # psutil doesn't show deleted files, but this exactly what we want to test203 output = subprocess.check_output(["lsof", "-p", str(os.getpid()), "-w"]).decode().split("\n")204 result = {fn: False for fn in filenames}205 for line in output:206 for fn in result:207 if fn not in line:208 continue209 result[fn] = True210 return result211def _wait_for(predicate: Callable[[], bool], timeout: int):212 cur = time.monotonic()213 deadline = cur + timeout214 while cur < deadline:215 if predicate():216 return True217 cur = time.monotonic()218 time.sleep(0.2)219 return False220@pytest.mark.skipif(not shutil.which("lsof"), reason="lsof is not available")221def test_journalpump_rotated_files_deletion(journalpump_factory, journal_log_dir):222 stub_sender = StubSender()223 journalpump_factory(stub_sender, pump_conf={"msg_buffer_max_length": 1})224 def _get_message():225 messages = stub_sender.get_messages(1, timeout=3)226 assert len(messages) == 1227 return messages[0]["MESSAGE"]228 lf = LogFiles(journal_log_dir)229 lf.rotate()230 assert _get_message() == "Message 0"231 lf.rotate()232 # Loop once to get new files open233 assert _get_message() == "Message 1"234 log_files = [str(f) for f in lf.log_files]235 assert len(log_files) == 2236 assert _wait_for(lambda: all(_lsof_is_file_open(log_files).values()), timeout=3)237 lf.remove(last=1)238 # Took buffered message239 # Dropped messages 3-9 as file was deleted240 assert _get_message() in ["Message 2", "Message 10"]241 assert _get_message() in ["Message 10", "Message 11"]242 def _only_head_open():243 open_files = _lsof_is_file_open(log_files)244 return open_files[log_files[0]] and not open_files[log_files[-1]]245 assert _wait_for(_only_head_open, timeout=3), f"Expected {log_files[-1]} to not be open"246def test_journalpump_stats_sender(mocker, journalpump_factory, journal_log_dir):247 stub_sender = StubSender()248 stats: Dict[str, int] = {}249 class StatStub:250 def increase(self, name: str, **kwargs): # pylint: disable=unused-argument251 stats[name] = stats.get(name, 0) + 1252 def gauge(self, *args, **kwargs): # pylint: disable=unused-argument253 pass254 def unexpected_exception(self, *args, **kwargs): # pylint: disable=unused-argument255 pass256 mocker.patch.object(statsd, "StatsClient", return_value=StatStub())257 journalpump_factory(258 stub_sender,259 pump_conf={260 "readers": {261 "my-stats-reader": {262 "journal_path": str(journal_log_dir),263 "initial_position": "head",264 "senders": {},265 "searches": [266 {267 "fields": {268 "MESSAGE": "Message [123]",269 },270 "name": "stats-messages",271 }272 ],273 },274 "my-reader": {275 "journal_path": str(journal_log_dir),276 "initial_position": "head",277 "senders": {278 "test-sender": {"output_type": "stub_sender"},279 },280 "searches": [281 {282 "fields": {283 "MESSAGE": "Message.*",284 },285 "name": "test-messages",286 }287 ],288 },289 },290 },291 )292 lf = LogFiles(journal_log_dir)293 lf.rotate()294 lf.rotate()295 assert _wait_for(296 lambda: stats.get("stats-messages") == 13, timeout=3...
servo.py
Source:servo.py
...107 self._rotation_speed = 0108 else:109 position = -1 if rotation_speed < 0 else 1110 asyncio.create_task(111 self._rotate_to(112 position,113 rotation_speed,114 time.time(),115 )116 )117 async def rotate_to(self, position, rotation_speed=None):118 """Rotate to some position, optionally with a spesific speed119 :param position: position between -1 and 1, see position property120 :type position: int121 :param rotation_speed: rotation speed between -1 and 1,122 see rotation_speed property. Defaults to None,123 which is changed to the max speed in the correct direction.124 :type rotation_speed: int, optional125 """126 self._check_if_stopped()127 await self._rotate_to(position, rotation_speed, time.time())128 async def _rotate_to(self, position, rotation_speed, rotation_start_time):129 # Asyncio task starts after a delay, this handles the case where130 # stop() is called before the task has started or there was another131 # _rotate_to call in between132 if self._stopped or (133 self._latest_rotation_start_time is not None134 and self._latest_rotation_start_time > rotation_start_time135 ):136 return137 current_position = self.position138 # Guess the middle position if the position is not set139 if self.position is None:140 logging.warning("Servo position not known, guessing middle 0")141 current_position = 0142 # Set the default rotation speed, if not specified...
biot_savart.py
Source:biot_savart.py
...148 # major_axis = major_axis / np.linalg.norm(major_axis)149 r = np.cross(z_ax, normal)150 if np.linalg.norm(r) == 0:151 r = np.array([1, 0, 0])152 normal_rotate = _rotate_to(z_ax, normal, r)153 major_rotate = _rotate_to(normal_rotate @ x_ax, major_axis, normal)154 segments = major_rotate @ normal_rotate @ segments155 return segments + center.reshape((1, 3, 1))156# TODO Docstring157def _rotate_to(v1, v2, r):158 assert(np.round(np.linalg.norm(r), decimals=9) != 0)159 assert(np.round(np.dot(v1, r), decimals=9) == 0)160 assert(np.round(np.dot(v2, r), decimals=9) == 0)161 v1_hat = v1 / np.linalg.norm(v1)162 v2_hat = v2 / np.linalg.norm(v2)163 r_hat = r / np.linalg.norm(r)164 cs = np.dot(v1_hat, v2_hat)165 sn = np.linalg.norm(np.cross(v1_hat, v2_hat))166 if sn != 0:167 r = r_hat * sn168 r_cross_mat = np.array([[0, -r[2], r[1]],169 [r[2], 0, -r[0]],170 [-r[1], r[0], 0]])171 return np.identity(3) + r_cross_mat + r_cross_mat @ r_cross_mat * (1 - cs)/sn**2...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!