Best Python code snippet using autotest_python
test_threading.py
Source:test_threading.py
1import os2import threading3import uuid4import pytest5from ddtrace.internal import nogevent6from ddtrace.profiling import recorder7from ddtrace.profiling.collector import threading as collector_threading8from . import test_collector9TESTING_GEVENT = os.getenv("DD_PROFILE_TEST_GEVENT", False)10def test_repr():11 test_collector._test_repr(12 collector_threading.ThreadingLockCollector,13 "ThreadingLockCollector(status=<ServiceStatus.STOPPED: 'stopped'>, "14 "recorder=Recorder(default_max_events=16384, max_events={}), capture_pct=1.0, nframes=64, "15 "endpoint_collection_enabled=True, tracer=None)",16 )17def test_wrapper():18 r = recorder.Recorder()19 collector = collector_threading.ThreadingLockCollector(r)20 with collector:21 class Foobar(object):22 lock_class = threading.Lock23 def __init__(self):24 lock = self.lock_class()25 assert lock.acquire()26 lock.release()27 # Try to access the attribute28 lock = Foobar.lock_class()29 assert lock.acquire()30 lock.release()31 # Try this way too32 Foobar()33def test_patch():34 r = recorder.Recorder()35 lock = threading.Lock36 collector = collector_threading.ThreadingLockCollector(r)37 collector.start()38 assert lock == collector.original39 # wrapt makes this true40 assert lock == threading.Lock41 collector.stop()42 assert lock == threading.Lock43 assert collector.original == threading.Lock44def test_lock_acquire_events():45 r = recorder.Recorder()46 with collector_threading.ThreadingLockCollector(r, capture_pct=100):47 lock = threading.Lock()48 lock.acquire()49 assert len(r.events[collector_threading.ThreadingLockAcquireEvent]) == 150 assert len(r.events[collector_threading.ThreadingLockReleaseEvent]) == 051 event = r.events[collector_threading.ThreadingLockAcquireEvent][0]52 assert event.lock_name == "test_threading.py:64"53 assert event.thread_id == nogevent.thread_get_ident()54 assert event.wait_time_ns >= 055 # It's called through pytest so I'm sure it's gonna be that long, right?56 assert len(event.frames) > 357 assert event.nframes > 358 assert event.frames[0] == (__file__.replace(".pyc", ".py"), 65, "test_lock_acquire_events", "")59 assert event.sampling_pct == 10060def test_lock_acquire_events_class():61 r = recorder.Recorder()62 with collector_threading.ThreadingLockCollector(r, capture_pct=100):63 class Foobar(object):64 def lockfunc(self):65 lock = threading.Lock()66 lock.acquire()67 Foobar().lockfunc()68 assert len(r.events[collector_threading.ThreadingLockAcquireEvent]) == 169 assert len(r.events[collector_threading.ThreadingLockReleaseEvent]) == 070 event = r.events[collector_threading.ThreadingLockAcquireEvent][0]71 assert event.lock_name == "test_threading.py:85"72 assert event.thread_id == nogevent.thread_get_ident()73 assert event.wait_time_ns >= 074 # It's called through pytest so I'm sure it's gonna be that long, right?75 assert len(event.frames) > 376 assert event.nframes > 377 assert event.frames[0] == (__file__.replace(".pyc", ".py"), 86, "lockfunc", "Foobar")78 assert event.sampling_pct == 10079def test_lock_events_tracer(tracer):80 resource = str(uuid.uuid4())81 span_type = str(uuid.uuid4())82 r = recorder.Recorder()83 with collector_threading.ThreadingLockCollector(r, tracer=tracer, capture_pct=100):84 lock = threading.Lock()85 lock.acquire()86 with tracer.trace("test", resource=resource, span_type=span_type) as t:87 lock2 = threading.Lock()88 lock2.acquire()89 lock.release()90 trace_id = t.trace_id91 span_id = t.span_id92 lock2.release()93 events = r.reset()94 # The tracer might use locks, so we need to look into every event to assert we got ours95 for event_type in (collector_threading.ThreadingLockAcquireEvent, collector_threading.ThreadingLockReleaseEvent):96 assert {"test_threading.py:108", "test_threading.py:111"}.issubset({e.lock_name for e in events[event_type]})97 for event in events[event_type]:98 if event.name == "test_threading.py:84":99 assert event.trace_id is None100 assert event.span_id is None101 assert event.trace_resource_container is None102 assert event.trace_type is None103 elif event.name == "test_threading.py:87":104 assert event.trace_id == trace_id105 assert event.span_id == span_id106 assert event.trace_resource_container[0] == t.resource107 assert event.trace_type == t.span_type108def test_lock_events_tracer_late_finish(tracer):109 resource = str(uuid.uuid4())110 span_type = str(uuid.uuid4())111 r = recorder.Recorder()112 with collector_threading.ThreadingLockCollector(r, tracer=tracer, capture_pct=100):113 lock = threading.Lock()114 lock.acquire()115 span = tracer.start_span("test", span_type=span_type)116 lock2 = threading.Lock()117 lock2.acquire()118 lock.release()119 trace_id = span.trace_id120 span_id = span.span_id121 lock2.release()122 span.resource = resource123 span.finish()124 events = r.reset()125 # The tracer might use locks, so we need to look into every event to assert we got ours126 for event_type in (collector_threading.ThreadingLockAcquireEvent, collector_threading.ThreadingLockReleaseEvent):127 assert {"test_threading.py:139", "test_threading.py:142"}.issubset({e.lock_name for e in events[event_type]})128 for event in events[event_type]:129 if event.name == "test_threading.py:115":130 assert event.trace_id is None131 assert event.span_id is None132 assert event.trace_resource_container is None133 assert event.trace_type is None134 elif event.name == "test_threading.py:118":135 assert event.trace_id == trace_id136 assert event.span_id == span_id137 assert event.trace_resource_container[0] == span.resource138 assert event.trace_type == span.span_type139def test_resource_not_collected(monkeypatch, tracer):140 monkeypatch.setenv("DD_PROFILING_ENDPOINT_COLLECTION_ENABLED", "false")141 resource = str(uuid.uuid4())142 span_type = str(uuid.uuid4())143 r = recorder.Recorder()144 with collector_threading.ThreadingLockCollector(r, tracer=tracer, capture_pct=100):145 lock = threading.Lock()146 lock.acquire()147 with tracer.trace("test", resource=resource, span_type=span_type) as t:148 lock2 = threading.Lock()149 lock2.acquire()150 lock.release()151 trace_id = t.trace_id152 span_id = t.span_id153 lock2.release()154 events = r.reset()155 # The tracer might use locks, so we need to look into every event to assert we got ours156 for event_type in (collector_threading.ThreadingLockAcquireEvent, collector_threading.ThreadingLockReleaseEvent):157 assert {"test_threading.py:173", "test_threading.py:176"}.issubset({e.lock_name for e in events[event_type]})158 for event in events[event_type]:159 if event.name == "test_threading.py:149":160 assert event.trace_id is None161 assert event.span_id is None162 assert event.trace_resource_container is None163 assert event.trace_type is None164 elif event.name == "test_threading.py:152":165 assert event.trace_id == trace_id166 assert event.span_id == span_id167 assert event.trace_resource_container is None168 assert event.trace_type == t.span_type169def test_lock_release_events():170 r = recorder.Recorder()171 with collector_threading.ThreadingLockCollector(r, capture_pct=100):172 lock = threading.Lock()173 lock.acquire()174 lock.release()175 assert len(r.events[collector_threading.ThreadingLockAcquireEvent]) == 1176 assert len(r.events[collector_threading.ThreadingLockReleaseEvent]) == 1177 event = r.events[collector_threading.ThreadingLockReleaseEvent][0]178 assert event.lock_name == "test_threading.py:202"179 assert event.thread_id == nogevent.thread_get_ident()180 assert event.locked_for_ns >= 0181 # It's called through pytest so I'm sure it's gonna be that long, right?182 assert len(event.frames) > 3183 assert event.nframes > 3184 assert event.frames[0] == (__file__.replace(".pyc", ".py"), 204, "test_lock_release_events", "")185 assert event.sampling_pct == 100186@pytest.mark.skipif(not TESTING_GEVENT, reason="Not testing gevent")187def test_lock_gevent_tasks():188 r = recorder.Recorder()189 def play_with_lock():190 lock = threading.Lock()191 lock.acquire()192 lock.release()193 with collector_threading.ThreadingLockCollector(r, capture_pct=100):194 t = threading.Thread(name="foobar", target=play_with_lock)195 t.start()196 t.join()197 assert len(r.events[collector_threading.ThreadingLockAcquireEvent]) >= 1198 assert len(r.events[collector_threading.ThreadingLockReleaseEvent]) >= 1199 for event in r.events[collector_threading.ThreadingLockAcquireEvent]:200 if event.lock_name == "test_threading.py:223":201 assert event.thread_id == nogevent.main_thread_id202 assert event.wait_time_ns >= 0203 assert event.task_id == t.ident204 assert event.task_name == "foobar"205 # It's called through pytest so I'm sure it's gonna be that long, right?206 assert len(event.frames) > 3207 assert event.nframes > 3208 assert event.frames[0] == (__file__.replace(".pyc", ".py"), 224, "play_with_lock", "")209 assert event.sampling_pct == 100210 assert event.task_id == t.ident211 assert event.task_name == "foobar"212 break213 else:214 pytest.fail("Lock event not found")215 for event in r.events[collector_threading.ThreadingLockReleaseEvent]:216 if event.lock_name == "test_threading.py:223":217 assert event.thread_id == nogevent.main_thread_id218 assert event.locked_for_ns >= 0219 assert event.task_id == t.ident220 assert event.task_name == "foobar"221 # It's called through pytest so I'm sure it's gonna be that long, right?222 assert len(event.frames) > 3223 assert event.nframes > 3224 assert event.frames[0] == (__file__.replace(".pyc", ".py"), 225, "play_with_lock", "")225 assert event.sampling_pct == 100226 assert event.task_id == t.ident227 assert event.task_name == "foobar"228 break229 else:230 pytest.fail("Lock event not found")231@pytest.mark.benchmark(232 group="threading-lock-create",233)234def test_lock_create_speed_patched(benchmark):235 r = recorder.Recorder()236 with collector_threading.ThreadingLockCollector(r):237 benchmark(threading.Lock)238@pytest.mark.benchmark(239 group="threading-lock-create",240)241def test_lock_create_speed(benchmark):242 benchmark(threading.Lock)243def _lock_acquire_release(lock):244 lock.acquire()245 lock.release()246@pytest.mark.benchmark(247 group="threading-lock-acquire-release",248)249@pytest.mark.parametrize(250 "pct",251 range(5, 61, 5),252)253def test_lock_acquire_release_speed_patched(benchmark, pct):254 r = recorder.Recorder()255 with collector_threading.ThreadingLockCollector(r, capture_pct=pct):256 benchmark(_lock_acquire_release, threading.Lock())257@pytest.mark.benchmark(258 group="threading-lock-acquire-release",259)260def test_lock_acquire_release_speed(benchmark):...
perf_tests.py
Source:perf_tests.py
...142 print("Attack detection - 10: ", t50)143 print("Attack detection - 10: ", t100)144# ---------------------------------------------------------------145# ---------------THREAD MODULE TESTING---------------------146def test_threading(iterations):147 SETUP_CODE = """ 148from __main__ import start_thread"""149 TEST_CODE = """ 150start_thread()151 """152 times = timeit.repeat(setup=SETUP_CODE, stmt=TEST_CODE, repeat=3, number=iterations)153 print("Server connect time for ", iterations, ": ", min(times))154 return min(times)155def thread_func():156 while True:157 data = getPerfMetrics()158 nb_classifier = pd.read_pickle("application/components/NB_classifier.pkl")159 predicted_values = nb_classifier.predict(data)160 print(predicted_values)161def threading_begin():162 server_thread = threading.Thread(target=thread_func, daemon=True)163 server_thread.start()164def start_thread():165 for x in range(10):166 threading_begin()167def test_thread_module():168 print("testing thread module")169 t1 = test_threading(1)170 t10 = test_threading(10)171 t50 = test_threading(50)172 t100 = test_threading(100)173 # test_threading(50)174 # test_threading(100)175 print("Threading - 1: ", t1)176 print("Threading - 10: ", t10)177 print("Threading - 50: ", t50)178 print("Threading - 100: ", t100)179# ---------------------------------------------------------------180if __name__ == "__main__":181 print("initiating testing")182 # test_server_connection_module()183 # test_command_exec_module()184 # test_detection_module()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!