Best Python code snippet using localstack_python
test_waveform.py
Source:test_waveform.py
...16 """17 def setUp(self):18 # directory where the test files are located19 self.path = os.path.join(os.path.dirname(__file__), 'images')20 def _create_stream(self, starttime, endtime, sampling_rate):21 """22 Helper method to create a Stream object that can be used for testing23 waveform plotting.24 Takes the time frame of the Stream to be created and a sampling rate.25 Any other header information will have to be adjusted on a case by case26 basis. Please remember to use the same sampling rate for one Trace as27 merging and plotting will not work otherwise.28 This method will create a single sine curve to a first approximation29 with superimposed 10 smaller sine curves on it.30 :return: Stream object31 """32 time_delta = endtime - starttime33 number_of_samples = int(time_delta * sampling_rate) + 134 # Calculate first sine wave.35 curve = np.linspace(0, 2 * np.pi, number_of_samples // 2)36 # Superimpose it with a smaller but shorter wavelength sine wave.37 curve = np.sin(curve) + 0.2 * np.sin(10 * curve)38 # To get a thick curve alternate between two curves.39 data = np.empty(number_of_samples)40 # Check if even number and adjust if necessary.41 if number_of_samples % 2 == 0:42 data[0::2] = curve43 data[1::2] = curve + 0.244 else:45 data[-1] = 0.046 data[0:-1][0::2] = curve47 data[0:-1][1::2] = curve + 0.248 tr = Trace()49 tr.stats.starttime = starttime50 tr.stats.sampling_rate = float(sampling_rate)51 # Fill dummy header.52 tr.stats.network = 'BW'53 tr.stats.station = 'OBSPY'54 tr.stats.channel = 'TEST'55 tr.data = data56 return Stream(traces=[tr])57 def test_data_remains_unchanged(self):58 """59 Data should not be changed when plotting.60 """61 # Use once with straight plotting with random calibration factor62 st = self._create_stream(UTCDateTime(0), UTCDateTime(1000), 1)63 st[0].stats.calib = 0.234364 org_st = st.copy()65 st.plot(format='png')66 self.assertEqual(st, org_st)67 # Now with min-max list creation (more than 400000 samples).68 st = self._create_stream(UTCDateTime(0), UTCDateTime(600000), 1)69 st[0].stats.calib = 0.234370 org_st = st.copy()71 st.plot(format='png')72 self.assertEqual(st, org_st)73 # Now only plot a certain time frame.74 st.plot(75 format='png', starrtime=UTCDateTime(10000),76 endtime=UTCDateTime(20000))77 self.assertEqual(st, org_st)78 def test_plot_empty_stream(self):79 """80 Plotting of an empty stream should raise a warning.81 """82 st = Stream()83 self.assertRaises(IndexError, st.plot)84 def test_plot_same_trace_different_sample_rates(self):85 """86 Plotting of a Stream object, that contains two traces with the same id87 and different sampling rates should raise an exception.88 """89 start = UTCDateTime(0)90 st = self._create_stream(start, start + 10, 1.0)91 st += self._create_stream(start + 10, start + 20, 10.0)92 self.assertRaises(Exception, st.plot)93 def test_plot_one_hour_many_samples(self):94 """95 Plots one hour, starting Jan 1970.96 Uses a frequency of 1000 Hz to get a sample count of over 3 Million and97 get in the range, that plotting will choose to use a minimum maximum98 approach to plot the data.99 """100 start = UTCDateTime(0)101 st = self._create_stream(start, start + 3600, 1000.0)102 # create and compare image103 image_name = 'waveform_one_hour_many_samples.png'104 with ImageComparison(self.path, image_name) as ic:105 st.plot(outfile=ic.name)106 def test_plot_one_hour_few_samples(self):107 """108 Plots one hour, starting Jan 1970.109 Uses a frequency of 10 Hz.110 """111 start = UTCDateTime(0)112 st = self._create_stream(start, start + 3600, 10.0)113 # create and compare image114 image_name = 'waveform_one_hour_few_samples.png'115 with ImageComparison(self.path, image_name) as ic:116 st.plot(outfile=ic.name)117 def test_plot_simple_gap_many_samples(self):118 """119 Plots three hours with a gap.120 There are 45 minutes of data at the beginning and 45 minutes of data at121 the end.122 """123 start = UTCDateTime(0)124 st = self._create_stream(start, start + 3600 * 3 / 4, 500.0)125 st += self._create_stream(start + 2.25 * 3600, start + 3 * 3600, 500.0)126 # create and compare image127 image_name = 'waveform_simple_gap_many_samples.png'128 with ImageComparison(self.path, image_name) as ic:129 st.plot(outfile=ic.name)130 def test_plot_simple_gap_few_samples(self):131 """132 Plots three hours with a gap.133 There are 45 minutes of data at the beginning and 45 minutes of data at134 the end.135 """136 start = UTCDateTime(0)137 st = self._create_stream(start, start + 3600 * 3 / 4, 5.0)138 st += self._create_stream(start + 2.25 * 3600, start + 3 * 3600, 5.0)139 # create and compare image140 image_name = 'waveform_simple_gap_few_samples.png'141 with ImageComparison(self.path, image_name) as ic:142 st.plot(outfile=ic.name)143 def test_plot_complex_gap_many_samples(self):144 """145 Plots three hours with a gap.146 There are 45 minutes of data at the beginning and 45 minutes of data at147 the end.148 """149 start = UTCDateTime(0)150 st = self._create_stream(start, start + 3600 * 3 / 4, 500.0)151 st += self._create_stream(start + 2.25 * 3600, start + 3 * 3600, 500.0)152 st[0].stats.location = '01'153 st[1].stats.location = '01'154 temp_st = self._create_stream(start + 3600 * 3 / 4,155 start + 2.25 * 3600, 500.0)156 temp_st[0].stats.location = '02'157 st += temp_st158 # create and compare image159 image_name = 'waveform_complex_gap_many_samples.png'160 with ImageComparison(self.path, image_name) as ic:161 st.plot(outfile=ic.name)162 def test_plot_complex_gap_few_samples(self):163 """164 Plots three hours with a gap.165 There are 45 minutes of data at the beginning and 45 minutes of data at166 the end.167 """168 start = UTCDateTime(0)169 st = self._create_stream(start, start + 3600 * 3 / 4, 5.0)170 st += self._create_stream(start + 2.25 * 3600, start + 3 * 3600, 5.0)171 st[0].stats.location = '01'172 st[1].stats.location = '01'173 temp_st = self._create_stream(start + 3600 * 3 / 4,174 start + 2.25 * 3600, 5.0)175 temp_st[0].stats.location = '02'176 st += temp_st177 # create and compare image178 image_name = 'waveform_complex_gap_few_samples.png'179 with ImageComparison(self.path, image_name) as ic:180 st.plot(outfile=ic.name)181 def test_plot_multiple_traces(self):182 """183 Plots multiple traces underneath.184 """185 # 1 trace186 st = read()[1]187 with ImageComparison(self.path, 'waveform_1_trace.png') as ic:188 st.plot(outfile=ic.name, automerge=False)189 # 3 traces190 st = read()191 with ImageComparison(self.path, 'waveform_3_traces.png') as ic:192 st.plot(outfile=ic.name, automerge=False)193 # 5 traces194 st = st[1] * 5195 with ImageComparison(self.path, 'waveform_5_traces.png') as ic:196 st.plot(outfile=ic.name, automerge=False)197 # 10 traces198 st = st[1] * 10199 with ImageComparison(self.path, 'waveform_10_traces.png') as ic:200 st.plot(outfile=ic.name, automerge=False)201 # 10 traces - huge numbers202 st = st[1] * 10203 for i, tr in enumerate(st):204 # scale data to have huge numbers205 st[i].data = tr.data * 10 ** i206 with ImageComparison(self.path, 'waveform_10_traces_huge.png') as ic:207 st.plot(outfile=ic.name, automerge=False, equal_scale=False)208 # 10 traces - tiny numbers209 st = st[1] * 10210 for i, tr in enumerate(st):211 # scale data to have huge numbers212 st[i].data = tr.data / (10 ** i)213 with ImageComparison(self.path, 'waveform_10_traces_tiny.png') as ic:214 st.plot(outfile=ic.name, automerge=False, equal_scale=False)215 def test_plot_with_labels(self):216 """217 Plots with labels.218 """219 st = read()220 st.label = u"Title #1 üöä?"221 st[0].label = 'Hello World!'222 st[1].label = u'Hällö Wörld & MarÃ'223 st[2].label = '*' * 80224 # create and compare image225 with ImageComparison(self.path, 'waveform_labels.png') as ic:226 st.plot(outfile=ic.name)227 def test_plot_binning_error(self):228 """229 Tests the plotting of a trace with a certain amount of sampling that230 had a binning problem.231 """232 tr = Trace(data=np.sin(np.linspace(0, 200, 432000)))233 # create and compare image234 with ImageComparison(self.path, 'waveform_binning_error.png') as ic:235 tr.plot(outfile=ic.name)236 tr = Trace(data=np.sin(np.linspace(0, 200, 431979)))237 # create and compare image238 with ImageComparison(self.path, 'waveform_binning_error_2.png') as ic:239 tr.plot(outfile=ic.name)240 def test_plot_default_section(self):241 """242 Tests plotting 10 traces in a horizontal section.243 """244 start = UTCDateTime(0)245 st = Stream()246 for _i in range(10):247 this_start = start + 300 * np.sin(np.pi * _i / 9)248 st += self._create_stream(this_start, this_start + 3600, 100)249 st[-1].stats.distance = _i * 10e3250 # create and compare image251 with ImageComparison(self.path, 'waveform_default_section.png') as ic:252 st.plot(outfile=ic.name, type='section')253 def test_plot_azim_section(self):254 """255 Tests plotting 10 traces in a azimuthal distant section.256 """257 start = UTCDateTime(0)258 st = Stream()259 for _i in range(10):260 st += self._create_stream(start, start + 3600, 100)261 st[-1].stats.coordinates = AttribDict({262 'latitude': _i,263 'longitude': _i})264 # create and compare image265 with ImageComparison(self.path, 'waveform_azim_section.png') as ic:266 st.plot(outfile=ic.name, type='section', dist_degree=True,267 ev_coord=(0.0, 0.0))268 def test_plot_horizontal_section(self):269 """270 Tests plotting 10 traces in a horizontal section.271 """272 start = UTCDateTime(0)273 st = Stream()274 for _i in range(10):275 st += self._create_stream(start, start + 3600, 100)276 st[-1].stats.distance = _i * 10e3277 # create and compare image278 with ImageComparison(self.path, 'waveform_horiz_section.png') as ic:279 st.plot(outfile=ic.name, type='section', orientation='horizontal')280 def test_plot_ref_time_section(self):281 """282 Tests plotting 10 traces in a section with alternate reference time.283 """284 start = UTCDateTime(0)285 reftime = start + 600286 st = Stream()287 for _i in range(10):288 this_start = start + 300 * np.sin(np.pi * _i / 9)289 st += self._create_stream(this_start, this_start + 3600, 100)290 st[-1].stats.distance = _i * 10e3291 # create and compare image292 with ImageComparison(self.path, 'waveform_reftime_section.png') as ic:293 st.plot(outfile=ic.name, type='section', reftime=reftime)294 def test_plot_colored_section(self):295 """296 Tests plotting 10 traces in a section colored by channel.297 """298 start = UTCDateTime(0)299 st = Stream()300 for _i in range(10):301 this_start = start + 300 * np.sin(np.pi * _i / 9)302 st += self._create_stream(this_start, this_start + 3600, 100)303 st[-1].stats.distance = _i * 10e3304 st[-1].stats.channel = str(_i % 3)305 # create and compare image306 with ImageComparison(self.path, 'waveform_color_section.png') as ic:307 st.plot(outfile=ic.name, type='section', color='channel')308 def test_plot_wiggles_negative_section(self):309 """310 Tests plotting 10 traces in a horizontal section,311 with colored wiggles only on the negative side.312 """313 start = UTCDateTime(0)314 st = Stream()315 for _i in range(10):316 this_start = start + 300 * np.sin(np.pi * _i / 9)317 st += self._create_stream(this_start, this_start + 3600, 100)318 st[-1].stats.distance = _i * 10e3319 st[-1].stats.channel = str(_i % 3)320 # create and compare image321 with ImageComparison(322 self.path, 'waveform_wiggles_negative_section.png') as ic:323 st.plot(outfile=ic.name, type='section', orientation='horizontal',324 fillcolors=(None, "black"))325 def test_plot_wiggles_positive_section(self):326 """327 Tests plotting 10 traces in a horizontal section,328 with colored wiggles only on the positive side.329 """330 start = UTCDateTime(0)331 st = Stream()332 for _i in range(10):333 this_start = start + 300 * np.sin(np.pi * _i / 9)334 st += self._create_stream(this_start, this_start + 3600, 100)335 st[-1].stats.distance = _i * 10e3336 st[-1].stats.channel = str(_i % 3)337 # create and compare image338 with ImageComparison(339 self.path, 'waveform_wiggles_positive_section.png') as ic:340 st.plot(outfile=ic.name, type='section', orientation='horizontal',341 fillcolors=("black", None))342 def test_plot_wiggles_horizontal_section(self):343 """344 Tests plotting 10 traces in a horizontal section with colored wiglets.345 """346 start = UTCDateTime(0)347 st = Stream()348 for _i in range(10):349 this_start = start + 300 * np.sin(np.pi * _i / 9)350 st += self._create_stream(this_start, this_start + 3600, 100)351 st[-1].stats.distance = _i * 10e3352 st[-1].stats.channel = str(_i % 3)353 # create and compare image354 with ImageComparison(355 self.path, 'waveform_wiggles_horizontal_section.png') as ic:356 st.plot(outfile=ic.name, type='section', orientation='horizontal',357 fillcolors=("blue", "red"))358 def test_plot_wiggles_vertical_section(self):359 """360 Tests plotting 10 traces in a vertical section with colored wiglets.361 """362 start = UTCDateTime(0)363 st = Stream()364 for _i in range(10):365 this_start = start + 300 * np.sin(np.pi * _i / 9)366 st += self._create_stream(this_start, this_start + 3600, 100)367 st[-1].stats.distance = _i * 10e3368 st[-1].stats.channel = str(_i % 3)369 # create and compare image370 with ImageComparison(371 self.path, 'waveform_wiggles_vertical_section.png') as ic:372 st.plot(outfile=ic.name, type='section', orientation='vertical',373 fillcolors=("blue", "red"))374 def test_plot_default_relative(self):375 """376 Plots one hour, starting Jan 1970, with a relative scale.377 """378 start = UTCDateTime(0)379 st = self._create_stream(start, start + 3600, 100)380 # create and compare image381 image_name = 'waveform_default_relative.png'382 with ImageComparison(self.path, image_name) as ic:383 st.plot(outfile=ic.name, type='relative')384 def test_plot_ref_time_relative(self):385 """386 Plots one hour, starting Jan 1970, with a relative scale.387 The reference time is at 300 seconds after the start.388 """389 start = UTCDateTime(0)390 ref = UTCDateTime(300)391 st = self._create_stream(start, start + 3600, 100)392 # create and compare image393 image_name = 'waveform_reftime_relative.png'394 with ImageComparison(self.path, image_name) as ic:395 st.plot(outfile=ic.name, type='relative', reftime=ref)396 def test_plot_day_plot(self):397 '''398 Plots day plot, starting Jan 1970.399 '''400 start = UTCDateTime(0)401 st = self._create_stream(start, start + 3 * 3600, 100)402 # create and compare image403 image_name = 'waveform_dayplot.png'404 with ImageComparison(self.path, image_name) as ic:405 st.plot(outfile=ic.name, type='dayplot',406 timezone='EST', time_offset=-5)407 def test_plot_day_plot_explicit_event(self):408 '''409 Plots day plot, starting Jan 1970, with several events.410 '''411 start = UTCDateTime(0)412 event1 = UTCDateTime(30) # Event: Top left; Note: below right413 event2 = UTCDateTime(14 * 60) # Event: Top right; Note: below left414 event3 = UTCDateTime(46 * 60) # Event: Bottom left; Note: above right415 event4 = UTCDateTime(59 * 60) # Event: Bottom right; Note: above left416 event5 = UTCDateTime(61 * 60) # Should be ignored417 st = self._create_stream(start, start + 3600, 100)418 # create and compare image419 image_name = 'waveform_dayplot_event.png'420 with ImageComparison(self.path, image_name) as ic:421 st.plot(outfile=ic.name, type='dayplot',422 timezone='EST', time_offset=-5,423 events=[{'time': event1, 'text': 'Event 1'},424 {'time': event2, 'text': 'Event 2'},425 {'time': event3, 'text': 'Event 3'},426 {'time': event4, 'text': 'Event 4'},427 {'time': event5, 'text': 'Event 5'}])428 def test_plot_day_plot_catalog(self):429 '''430 Plots day plot, with a catalog of events.431 '''432 start = UTCDateTime(2012, 4, 4, 14, 0, 0)433 cat = read_events()434 st = self._create_stream(start, start + 3600, 100)435 # create and compare image436 image_name = 'waveform_dayplot_catalog.png'437 with ImageComparison(self.path, image_name) as ic:438 st.plot(outfile=ic.name, type='dayplot',439 timezone='EST', time_offset=-5,440 events=cat)441def suite():442 return unittest.makeSuite(WaveformTestCase, 'test')443if __name__ == '__main__':...
test_display_manager.py
Source:test_display_manager.py
...14from rr.display.display_manager import DisplayManagerError15from rr.gstreamer.gst_media import GstMedia16def _get_media_State(media):17 return media.get_state(gst.CLOCK_TIME_NONE)[1]18def _create_stream(id, trigger_list):19 # Create a stream instance20 stream_desc = {21 "id": id,22 "uri": "rtsp://localhost:5000/stream",23 "triggers": ["trigger_name"]24 }25 return GstMedia.make(stream_desc, trigger_list)26class MockTriggerAction:27 def __init__(self):28 self.execute = MagicMock()29 def get_name(self):30 return "mock_action"31class MockTriggerFilter1:32 def __init__(self):33 self.apply = MagicMock()34 def get_name(self):35 return "mock_filter1"36class MockTriggerFilter2:37 def __init__(self):38 self.apply = MagicMock()39 def get_name(self):40 return "mock_filter2"41class TestDisplayManager(unittest.TestCase):42 def setUp(self):43 self.display_manager = DisplayManager()44 # Create a trigger45 desc = {46 "name": "trigger_name",47 "action": "mock_action",48 "filters": ["mock_filter1", "mock_filter2"]49 }50 action = MockTriggerAction()51 filters = [MockTriggerFilter1(), MockTriggerFilter2()]52 trigger = Trigger.make(desc, [action], filters)53 trigger.get_name = MagicMock(return_value=desc['name'])54 # Create a stream instance55 self.stream_desc = {56 "id": "stream0",57 "uri": "rtsp://localhost:5000/stream",58 "triggers": ["trigger_name"]59 }60 self.stream = GstMedia.make(self.stream_desc, [trigger])61 self.display_manager.add_stream(self.stream)62 self.display_media = self.display_manager._get_media()63 def test_add_stream(self):64 list = self.display_manager._get_stream_list()65 self.assertTrue(self.stream.get_name() in list)66 def test_remove_stream(self):67 self.display_manager.remove_stream(self.stream_desc['id'])68 list = self.display_manager._get_stream_list()69 self.assertTrue(self.stream.get_name() not in list)70 def test_create_display(self):71 display_desc = 'videomixer name=mixer sink_0::xpos=0 sink_0::ypos=0 ! queue ! videoconvert ! videoscale ! video/x-raw,width=1280,height=720 ! kmssink force-modesetting=true sync=false async=false qos=false appsrc do-timestamp=true name=stream0 format=time ! queue ! video/x-raw,width=320,height=240,format=RGB,framerate=30/1,pixel-aspect-ratio=1/1 ! videoconvert ! videoscale ! video/x-raw,width=320,height=240 ! mixer. '72 self.display_manager.create_display()73 self.assertEqual(74 display_desc,75 self.display_manager._get_display_desc())76 def test_play_display(self):77 self.test_create_display()78 media_state = _get_media_State(self.display_media.get_media())79 self.assertEqual(gst.State.NULL, media_state)80 self.display_manager.play_display()81 media_state = _get_media_State(self.display_media.get_media())82 self.assertEqual(gst.State.PLAYING, media_state)83 def test_stop_display(self):84 self.test_play_display()85 display_media = self.display_manager._get_media()86 self.display_manager.stop_display()87 media_state = _get_media_State(self.display_media.get_media())88 self.assertEqual(gst.State.NULL, media_state)89 def test_delete_display(self):90 self.test_create_display()91 self.display_manager.delete_display()92 self.assertEqual(None, self.display_manager._get_display_desc())93 def test_full_sequence(self):94 self.test_stop_display()95 self.display_manager.stop_display()96 self.display_manager.delete_display()97 self.assertEqual(None, self.display_manager._get_display_desc())98class TestDisplayManagerFail(unittest.TestCase):99 def setUp(self):100 self.display_manager = DisplayManager()101 # Create a trigger102 desc = {103 "name": "trigger_name",104 "action": "mock_action",105 "filters": ["mock_filter1", "mock_filter2"]106 }107 action = MockTriggerAction()108 filters = [MockTriggerFilter1(), MockTriggerFilter2()]109 self.trigger = Trigger.make(desc, [action], filters)110 self.trigger.get_name = MagicMock(return_value=desc['name'])111 # Create a stream instance112 self.stream_desc = {113 "id": "stream0",114 "uri": "rtsp://localhost:5000/stream",115 "triggers": ["trigger_name"]116 }117 self.stream = GstMedia.make(self.stream_desc, [self.trigger])118 self.display_manager.add_stream(self.stream)119 def test_add_stream_none(self):120 with self.assertRaisesRegex(DisplayManagerError, "Invalid media object"):121 self.display_manager.add_stream(None)122 def test_add_stream_duplicated(self):123 with self.assertRaisesRegex(DisplayManagerError, "Stream already exists in display manager"):124 self.display_manager.add_stream(self.stream)125 def test_add_stream_after_created(self):126 self.display_manager.create_display()127 with self.assertRaisesRegex(DisplayManagerError, "Display already created, delete before adding a new stream"):128 self.display_manager.add_stream("stream1")129 def test_add_stream_exceed_limit(self):130 stream1 = _create_stream("stream1", [self.trigger])131 self.display_manager.add_stream(stream1)132 stream2 = _create_stream("stream2", [self.trigger])133 self.display_manager.add_stream(stream2)134 stream3 = _create_stream("stream3", [self.trigger])135 self.display_manager.add_stream(stream3)136 stream4 = _create_stream("stream4", [self.trigger])137 self.display_manager.add_stream(stream4)138 stream5 = _create_stream("stream5", [self.trigger])139 self.display_manager.add_stream(stream5)140 stream6 = _create_stream("stream6", [self.trigger])141 self.display_manager.add_stream(stream6)142 stream7 = _create_stream("stream7", [self.trigger])143 self.display_manager.add_stream(stream7)144 with self.assertRaisesRegex(DisplayManagerError, "Max number of streams reached"):145 stream8 = _create_stream("stream8", [self.trigger])146 self.display_manager.add_stream(stream8)147 def test_remove_stream_none(self):148 with self.assertRaisesRegex(DisplayManagerError, "Invalid key"):149 self.display_manager.remove_stream(None)150 def test_remove_stream_not_string(self):151 with self.assertRaisesRegex(DisplayManagerError, "Invalid key"):152 self.display_manager.remove_stream(1)153 def test_remove_stream_invalid(self):154 with self.assertRaisesRegex(DisplayManagerError, "Stream doesn't exist in display manager"):155 self.display_manager.remove_stream("a_invalid_key")156 def test_remove_stream_after_created(self):157 self.display_manager.create_display()158 with self.assertRaisesRegex(DisplayManagerError, "Display already created, delete before removing a stream"):159 self.display_manager.remove_stream(self.stream_desc['id'])...
stream.py
Source:stream.py
...6 base_url = 'https://stream.shodan.io'7 def __init__(self, api_key, proxies=None):8 self.api_key = api_key9 self.proxies = proxies10 def _create_stream(self, name, timeout=None):11 params = {12 'key': self.api_key,13 }14 stream_url = self.base_url + name15 # The user doesn't want to use a timeout16 # If the timeout is specified as 0 then we also don't want to have a timeout17 if (timeout and timeout <= 0) or (timeout == 0):18 timeout = None19 # If the user requested a timeout then we need to disable heartbeat messages20 # which are intended to keep stream connections alive even if there isn't any data21 # flowing through.22 if timeout:23 params['heartbeat'] = False24 try:25 while True:26 req = requests.get(stream_url, params=params, stream=True, timeout=timeout,27 proxies=self.proxies)28 # Status code 524 is special to Cloudflare29 # It means that no data was sent from the streaming servers which caused Cloudflare30 # to terminate the connection.31 #32 # We only want to exit if there was a timeout specified or the HTTP status code is33 # not specific to Cloudflare.34 if req.status_code != 524 or timeout >= 0:35 break36 except Exception:37 raise APIError('Unable to contact the Shodan Streaming API')38 if req.status_code != 200:39 try:40 data = json.loads(req.text)41 raise APIError(data['error'])42 except APIError:43 raise44 except Exception:45 pass46 raise APIError('Invalid API key or you do not have access to the Streaming API')47 if req.encoding is None:48 req.encoding = 'utf-8'49 return req50 def _iter_stream(self, stream, raw):51 for line in stream.iter_lines(decode_unicode=True):52 # The Streaming API sends out heartbeat messages that are newlines53 # We want to ignore those messages since they don't contain any data54 if line:55 if raw:56 yield line57 else:58 yield json.loads(line)59 def alert(self, aid=None, timeout=None, raw=False):60 if aid:61 stream = self._create_stream('/shodan/alert/%s' % aid, timeout=timeout)62 else:63 stream = self._create_stream('/shodan/alert', timeout=timeout)64 try:65 for line in self._iter_stream(stream, raw):66 yield line67 except requests.exceptions.ConnectionError:68 raise APIError('Stream timed out')69 except ssl.SSLError:70 raise APIError('Stream timed out')71 def asn(self, asn, raw=False, timeout=None):72 """73 A filtered version of the "banners" stream to only return banners that match the ASNs of interest.74 :param asn: A list of ASN to return banner data on.75 :type asn: string[]76 """77 stream = self._create_stream('/shodan/asn/%s' % ','.join(asn), timeout=timeout)78 for line in self._iter_stream(stream, raw):79 yield line80 def banners(self, raw=False, timeout=None):81 """A real-time feed of the data that Shodan is currently collecting. Note that this is only available to82 API subscription plans and for those it only returns a fraction of the data.83 """84 stream = self._create_stream('/shodan/banners', timeout=timeout)85 for line in self._iter_stream(stream, raw):86 yield line87 def countries(self, countries, raw=False, timeout=None):88 """89 A filtered version of the "banners" stream to only return banners that match the countries of interest.90 :param countries: A list of countries to return banner data on.91 :type countries: string[]92 """93 stream = self._create_stream('/shodan/countries/%s' % ','.join(countries), timeout=timeout)94 for line in self._iter_stream(stream, raw):95 yield line96 def ports(self, ports, raw=False, timeout=None):97 """98 A filtered version of the "banners" stream to only return banners that match the ports of interest.99 :param ports: A list of ports to return banner data on.100 :type ports: int[]101 """102 stream = self._create_stream('/shodan/ports/%s' % ','.join([str(port) for port in ports]), timeout=timeout)103 for line in self._iter_stream(stream, raw):104 yield line105 def tags(self, tags, raw=False, timeout=None):106 """107 A filtered version of the "banners" stream to only return banners that match the tags of interest.108 :param tags: A list of tags to return banner data on.109 :type tags: string[]110 """111 stream = self._create_stream('/shodan/tags/%s' % ','.join(tags), timeout=timeout)112 for line in self._iter_stream(stream, raw):113 yield line114 def vulns(self, vulns, raw=False, timeout=None):115 """116 A filtered version of the "banners" stream to only return banners that match the vulnerabilities of interest.117 :param vulns: A list of vulns to return banner data on.118 :type vulns: string[]119 """120 stream = self._create_stream('/shodan/vulns/%s' % ','.join(vulns), timeout=timeout)121 for line in self._iter_stream(stream, raw):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!