How to use test_stop_event method in locust

Best Python code snippet using locust

test_worker.py

Source:test_worker.py Github

copy

Full Screen

1import os2import logging3import zipfile4import time5import threading6import pytest7try:8 # Python 39 import queue10 from urllib.error import HTTPError, URLError11except ImportError:12 # Python 213 from urllib2 import HTTPError, URLError14 import Queue as queue15import gmaltcli.worker as worker16class TestSafeCounter(object):17 def test_increment_one(self):18 counter = worker.SafeCounter()19 counter.max = 10020 return_value = counter.increment()21 assert return_value[0] == 122 assert return_value[1] == 10023 return_value = counter.increment()24 assert return_value[0] == 225 assert return_value[1] == 10026 def test_increment_five_start_ten(self):27 counter = worker.SafeCounter(start=10, incr=5)28 counter.max = 10029 return_value = counter.increment()30 assert return_value[0] == 1531 assert return_value[1] == 10032 return_value = counter.increment()33 assert return_value[0] == 2034 assert return_value[1] == 10035 def test_get(self):36 counter = worker.SafeCounter()37 counter.max = 10038 counter.increment()39 counter.increment()40 counter.increment()41 assert counter.get() == 342class FalseWorker(worker.Worker):43 def __init__(self, id_, queue_obj, counter, stop_event, processed, sleep=0):44 super(FalseWorker, self).__init__(id_, queue_obj, counter, stop_event)45 processed[id_] = []46 self.processed = processed47 self.sleep = sleep48 def process(self, queue_item, counter_info):49 self.processed[self.id].append(queue_item)50 if self.sleep:51 time.sleep(self.sleep)52 def _on_end(self):53 setattr(self, 'on_end_called', True)54class ErrorWorker(FalseWorker):55 def process(self, queue_item, counter_info):56 if queue_item == 80 or counter_info[0] == 80:57 raise Exception(58 'Exception on 80th item')59 else:60 super(ErrorWorker, self).process(queue_item, counter_info)61class TestWorkerPool(object):62 def setup_method(self, func_method):63 self.processed = {}64 self.pool = worker.WorkerPool(FalseWorker, 5, self.processed, sleep=0.001)65 self.pool.fill(['item' + str(item) for item in range(1, 100)])66 def test__init__(self):67 assert len(self.pool.workers) == 568 assert all([isinstance(item, FalseWorker) for item in self.pool.workers])69 assert all([id(self.processed) == id(item.processed) for item in self.pool.workers])70 def test_fill(self):71 assert self.pool.queue.qsize() == 9972 assert self.pool.counter.max == 9973 def test_start(self):74 self.pool.start()75 assert len(self.processed[1] + self.processed[2] + self.processed[3] +76 self.processed[4] + self.processed[5]) == 9977 assert all([len(self.processed[key]) > 10 for key in self.processed])78 def test_start_and_error(self):79 pool = worker.WorkerPool(ErrorWorker, 5, {}, sleep=0.1)80 pool.fill(['item' + str(item) for item in range(1, 100)])81 with pytest.raises(worker.WorkerPoolException):82 pool.start()83class TestWorker(object):84 def setup_method(self, func_method):85 test_stop_event = threading.Event()86 error_stop_event = threading.Event()87 test_counter = worker.SafeCounter()88 error_counter = worker.SafeCounter()89 test_worker_queue = queue.Queue()90 error_worker_queue = queue.Queue()91 for item in range(1, 100):92 test_worker_queue.put(item)93 error_worker_queue.put(item)94 self.processed = {}95 self.test_worker = FalseWorker(1, test_worker_queue,96 test_counter, test_stop_event,97 self.processed)98 self.error_worker = ErrorWorker(2, error_worker_queue,99 error_counter, error_stop_event,100 self.processed)101 def test_run(self):102 self.test_worker.run()103 assert len(self.processed[1]) == 99104 assert hasattr(self.test_worker, 'on_end_called')105 assert self.test_worker.queue.empty()106 assert not self.test_worker.stop_event.isSet()107 self.error_worker.run()108 assert len(self.processed[2]) == 79109 assert hasattr(self.error_worker, 'on_end_called')110 assert self.error_worker.queue.qsize() == 19111 assert self.error_worker.stop_event.isSet()112 def test__get_queue(self):113 self.test_worker._get_queue()114 assert len(self.processed[1]) == 1115 assert not hasattr(self.test_worker, 'on_end_called')116 assert self.test_worker.queue.qsize() == 98117 assert not self.test_worker.stop_event.isSet()118 for i in range(1, 80):119 self.error_worker._get_queue()120 assert len(self.processed[2]) == 79121 assert not hasattr(self.error_worker, 'on_end_called')122 assert self.error_worker.queue.qsize() == 20123 assert not self.error_worker.stop_event.isSet()124 # 80th event125 self.error_worker._get_queue()126 assert len(self.processed[2]) == 79127 assert not hasattr(self.error_worker, 'on_end_called')128 assert self.error_worker.queue.qsize() == 19129 assert self.error_worker.stop_event.isSet()130class TestDownloadWorker(object):131 def setup_method(self, func_method):132 stop_event = threading.Event()133 counter = worker.SafeCounter()134 worker_queue = queue.Queue()135 folder = None136 self.download_worker = worker.DownloadWorker(1, worker_queue, counter,137 stop_event, folder)138 def test__secured_download_file_connection_error(self, monkeypatch):139 def raise_url_error(url, filename, md5sum=None):140 raise URLError('message')141 monkeypatch.setattr(self.download_worker, '_download_file', raise_url_error)142 monkeypatch.setattr(logging, 'error', lambda x: x)143 with pytest.raises(URLError):144 self.download_worker._secured_download_file('url', 'filename')145 def test__secured_download_file_wrong_url(self, monkeypatch):146 def raise_url_error(url, filename, md5sum=None):147 raise HTTPError('message', None, None, None, None)148 monkeypatch.setattr(self.download_worker, '_download_file', raise_url_error)149 monkeypatch.setattr(logging, 'error', lambda x: x)150 with pytest.raises(HTTPError):151 self.download_worker._secured_download_file('url', 'filename')152 def test__secured_download_file_wrong_checksum(self, monkeypatch):153 def raise_url_error(url, filename, md5sum=None):154 raise worker.InvalidCheckSumException('checksum exception')155 monkeypatch.setattr(self.download_worker, '_download_file', raise_url_error)156 monkeypatch.setattr(logging, 'error', lambda x: x)157 with pytest.raises(worker.InvalidCheckSumException) as e:158 self.download_worker._secured_download_file('url', 'filename', 'checkcsum')159 assert str(e.value) == 'checksum exception'160 def test__secured_download_file_wrong_zip_crc(self, monkeypatch):161 def raise_zip_error(url, filename, md5sum=None):162 raise zipfile.BadZipfile('bad crc')163 monkeypatch.setattr(self.download_worker, '_download_file', raise_zip_error)164 monkeypatch.setattr(logging, 'error', lambda x: x)165 with pytest.raises(zipfile.BadZipfile) as e:166 self.download_worker._secured_download_file('url', 'filename', 'checkcsum')167 assert str(e.value) == 'bad crc'168 def test__download_file(self, tmpdir):169 tmp_folder = str(tmpdir.mkdir('gmaltcli'))170 self.download_worker.folder = tmp_folder171 self.download_worker._download_file(172 'http://dds.cr.usgs.gov/srtm/version2_1/SRTM3/Africa/N00E010.hgt.zip',173 'N00E010.hgt.zip')174 downloaded_path = os.path.join(self.download_worker.folder, 'N00E010.hgt.zip')175 assert os.path.exists(downloaded_path)176 assert os.path.getsize(downloaded_path) == 1743694177 def test__download_file_with_md5sum_check(self, tmpdir):178 tmp_folder = str(tmpdir.mkdir('gmaltcli'))179 self.download_worker.folder = tmp_folder180 self.download_worker._download_file(181 'http://dds.cr.usgs.gov/srtm/version2_1/SRTM3/Africa/N00E010.hgt.zip',182 'N00E010.hgt.zip',183 'dfb52a9b9eae6de945bd2cfbbacdbc7f')184 downloaded_path = os.path.join(self.download_worker.folder, 'N00E010.hgt.zip')185 assert os.path.exists(downloaded_path)186 assert os.path.getsize(downloaded_path) == 1743694187 def test__download_file_with_wrong_md5sum_check(self, tmpdir):188 tmp_folder = str(tmpdir.mkdir('gmaltcli'))189 self.download_worker.folder = tmp_folder190 with pytest.raises(worker.InvalidCheckSumException):191 self.download_worker._download_file(192 'http://dds.cr.usgs.gov/srtm/version2_1/SRTM3/Africa/N00E010.hgt.zip',193 'N00E010.hgt.zip',194 'abcdefgh')195 def test__download_file_with_wrong_crc_check(self, tmpdir, monkeypatch):196 monkeypatch.setattr(zipfile.ZipFile, 'testzip', lambda x: 'bad crc filename')197 tmp_folder = str(tmpdir.mkdir('gmaltcli'))198 self.download_worker.folder = tmp_folder199 with pytest.raises(zipfile.BadZipfile):200 self.download_worker._download_file(201 'http://dds.cr.usgs.gov/srtm/version2_1/SRTM3/Africa/N00E010.hgt.zip',202 'N00E010.hgt.zip',203 'dfb52a9b9eae6de945bd2cfbbacdbc7f')204class TestExtractWorker(object):205 def setup_method(self, func_method):206 stop_event = threading.Event()207 counter = worker.SafeCounter()208 worker_queue = queue.Queue()209 folder = None210 self.extract_worker = worker.ExtractWorker(1, worker_queue, counter,211 stop_event, folder)212 def test__extract_file(self, tmpdir):213 tmp_folder = str(tmpdir.mkdir('gmaltcli'))214 self.extract_worker.folder = tmp_folder215 zip_file = os.path.realpath(os.path.join(os.path.dirname(__file__), 'srtm3', 'N00E010.hgt.zip'))216 self.extract_worker._extract_file(zip_file)217 extracted_file = os.path.join(tmp_folder, 'N00E010.hgt')218 assert os.path.exists(extracted_file)...

Full Screen

Full Screen

stop_test.py

Source:stop_test.py Github

copy

Full Screen

...10 """Setup test case"""11 self.sio = socketio.Client()12 self.mock = Mock()13 self.sio.connect('http://localhost:8080')14 def test_stop_event(self):15 """Test stopping movement with event"""16 self.sio.emit('register_manipulator', 1)17 self.sio.emit('register_manipulator', 2)18 self.sio.emit('bypass_calibration', 1)19 self.sio.emit('bypass_calibration', 2)20 self.sio.emit('goto_pos', {'manipulator_id': 1, 'pos': [0, 0, 0, 0],21 'speed': self.DRIVE_SPEED},22 callback=self.mock)23 self.sio.emit('goto_pos', {'manipulator_id': 2, 'pos': [0, 0, 0, 0],24 'speed': self.DRIVE_SPEED},25 callback=self.mock)26 self.sio.emit('goto_pos',27 {'manipulator_id': 1,28 'pos': [10000, 10000, 10000, 10000],...

Full Screen

Full Screen

test_node.py

Source:test_node.py Github

copy

Full Screen

1import pytest2from valens.structures.node import Node3from torch.multiprocessing import Event4import time5def test_stop_event():6 class TestNode(Node):7 def process(self):8 pass9 t = [TestNode("t1"), TestNode("t2")]10 stop_event = Event()11 for ti in t: ti.set_stop_event(stop_event)12 for ti in t: ti.start()13 time.sleep(10)14 stop_event.set()15 for ti in t: ti.join()16def test_terminate():17 class TestNode(Node):18 def process(self):19 pass...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run locust automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful