Best Python code snippet using lisa_python
fw_zone.py
Source:fw_zone.py
...353 raise FirewallError(COMMAND_FAILED, msg)354 if not enable:355 self.remove_chain(zone, table, chain)356 def add_interface(self, zone, interface, sender=None):357 self._fw.check_panic()358 _zone = self._fw.check_zone(zone)359 _obj = self._zones[_zone]360 interface_id = self.__interface_id(interface)361 if interface_id in _obj.settings["interfaces"]:362 raise FirewallError(ZONE_ALREADY_SET)363 if self.get_zone_of_interface(interface) != None:364 raise FirewallError(ZONE_CONFLICT)365 self.__interface(True, _zone, interface)366 _obj.settings["interfaces"][interface_id] = \367 self.__gen_settings(0, sender)368 # add information whether we add to default or specific zone369 _obj.settings["interfaces"][interface_id]["__default__"] = (not zone or zone == "")370 return _zone371 def change_zone_of_interface(self, zone, interface, sender=None):372 self._fw.check_panic()373 _old_zone = self.get_zone_of_interface(interface)374 _new_zone = self._fw.check_zone(zone)375 if _new_zone == _old_zone:376 return _old_zone377 if _old_zone != None:378 self.remove_interface(_old_zone, interface)379 return self.add_interface(zone, interface, sender)380 def change_default_zone(self, old_zone, new_zone):381 self._fw.check_panic()382 self.__interface(True, new_zone, "+", True)383 if old_zone != None and old_zone != "":384 self.__interface(False, old_zone, "+", True)385 def remove_interface(self, zone, interface):386 self._fw.check_panic()387 zoi = self.get_zone_of_interface(interface)388 if zoi == None:389 raise FirewallError(UNKNOWN_INTERFACE, interface)390 _zone = zoi if zone == "" else self._fw.check_zone(zone)391 if zoi != _zone:392 raise FirewallError(ZONE_CONFLICT)393 _obj = self._zones[_zone]394 interface_id = self.__interface_id(interface)395 self.__interface(False, _zone, interface)396 if interface_id in _obj.settings["interfaces"]:397 del _obj.settings["interfaces"][interface_id]398 return _zone399 def query_interface(self, zone, interface):400 return self.__interface_id(interface) in self.get_settings(zone)["interfaces"]401 def list_interfaces(self, zone):402 return sorted(self.get_settings(zone)["interfaces"].keys())403 # SOURCES404 def check_source(self, source):405 if checkIPnMask(source):406 return "ipv4"407 elif checkIP6nMask(source):408 return "ipv6"409 else:410 raise FirewallError(INVALID_ADDR, source)411 def __source_id(self, source):412 ipv = self.check_source(source)413 return (ipv, source)414 def __source(self, enable, zone, ipv, source):415 rules = [ ]416 for table in ZONE_CHAINS:417 for chain in ZONE_CHAINS[table]:418 # create needed chains if not done already419 if enable:420 self.add_chain(zone, table, chain)421 # handle trust and block zone directly, accept or reject422 # others will be placed into the proper zone chains423 opt = INTERFACE_ZONE_OPTS[chain]424 # transform INTERFACE_ZONE_OPTS for source address425 if opt == "-i":426 opt = "-s"427 if opt == "-o":428 opt = "-d"429 target = self._zones[zone].target.format(430 chain=SHORTCUTS[chain], zone=zone)431 if target in [ "REJECT", "%%REJECT%%" ] and \432 chain not in [ "INPUT", "FORWARD", "OUTPUT" ]:433 # REJECT is only valid in the INPUT, FORWARD and434 # OUTPUT chains, and user-defined chains which are 435 # only called from those chains436 continue437 if target == "DROP" and table == "nat":438 # DROP is not supported in nat table439 continue440 # append rule441 action = "-g" if "_ZONE_" in target else "-j"442 rule = [ "%s_ZONES_SOURCE" % chain, "-t", table,443 opt, source, action, target ]444 rules.append((ipv, rule))445 # handle rules446 ret = self._fw.handle_rules(rules, enable)447 if ret:448 (cleanup_rules, msg) = ret449 self._fw.handle_rules(cleanup_rules, not enable)450 log.debug2(msg)451 raise FirewallError(COMMAND_FAILED, msg)452 if not enable:453 self.remove_chain(zone, table, chain)454 def add_source(self, zone, source, sender=None):455 self._fw.check_panic()456 _zone = self._fw.check_zone(zone)457 _obj = self._zones[_zone]458 source_id = self.__source_id(source)459 if source_id in _obj.settings["sources"]:460 raise FirewallError(ZONE_ALREADY_SET)461 if self.get_zone_of_source(source) != None:462 raise FirewallError(ZONE_CONFLICT)463 self.__source(True, _zone, source_id[0], source_id[1])464 _obj.settings["sources"][source_id] = \465 self.__gen_settings(0, sender)466 # add information whether we add to default or specific zone467 _obj.settings["sources"][source_id]["__default__"] = (not zone or zone == "")468 return _zone469 def change_zone_of_source(self, zone, source, sender=None):470 self._fw.check_panic()471 _old_zone = self.get_zone_of_source(source)472 _new_zone = self._fw.check_zone(zone)473 if _new_zone == _old_zone:474 return _old_zone475 if _old_zone != None:476 self.remove_source(_old_zone, source)477 return self.add_source(zone, source, sender)478 def remove_source(self, zone, source):479 self._fw.check_panic()480 zos = self.get_zone_of_source(source)481 if zos == None:482 raise FirewallError(UNKNOWN_SOURCE, source)483 _zone = zos if zone == "" else self._fw.check_zone(zone)484 if zos != _zone:485 raise FirewallError(ZONE_CONFLICT)486 _obj = self._zones[_zone]487 source_id = self.__source_id(source)488 self.__source(False, _zone, source_id[0], source_id[1])489 if source_id in _obj.settings["sources"]:490 del _obj.settings["sources"][source_id]491 return _zone492 def query_source(self, zone, source):493 return self.__source_id(source) in self.get_settings(zone)["sources"]494 def list_sources(self, zone):495 return [ k[1] for k in self.get_settings(zone)["sources"].keys() ]496 # RICH LANGUAGE497 def check_rule(self, rule):498 rule.check()499 def __rule_id(self, rule):500 self.check_rule(rule)501 return (str(rule))502 def __rule_source(self, source, command):503 if source:504 if source.invert:505 command.append("!")506 command += [ "-s", source.addr ]507 def __rule_destination(self, destination, command):508 if destination:509 if destination.invert:510 command.append("!")511 command += [ "-d", destination.addr ]512 def __rule_limit(self, limit):513 if limit:514 return [ "-m", "limit", "--limit", limit.value ]515 return [ ]516 def __rule_log(self, ipv, table, target, rule, command, rules):517 if not rule.log:518 return519 chain = "%s_log" % target520 _command = command[:]521 _command += [ "-j", "LOG" ]522 if rule.log.prefix:523 _command += [ "--log-prefix", '%s' % rule.log.prefix ]524 if rule.log.level:525 _command += [ "--log-level", '%s' % rule.log.level ]526 _command += self.__rule_limit(rule.log.limit)527 rules.append((ipv, table, chain, _command))528 def __rule_audit(self, ipv, table, target, rule, command, rules):529 if not rule.audit:530 return531 chain = "%s_log" % target532 _command = command[:]533 if type(rule.action) == Rich_Accept:534 _type = "accept"535 elif type(rule.action) == Rich_Reject:536 _type = "reject"537 elif type(rule.action) == Rich_Drop:538 _type = "drop"539 else:540 _type = "unknown"541 _command += [ "-j", "AUDIT", "--type", _type ]542 _command += self.__rule_limit(rule.audit.limit)543 rules.append((ipv, table, chain, _command))544 def __rule_action(self, ipv, table, target, rule, command, rules):545 if not rule.action:546 return547 _command = command[:]548 if type(rule.action) == Rich_Accept:549 chain = "%s_allow" % target550 _command += [ "-j", "ACCEPT" ]551 elif type(rule.action) == Rich_Reject:552 chain = "%s_deny" % target553 _command += [ "-j", "REJECT" ]554 if rule.action.type:555 _command += [ "--reject-with", rule.action.type ]556 elif type(rule.action) == Rich_Drop:557 chain = "%s_deny" % target558 _command += [ "-j", "DROP" ]559 else:560 raise FirewallError(INVALID_RULE,561 "Unknown action %s" % type(rule.action))562 _command += self.__rule_limit(rule.action.limit)563 rules.append((ipv, table, chain, _command))564 def __rule(self, enable, zone, rule, mark_id):565 566 chains = [ ]567 modules = [ ]568 rules = [ ]569 if rule.family != None:570 ipvs = [ rule.family ]571 else:572 ipvs = [ "ipv4", "ipv6" ]573 for ipv in ipvs:574 # SERVICE575 if type(rule.element) == Rich_Service:576 svc = self._fw.service.get_service(rule.element.name)577 if len(svc.destination) > 0:578 if ipv not in svc.destination:579 # destination is set, only use if it contains ipv580 raise FirewallError(INVALID_RULE,581 "Service %s is not usable with %s" % 582 (rule.element.name, ipv))583 if svc.destination[ipv] != "" and rule.destination:584 # we can not use two destinations at the same time585 raise FirewallError(INVALID_RULE,586 "Destination conflict with service.")587 table = "filter"588 chains.append([table, "INPUT" ])589 if type(rule.action) == Rich_Accept:590 # only load modules for accept action591 modules += svc.modules592 target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["INPUT"],593 zone=zone)594 # create rules595 for (port,proto) in svc.ports:596 table = "filter"597 command = [ ]598 self.__rule_source(rule.source, command)599 self.__rule_destination(rule.destination, command)600 if proto in [ "tcp", "udp" ]:601 command += [ "-m", proto, "-p", proto ]602 else:603 if ipv == "ipv4":604 command += [ "-p", proto ]605 else:606 command += [ "-m", "ipv6header", "--header", proto ]607 if port:608 command += [ "--dport", "%s" % portStr(port) ]609 if ipv in svc.destination and svc.destination[ipv] != "":610 command += [ "-d", svc.destination[ipv] ]611 command += [ "-m", "conntrack", "--ctstate", "NEW" ]612 self.__rule_log(ipv, table, target, rule, command, rules)613 self.__rule_audit(ipv, table, target, rule, command, rules)614 self.__rule_action(ipv, table, target, rule, command, rules)615 # PORT616 elif type(rule.element) == Rich_Port:617 port = rule.element.port618 protocol = rule.element.protocol619 self.check_port(port, protocol)620 table = "filter"621 chains.append([ table, "INPUT" ])622 target = self._zones[zone].target.format(chain=SHORTCUTS["INPUT"],623 zone=zone)624 command = [ ]625 self.__rule_source(rule.source, command)626 self.__rule_destination(rule.destination, command)627 command += [ "-m", protocol, "-p", protocol,628 "--dport", portStr(port),629 "-m", "conntrack", "--ctstate", "NEW" ]630 self.__rule_log(ipv, table, target, rule, command, rules)631 self.__rule_audit(ipv, table, target, rule, command, rules)632 self.__rule_action(ipv, table, target, rule, command, rules)633 # PROTOCOL634 elif type(rule.element) == Rich_Protocol:635 protocol = rule.element.value636 self.check_protocol(protocol)637 table = "filter"638 chains.append([ table, "INPUT" ])639 target = self._zones[zone].target.format(chain=SHORTCUTS["INPUT"],640 zone=zone)641 command = [ ]642 self.__rule_source(rule.source, command)643 self.__rule_destination(rule.destination, command)644 command += [ "-p", protocol, 645 "-m", "conntrack", "--ctstate", "NEW" ]646 self.__rule_log(ipv, table, target, rule, command, rules)647 self.__rule_audit(ipv, table, target, rule, command, rules)648 self.__rule_action(ipv, table, target, rule, command, rules)649 # MASQUERADE650 elif type(rule.element) == Rich_Masquerade:651 if enable:652 enable_ip_forwarding(ipv)653 chains.append([ "nat", "POSTROUTING" ])654 chains.append([ "filter", "FORWARD_OUT" ])655 # POSTROUTING656 target = DEFAULT_ZONE_TARGET.format(657 chain=SHORTCUTS["POSTROUTING"], zone=zone)658 command = [ ]659 self.__rule_source(rule.source, command)660 self.__rule_destination(rule.destination, command)661 command += [ "-j", "MASQUERADE" ]662 rules.append((ipv, "nat", "%s_allow" % target, command))663 # FORWARD_OUT664 target = DEFAULT_ZONE_TARGET.format(665 chain=SHORTCUTS["FORWARD_OUT"], zone=zone)666 command = [ ]667 # reverse source/destination !668 self.__rule_source(rule.destination, command)669 self.__rule_destination(rule.source, command)670 command += [ "-j", "ACCEPT" ]671 rules.append((ipv, "filter", "%s_allow" % target, command))672 # FORWARD PORT673 elif type(rule.element) == Rich_ForwardPort:674 if enable:675 enable_ip_forwarding(ipv)676 port = rule.element.port677 protocol = rule.element.protocol678 toport = rule.element.to_port679 toaddr = rule.element.to_address680 self.check_forward_port(ipv, port, protocol, toport, toaddr)681 if enable:682 mark_id = self._fw.new_mark()683 filter_chain = "INPUT" if not toaddr else "FORWARD_IN"684 chains.append([ "mangle", "PREROUTING" ])685 chains.append([ "nat", "PREROUTING" ])686 chains.append([ "filter", filter_chain ])687 mark_str = "0x%x" % mark_id688 port_str = portStr(port)689 dest = [ ]690 to = ""691 if toaddr:692 to += toaddr693 if toport and toport != "":694 toport_str = portStr(toport)695 dest = [ "--dport", toport_str ]696 to += ":%s" % portStr(toport, "-")697 mark = [ "-m", "mark", "--mark", mark_str ]698 target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["PREROUTING"],699 zone=zone)700 command = [ ]701 self.__rule_source(rule.source, command)702 command += [ "-p", protocol, "--dport", port_str,703 "-j", "MARK", "--set-mark", mark_str ]704 rules.append((ipv, "mangle", "%s_allow" % target, command))705 # local and remote706 command = [ "-p", protocol ] + mark + \707 [ "-j", "DNAT", "--to-destination", to ]708 rules.append((ipv, "nat", "%s_allow" % target, command))709 target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS[filter_chain],710 zone=zone)711 command = [ "-m", "conntrack", "--ctstate", "NEW" ] + \712 mark + [ "-j", "ACCEPT" ]713 rules.append((ipv, "filter", "%s_allow" % target, command))714 if not enable:715 self._fw.del_mark(mark_id)716 # ICMP BLOCK717 elif type(rule.element) == Rich_IcmpBlock:718 ict = self._fw.icmptype.get_icmptype(rule.element.name)719 if rule.action and type(rule.action) == Rich_Accept:720 # icmp block might have reject or drop action, but not accept721 raise FirewallError(INVALID_RULE,722 "IcmpBlock not usable with accept action")723 if ict.destination and ipv not in ict.destination:724 raise FirewallError(INVALID_RULE,725 "IcmpBlock %s not usable with %s" % 726 (rule.element.name, ipv))727 table = "filter"728 chains.append([ table, "INPUT" ])729 chains.append([ table, "FORWARD_IN" ])730 if ipv == "ipv4":731 proto = [ "-p", "icmp" ]732 match = [ "-m", "icmp", "--icmp-type", rule.element.name ]733 else:734 proto = [ "-p", "ipv6-icmp" ]735 match = [ "-m", "icmp6", "--icmpv6-type", rule.element.name ]736 # INPUT737 target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["INPUT"],738 zone=zone)739 command = [ ]740 self.__rule_source(rule.source, command)741 self.__rule_destination(rule.destination, command)742 command += proto + match743 self.__rule_log(ipv, table, target, rule, command, rules)744 self.__rule_audit(ipv, table, target, rule, command, rules)745 if rule.action:746 self.__rule_action(ipv, table, target, rule, command, rules)747 else:748 command += [ "-j", "%%REJECT%%" ]749 rules.append((ipv, table, "%s_deny" % target, command))750 # FORWARD_IN751 target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["FORWARD_IN"],752 zone=zone)753 command = [ ]754 self.__rule_source(rule.source, command)755 self.__rule_destination(rule.destination, command)756 command += proto + match757 self.__rule_log(ipv, table, target, rule, command, rules)758 self.__rule_audit(ipv, table, target, rule, command, rules)759 if rule.action:760 self.__rule_action(ipv, table, target, rule, command, rules)761 else:762 command += [ "-j", "%%REJECT%%" ]763 rules.append((ipv, table, "%s_deny" % target, command))764 elif rule.element == None:765 # source action766 table = "filter"767 chains.append([ table, "INPUT" ])768 target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["INPUT"],769 zone=zone)770 command = [ ]771 self.__rule_source(rule.source, command)772 self.__rule_log(ipv, table, target, rule, command, rules)773 self.__rule_audit(ipv, table, target, rule, command, rules)774 self.__rule_action(ipv, table, target, rule, command, rules)775 # EVERYTHING ELSE776 else:777 raise FirewallError(INVALID_RULE, "Unknown element %s" % 778 type(rule.element))779 msg = self.handle_cmr(zone, chains, modules, rules, enable)780 if msg != None:781 raise FirewallError(COMMAND_FAILED, msg)782 return mark_id783 def add_rule(self, zone, rule, timeout=0, sender=None):784 _zone = self._fw.check_zone(zone)785 self._fw.check_panic()786 _obj = self._zones[_zone]787 rule_id = self.__rule_id(rule)788 if rule_id in _obj.settings["rules"]:789 raise FirewallError(ALREADY_ENABLED)790 mark = self.__rule(True, _zone, rule, None)791 _obj.settings["rules"][rule_id] = \792 self.__gen_settings(timeout, sender, mark=mark)793 return _zone794 def remove_rule(self, zone, rule):795 _zone = self._fw.check_zone(zone)796 self._fw.check_panic()797 _obj = self._zones[_zone]798 rule_id = self.__rule_id(rule)799 if not rule_id in _obj.settings["rules"]:800 raise FirewallError(NOT_ENABLED)801 if "mark" in _obj.settings["rules"][rule_id]:802 mark = _obj.settings["rules"][rule_id]["mark"]803 else:804 mark = None805 self.__rule(False, _zone, rule, mark)806 if rule_id in _obj.settings["rules"]:807 del _obj.settings["rules"][rule_id]808 return _zone809 def query_rule(self, zone, rule):810 return self.__rule_id(rule) in self.get_settings(zone)["rules"]811 def list_rules(self, zone):812 return list(self.get_settings(zone)["rules"].keys())813 # SERVICES814 def check_service(self, service):815 self._fw.check_service(service)816 def __service_id(self, service):817 self.check_service(service)818 return service819 def __service(self, enable, zone, service):820 svc = self._fw.service.get_service(service)821 if enable:822 self.add_chain(zone, "filter", "INPUT")823 rules = [ ]824 for ipv in [ "ipv4", "ipv6" ]:825 if len(svc.destination) > 0 and ipv not in svc.destination:826 # destination is set, only use if it contains ipv827 continue828 # handle rules829 for (port,proto) in svc.ports:830 target = DEFAULT_ZONE_TARGET.format(831 chain=SHORTCUTS["INPUT"], zone=zone)832 rule = [ "%s_allow" % (target), "-t", "filter" ]833 if proto in [ "tcp", "udp" ]:834 rule += [ "-m", proto, "-p", proto ]835 else:836 if ipv == "ipv4":837 rule += [ "-p", proto ]838 else:839 rule += [ "-m", "ipv6header", "--header", proto ]840 if port:841 rule += [ "--dport", "%s" % portStr(port) ]842 if ipv in svc.destination and svc.destination[ipv] != "":843 rule += [ "-d", svc.destination[ipv] ]844 rule += [ "-m", "conntrack", "--ctstate", "NEW" ]845 rule += [ "-j", "ACCEPT" ]846 rules.append((ipv, rule))847 cleanup_rules = None848 cleanup_modules = None849 msg = None850 # handle rules851 ret = self._fw.handle_rules(rules, enable)852 if ret == None: # no error, handle modules853 mod_ret = self._fw.handle_modules(svc.modules, enable)854 if mod_ret != None: # error loading modules855 (cleanup_modules, msg) = mod_ret856 cleanup_rules = rules857 else: # ret != None858 (cleanup_rules, msg) = ret859 if cleanup_rules!= None or cleanup_modules != None:860 if cleanup_rules:861 self._fw.handle_rules(cleanup_rules, not enable)862 if cleanup_modules:863 self._fw.handle_modules(cleanup_modules, not enable)864 raise FirewallError(COMMAND_FAILED, msg)865 if not enable:866 self.remove_chain(zone, "filter", "INPUT")867 def add_service(self, zone, service, timeout=0, sender=None):868 _zone = self._fw.check_zone(zone)869 self._fw.check_panic()870 _obj = self._zones[_zone]871 service_id = self.__service_id(service)872 if service_id in _obj.settings["services"]:873 raise FirewallError(ALREADY_ENABLED)874 self.__service(True, _zone, service)875 _obj.settings["services"][service_id] = \876 self.__gen_settings(timeout, sender)877 return _zone878 def remove_service(self, zone, service):879 _zone = self._fw.check_zone(zone)880 self._fw.check_panic()881 _obj = self._zones[_zone]882 service_id = self.__service_id(service)883 if not service_id in _obj.settings["services"]:884 raise FirewallError(NOT_ENABLED)885 self.__service(False, _zone, service)886 if service_id in _obj.settings["services"]:887 del _obj.settings["services"][service_id]888 return _zone889 def query_service(self, zone, service):890 return (self.__service_id(service) in self.get_settings(zone)["services"])891 def list_services(self, zone):892 return sorted(self.get_settings(zone)["services"].keys())893 # PORTS894 def check_port(self, port, protocol):895 self._fw.check_port(port)896 self._fw.check_protocol(protocol)897 def __port_id(self, port, protocol):898 self.check_port(port, protocol)899 return (portStr(port, "-"), protocol)900 def __port(self, enable, zone, port, protocol):901 if enable:902 self.add_chain(zone, "filter", "INPUT")903 rules = [ ]904 for ipv in [ "ipv4", "ipv6" ]:905 target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["INPUT"],906 zone=zone)907 rules.append((ipv, [ "%s_allow" % (target),908 "-t", "filter",909 "-m", protocol, "-p", protocol,910 "--dport", portStr(port),911 "-m", "conntrack", "--ctstate", "NEW",912 "-j", "ACCEPT" ]))913 # handle rules914 ret = self._fw.handle_rules(rules, enable)915 if ret:916 (cleanup_rules, msg) = ret917 self._fw.handle_rules(cleanup_rules, not enable)918 raise FirewallError(COMMAND_FAILED, msg)919 920 if not enable:921 self.remove_chain(zone, "filter", "INPUT")922 def add_port(self, zone, port, protocol, timeout=0, sender=None):923 _zone = self._fw.check_zone(zone)924 self._fw.check_panic()925 _obj = self._zones[_zone]926 port_id = self.__port_id(port, protocol)927 if port_id in _obj.settings["ports"]:928 raise FirewallError(ALREADY_ENABLED)929 self.__port(True, _zone, port, protocol)930 _obj.settings["ports"][port_id] = \931 self.__gen_settings(timeout, sender)932 return _zone933 def remove_port(self, zone, port, protocol):934 _zone = self._fw.check_zone(zone)935 self._fw.check_panic()936 _obj = self._zones[_zone]937 port_id = self.__port_id(port, protocol)938 if not port_id in _obj.settings["ports"]:939 raise FirewallError(NOT_ENABLED)940 self.__port(False, _zone, port, protocol)941 if port_id in _obj.settings["ports"]:942 del _obj.settings["ports"][port_id]943 return _zone944 def query_port(self, zone, port, protocol):945 return self.__port_id(port, protocol) in self.get_settings(zone)["ports"]946 def list_ports(self, zone):947 return list(self.get_settings(zone)["ports"].keys())948 # PROTOCOLS949 def check_protocol(self, protocol):950 if not checkProtocol(protocol):951 raise FirewallError(INVALID_PROTOCOL, protocol)952 # MASQUERADE953 def __masquerade_id(self):954 return True955 def __masquerade(self, enable, zone):956 if enable:957 self.add_chain(zone, "nat", "POSTROUTING")958 self.add_chain(zone, "filter", "FORWARD_OUT")959 enable_ip_forwarding("ipv4")960 rules = [ ]961 for ipv in [ "ipv4" ]: # IPv4 only!962 target = DEFAULT_ZONE_TARGET.format(963 chain=SHORTCUTS["POSTROUTING"], zone=zone)964 rules.append((ipv, [ "%s_allow" % (target), "!", "-i", "lo",965 "-t", "nat", "-j", "MASQUERADE" ]))966 # FORWARD_OUT967 target = DEFAULT_ZONE_TARGET.format(968 chain=SHORTCUTS["FORWARD_OUT"], zone=zone)969 rules.append((ipv, [ "%s_allow" % (target),970 "-t", "filter", "-j", "ACCEPT" ]))971 # handle rules972 ret = self._fw.handle_rules(rules, enable)973 if ret:974 (cleanup_rules, msg) = ret975 self._fw.handle_rules(cleanup_rules, not enable)976 raise FirewallError(COMMAND_FAILED, msg)977 if not enable:978 self.remove_chain(zone, "nat", "POSTROUTING")979 self.remove_chain(zone, "filter", "FORWARD_OUT")980 def add_masquerade(self, zone, timeout=0, sender=None):981 _zone = self._fw.check_zone(zone)982 self._fw.check_panic()983 _obj = self._zones[_zone]984 masquerade_id = self.__masquerade_id()985 if masquerade_id in _obj.settings["masquerade"]:986 raise FirewallError(ALREADY_ENABLED)987 self.__masquerade(True, _zone)988 _obj.settings["masquerade"][masquerade_id] = \989 self.__gen_settings(timeout, sender)990 return _zone991 def remove_masquerade(self, zone):992 _zone = self._fw.check_zone(zone)993 self._fw.check_panic()994 _obj = self._zones[_zone]995 masquerade_id = self.__masquerade_id()996 if masquerade_id not in _obj.settings["masquerade"]:997 raise FirewallError(NOT_ENABLED)998 self.__masquerade(False, _zone)999 if masquerade_id in _obj.settings["masquerade"]:1000 del _obj.settings["masquerade"][masquerade_id]1001 return _zone1002 def query_masquerade(self, zone):1003 return self.__masquerade_id() in self.get_settings(zone)["masquerade"]1004 # PORT FORWARDING1005 def check_forward_port(self, ipv, port, protocol, toport=None, toaddr=None):1006 self._fw.check_port(port)1007 self._fw.check_protocol(protocol)1008 if toport:1009 self._fw.check_port(toport)1010 if toaddr:1011 check_single_address(ipv, toaddr)1012 if not toport and not toaddr:1013 raise FirewallError(INVALID_FORWARD)1014 def __forward_port_id(self, port, protocol, toport=None, toaddr=None):1015 self.check_forward_port("ipv4", port, protocol, toport, toaddr)1016 return (portStr(port, "-"), protocol,1017 portStr(toport, "-"), str(toaddr))1018 def __forward_port(self, enable, zone, port, protocol, toport=None,1019 toaddr=None, mark_id=None):1020 mark_str = "0x%x" % mark_id1021 port_str = portStr(port)1022 dest = [ ]1023 to = ""1024 if toaddr:1025 to += toaddr1026 filter_chain = "INPUT" if not toaddr else "FORWARD_IN"1027 if toport and toport != "":1028 toport_str = portStr(toport)1029 dest = [ "--dport", toport_str ]1030 to += ":%s" % portStr(toport, "-")1031 mark = [ "-m", "mark", "--mark", mark_str ]1032 if enable:1033 self.add_chain(zone, "mangle", "PREROUTING")1034 self.add_chain(zone, "nat", "PREROUTING")1035 self.add_chain(zone, "filter", filter_chain)1036 enable_ip_forwarding("ipv4")1037 rules = [ ]1038 for ipv in [ "ipv4" ]: # IPv4 only!1039 target = DEFAULT_ZONE_TARGET.format(1040 chain=SHORTCUTS["PREROUTING"], zone=zone)1041 rules.append((ipv, [ "%s_allow" % (target),1042 "-t", "mangle",1043 "-p", protocol, "--dport", port_str,1044 "-j", "MARK", "--set-mark", mark_str ]))1045 # local and remote1046 rules.append((ipv, [ "%s_allow" % (target),1047 "-t", "nat",1048 "-p", protocol ] + mark + \1049 [ "-j", "DNAT", "--to-destination", to ]))1050 target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS[filter_chain],1051 zone=zone)1052 rules.append((ipv, [ "%s_allow" % (target),1053 "-t", "filter",1054 "-m", "conntrack", "--ctstate", "NEW" ] + \1055 mark + [ "-j", "ACCEPT" ]))1056 # handle rules1057 ret = self._fw.handle_rules(rules, enable)1058 if ret:1059 (cleanup_rules, msg) = ret1060 self._fw.handle_rules(cleanup_rules, not enable)1061 if enable:1062 self._fw.del_mark(mark_id)1063 raise FirewallError(COMMAND_FAILED, msg)1064 if not enable:1065 self.remove_chain(zone, "mangle", "PREROUTING")1066 self.remove_chain(zone, "nat", "PREROUTING")1067 self.remove_chain(zone, "filter", filter_chain)1068 def add_forward_port(self, zone, port, protocol, toport=None,1069 toaddr=None, timeout=0, sender=None):1070 _zone = self._fw.check_zone(zone)1071 self._fw.check_panic()1072 _obj = self._zones[_zone]1073 forward_id = self.__forward_port_id(port, protocol, toport, toaddr)1074 if forward_id in _obj.settings["forward_ports"]:1075 raise FirewallError(ALREADY_ENABLED)1076 mark = self._fw.new_mark()1077 self.__forward_port(True, _zone, port, protocol, toport, toaddr,1078 mark_id=mark)1079 1080 _obj.settings["forward_ports"][forward_id] = \1081 self.__gen_settings(timeout, sender, mark=mark)1082 return _zone1083 def remove_forward_port(self, zone, port, protocol, toport=None,1084 toaddr=None):1085 _zone = self._fw.check_zone(zone)1086 self._fw.check_panic()1087 _obj = self._zones[_zone]1088 forward_id = self.__forward_port_id(port, protocol, toport, toaddr)1089 if not forward_id in _obj.settings["forward_ports"]:1090 raise FirewallError(NOT_ENABLED)1091 mark = _obj.settings["forward_ports"][forward_id]["mark"]1092 self.__forward_port(False, _zone, port, protocol, toport, toaddr,1093 mark_id=mark)1094 if forward_id in _obj.settings["forward_ports"]:1095 del _obj.settings["forward_ports"][forward_id]1096 self._fw.del_mark(mark)1097 return _zone1098 def query_forward_port(self, zone, port, protocol, toport=None,1099 toaddr=None):1100 forward_id = self.__forward_port_id(port, protocol, toport, toaddr)1101 return forward_id in self.get_settings(zone)["forward_ports"]1102 def list_forward_ports(self, zone):1103 return list(self.get_settings(zone)["forward_ports"].keys())1104 # ICMP BLOCK1105 def check_icmp_block(self, icmp):1106 self._fw.check_icmptype(icmp)1107 def __icmp_block_id(self, icmp):1108 self.check_icmp_block(icmp)1109 return icmp1110 def __icmp_block(self, enable, zone, icmp):1111 ict = self._fw.icmptype.get_icmptype(icmp)1112 if enable:1113 self.add_chain(zone, "filter", "INPUT")1114 self.add_chain(zone, "filter", "FORWARD_IN")1115 rules = [ ]1116 for ipv in [ "ipv4", "ipv6" ]:1117 if ict.destination and ipv not in ict.destination:1118 continue1119 if ipv == "ipv4":1120 proto = [ "-p", "icmp" ]1121 match = [ "-m", "icmp", "--icmp-type", icmp ]1122 else:1123 proto = [ "-p", "ipv6-icmp" ]1124 match = [ "-m", "icmp6", "--icmpv6-type", icmp ]1125 target = DEFAULT_ZONE_TARGET.format(chain=SHORTCUTS["INPUT"],1126 zone=zone)1127 rules.append((ipv, [ "%s_deny" % (target),1128 "-t", "filter", ] + proto + \1129 match + [ "-j", "%%REJECT%%" ]))1130 target = DEFAULT_ZONE_TARGET.format(1131 chain=SHORTCUTS["FORWARD_IN"], zone=zone)1132 rules.append((ipv, [ "%s_deny" % (target),1133 "-t", "filter", ] + proto + \1134 match + [ "-j", "%%REJECT%%" ]))1135 # handle rules1136 ret = self._fw.handle_rules(rules, enable)1137 if ret:1138 (cleanup_rules, msg) = ret1139 self._fw.handle_rules(cleanup_rules, not enable)1140 raise FirewallError(COMMAND_FAILED, msg)1141 if not enable:1142 self.remove_chain(zone, "filter", "INPUT")1143 self.remove_chain(zone, "filter", "FORWARD_IN")1144 def add_icmp_block(self, zone, icmp, timeout=0, sender=None):1145 _zone = self._fw.check_zone(zone)1146 self._fw.check_panic()1147 _obj = self._zones[_zone]1148 icmp_id = self.__icmp_block_id(icmp)1149 if icmp_id in _obj.settings["icmp_blocks"]:1150 raise FirewallError(ALREADY_ENABLED)1151 self.__icmp_block(True, _zone, icmp)1152 _obj.settings["icmp_blocks"][icmp_id] = \1153 self.__gen_settings(timeout, sender)1154 return _zone1155 def remove_icmp_block(self, zone, icmp):1156 _zone = self._fw.check_zone(zone)1157 self._fw.check_panic()1158 _obj = self._zones[_zone]1159 icmp_id = self.__icmp_block_id(icmp)1160 if not icmp_id in _obj.settings["icmp_blocks"]:1161 raise FirewallError(NOT_ENABLED)1162 self.__icmp_block(False, _zone, icmp)1163 if icmp_id in _obj.settings["icmp_blocks"]:1164 del _obj.settings["icmp_blocks"][icmp_id]1165 return _zone1166 def query_icmp_block(self, zone, icmp):1167 return self.__icmp_block_id(icmp) in self.get_settings(zone)["icmp_blocks"]1168 def list_icmp_blocks(self, zone):...
urls.py
Source:urls.py
1"""resolute URL Configuration2The `urlpatterns` list routes URLs to views. For more information please see:3 https://docs.djangoproject.com/en/2.0/topics/http/urls/4Examples:5Function views6 1. Add an import: from my_app import views7 2. Add a URL to urlpatterns: path('', views.home, name='home')8Class-based views9 1. Add an import: from other_app.views import Home10 2. Add a URL to urlpatterns: path('', Home.as_view(), name='home')11Including another URLconf12 1. Import the include() function: from django.urls import include, path13 2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))14"""15from django.contrib import admin16from django.urls import path17from . import views18urlpatterns = [19 path('', views.index, name='index'),20 path('power/', views.power, name='power'),21 path('last_read/', views.last_read, name='last_read'),22 path('get_last_read/', views.get_last_read, name='get_last_read'),23 path('score_card/', views.score_card, name='score_card'),24 path('voltage/', views.voltage, name='voltage'),25 path('current/', views.current, name='current'),26 path('readings/', views.readings, name='readings'),27 path('energy_readings/', views.energy_readings, name='energy_readings'),28 path('get_energy_readings/', views.get_energy_readings, name='get_energy_readings'),29 path('get_line_readings/', views.get_line_readings, name='get_line_readings'),30 path('max_demand/', views.max_demand, name='max_demand'),31 path('fetch_vals_period/', views.fetch_vals_period, name='fetch_vals_period'),32 path('fetch_device_vals/', views.fetch_vals_period_per_device, name='fetch_vals_period_per_device'),33 path('load_datalogs/', views.load_datalogs, name='load_datalogs'),34 path('load_readings/', views.load_readings, name='load_readings'),35 path('get_yesterday_today_usage/', views.get_yesterday_today_usage, name='get_yesterday_today_usage'),36 path('get_line_readings_log/', views.get_line_readings_log, name='get_line_readings_log'),37 path('get_capacity_factors/', views.get_capacity_factors, name='get_capacity_factors'),38 path('messaging/', views.messaging, name='messaging'),39 path('fetch_messages/', views.fetch_messages, name='fetch_messages'),40 path('check_new_message/', views.check_new_message, name='check_new_message'),41 path('send_message/', views.send_message, name='send_message'),42 path('simple_upload/', views.simple_upload, name='simple_upload'),43 path('upload_cdd/', views.upload_cdd, name='upload_cdd'),44 path('all_customers/', views.all_customers, name='all_customers'),45 path('view_customer/<int:id>', views.view_customer, name='view_customer'),46 path('view_profile/', views.view_profile, name='view_profile'),47 path('upload_image/', views.upload_image, name='upload_image'),48 path('update_details/', views.update_details, name='update_details'),49 path('add_user/', views.add_user, name='add_user'),50 path('edit_user/', views.edit_user, name='edit_user'),51 path('update_branch/', views.update_branch, name='update_branch'),52 path('update_device/', views.update_device, name='update_device'),53 path('create_branch/', views.create_branch, name='create_branch'),54 path('create_device/', views.create_device, name='create_device'),55 path('update_historic_scores/', views.update_historic_scores, name='update_historic_scores'),56 path('get_total_energy/', views.get_total_energy, name='get_total_energy'),57 path('get_stats/', views.get_stats, name='get_stats'),58 # path('detail_view', views.detail_view, name='detail_view'),59 # path('locationpost', views.locationpost, name='locationpost'),60 # path('incidents', views.incidents, name='incidents'),61 # path('table', views.table, name='table'),62 # path('logs', views.logs, name='logs'),63 # # path('track', views.track, name='track')64 # path('check/<slug:slug>/', views.check, name='check'),65 # path('collection_check/<int:id>/', views.collection_check, name='collection_check'),66 # path('track/<slug:slug>/', views.track, name='track'),67 # path('trail/<slug:slug>/', views.trail, name='trail'),68 # path('mapping/<slug:slug>/', views.mapping, name='mapping'),69 # path('get_lat_lng/<slug:id>', views.get_lat_lng, name='get_lat_lng'),# FOR PROFILE REALTIME MAP70 # path('get_latlng/<slug:username>', views.get_latlng, name='get_latlng'),71 # path('get_latlng_incident/<slug:username>/<slug:incident>/', views.get_latlng_incident, name='get_latlng_incident'),72 # path('post_latlng', views.post_latlng, name='post_latlng'),73 # path('check_panic', views.check_panic, name='check_panic'),74 # path('create_panic', views.create_panic, name='create_panic'),75 # path('resolve_panic_mobile', views.resolve_panic_mobile, name='resolve_panic_mobile'),76 # path('resolve_panic', views.resolve_panic, name='resolve_panic'),77 # path('recurring_gps_post', views.recurring_gps_post, name='recurring_gps_post'),78 # path('farmers', views.farmers, name='farmers'),79 # path('herdsmen', views.herdsmen, name='herdsmen'),80 # path('get_client_data', views.get_client_data, name='get_client_data'),81 # path('profile_page/<int:target_id>/<slug:is_farmer>', views.profile_page, name='profile_page')82 83 # path('test', views.test, name='test'),...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!