Best Python code snippet using lisa_python
schema.py
Source:schema.py
...1685 def _get_scale(self):1686 return self._attributes['RDB$FIELD_SCALE']1687 def _get_field_type(self):1688 return self._attributes['RDB$FIELD_TYPE']1689 def _get_sub_type(self):1690 return self._attributes['RDB$FIELD_SUB_TYPE']1691 def _get_segment_length(self):1692 return self._attributes['RDB$SEGMENT_LENGTH']1693 def _get_external_length(self):1694 return self._attributes['RDB$EXTERNAL_LENGTH']1695 def _get_external_scale(self):1696 return self._attributes['RDB$EXTERNAL_SCALE']1697 def _get_external_type(self):1698 return self._attributes['RDB$EXTERNAL_TYPE']1699 def _get_dimensions(self):1700 if self._attributes['RDB$DIMENSIONS']:1701 return self.schema._get_field_dimensions(self)1702 else:1703 return []1704 def _get_character_length(self):1705 return self._attributes['RDB$CHARACTER_LENGTH']1706 def _get_collation(self):1707 return self.schema.get_collation_by_id(self._attributes['RDB$CHARACTER_SET_ID'],1708 self._attributes['RDB$COLLATION_ID'])1709 def _get_character_set(self):1710 return self.schema.get_character_set_by_id(self._attributes['RDB$CHARACTER_SET_ID'])1711 def _get_precision(self):1712 return self._attributes['RDB$FIELD_PRECISION']1713 def _get_datatype(self):1714 l = []1715 precision_known = False1716 if self.field_type in (FBT_SMALLINT,FBT_INTEGER,FBT_BIGINT):1717 if self.precision != None:1718 if (self.sub_type > 0) and (self.sub_type < MAX_INTSUBTYPES):1719 l.append('%s(%d, %d)' % \1720 (INTEGRAL_SUBTYPES[self.sub_type],self.precision,-self.scale))1721 precision_known = True1722 if not precision_known:1723 if (self.field_type == FBT_SMALLINT) and (self.scale < 0):1724 l.append('NUMERIC(4, %d)' % -self.scale)1725 elif (self.field_type == FBT_INTEGER) and (self.scale < 0):1726 l.append('NUMERIC(9, %d)' % -self.scale)1727 elif (self.field_type == FBT_DOUBLE_PRECISION) and (self.scale < 0):1728 l.append('NUMERIC(15, %d)' % -self.scale)1729 else:1730 l.append(COLUMN_TYPES[self.field_type])1731 if self.field_type in (FBT_CHAR,FBT_VARCHAR):1732 l.append('(%d)' % (self.length if self.character_length == None else self.character_length))1733 if self._attributes['RDB$DIMENSIONS'] != None:1734 l.append('[%s]' % ', '.join('%d' % u if l == 1 1735 else '%d:%d' % (l,u) 1736 for l,u in self.dimensions))1737 if self.field_type == FBT_BLOB:1738 if self.sub_type >= 0 and self.sub_type <= MAX_BLOBSUBTYPES:1739 l.append(' SUB_TYPE %s' % BLOB_SUBTYPES[self.sub_type])1740 else:1741 l.append(' SUB_TYPE %d' % self.sub_type)1742 l.append(' SEGMENT SIZE %d' % self.segment_length)1743 if self.field_type in (FBT_CHAR,FBT_VARCHAR,FBT_BLOB):1744 if self._attributes['RDB$CHARACTER_SET_ID'] is not None and \1745 (self.character_set.name != self.schema.default_character_set.name) or \1746 self._attributes['RDB$COLLATION_ID']:1747 if (self._attributes['RDB$CHARACTER_SET_ID'] is not None):1748 l.append(' CHARACTER SET %s' % self.character_set.name)1749 if self._attributes['RDB$COLLATION_ID'] is not None:1750 cname = self.collation.name1751 if self.character_set._attributes['RDB$DEFAULT_COLLATE_NAME'] != cname:1752 l.append(' COLLATE %s' % cname)1753 return ''.join(l)1754 #--- Properties1755 expression = LateBindingProperty(_get_expression,None,None,1756 "Expression that defines the COMPUTED BY column or None.")1757 validation = LateBindingProperty(_get_validation,None,None,1758 "CHECK constraint for the domain or None.")1759 default = LateBindingProperty(_get_default,None,None,1760 "Expression that defines the default value or None.")1761 length = LateBindingProperty(_get_length,None,None,1762 "Length of the column in bytes.")1763 scale = LateBindingProperty(_get_scale,None,None,1764 "Negative number representing the scale of NUMBER and DECIMAL column.")1765 field_type = LateBindingProperty(_get_field_type,None,None,1766 "Number code of the data type defined for the column.")1767 sub_type = LateBindingProperty(_get_sub_type,None,None,"BLOB subtype.")1768 segment_length = LateBindingProperty(_get_segment_length,None,None,1769 "For BLOB columns, a suggested length for BLOB buffers.")1770 external_length = LateBindingProperty(_get_external_length,None,None,1771 "Length of field as it is in an external table. Always 0 for regular tables.")1772 external_scale = LateBindingProperty(_get_external_scale,None,None,1773 "Scale factor of an integer field as it is in an external table.")1774 external_type = LateBindingProperty(_get_external_type,None,None,1775 "Data type of the field as it is in an external table.")1776 dimensions = LateBindingProperty(_get_dimensions,None,None,1777 "List of dimension definition pairs if column is an array type. Always empty for non-array columns.")1778 character_length = LateBindingProperty(_get_character_length,None,None,1779 "Length of CHAR and VARCHAR column, in characters (not bytes).")1780 collation = LateBindingProperty(_get_collation,None,None,1781 "Collation object for a character column or None.")1782 character_set = LateBindingProperty(_get_character_set,None,None,1783 "CharacterSet object for a character or text BLOB column, or None.")1784 precision = LateBindingProperty(_get_precision,None,None,1785 "Indicates the number of digits of precision available to the data type of the column.")1786 datatype = LateBindingProperty(_get_datatype,None,None,1787 "Comlete SQL datatype definition.")1788 #--- Public1789 1790 def accept_visitor(self,visitor):1791 """Visitor Pattern support. Calls `visitDomain(self)` on parameter object.1792 1793 :param visitor: Visitor object of Vistior Pattern.1794 """1795 visitor.visitDomain(self)1796 def issystemobject(self):1797 "Return True if this database object is system object."1798 return (self._attributes['RDB$SYSTEM_FLAG'] == 1) or self.name.startswith('RDB$')1799 def isnullable(self):1800 "Returns True if domain is not defined with NOT NULL."1801 return not self._attributes['RDB$NULL_FLAG']1802 def iscomputed(self):1803 "Returns True if domain is computed."1804 return bool(self._attributes['RDB$COMPUTED_SOURCE'])1805 def isvalidated(self):1806 "Returns True if domain has validation constraint."1807 return bool(self._attributes['RDB$VALIDATION_SOURCE'])1808 def isarray(self):1809 "Returns True if domain defines an array."1810 return bool(self._attributes['RDB$DIMENSIONS'])1811 def has_default(self):1812 "Returns True if domain has default value."1813 return bool(self._attributes['RDB$DEFAULT_SOURCE'])1814class Dependency(BaseSchemaItem):1815 """Maps dependency between database objects.1816 Supported SQL actions: none1817 """1818 def __init__(self,schema,attributes):1819 super(Dependency,self).__init__(schema,attributes)1820 1821 self._strip_attribute('RDB$DEPENDENT_NAME')1822 self._strip_attribute('RDB$DEPENDED_ON_NAME')1823 self._strip_attribute('RDB$FIELD_NAME')1824 #--- Protected1825 1826 def _get_dependent_name(self):1827 return self._attributes['RDB$DEPENDENT_NAME']1828 def _get_dependent_type(self):1829 return self._attributes['RDB$DEPENDENT_TYPE']1830 def _get_field_name(self):1831 return self._attributes['RDB$FIELD_NAME']1832 def _get_depended_on_name(self):1833 return self._attributes['RDB$DEPENDED_ON_NAME']1834 def _get_depended_on_type(self):1835 return self._attributes['RDB$DEPENDED_ON_TYPE']1836 def _get_dependent(self):1837 if self.dependent_type == 0: # TABLE1838 t = self.schema.get_table(self.dependent_name)1839 elif self.dependent_type == 1: # VIEW1840 return self.schema.get_view(self.dependent_name)1841 elif self.dependent_type == 2: # TRIGGER1842 return self.schema.get_trigger(self.dependent_name)1843 elif self.dependent_type == 3: # COMPUTED FIELD (i.e. DOMAIN)1844 return self.schema.get_domain(self.dependent_name)1845 elif self.dependent_type == 4: 1846 ## ToDo: Implement handler for VALIDATION if necessary1847 return None1848 elif self.dependent_type == 5: #PROCEDURE1849 return self.schema.get_procedure(self.dependent_name)1850 elif self.dependent_type == 6: # EXPRESSION INDEX1851 return self.schema.get_index(self.dependent_name)1852 elif self.dependent_type == 7: # EXCEPTION1853 return self.schema.get_exception(self.dependent_name)1854 elif self.dependent_type == 8:1855 ## ToDo: Implement handler for USER if necessary1856 return None1857 elif self.dependent_type == 9: # FIELD (i.e. DOMAIN)1858 return self.schema.get_domain(self.dependent_name)1859 elif self.dependent_type == 10: # INDEX1860 return self.schema.get_index(self.dependent_name)1861 elif self.dependent_type == 11:1862 ## ToDo: Implement handler for DEPENDENT COUNT if necessary1863 return None1864 elif self.dependent_type == 12:1865 ## ToDo: Implement handler for USER GROUP if necessary1866 return None1867 elif self.dependent_type == 13: # ROLE1868 return self.schema.get_role(self.dependent_name)1869 elif self.dependent_type == 14: # GENERATOR1870 return self.schema.get_generator(self.dependent_name)1871 elif self.dependent_type == 15: # UDF1872 return self.schema.get_function(self.dependent_name)1873 elif self.dependent_type == 16:1874 ## ToDo: Implement handler for BLOB_FILTER1875 return None1876 return None1877 def _get_depended_on(self):1878 if self.depended_on_type == 0: # TABLE1879 t = self.schema.get_table(self.depended_on_name)1880 if self.field_name:1881 return t.get_column(self.field_name)1882 else:1883 return t1884 elif self.depended_on_type == 1: # VIEW1885 return self.schema.get_view(self.depended_on_name)1886 elif self.depended_on_type == 2: # TRIGGER1887 return self.schema.get_trigger(self.depended_on_name)1888 elif self.depended_on_type == 3: # COMPUTED FIELD (i.e. DOMAIN)1889 return self.schema.get_domain(self.depended_on_name)1890 elif self.depended_on_type == 4:1891 ## ToDo: Implement handler for VALIDATION if necessary1892 return None1893 elif self.depended_on_type == 5: #PROCEDURE1894 return self.schema.get_procedure(self.depended_on_name)1895 elif self.depended_on_type == 6: # EXPRESSION INDEX1896 return self.schema.get_index(self.depended_on_name)1897 elif self.depended_on_type == 7: # EXCEPTION1898 return self.schema.get_exception(self.depended_on_name)1899 elif self.depended_on_type == 8:1900 ## ToDo: Implement handler for USER if necessary1901 return None1902 elif self.depended_on_type == 9: # FIELD (i.e. DOMAIN)1903 return self.schema.get_domain(self.depended_on_name)1904 elif self.depended_on_type == 10: # INDEX1905 return self.schema.get_index(self.depended_on_name)1906 elif self.depended_on_type == 11:1907 ## ToDo: Implement handler for DEPENDENT COUNT if necessary1908 return None1909 elif self.depended_on_type == 12:1910 ## ToDo: Implement handler for USER GROUP if necessary1911 return None1912 elif self.depended_on_type == 13: # ROLE1913 return self.schema.get_role(self.depended_on_name)1914 elif self.depended_on_type == 14: # GENERATOR1915 return self.schema.get_generator(self.depended_on_name)1916 elif self.depended_on_type == 15: # UDF1917 return self.schema.get_function(self.depended_on_name)1918 elif self.depended_on_type == 16:1919 ## ToDo: Implement handler for BLOB_FILTER1920 return None1921 return None1922 1923 #--- Properties1924 1925 dependent = LateBindingProperty(_get_dependent,None,None,1926 "Dependent database object.")1927 dependent_name = LateBindingProperty(_get_dependent_name,None,None,1928 "Dependent database object name.")1929 dependent_type = LateBindingProperty(_get_dependent_type,None,None,1930 "Dependent database object type.")1931 field_name = LateBindingProperty(_get_field_name,None,None,1932 "Name of one column in `depended on` object.")1933 depended_on = LateBindingProperty(_get_depended_on,None,None,1934 "Database object on which dependent depends.")1935 depended_on_name = LateBindingProperty(_get_depended_on_name,None,None,1936 "Name of db object on which dependent depends.")1937 depended_on_type = LateBindingProperty(_get_depended_on_type,None,None,1938 "Type of db object on which dependent depends.")1939 1940 #--- Public1941 1942 def accept_visitor(self,visitor):1943 """Visitor Pattern support. Calls `visitDependency(self)` on parameter object.1944 1945 :param visitor: Visitor object of Vistior Pattern.1946 """1947 visitor.visitDependency(self)1948 def issystemobject(self):1949 "Returns True as dependency entries are considered as system objects." 1950 return True1951 def get_dependents(self):1952 "Returns empty list because Dependency object never has dependents."1953 return []1954 def get_dependencies(self):1955 "Returns empty list because Dependency object never has dependencies."1956 return []1957 1958class Constraint(BaseSchemaItem):1959 """Represents table or column constraint.1960 Supported SQL actions: 1961 1962 - Constraint on user table except NOT NULL constraint: create, drop1963 - Constraint on system table: none1964 """1965 def __init__(self,schema,attributes):1966 super(Constraint,self).__init__(schema,attributes)1967 self._strip_attribute('RDB$CONSTRAINT_NAME')1968 self._strip_attribute('RDB$CONSTRAINT_TYPE')1969 self._strip_attribute('RDB$RELATION_NAME')1970 self._strip_attribute('RDB$DEFERRABLE')1971 self._strip_attribute('RDB$INITIALLY_DEFERRED')1972 self._strip_attribute('RDB$INDEX_NAME')1973 self._strip_attribute('RDB$TRIGGER_NAME')1974 self._strip_attribute('RDB$CONST_NAME_UQ')1975 self._strip_attribute('RDB$MATCH_OPTION')1976 self._strip_attribute('RDB$UPDATE_RULE')1977 self._strip_attribute('RDB$DELETE_RULE')1978 if not (self.issystemobject() or self.isnotnull()):1979 self._actions = ['create','drop']1980 #--- Protected1981 def _get_create_sql(self,**params):1982 self._check_params(params,[])1983 const_def = 'ALTER TABLE %s ADD ' % self.table.get_quoted_name()1984 if not self.name.startswith('INTEG_'):1985 const_def += 'CONSTRAINT %s\n ' % self.get_quoted_name()1986 if self.ischeck():1987 const_def += self.triggers[0].source1988 elif self.ispkey() or self.isunique():1989 const_def += 'PRIMARY KEY' if self.ispkey() else 'UNIQUE'1990 i = self.index1991 const_def += ' (%s)' % ','.join(i.segment_names)1992 if not i.issystemobject():1993 const_def += '\n USING %s INDEX %s' % (i.index_type,i.get_quoted_name())1994 elif self.isfkey():1995 const_def += 'FOREIGN KEY (%s)\n ' % ','.join(self.index.segment_names)1996 p = self.partner_constraint1997 const_def += 'REFERENCES %s (%s)' % (p.table.get_quoted_name(),1998 ','.join(p.index.segment_names))1999 if self.delete_rule != 'RESTRICT':2000 const_def += '\n ON DELETE %s' % self.delete_rule2001 if self.update_rule != 'RESTRICT':2002 const_def += '\n ON UPDATE %s' % self.update_rule2003 i = self.index2004 if not i.issystemobject():2005 const_def += '\n USING %s INDEX %s' % (i.index_type,i.get_quoted_name())2006 else:2007 raise fdb_embedded.OperationalError("Unrecognized constraint type '%s'" % self.constraint_type)2008 return const_def2009 def _get_drop_sql(self,**params):2010 self._check_params(params,[])2011 return 'ALTER TABLE %s DROP CONSTRAINT %s' % (self.table.get_quoted_name(),2012 self.get_quoted_name())2013 def _get_name(self):2014 return self._attributes['RDB$CONSTRAINT_NAME']2015 def _get_constraint_type(self):2016 return self._attributes['RDB$CONSTRAINT_TYPE']2017 def _get_table(self):2018 return self.schema.get_table(self._attributes['RDB$RELATION_NAME'])2019 def _get_index(self):2020 return self.schema.get_index(self._attributes['RDB$INDEX_NAME'])2021 def _get_trigger_names(self):2022 if self.ischeck():2023 return self._attributes['RDB$TRIGGER_NAME']2024 else:2025 return []2026 def _get_triggers(self):2027 return [self.schema.get_trigger(tname) for tname in self.trigger_names]2028 def _get_column_name(self):2029 if self.isnotnull():2030 return self._attributes['RDB$TRIGGER_NAME']2031 else:2032 return None2033 def _get_partner_constraint(self):2034 return self.schema.get_constraint(self._attributes['RDB$CONST_NAME_UQ'])2035 def _get_match_option(self):2036 return self._attributes['RDB$MATCH_OPTION']2037 def _get_update_rule(self):2038 return self._attributes['RDB$UPDATE_RULE']2039 def _get_delete_rule(self):2040 return self._attributes['RDB$DELETE_RULE']2041 #--- Properties2042 constraint_type = LateBindingProperty(_get_constraint_type,None,None,2043 "primary key/unique/foreign key/check/not null.")2044 table = LateBindingProperty(_get_table,None,None,2045 ":class:`Table` instance this constraint applies to.")2046 index = LateBindingProperty(_get_index,None,None,2047 ":class:`Index` instance that enforces the constraint.\n`None` if constraint is not primary key/unique or foreign key.")2048 trigger_names = LateBindingProperty(_get_trigger_names,None,None,2049 "For a CHECK constraint contains trigger names that enforce the constraint.")2050 triggers = LateBindingProperty(_get_triggers,None,None,2051 "For a CHECK constraint contains :class:`Trigger` instances that enforce the constraint.")2052 column_name = LateBindingProperty(_get_column_name,None,None,2053 "For a NOT NULL constraint, this is the name of the column to which the constraint applies.")2054 partner_constraint = LateBindingProperty(_get_partner_constraint,None,None,2055 "For a FOREIGN KEY constraint, this is the unique or primary key :class:`Constraint` referred.")2056 match_option = LateBindingProperty(_get_match_option,None,None,2057 "For a FOREIGN KEY constraint only. Current value is FULL in all cases.")2058 update_rule = LateBindingProperty(_get_update_rule,None,None,2059 "For a FOREIGN KEY constraint, this is the action applicable to when primary key is updated.")2060 delete_rule = LateBindingProperty(_get_delete_rule,None,None,2061 "For a FOREIGN KEY constraint, this is the action applicable to when primary key is deleted.")2062 #--- Public2063 2064 def accept_visitor(self,visitor):2065 """Visitor Pattern support. Calls `visitConstraint(self)` on parameter object.2066 2067 :param visitor: Visitor object of Vistior Pattern.2068 """2069 visitor.visitConstraint(self)2070 def issystemobject(self):2071 "Returns True if this database object is system object."2072 return self.schema.get_table(self._attributes['RDB$RELATION_NAME']).issystemobject()2073 def isnotnull(self):2074 "Returns True if it's NOT NULL constraint."2075 return self.constraint_type == 'NOT NULL'2076 def ispkey(self):2077 "Returns True if it's PRIMARY KEY constraint."2078 return self.constraint_type == 'PRIMARY KEY'2079 def isfkey(self):2080 "Returns True if it's FOREIGN KEY constraint."2081 return self.constraint_type == 'FOREIGN KEY'2082 def isunique(self):2083 "Returns True if it's UNIQUE constraint."2084 return self.constraint_type == 'UNIQUE'2085 def ischeck(self):2086 "Returns True if it's CHECK constraint."2087 return self.constraint_type == 'CHECK'2088 def isdeferrable(self):2089 "Returns True if it's DEFERRABLE constraint."2090 return self._attributes['RDB$DEFERRABLE'] != 'NO'2091 def isdeferred(self):2092 "Returns True if it's INITIALLY DEFERRED constraint."2093 return self._attributes['RDB$INITIALLY_DEFERRED'] != 'NO'2094class Table(BaseSchemaItem):2095 """Represents Table in database.2096 Supported SQL actions: 2097 2098 - User table: create, recreate, drop2099 - System table: none2100 """2101 def __init__(self,schema,attributes):2102 super(Table,self).__init__(schema,attributes)2103 self._type_code = [0,]2104 self.__columns = None2105 self._strip_attribute('RDB$RELATION_NAME')2106 self._strip_attribute('RDB$OWNER_NAME')2107 self._strip_attribute('RDB$SECURITY_CLASS')2108 self._strip_attribute('RDB$DEFAULT_CLASS')2109 if not self.issystemobject():2110 self._actions = ['create','recreate','drop']2111 2112 #--- Protected2113 def _get_create_sql(self,**params):2114 self._check_params(params,[])2115 tabdef = 'CREATE %sTABLE %s' % ('GLOBAL TEMPORARY ' if self.isgtt() else '',2116 self.get_quoted_name())2117 if self.isexternal():2118 tabdef += " EXTERNAL FILE '%s'\n" % self.external_file2119 tabdef += '\n('2120 partdefs = []2121 for col in self.columns:2122 coldef = '\n %s ' % col.get_quoted_name()2123 collate = ''2124 if col.isdomainbased():2125 coldef += '%s' % col.domain.get_quoted_name()2126 elif col.iscomputed():2127 coldef += 'COMPUTED BY %s' % col.get_computedby()2128 else:2129 datatype = col.datatype2130 if datatype.rfind(' COLLATE ') > 0:2131 datatype, collate = datatype.split(' COLLATE ')2132 coldef += '%s' % datatype2133 if col.has_default():2134 coldef += ' DEFAULT %s' % col.default2135 if not col.isnullable():2136 coldef += ' NOT NULL'2137 if col._attributes['RDB$COLLATION_ID'] is not None:2138 cname = col.collation.name2139 if col.domain.character_set._attributes['RDB$DEFAULT_COLLATE_NAME'] != cname:2140 collate = cname2141 if collate:2142 coldef += ' COLLATE %s' % collate2143 partdefs.append(coldef)2144 if self.has_pkey():2145 pk = self.primary_key2146 pkdef = '\n '2147 if not pk.name.startswith('INTEG_'):2148 pkdef += 'CONSTRAINT %s\n ' % pk.get_quoted_name()2149 i = pk.index2150 pkdef += 'PRIMARY KEY (%s)' % ','.join(i.segment_names)2151 if not i.issystemobject():2152 pkdef += '\n USING %s INDEX %s' % (i.index_type,i.get_quoted_name())2153 partdefs.append(pkdef)2154 for uq in self.constraints:2155 if uq.isunique():2156 uqdef = '\n '2157 if not uq.name.startswith('INTEG_'):2158 uqdef += 'CONSTRAINT %s\n ' % uq.get_quoted_name()2159 i = uq.index2160 uqdef += 'UNIQUE (%s)' % ','.join(i.segment_names)2161 if not i.issystemobject():2162 uqdef += '\n USING %s INDEX %s' % (i.index_type,i.get_quoted_name())2163 partdefs.append(uqdef)2164 tabdef += ','.join(partdefs)2165 tabdef += '\n)'2166 return tabdef2167 def _get_drop_sql(self,**params):2168 self._check_params(params,[])2169 return 'DROP TABLE %s' % self.get_quoted_name()2170 def _get_name(self):2171 return self._attributes['RDB$RELATION_NAME']2172 def _get_id(self):2173 return self._attributes['RDB$RELATION_ID']2174 def _get_dbkey_length(self):2175 return self._attributes['RDB$DBKEY_LENGTH']2176 def _get_format(self):2177 return self._attributes['RDB$FORMAT']2178 def _get_table_type(self):2179 return self.schema.enum_relation_types.get(self._attributes.get('RDB$RELATION_TYPE'),2180 'PERSISTENT')2181 def _get_security_class(self):2182 return self._attributes['RDB$SECURITY_CLASS']2183 def _get_external_file(self):2184 return self._attributes['RDB$EXTERNAL_FILE']2185 def _get_owner_name(self):2186 return self._attributes['RDB$OWNER_NAME']2187 def _get_default_class(self):2188 return self._attributes['RDB$DEFAULT_CLASS']2189 def _get_flags(self):2190 return self._attributes['RDB$FLAGS']2191 def _get_indices(self):2192 return [i for i in self.schema._get_all_indices() 2193 if i._attributes['RDB$RELATION_NAME'] == self.name]2194 def _get_triggers(self):2195 return [t for t in self.schema.triggers 2196 if t._attributes['RDB$RELATION_NAME'] == self.name]2197 def _get_constraints(self):2198 return [c for c in self.schema.constraints 2199 if c._attributes['RDB$RELATION_NAME'] == self.name]2200 def _get_columns(self):2201 if self.__columns is None:2202 self.__columns = [TableColumn(self.schema,self,row) for row in 2203 self.schema._select("""select RDB$FIELD_NAME, RDB$RELATION_NAME,2204RDB$FIELD_SOURCE, RDB$FIELD_POSITION, RDB$UPDATE_FLAG, RDB$FIELD_ID, RDB$DESCRIPTION, 2205RDB$SYSTEM_FLAG, RDB$SECURITY_CLASS, RDB$NULL_FLAG, RDB$DEFAULT_SOURCE, RDB$COLLATION_ID 2206from RDB$RELATION_FIELDS where RDB$RELATION_NAME = ? order by RDB$FIELD_POSITION""",(self.name,))]2207 return self.__columns2208 def _get_primary_key(self):2209 for const in self.constraints:2210 if const.ispkey():2211 return const2212 return None2213 def _get_foreign_keys(self):2214 return [c for c in self.constraints if c.isfkey()]2215 def _get_privileges(self):2216 return [p for p in self.schema.privileges2217 if ((p.subject_name == self.name) and 2218 (p.subject_type in self._type_code))]2219 #--- Properties2220 id = LateBindingProperty(_get_id,None,None,"Internam number ID for the table.")2221 dbkey_length = LateBindingProperty(_get_dbkey_length,None,None,2222 "Length of the RDB$DB_KEY column in bytes.")2223 format = LateBindingProperty(_get_format,None,None,2224 "Internal format ID for the table.")2225 table_type = LateBindingProperty(_get_table_type,None,None,"Table type.")2226 security_class = LateBindingProperty(_get_security_class,None,None,2227 "Security class that define access limits to the table.")2228 external_file = LateBindingProperty(_get_external_file,None,None,2229 "Full path to the external data file, if any.")2230 owner_name = LateBindingProperty(_get_owner_name,None,None,2231 "User name of table's creator.")2232 default_class = LateBindingProperty(_get_default_class,None,None,2233 "Default security class.")2234 flags = LateBindingProperty(_get_flags,None,None,"Internal flags.")2235 primary_key = LateBindingProperty(_get_primary_key,None,None,2236 "PRIMARY KEY :class:`Constraint` for this table or None.")2237 foreign_keys = LateBindingProperty(_get_foreign_keys,None,None,2238 "List of FOREIGN KEY :class:`Constraint` instances for this table.")2239 columns = LateBindingProperty(_get_columns,None,None,2240 "Returns list of columns defined for table.\nItems are :class:`TableColumn` objects.")2241 constraints = LateBindingProperty(_get_constraints,None,None,2242 "Returns list of constraints defined for table.\nItems are :class:`Constraint` objects.")2243 indices = LateBindingProperty(_get_indices,None,None,2244 "Returns list of indices defined for table.\nItems are :class:`Index` objects.")2245 triggers = LateBindingProperty(_get_triggers,None,None,2246 "Returns list of triggers defined for table.\nItems are :class:`Trigger` objects.")2247 privileges = LateBindingProperty(_get_privileges,None,None,2248 "List of :class:`Privilege` objects granted to this object.")2249 #--- Public2250 2251 def accept_visitor(self,visitor):2252 """Visitor Pattern support. Calls `visitTable(self)` on parameter object.2253 2254 :param visitor: Visitor object of Vistior Pattern.2255 """2256 visitor.visitTable(self)2257 def get_column(self,name):2258 "Return :class:`TableColumn` object with specified name."2259 for col in self.columns:2260 if col.name == name:2261 return col2262 return None2263 def isgtt(self):2264 "Returns True if table is GLOBAL TEMPORARY table."2265 return self.table_type.startswith('GLOBAL_TEMPORARY')2266 def ispersistent(self):2267 "Returns True if table is persistent one."2268 return self.table_type in ['PERSISTENT','EXTERNAL']2269 def isexternal(self):2270 "Returns True if table is external table."2271 return bool(self.external_file)2272 def has_pkey(self):2273 "Returns True if table has PRIMARY KEY defined."2274 for const in self.constraints:2275 if const.ispkey():2276 return True2277 return False2278 def has_fkey(self):2279 "Returns True if table has any FOREIGN KEY constraint."2280 for const in self.constraints:2281 if const.isfkey():2282 return True2283 return False2284 2285class View(BaseSchemaItem):2286 """Represents database View.2287 Supported SQL actions: 2288 2289 - User views: create, recreate, alter(columns=string_or_list,query=string,check=bool), 2290 create_or_alter, drop2291 - System views: none2292 """2293 def __init__(self,schema,attributes):2294 super(View,self).__init__(schema,attributes)2295 self._type_code = [1,]2296 self.__columns = None2297 self._strip_attribute('RDB$RELATION_NAME')2298 self._strip_attribute('RDB$VIEW_SOURCE')2299 self._strip_attribute('RDB$OWNER_NAME')2300 self._strip_attribute('RDB$SECURITY_CLASS')2301 self._strip_attribute('RDB$DEFAULT_CLASS')2302 if not self.issystemobject():2303 self._actions = ['create','recreate','alter','create_or_alter','drop']2304 2305 #--- Protected2306 def _get_create_sql(self,**params):2307 self._check_params(params,[])2308 return "CREATE VIEW %s (%s)\n AS\n %s" % (self.get_quoted_name(),2309 ','.join([col.get_quoted_name() for col in self.columns]),self.sql)2310 def _get_alter_sql(self,**params):2311 self._check_params(params,['columns','query','check'])2312 columns = params.get('columns')2313 if isinstance(columns,(list,tuple)):2314 columns = ','.join(columns)2315 query = params.get('query')2316 check = params.get('check',False)2317 if query:2318 return "ALTER VIEW %s %s\n AS\n %s" % (self.get_quoted_name(),2319 '(%s)' % columns if columns else '',2320 '%s\n WITH CHECK OPTION' % query if check else query)2321 else:2322 raise fdb_embedded.ProgrammingError("Missing required parameter: 'query'.")2323 def _get_drop_sql(self,**params):2324 self._check_params(params,[])2325 return 'DROP VIEW %s' % self.get_quoted_name()2326 def _get_name(self):2327 return self._attributes['RDB$RELATION_NAME']2328 def _get_sql(self):2329 return self._attributes['RDB$VIEW_SOURCE']2330 def _get_id(self):2331 return self._attributes['RDB$RELATION_ID']2332 def _get_dbkey_length(self):2333 return self._attributes['RDB$DBKEY_LENGTH']2334 def _get_format(self):2335 return self._attributes['RDB$FORMAT']2336 def _get_security_class(self):2337 return self._attributes['RDB$SECURITY_CLASS']2338 def _get_owner_name(self):2339 return self._attributes['RDB$OWNER_NAME']2340 def _get_default_class(self):2341 return self._attributes['RDB$DEFAULT_CLASS']2342 def _get_flags(self):2343 return self._attributes['RDB$FLAGS']2344 def _get_triggers(self):2345 return [t for t in self.schema.triggers 2346 if t._attributes['RDB$RELATION_NAME'] == self.name]2347 def _get_columns(self):2348 if self.__columns is None:2349 self.__columns = [ViewColumn(self.schema,self,row) for row in 2350 self.schema._select("""select r.RDB$FIELD_NAME, r.RDB$RELATION_NAME, 2351r.RDB$FIELD_SOURCE, r.RDB$FIELD_POSITION, r.RDB$UPDATE_FLAG, r.RDB$FIELD_ID, 2352r.RDB$DESCRIPTION, r.RDB$SYSTEM_FLAG, r.RDB$SECURITY_CLASS, r.RDB$NULL_FLAG, 2353r.RDB$DEFAULT_SOURCE, r.RDB$COLLATION_ID, r.RDB$BASE_FIELD, 2354v.RDB$RELATION_NAME as BASE_RELATION 2355 from RDB$RELATION_FIELDS r2356 left join RDB$VIEW_RELATIONS v on r.RDB$VIEW_CONTEXT = v.RDB$VIEW_CONTEXT2357 where r.RDB$RELATION_NAME = ? 2358 order by RDB$FIELD_POSITION""",(self.name,))]2359 return self.__columns2360 def _get_privileges(self):2361 return [p for p in self.schema.privileges2362 if ((p.subject_name == self.name) and 2363 (p.subject_type == 0))] # Views are logged as Tables in RDB$USER_PRIVILEGES2364 #--- Properties2365 id = LateBindingProperty(_get_id,None,None,"Internal number ID for the view.")2366 sql= LateBindingProperty(_get_sql,None,None,"The query specification.")2367 dbkey_length = LateBindingProperty(_get_dbkey_length,None,None,2368 "Length of the RDB$DB_KEY column in bytes.")2369 format = LateBindingProperty(_get_format,None,None,"Internal format ID for the view.")2370 security_class = LateBindingProperty(_get_security_class,None,None,2371 "Security class that define access limits to the view.")2372 owner_name = LateBindingProperty(_get_owner_name,None,None,"User name of view's creator.")2373 default_class = LateBindingProperty(_get_default_class,None,None,"Default security class.")2374 flags = LateBindingProperty(_get_flags,None,None,"Internal flags.")2375 columns = LateBindingProperty(_get_columns,None,None,2376 "Returns list of columns defined for view.\nItems are :class:`ViewColumn` objects.")2377 triggers = LateBindingProperty(_get_triggers,None,None,2378 "Returns list of triggers defined for view.\nItems are :class:`Trigger` objects.")2379 privileges = LateBindingProperty(_get_privileges,None,None,2380 "List of :class:`Privilege` objects granted to this object.")2381 #--- Public2382 2383 def accept_visitor(self,visitor):2384 """Visitor Pattern support. Calls `visitView(self)` on parameter object.2385 2386 :param visitor: Visitor object of Vistior Pattern.2387 """2388 visitor.visitView(self)2389 def get_column(self,name):2390 "Return :class:`TableColumn` object with specified name."2391 for col in self.columns:2392 if col.name == name:2393 return col2394 return None2395 def get_trigger(self,name):2396 "Return :class:`Trigger` object with specified name."2397 for t in self.triggers:2398 if t.name == name:2399 return t2400 return None2401 def has_checkoption(self):2402 "Returns True if View has WITH CHECK OPTION defined."2403 return "WITH CHECK OPTION" in self.sql.upper()2404 2405class Trigger(BaseSchemaItem):2406 """Represents trigger.2407 Supported SQL actions: 2408 2409 - User trigger: create, recreate, create_or_alter, drop,2410 alter(fire_on=string,active=bool,sequence=int,declare=string_or_list,2411 code=string_or_list)2412 - System trigger: none2413 """2414 def __init__(self,schema,attributes):2415 super(Trigger,self).__init__(schema,attributes)2416 self._type_code = [2,]2417 self._strip_attribute('RDB$TRIGGER_NAME')2418 self._strip_attribute('RDB$RELATION_NAME')2419 if not self.issystemobject():2420 self._actions = ['create','recreate','alter','create_or_alter','drop']2421 #--- Protected2422 def _get_create_sql(self,**params):2423 self._check_params(params,[])2424 result = 'CREATE TRIGGER %s' % self.get_quoted_name()2425 if self._attributes['RDB$RELATION_NAME']:2426 result += ' FOR %s' % self.relation.get_quoted_name()2427 result += ' %s\n%s POSITION %d\n%s' % ('ACTIVE' if self.isactive() else 'INACTIVE',2428 self.get_type_as_string(),2429 self.sequence,self.source)2430 return result2431 def _get_alter_sql(self,**params):2432 self._check_params(params,['fire_on','active','sequence','declare','code'])2433 action = params.get('fire_on')2434 active = params.get('active')2435 sequence = params.get('sequence')2436 declare = params.get('declare')2437 code = params.get('code')2438 #2439 header = ''2440 if active is not None:2441 header += ' ACTIVE' if active else ' INACTIVE'2442 if action is not None:2443 dbaction = action.upper().startswith('ON ')2444 if ((dbaction and not self.isdbtrigger()) 2445 or (not dbaction and self.isdbtrigger())):2446 raise fdb_embedded.ProgrammingError("Trigger type change is not allowed.")2447 header += '\n %s' % action2448 if sequence is not None:2449 header += '\n POSITION %d' % sequence2450 #2451 if code is not None:2452 if declare is None:2453 d = ''2454 elif isinstance(declare,(list,tuple)):2455 d = ''2456 for x in declare:2457 d += ' %s\n' % x2458 else:2459 d = '%s\n' % declare2460 if isinstance(code,(list,tuple)):2461 c = ''2462 for x in code:2463 c += ' %s\n' % x2464 else:2465 c = '%s\n' % code2466 body = '\nAS\n%sBEGIN\n%sEND' % (d,c)2467 else:2468 body = ''2469 #2470 if not (header or body):2471 raise fdb_embedded.ProgrammingError("Header or body definition required.")2472 return 'ALTER TRIGGER %s%s%s' % (self.get_quoted_name(),header,body)2473 def _get_drop_sql(self,**params):2474 self._check_params(params,[])2475 return 'DROP TRIGGER %s' % self.get_quoted_name()2476 def _get_action_time(self):2477 return (self.trigger_type + 1) & 12478 def _get_action_type(self,slot):2479 return ((self.trigger_type + 1) >> (slot * 2 - 1)) & 32480 def _get_name(self):2481 return self._attributes['RDB$TRIGGER_NAME']2482 def _get_relation(self):2483 relname = self._attributes['RDB$RELATION_NAME']2484 rel = self.schema.get_table(relname)2485 if not rel:2486 rel = self.schema.get_view(relname)2487 return rel2488 def _get_sequence(self):2489 return self._attributes['RDB$TRIGGER_SEQUENCE']2490 def _get_trigger_type(self):2491 return self._attributes['RDB$TRIGGER_TYPE']2492 def _get_source(self):2493 return self._attributes['RDB$TRIGGER_SOURCE']2494 def _get_flags(self):2495 return self._attributes['RDB$FLAGS']2496 def _istype(self,type_code):2497 atype = self._get_action_type(1)2498 if atype == type_code:2499 return True2500 atype = self._get_action_type(2)2501 if atype and atype == type_code:2502 return True2503 atype = self._get_action_type(3)2504 if atype and atype == type_code:2505 return True2506 return False2507 #--- Properties2508 relation = LateBindingProperty(_get_relation,None,None,2509 ":class:`Table` or :class:`View` that the trigger is for, or None for database triggers")2510 sequence = LateBindingProperty(_get_sequence,None,None,2511 "Sequence (position) of trigger. Zero usually means no sequence defined.")2512 trigger_type = LateBindingProperty(_get_trigger_type,None,None,2513 "Numeric code for trigger type that define what event and when are covered by trigger.")2514 source = LateBindingProperty(_get_source,None,None,"PSQL source code.")2515 flags = LateBindingProperty(_get_flags,None,None,"Internal flags.")2516 #--- Public2517 2518 def accept_visitor(self,visitor):2519 """Visitor Pattern support. Calls `visitTrigger(self)` on parameter object.2520 2521 :param visitor: Visitor object of Vistior Pattern.2522 """2523 visitor.visitTrigger(self)2524 def isactive(self):2525 "Returns True if this trigger is active."2526 return self._attributes['RDB$TRIGGER_INACTIVE'] == 02527 def isbefore(self):2528 "Returns True if this trigger is set for BEFORE action."2529 return self._get_action_time() == 02530 def isafter(self):2531 "Returns True if this trigger is set for AFTER action."2532 return self._get_action_time() == 12533 def isdbtrigger(self):2534 "Returns True if this trigger is database trigger."2535 return (self.trigger_type & TRIGGER_TYPE_MASK) == TRIGGER_TYPE_DB2536 def isinsert(self):2537 "Returns True if this trigger is set for INSERT operation."2538 return self._istype(1)2539 def isupdate(self):2540 "Returns True if this trigger is set for UPDATE operation."2541 return self._istype(2)2542 def isdelete(self):2543 "Returns True if this trigger is set for DELETE operation."2544 return self._istype(3)2545 def get_type_as_string(self):2546 "Return string with action and operation specification."2547 l = []2548 if self.isdbtrigger():2549 l.append('ON '+TRIGGER_DB_TYPES[self.trigger_type & ~TRIGGER_TYPE_DB])2550 else:2551 l.append(TRIGGER_PREFIX_TYPES[self._get_action_time()])2552 l.append(TRIGGER_SUFFIX_TYPES[self._get_action_type(1)])2553 sufix = self._get_action_type(2)2554 if sufix:2555 l.append('OR')2556 l.append(TRIGGER_SUFFIX_TYPES[sufix])2557 sufix = self._get_action_type(3)2558 if sufix:2559 l.append('OR')2560 l.append(TRIGGER_SUFFIX_TYPES[sufix])2561 return ' '.join(l)2562class ProcedureParameter(BaseSchemaItem):2563 """Represents procedure parameter.2564 Supported SQL actions: none.2565 """2566 def __init__(self,schema,proc,attributes):2567 super(ProcedureParameter,self).__init__(schema,attributes)2568 self.__proc = proc2569 self._strip_attribute('RDB$PARAMETER_NAME')2570 self._strip_attribute('RDB$PROCEDURE_NAME')2571 self._strip_attribute('RDB$FIELD_SOURCE')2572 self._strip_attribute('RDB$RELATION_NAME')2573 self._strip_attribute('RDB$FIELD_NAME')2574 #--- Protected2575 def _get_name(self):2576 return self._attributes['RDB$PARAMETER_NAME']2577 def _get_procedure(self):2578 return self.schema.get_procedure(self._attributes['RDB$PROCEDURE_NAME'])2579 def _get_sequence(self):2580 return self._attributes['RDB$PARAMETER_NUMBER']2581 def _get_domain(self):2582 return self.schema.get_domain(self._attributes['RDB$FIELD_SOURCE'])2583 def _get_datatype(self):2584 return self.domain.datatype2585 def _get_type_from(self):2586 m = self.mechanism2587 if m is None:2588 return PROCPAR_DATATYPE2589 elif m == 0:2590 return PROCPAR_DATATYPE if self.domain.issystemobject() else PROCPAR_DOMAIN2591 elif m == 1:2592 if self._attributes.get('RDB$RELATION_NAME') is None:2593 return PROCPAR_TYPE_OF_DOMAIN2594 else:2595 return PROCPAR_TYPE_OF_COLUMN2596 else:2597 raise fdb_embedded.InternalError("Unknown parameter mechanism code: %d" % m)2598 def _get_default(self):2599 result = self._attributes.get('RDB$DEFAULT_SOURCE')2600 if result:2601 if result.upper().startswith('= '):2602 result = result[2:]2603 elif result.upper().startswith('DEFAULT '):2604 result = result[8:]2605 return result2606 def _get_collation(self):2607 cid = self._attributes.get('RDB$COLLATION_ID')2608 return (None if cid is None 2609 else self.schema.get_collation_by_id(self.domain._attributes['RDB$CHARACTER_SET_ID'],cid))2610 def _get_mechanism(self):2611 return self._attributes.get('RDB$PARAMETER_MECHANISM')2612 def _get_column(self):2613 rname = self._attributes.get('RDB$RELATION_NAME')2614 return (None if rname is None 2615 else self.schema.get_table(rname).get_column(self._attributes['RDB$FIELD_NAME']))2616 #--- Properties2617 procedure = LateBindingProperty(_get_procedure,None,None,2618 "Name of the stored procedure.")2619 sequence = LateBindingProperty(_get_sequence,None,None,2620 "Sequence (position) of parameter.")2621 domain = LateBindingProperty(_get_domain,None,None,2622 ":class:`Domain` for this parameter.")2623 datatype = LateBindingProperty(_get_datatype,None,None,2624 "Comlete SQL datatype definition.")2625 type_from = LateBindingProperty(_get_type_from,None,None,2626 "Numeric code. See :attr:`Schema.enum_param_type_from`.`")2627 2628 # FB 2.12629 default = LateBindingProperty(_get_default,None,None,"Default value.")2630 collation = LateBindingProperty(_get_collation,None,None,2631 ":class:`collation` for this parameter.")2632 mechanism = LateBindingProperty(_get_mechanism,None,None,2633 "Parameter mechanism code.")2634 # FB 2.52635 column = LateBindingProperty(_get_column,None,None,2636 ":class:`TableColumn` for this parameter.")2637 #--- Public2638 2639 def accept_visitor(self,visitor):2640 """Visitor Pattern support. Calls `visitProcedureParameter(self)` on parameter object.2641 2642 :param visitor: Visitor object of Vistior Pattern.2643 """2644 visitor.visitProcedureParameter(self)2645 def get_sql_definition(self):2646 "Returns SQL definition for parameter."2647 typedef = self.datatype2648 if self.type_from == PROCPAR_DOMAIN:2649 typedef = self.domain.get_quoted_name()2650 elif self.type_from == PROCPAR_TYPE_OF_DOMAIN:2651 typedef = 'TYPE OF %s' % self.domain.get_quoted_name()2652 elif self.type_from == PROCPAR_TYPE_OF_COLUMN:2653 typedef = 'TYPE OF COLUMN %s.%s' % (self.column.table.get_quoted_name(),2654 self.column.get_quoted_name())2655 result = '%s %s%s' % (self.get_quoted_name(),typedef,2656 '' if self.isnullable() else ' NOT NULL')2657 c = self.collation2658 if c is not None:2659 result += ' COLLATE %s' % c.get_quoted_name()2660 if self.isinput() and self.has_default():2661 result += ' = %s' % self.default2662 return result2663 def isinput(self):2664 "Returns True if parameter is INPUT parameter."2665 return self._attributes['RDB$PARAMETER_TYPE'] == 02666 def isnullable(self):2667 "Returns True if parameter allows NULL."2668 return not bool(self._attributes.get('RDB$NULL_FLAG'))2669 def has_default(self):2670 "Returns True if parameter has default value."2671 return bool(self._attributes.get('RDB$DEFAULT_SOURCE'))2672 2673class Procedure(BaseSchemaItem):2674 """Represents stored procedure.2675 Supported SQL actions: 2676 2677 - User procedure: create(no_code=bool), recreate(no_code=bool), 2678 create_or_alter(no_code=bool), drop,2679 alter(input=string_or_list,output=string_or_list,declare=string_or_list,2680 code=string_or_list)2681 - System procedure: none2682 """2683 def __init__(self,schema,attributes):2684 super(Procedure,self).__init__(schema,attributes)2685 self._type_code = [5,]2686 self.__inputParams = self.__outputParams = None2687 2688 self._strip_attribute('RDB$PROCEDURE_NAME')2689 self._strip_attribute('RDB$OWNER_NAME')2690 self._strip_attribute('RDB$SECURITY_CLASS')2691 self.__ods = schema._con.ods2692 if not self.issystemobject():2693 self._actions = ['create','recreate','alter','create_or_alter','drop']2694 #--- Protected2695 def _get_create_sql(self,**params):2696 self._check_params(params,['no_code'])2697 no_code = params.get('no_code')2698 result = 'CREATE PROCEDURE %s' % self.get_quoted_name()2699 if self.has_input():2700 if self._attributes['RDB$PROCEDURE_INPUTS'] == 1:2701 result += ' (%s)\n' % self.input_params[0].get_sql_definition()2702 else:2703 result += ' (\n'2704 for p in self.input_params:2705 result += ' %s%s\n' % (p.get_sql_definition(),2706 '' if p.sequence+1 == self._attributes['RDB$PROCEDURE_INPUTS']2707 else ',')2708 result += ')\n'2709 else:2710 result += '\n'2711 if self.has_output():2712 if self._attributes['RDB$PROCEDURE_OUTPUTS'] == 1:2713 result += 'RETURNS (%s)\n' % self.output_params[0].get_sql_definition()2714 else:2715 result += 'RETURNS (\n'2716 for p in self.input_params:2717 result += ' %s%s\n' % (p.get_sql_definition(),2718 '' if p.sequence+1 == self._attributes['RDB$PROCEDURE_OUTPUTS']2719 else ',')2720 result += ')\n'2721 return result+'AS\n'+('BEGIN\nEND' if no_code else self.source)2722 def _get_alter_sql(self,**params):2723 self._check_params(params,['input','output','declare','code'])2724 inpars = params.get('input')2725 outpars = params.get('output')2726 declare = params.get('declare')2727 code = params.get('code')2728 if code is None:2729 raise fdb_embedded.ProgrammingError("Missing required parameter: 'code'.")2730 #2731 header = ''2732 if inpars is not None:2733 if isinstance(inpars,(list,tuple)):2734 numpars = len(inpars)2735 if numpars == 1:2736 header = ' (%s)\n' % inpars2737 else:2738 header = ' (\n'2739 i = 12740 for p in inpars:2741 header += ' %s%s\n' % (p,'' if i == numpars else ',')2742 i += 12743 header += ')\n'2744 else:2745 header = ' (%s)\n' % inpars2746 #2747 if outpars is not None:2748 if not header:2749 header += '\n'2750 if isinstance(outpars,(list,tuple)):2751 numpars = len(outpars)2752 if numpars == 1:2753 header += 'RETURNS (%s)\n' % outpars2754 else:2755 header += 'RETURNS (\n'2756 i = 12757 for p in outpars:2758 header += ' %s%s\n' % (p,'' if i == numpars else ',')2759 i += 12760 header += ')\n'2761 else:2762 header += 'RETURNS (%s)\n' % outpars2763 #2764 if code:2765 if declare is None:2766 d = ''2767 elif isinstance(declare,(list,tuple)):2768 d = ''2769 for x in declare:2770 d += ' %s\n' % x2771 else:2772 d = '%s\n' % declare2773 if isinstance(code,(list,tuple)):2774 c = ''2775 for x in code:2776 c += ' %s\n' % x2777 else:2778 c = '%s\n' % code2779 body = '%sAS\n%sBEGIN\n%sEND' % ('' if header else '\n',d,c)2780 else:2781 body = '%sAS\nBEGIN\nEND' % ('' if header else '\n')2782 #2783 return 'ALTER PROCEDURE %s%s%s' % (self.get_quoted_name(),header,body)2784 def _get_drop_sql(self,**params):2785 self._check_params(params,[])2786 return 'DROP PROCEDURE %s' % self.get_quoted_name()2787 def __param_columns(self):2788 cols = ['RDB$PARAMETER_NAME','RDB$PROCEDURE_NAME','RDB$PARAMETER_NUMBER',2789 'RDB$PARAMETER_TYPE','RDB$FIELD_SOURCE','RDB$DESCRIPTION', 2790 'RDB$SYSTEM_FLAG']2791 if self.__ods >= fdb_embedded.ODS_FB_21:2792 cols.extend(['RDB$DEFAULT_SOURCE','RDB$COLLATION_ID','RDB$NULL_FLAG',2793 'RDB$PARAMETER_MECHANISM'])2794 if self.__ods >= fdb_embedded.ODS_FB_25:2795 cols.extend(['RDB$FIELD_NAME','RDB$RELATION_NAME'])2796 return ','.join(cols)2797 def _get_name(self):2798 return self._attributes['RDB$PROCEDURE_NAME']2799 def _get_id(self):2800 return self._attributes['RDB$PROCEDURE_ID']2801 def _get_source(self):2802 return self._attributes['RDB$PROCEDURE_SOURCE']2803 def _get_security_class(self):2804 return self._attributes['RDB$SECURITY_CLASS']2805 def _get_owner_name(self):2806 return self._attributes['RDB$OWNER_NAME']2807 def _get_input_params(self):2808 if self.__inputParams is None:2809 if self.has_input():2810 self.__inputParams = [ProcedureParameter(self.schema,self,row) for row in 2811 self.schema._select("""select %s from rdb$procedure_parameters 2812where rdb$procedure_name = ? 2813and rdb$parameter_type = 0 2814order by rdb$parameter_number""" % self.__param_columns(),(self.name,))]2815 else:2816 self.__inputParams = []2817 return self.__inputParams2818 def _get_output_params(self):2819 if self.__outputParams is None:2820 if self.has_output():2821 self.__outputParams = [ProcedureParameter(self.schema,self,row) for row in 2822 self.schema._select("""select %s from rdb$procedure_parameters 2823where rdb$procedure_name = ? 2824and rdb$parameter_type = 1 2825order by rdb$parameter_number""" % self.__param_columns(),(self.name,))]2826 else:2827 self.__outputParams = []2828 return self.__outputParams2829 def _get_proc_type(self):2830 return self._attributes.get('RDB$PROCEDURE_TYPE',0)2831 def _get_valid_blr(self):2832 result = self._attributes.get('RDB$VALID_BLR')2833 return bool(result) if result is not None else None2834 def _get_privileges(self):2835 return [p for p in self.schema.privileges2836 if ((p.subject_name == self.name) and 2837 (p.subject_type in self._type_code))]2838 2839 #--- Properties2840 id = LateBindingProperty(_get_id,None,None,"Internal unique ID number.")2841 source = LateBindingProperty(_get_source,None,None,"PSQL source code.")2842 security_class = LateBindingProperty(_get_security_class,None,None,2843 "Security class that define access limits to the procedure.")2844 owner_name = LateBindingProperty(_get_owner_name,None,None,2845 "User name of procedure's creator.")2846 input_params = LateBindingProperty(_get_input_params,None,None,2847 "List of input parameters.\nInstances are :class:`ProcedureParameter` instances.")2848 output_params = LateBindingProperty(_get_output_params,None,None,2849 "List of output parameters.\nInstances are :class:`ProcedureParameter` instances.")2850 privileges = LateBindingProperty(_get_privileges,None,None,2851 "List of :class:`Privilege` objects granted to this object.")2852 # FB 2.12853 proc_type = LateBindingProperty(_get_proc_type,None,None,2854 "Procedure type code. See :attr:`fdb_embedded.Connection.enum_procedure_types`.")2855 valid_blr = LateBindingProperty(_get_valid_blr,None,None,2856 "Procedure BLR invalidation flag. Coul be True/False or None.")2857 2858 #--- Public2859 2860 def accept_visitor(self,visitor):2861 """Visitor Pattern support. Calls `visitProcedure(self)` on parameter object.2862 2863 :param visitor: Visitor object of Vistior Pattern.2864 """2865 visitor.visitProcedure(self)2866 def get_param(self,name):2867 "Returns :class:`ProcedureParameter` with specified name or None"2868 for p in self.output_params:2869 if p.name == name:2870 return p2871 for p in self.input_params:2872 if p.name == name:2873 return p2874 return None2875 def has_input(self):2876 "Returns True if procedure has any input parameters."2877 return bool(self._attributes['RDB$PROCEDURE_INPUTS'])2878 def has_output(self):2879 "Returns True if procedure has any output parameters."2880 return bool(self._attributes['RDB$PROCEDURE_OUTPUTS'])2881class Role(BaseSchemaItem):2882 """Represents user role.2883 Supported SQL actions: 2884 2885 - User role: create, drop2886 - System role: none2887 """2888 def __init__(self,schema,attributes):2889 super(Role,self).__init__(schema,attributes)2890 self._type_code = [13,]2891 self._strip_attribute('RDB$ROLE_NAME')2892 self._strip_attribute('RDB$OWNER_NAME')2893 if not self.issystemobject():2894 self._actions = ['create','drop']2895 #--- Protected2896 def _get_create_sql(self,**params):2897 self._check_params(params,[])2898 return 'CREATE ROLE %s' % self.get_quoted_name()2899 def _get_drop_sql(self,**params):2900 self._check_params(params,[])2901 return 'DROP ROLE %s' % self.get_quoted_name()2902 def _get_name(self):2903 return self._attributes['RDB$ROLE_NAME']2904 def _get_owner_name(self):2905 return self._attributes['RDB$OWNER_NAME']2906 def _get_privileges(self):2907 return [p for p in self.schema.privileges2908 if ((p.subject_name == self.name) and 2909 (p.subject_type in self._type_code))]2910 #--- Properties2911 owner_name = LateBindingProperty(_get_owner_name,None,None,"User name of role owner.")2912 privileges = LateBindingProperty(_get_privileges,None,None,2913 "List of :class:`Privilege` objects granted to this object.")2914 #--- Public2915 2916 def accept_visitor(self,visitor):2917 """Visitor Pattern support. Calls `visitRole(self)` on parameter object.2918 2919 :param visitor: Visitor object of Vistior Pattern.2920 """2921 visitor.visitRole(self)2922class FunctionArgument(BaseSchemaItem):2923 """Represets UDF argument.2924 Supported SQL actions: none.2925 """2926 def __init__(self,schema,function,attributes):2927 super(FunctionArgument,self).__init__(schema,attributes)2928 self._type_code = [15,]2929 self.__function = function2930 self._strip_attribute('RDB$FUNCTION_NAME')2931 #--- Protected2932 def _get_name(self):2933 return self._attributes['RDB$FUNCTION_NAME']+'_'+str(self._get_position())2934 def _get_function(self):2935 return self.__function2936 #return self.schema.get_function(self._attributes['RDB$FUNCTION_NAME'])2937 def _get_position(self):2938 return self._attributes['RDB$ARGUMENT_POSITION']2939 def _get_mechanism(self):2940 return abs(self._attributes['RDB$MECHANISM'])2941 def _get_length(self):2942 return self._attributes['RDB$FIELD_LENGTH']2943 def _get_scale(self):2944 return self._attributes['RDB$FIELD_SCALE']2945 def _get_field_type(self):2946 return self._attributes['RDB$FIELD_TYPE']2947 def _get_sub_type(self):2948 return self._attributes['RDB$FIELD_SUB_TYPE']2949 def _get_character_length(self):2950 return self._attributes['RDB$CHARACTER_LENGTH']2951 def _get_character_set(self):2952 return self.schema.get_character_set_by_id(self._attributes['RDB$CHARACTER_SET_ID'])2953 def _get_precision(self):2954 return self._attributes['RDB$FIELD_PRECISION']2955 def _get_datatype(self):2956 l = []2957 precision_known = False2958 if self.field_type in (FBT_SMALLINT,FBT_INTEGER,FBT_BIGINT):2959 if self.precision != None:2960 if (self.sub_type > 0) and (self.sub_type < MAX_INTSUBTYPES):2961 l.append('%s(%d, %d)' % \...
bilibili.py
Source:bilibili.py
...198# å ID 解æå¨çåç±»åºäº https://github.com/mengshouer/nonebot_plugin_analysis_bilibili ç c79dbe9 æ交çæ¬199class BaseID(abc.ABC):200 def __init__(self, text: str):201 search_result: Optional[Match[str]] = re.compile(self._get_pattern(), re.I).search(text)202 self.subtype: str = self._get_sub_type()203 self.suburl: Optional[str] = self._get_api_url(search_result) if search_result else None204 @abc.abstractmethod205 def _get_pattern(self) -> str:206 """è¿åä¸ä¸ªæ£å表达å¼ï¼ç¨äºä»ç¨æ·æ¶æ¯ä¸å¹é
åºåªä½id"""207 raise NotImplementedError208 @abc.abstractmethod209 def _get_sub_type(self) -> str:210 """è¿å该idçåªä½ç±»åï¼ï¼"""211 raise NotImplementedError212 @abc.abstractmethod213 def _get_api_url(self, search_result: Match[str]) -> str:214 """æ ¹æ®search_resultè¿åä¸ä¸ªè¯¥åªä½å¯¹åºçapiå°å"""215 raise NotImplementedError216class BVID(BaseID):217 def _get_pattern(self): return r'BV([A-Za-z0-9]{10})+'218 def _get_sub_type(self): return 'video'219 def _get_api_url(self, search_result): return f'https://api.bilibili.com/x/web-interface/view?bvid={search_result[0]}'220class AID(BaseID):221 def _get_pattern(self): return r'av\d+'222 def _get_sub_type(self): return 'video'223 def _get_api_url(self, search_result): return f'https://api.bilibili.com/x/web-interface/view?aid={search_result[0][2:]}'224class EPID(BaseID):225 def _get_pattern(self): return r'ep\d+'226 def _get_sub_type(self): return 'bangumi'227 def _get_api_url(self, search_result): return f'https://bangumi.bilibili.com/view/web_api/season?ep_id={search_result[0][2:]}'228class SeasonID(BaseID):229 def _get_pattern(self): return r'ss\d+'230 def _get_sub_type(self): return 'bangumi'231 def _get_api_url(self, search_result): return f'https://bangumi.bilibili.com/view/web_api/season?season_id={search_result[0][2:]}'232class MediaID(BaseID):233 def _get_pattern(self): return r'md\d+'234 def _get_sub_type(self): return 'bangumi'235 def _get_api_url(self, search_result): return f'https://bangumi.bilibili.com/view/web_api/season?media_id={search_result[0][2:]}'236class RoomID(BaseID):237 def _get_pattern(self): return r'live.bilibili.com/(blanc/|h5/)?(\d+)'238 def _get_sub_type(self): return 'live'239 def _get_api_url(self, search_result): return f'https://api.live.bilibili.com/xlive/web-room/v1/index/getInfoByRoom?room_id={search_result[2]}'240class CVID(BaseID):241 def _get_pattern(self): return r'(cv|/read/(mobile|native)(/|\?id=))(\d+)'242 def _get_sub_type(self): return 'article'243 def _get_api_url(self, search_result): return f'https://api.bilibili.com/x/article/viewinfo?id={search_result[4]}&mobi_app=pc&from=web'244class Dynamic2ID(BaseID):245 def _get_pattern(self): return r'([tm]).bilibili.com/(\d+)\?(.*?)(&|&)type=2'246 def _get_sub_type(self): return 'dynamic'247 def _get_api_url(self, search_result): return f'https://api.vc.bilibili.com/dynamic_svr/v1/dynamic_svr/get_dynamic_detail?rid={search_result[2]}&type=2'248class DynamicID(BaseID):249 def _get_pattern(self): return r'([tm]).bilibili.com/(\d+)'250 def _get_sub_type(self): return 'dynamic'251 def _get_api_url(self, search_result): return f'https://api.vc.bilibili.com/dynamic_svr/v1/dynamic_svr/get_dynamic_detail?dynamic_id={search_result[2]}'252class VideoResponse(BaseModel):253 class Owner(BaseModel):254 name: str255 class Stat(BaseModel):256 view: int257 danmaku: int258 reply: int259 favorite: int260 coin: int261 share: int262 like: int263 dislike: int264 pubdate: int...
subclasses.py
Source:subclasses.py
...51 f"registered: " f"[{', '.join([name for name in self.keys()])}]"52 )53 def load_typed_runbook(self, raw_runbook: Any) -> T_BASECLASS:54 type_name = raw_runbook[constants.TYPE]55 sub_type = self._get_sub_type(type_name)56 instance: Any = schema.load_by_type(sub_type, raw_runbook)57 if hasattr(instance, "extended_schemas"):58 if instance.extended_schemas:59 raise LisaException(60 f"found unknown fields: {instance.extended_schemas}"61 )62 return cast(T_BASECLASS, instance)63 def create_by_type_name(self, type_name: str, **kwargs: Any) -> T_BASECLASS:64 sub_type = self._get_sub_type(type_name)65 return cast(T_BASECLASS, sub_type(**kwargs))66 def create_by_runbook(67 self, runbook: schema.TypedSchema, **kwargs: Any68 ) -> T_BASECLASS:69 sub_type = self._get_sub_type(runbook.type)70 sub_type_with_runbook = cast(Type[BaseClassWithRunbookMixin], sub_type)71 sub_object = sub_type_with_runbook.create_with_runbook(72 runbook=runbook, **kwargs73 )74 assert isinstance(75 sub_object, BaseClassWithRunbookMixin76 ), f"actual: {type(sub_object)}"77 return cast(T_BASECLASS, sub_object)78 def _get_subclasses(79 self, type: Type[BaseClassMixin]80 ) -> Iterable[Type[BaseClassMixin]]:81 # recursive loop subclasses of subclasses82 for subclass_type in type.__subclasses__():83 yield subclass_type84 yield from self._get_subclasses(subclass_type)85 def _get_sub_type(self, type_name: str) -> type:86 self.initialize()87 sub_type = self.get(type_name)88 if sub_type is None:89 raise LisaException(90 f"cannot find subclass '{type_name}' of {self._base_type.__name__}. "91 f"Supported types include: {list(self.keys())}. "92 f"Are you missing an import in 'mixin_modules.py' or an extension?"93 )...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!