Best Python code snippet using pypom_form_python
test_iptables_manager.py
Source:test_iptables_manager.py
...156 '%(bn)s-filter\n-I %(bn)s-filter 1 -j DROP\n'157 % iptables_args)158 iptables_args['filter_rules'] = filter_rules159 filter_dump_mod = FILTER_WITH_RULES_TEMPLATE % iptables_args160 raw_dump = _generate_raw_dump(IPTABLES_ARG)161 mangle_dump = _generate_mangle_dump(IPTABLES_ARG)162 expected_calls_and_values = [163 (mock.call(['iptables-save'],164 run_as_root=True),165 ''),166 (mock.call(['iptables-restore', '-n'],167 process_input=(filter_dump_mod + mangle_dump +168 COMMENTED_NAT_DUMP + raw_dump),169 run_as_root=True),170 None),171 (mock.call(['iptables-save'],172 run_as_root=True),173 ''),174 (mock.call(['iptables-restore', '-n'],175 process_input=(FILTER_DUMP + mangle_dump +176 COMMENTED_NAT_DUMP + raw_dump),177 run_as_root=True178 ),179 None),180 ]181 tools.setup_mock_calls(self.execute, expected_calls_and_values)182 self.iptables.ipv4['filter'].add_chain('filter')183 self.iptables.ipv4['filter'].add_rule('filter', '-j DROP')184 self.iptables.ipv4['filter'].add_rule('INPUT',185 '-s 0/0 -d 192.168.0.2 -j'186 ' %(bn)s-filter' % IPTABLES_ARG)187 self.iptables.apply()188 self.iptables.ipv4['filter'].remove_rule('filter', '-j DROP')189 self.iptables.ipv4['filter'].remove_rule('INPUT',190 '-s 0/0 -d 192.168.0.2 -j'191 ' %(bn)s-filter'192 % IPTABLES_ARG)193 self.iptables.ipv4['filter'].remove_chain('filter')194 self.iptables.apply()195 tools.verify_mock_calls(self.execute, expected_calls_and_values)196def _generate_mangle_dump(iptables_args):197 return ('# Generated by iptables_manager\n'198 '*mangle\n'199 ':FORWARD - [0:0]\n'200 ':INPUT - [0:0]\n'201 ':OUTPUT - [0:0]\n'202 ':POSTROUTING - [0:0]\n'203 ':PREROUTING - [0:0]\n'204 ':%(bn)s-FORWARD - [0:0]\n'205 ':%(bn)s-INPUT - [0:0]\n'206 ':%(bn)s-OUTPUT - [0:0]\n'207 ':%(bn)s-POSTROUTING - [0:0]\n'208 ':%(bn)s-PREROUTING - [0:0]\n'209 ':%(bn)s-mark - [0:0]\n'210 '-I FORWARD 1 -j %(bn)s-FORWARD\n'211 '-I INPUT 1 -j %(bn)s-INPUT\n'212 '-I OUTPUT 1 -j %(bn)s-OUTPUT\n'213 '-I POSTROUTING 1 -j %(bn)s-POSTROUTING\n'214 '-I PREROUTING 1 -j %(bn)s-PREROUTING\n'215 '-I %(bn)s-PREROUTING 1 -j %(bn)s-mark\n'216 'COMMIT\n'217 '# Completed by iptables_manager\n' % iptables_args)218def _generate_raw_dump(iptables_args):219 return ('# Generated by iptables_manager\n'220 '*raw\n'221 ':OUTPUT - [0:0]\n'222 ':PREROUTING - [0:0]\n'223 ':%(bn)s-OUTPUT - [0:0]\n'224 ':%(bn)s-PREROUTING - [0:0]\n'225 '-I OUTPUT 1 -j %(bn)s-OUTPUT\n'226 '-I PREROUTING 1 -j %(bn)s-PREROUTING\n'227 'COMMIT\n'228 '# Completed by iptables_manager\n' % iptables_args)229MANGLE_DUMP = _generate_mangle_dump(IPTABLES_ARG)230RAW_DUMP = _generate_raw_dump(IPTABLES_ARG)231class IptablesManagerStateFulTestCase(base.BaseTestCase):232 def setUp(self):233 super(IptablesManagerStateFulTestCase, self).setUp()234 cfg.CONF.set_override('comment_iptables_rules', False, 'AGENT')235 self.iptables = iptables_manager.IptablesManager()236 self.execute = mock.patch.object(self.iptables, "execute").start()237 def test_binary_name(self):238 expected = os.path.basename(sys.argv[0])[:16]239 self.assertEqual(expected, iptables_manager.binary_name)240 def test_get_chain_name(self):241 name = '0123456789' * 5242 # 28 chars is the maximum length of iptables chain name.243 self.assertEqual(iptables_manager.get_chain_name(name, wrap=False),244 name[:28])245 # 11 chars is the maximum length of chain name of iptable_manager246 # if binary_name is prepended.247 self.assertEqual(iptables_manager.get_chain_name(name, wrap=True),248 name[:11])249 def test_defer_apply_with_exception(self):250 self.iptables._apply = mock.Mock(side_effect=Exception)251 with testtools.ExpectedException(n_exc.IpTablesApplyException):252 with self.iptables.defer_apply():253 pass254 def _extend_with_ip6tables_filter(self, expected_calls, filter_dump):255 expected_calls.insert(2, (256 mock.call(['ip6tables-save'],257 run_as_root=True),258 ''))259 expected_calls.insert(3, (260 mock.call(['ip6tables-restore', '-n'],261 process_input=filter_dump,262 run_as_root=True),263 None))264 expected_calls.extend([265 (mock.call(['ip6tables-save'],266 run_as_root=True),267 ''),268 (mock.call(['ip6tables-restore', '-n'],269 process_input=filter_dump,270 run_as_root=True),271 None)])272 def _test_add_and_remove_chain_custom_binary_name_helper(self, use_ipv6):273 bn = ("xbcdef" * 5)274 self.iptables = iptables_manager.IptablesManager(275 binary_name=bn,276 use_ipv6=use_ipv6)277 self.execute = mock.patch.object(self.iptables, "execute").start()278 iptables_args = {'bn': bn[:16], 'filter_rules': ''}279 filter_dump = FILTER_WITH_RULES_TEMPLATE % iptables_args280 filter_dump_ipv6 = FILTER_TEMPLATE % iptables_args281 filter_dump_mod = filter_dump282 nat_dump = NAT_TEMPLATE % iptables_args283 raw_dump = _generate_raw_dump(iptables_args)284 mangle_dump = _generate_mangle_dump(iptables_args)285 expected_calls_and_values = [286 (mock.call(['iptables-save'],287 run_as_root=True),288 ''),289 (mock.call(['iptables-restore', '-n'],290 process_input=(filter_dump_mod + mangle_dump +291 nat_dump + raw_dump),292 run_as_root=True),293 None),294 (mock.call(['iptables-save'],295 run_as_root=True),296 ''),297 (mock.call(['iptables-restore', '-n'],298 process_input=(filter_dump + mangle_dump +299 nat_dump + raw_dump),300 run_as_root=True),301 None),302 ]303 if use_ipv6:304 self._extend_with_ip6tables_filter(expected_calls_and_values,305 filter_dump_ipv6 + raw_dump)306 tools.setup_mock_calls(self.execute, expected_calls_and_values)307 self.iptables.ipv4['filter'].add_chain('filter')308 self.iptables.apply()309 self.iptables.ipv4['filter'].empty_chain('filter')310 self.iptables.apply()311 tools.verify_mock_calls(self.execute, expected_calls_and_values)312 def test_add_and_remove_chain_custom_binary_name(self):313 self._test_add_and_remove_chain_custom_binary_name_helper(False)314 def test_add_and_remove_chain_custom_binary_name_with_ipv6(self):315 self._test_add_and_remove_chain_custom_binary_name_helper(True)316 def _test_empty_chain_custom_binary_name_helper(self, use_ipv6):317 bn = ("xbcdef" * 5)[:16]318 self.iptables = iptables_manager.IptablesManager(319 binary_name=bn,320 use_ipv6=use_ipv6)321 self.execute = mock.patch.object(self.iptables, "execute").start()322 iptables_args = {'bn': bn}323 filter_dump = FILTER_TEMPLATE % iptables_args324 filter_rules = ('-I %(bn)s-filter 1 -s 0/0 -d 192.168.0.2\n'325 % iptables_args)326 iptables_args['filter_rules'] = filter_rules327 filter_dump_mod = FILTER_WITH_RULES_TEMPLATE % iptables_args328 nat_dump = NAT_TEMPLATE % iptables_args329 raw_dump = _generate_raw_dump(iptables_args)330 mangle_dump = _generate_mangle_dump(iptables_args)331 expected_calls_and_values = [332 (mock.call(['iptables-save'],333 run_as_root=True),334 ''),335 (mock.call(['iptables-restore', '-n'],336 process_input=(filter_dump_mod + mangle_dump +337 nat_dump + raw_dump),338 run_as_root=True),339 None),340 (mock.call(['iptables-save'],341 run_as_root=True),342 ''),343 (mock.call(['iptables-restore', '-n'],...
fetch_data.py
Source:fetch_data.py
1import toolkit2import time3from scapy.all import IP, Raw, IPSession4# This script fetches gate data by sniffing traffic and dumping it into a file5STR_INIT_COMM = b'com.threerings.presents.net.SecureResponse'6STR_ROTATING_GATE = b'%town:m.rotating_gate_name'7STR_DATA_CHECK = b'm.terminal_core_desc'8STR_STEAM_VC = b'steamVoiceChatEnabled' # this only works with Steam launcher9PACKET_FILTER = 'dst net 192.168.1 and tcp and ip'10DUMP_DIR = 'raw_dump/'11# TODO: Determine when this stream ends more precisely12print('Monitoring packets...')13state = 014src_ip = None15queue, sniffer = toolkit.get_queue(session=IPSession, filter=PACKET_FILTER)16for pkt in toolkit.queue_yielder(queue):17 if src_ip is not None:18 if IP in pkt:19 if pkt.getlayer(IP).src != src_ip:20 continue21 22 if Raw in pkt:23 raw_data = pkt.getlayer(Raw).load24 if state == 0 and STR_INIT_COMM in raw_data:25 print('Found threerings packet!')26 print('Storing packets...')27 state = 128 src_ip = pkt.getlayer(IP).src29 raw_dump = raw_data30 elif state == 1:31 raw_dump += raw_data32 if raw_dump.count(STR_ROTATING_GATE) >= 4 and raw_dump.count(STR_DATA_CHECK) == 2 + raw_dump.count(STR_ROTATING_GATE):33 # if STR_STEAM_VC in raw_dump:34 state = 235 elif state == 2:36 raw_dump += raw_data37 if raw_dump.count(STR_DATA_CHECK) != 2 + raw_dump.count(STR_ROTATING_GATE):38 state = 139 else:40 break41sniffer.stop()42fn = DUMP_DIR + time.strftime('%Y%m%d%H%M%S', time.localtime())43with open(fn, 'wb') as f:44 f.write(raw_dump)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!