Best Python code snippet using Kiwi_python
net.py
Source:net.py
1# -*- coding: utf8 -*-2# This file is part of PywerView.3# PywerView is free software: you can redistribute it and/or modify4# it under the terms of the GNU General Public License as published by5# the Free Software Foundation, either version 3 of the License, or6# (at your option) any later version.7# PywerView is distributed in the hope that it will be useful,8# but WITHOUT ANY WARRANTY; without even the implied warranty of9# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the10# GNU General Public License for more details.11# You should have received a copy of the GNU General Public License12# along with PywerView. If not, see <http://www.gnu.org/licenses/>.13# Yannick Méheut [yannick (at) meheut (dot) org] - Copyright © 201614import socket15from impacket.dcerpc.v5.ndr import NULL16from impacket.dcerpc.v5 import wkst, srvs, samr17from impacket.dcerpc.v5.samr import DCERPCSessionError18from impacket.dcerpc.v5.rpcrt import DCERPCException19from bs4 import BeautifulSoup20from pywerview.requester import LDAPRPCRequester21import pywerview.objects.adobjects as adobj22import pywerview.objects.rpcobjects as rpcobj23import pywerview.functions.misc24class NetRequester(LDAPRPCRequester):25 @LDAPRPCRequester._ldap_connection_init26 def get_adobject(self, queried_domain=str(), queried_sid=str(),27 queried_name=str(), queried_sam_account_name=str(),28 ads_path=str(), custom_filter=str()):29 for attr_desc, attr_value in (('objectSid', queried_sid), ('name', queried_name),30 ('samAccountName', queried_sam_account_name)):31 if attr_value:32 object_filter = '(&({}={}){})'.format(attr_desc, attr_value, custom_filter)33 break34 return self._ldap_search(object_filter, adobj.ADObject)35 @LDAPRPCRequester._ldap_connection_init36 def get_netuser(self, queried_username=str(), queried_domain=str(),37 ads_path=str(), admin_count=False, spn=False,38 unconstrained=False, allow_delegation=False,39 custom_filter=str()):40 if unconstrained:41 custom_filter += '(userAccountControl:1.2.840.113556.1.4.803:=524288)'42 if allow_delegation:43 custom_filter += '(!(userAccountControl:1.2.840.113556.1.4.803:=1048574))'44 if admin_count:45 custom_filter += '(admincount=1)'46 user_search_filter = '(samAccountType=805306368){}'.format(custom_filter)47 if queried_username:48 user_search_filter += '(samAccountName={})'.format(queried_username)49 elif spn:50 user_search_filter += '(servicePrincipalName={})'.format(spn)51 user_search_filter = '(&{})'.format(user_search_filter)52 return self._ldap_search(user_search_filter, adobj.User)53 @LDAPRPCRequester._ldap_connection_init54 def get_netgroup(self, queried_groupname='*', queried_sid=str(),55 queried_username=str(), queried_domain=str(),56 ads_path=str(), admin_count=False, full_data=False,57 custom_filter=str()):58 if admin_count:59 custom_filter += '(admincount=1)'60 group_search_filter = custom_filter61 if queried_username:62 group_search_filter += '(samAccountName={})'.format(queried_username)63 attributes = ['memberOf']64 else:65 group_search_filter += '(objectCategory=group)'66 if queried_sid:67 group_search_filter += '(objectSid={})'.format(queried_sid)68 elif queried_groupname:69 group_search_filter += '(name={})'.format(queried_groupname)70 if full_data:71 attributes=list()72 else:73 attributes=['samaccountname']74 group_search_filter = '(&{})'.format(group_search_filter)75 return self._ldap_search(group_search_filter, adobj.Group, attributes=attributes)76 @LDAPRPCRequester._ldap_connection_init77 def get_netcomputer(self, queried_computername='*', queried_spn=str(),78 queried_os=str(), queried_sp=str(), queried_domain=str(),79 ads_path=str(), printers=False, unconstrained=False,80 ping=False, full_data=False, custom_filter=str()):81 if unconstrained:82 custom_filter += '(userAccountControl:1.2.840.113556.1.4.803:=524288)'83 if printers:84 custom_filter += '(objectCategory=printQueue)'85 computer_search_filter = '(samAccountType=805306369){}'.format(custom_filter)86 for (attr_desc, attr_value) in (('servicePrincipalName', queried_spn),87 ('operatingSystem', queried_os), ('operatingsystemservicepack', queried_sp),88 ('dnsHostName', queried_computername)):89 if attr_value:90 computer_search_filter += '({}={})'.format(attr_desc, attr_value)91 if full_data:92 attributes=list()93 else:94 attributes=['dnsHostName']95 computer_search_filter = '(&{})'.format(computer_search_filter)96 return self._ldap_search(computer_search_filter, adobj.Computer, attributes=attributes)97 @LDAPRPCRequester._ldap_connection_init98 def get_netdomaincontroller(self, queried_domain=str()):99 domain_controller_filter = '(userAccountControl:1.2.840.113556.1.4.803:=8192)'100 return self.get_netcomputer(queried_domain=queried_domain, full_data=True,101 custom_filter=domain_controller_filter)102 @LDAPRPCRequester._ldap_connection_init103 def get_netfileserver(self, queried_domain=str(), target_users=list()):104 def split_path(path):105 split_path = path.split('\\')106 if len(split_path) >= 3:107 return split_path[2]108 results = list()109 if target_users:110 users = list()111 for target_user in target_users:112 users += self.get_netuser(target_user, queried_domain)113 else:114 users = self.get_netuser(queried_domain=queried_domain)115 for user in users:116 if user.homedirectory:117 results.append(split_path(user.homedirectory))118 if user.scriptpath:119 results.append(split_path(user.scriptpath))120 if user.profilepath:121 results.append(split_path(user.profilepath))122 return results123 @LDAPRPCRequester._ldap_connection_init124 def get_dfsshare(self, version=['v1', 'v2'], queried_domain=str(), ads_path=str()):125 def _get_dfssharev1():126 dfs_search_filter = '(objectClass=fTDfs)'127 intermediate_results = self._ldap_search(dfs_search_filter, adobj.ADObject,128 attributes=['remoteservername', 'name'])129 results = list()130 for dfs in intermediate_results:131 for remote_server in dfs.remoteservername:132 remote_server = str(remote_server)133 if '\\' in remote_server:134 attributes = list()135 attributes.append({'type': 'name', 'vals': [dfs.name]})136 attributes.append({'type': 'remoteservername', 'vals': [remote_server.split('\\')[2]]})137 results.append(adobj.DFS(attributes))138 return results139 def _get_dfssharev2():140 dfs_search_filter = '(objectClass=msDFS-Linkv2)'141 intermediate_results = self._ldap_search(dfs_search_filter, adobj.ADObject,142 attributes=['msdfs-linkpathv2','msDFS-TargetListv2'])143 results = list()144 for dfs in intermediate_results:145 attributes = list()146 share_name = getattr(dfs, 'msdfs-linkpathv2')147 xml_target_list = getattr(dfs, 'msdfs-targetlistv2')[2:].decode('utf-16le')148 soup_target_list = BeautifulSoup(xml_target_list, 'xml')149 for target in soup_target_list.targets.contents:150 if '\\' in target.string:151 server_name, dfs_root = target.string.split('\\')[2:4]152 attributes.append({'type': 'remoteservername',153 'vals': [server_name]})154 attributes.append({'type': 'name',155 'vals': ['{}{}'.format(dfs_root, share_name)]})156 results.append(adobj.DFS(attributes))157 return results158 version_to_function = {'v1': _get_dfssharev1, 'v2': _get_dfssharev2}159 results = list()160 for v in version:161 results += version_to_function[v]()162 return results163 @LDAPRPCRequester._ldap_connection_init164 def get_netou(self, queried_domain=str(), queried_ouname='*',165 queried_guid=str(), ads_path=str(), full_data=False):166 ou_search_filter = '(objectCategory=organizationalUnit)'167 if queried_ouname:168 ou_search_filter += '(name={})'.format(queried_ouname)169 if queried_guid:170 ou_search_filter += '(gplink=*{}*)'.format(queried_guid)171 if full_data:172 attributes = list()173 else:174 attributes = ['distinguishedName']175 ou_search_filter = '(&{})'.format(ou_search_filter)176 return self._ldap_search(ou_search_filter, adobj.OU, attributes=attributes)177 @LDAPRPCRequester._ldap_connection_init178 def get_netsite(self, queried_domain=str(), queried_sitename=str(),179 queried_guid=str(), ads_path=str(), full_data=False):180 site_search_filter = '(objectCategory=site)'181 if queried_sitename:182 site_search_filter += '(name={})'.format(queried_sitename)183 if queried_guid:184 site_search_filter += '(gplink=*{}*)'.format(queried_guid)185 if full_data:186 attributes = list()187 else:188 attributes = ['name']189 site_search_filter = '(&{})'.format(site_search_filter)190 return self._ldap_search(site_search_filter, adobj.Site, attributes=attributes)191 @LDAPRPCRequester._ldap_connection_init192 def get_netsubnet(self, queried_domain=str(), queried_sitename=str(),193 ads_path=str(), full_data=False):194 subnet_search_filter = '(objectCategory=subnet)'195 if queried_sitename:196 if not queried_sitename.endswith('*'):197 queried_sitename += '*'198 subnet_search_filter += '(siteobject=*CN={})'.format(queried_sitename)199 if full_data:200 attributes = list()201 else:202 attributes = ['name', 'siteobject']203 subnet_search_filter = '(&{})'.format(subnet_search_filter)204 return self._ldap_search(subnet_search_filter, adobj.Subnet, attributes=attributes)205 @LDAPRPCRequester._ldap_connection_init206 def get_netgroupmember(self, queried_groupname=str(), queried_sid=str(),207 queried_domain=str(), ads_path=str(), recurse=False,208 use_matching_rule=False, full_data=False,209 custom_filter=str()):210 def _get_members(_groupname=str(), _sid=str()):211 try:212 if _groupname:213 groups = self.get_netgroup(queried_groupname=_groupname, full_data=True)214 else:215 if _sid:216 queried_sid = _sid217 else:218 with pywerview.functions.misc.Misc(self._domain_controller,219 self._domain, self._user,220 self._password, self._lmhash,221 self._nthash) as misc_requester:222 queried_sid = misc_requester.get_domainsid(queried_domain) + '-512'223 groups = self.get_netgroup(queried_sid=queried_sid, full_data=True)224 except IndexError:225 raise ValueError('The group {} was not found'.format(_groupname))226 final_members = list()227 for group in groups:228 members = list()229 if recurse and use_matching_rule:230 group_memberof_filter = '(&(samAccountType=805306368)(memberof:1.2.840.113556.1.4.1941:={}){})'.format(group.distinguishedname, custom_filter)231 members = self.get_netuser(custom_filter=group_memberof_filter)232 else:233 # TODO: range cycling234 try:235 for member in group.member:236 dn_filter = '(distinguishedname={}){}'.format(member, custom_filter)237 members += self.get_netuser(custom_filter=dn_filter)238 members += self.get_netgroup(custom_filter=dn_filter, full_data=True)239 # The group doesn't have any members240 except AttributeError:241 continue242 for member in members:243 if full_data:244 final_member = member245 else:246 final_member = adobj.ADObject(list())247 member_dn = member.distinguishedname248 try:249 member_domain = member_dn[member_dn.index('DC='):].replace('DC=', '').replace(',', '.')250 except IndexError:251 member_domain = str()252 is_group = (member.samaccounttype != '805306368')253 attributes = list()254 if queried_domain:255 attributes.append({'type': 'groupdomain', 'vals': [queried_domain]})256 else:257 attributes.append({'type': 'groupdomain', 'vals': [self._domain]})258 attributes.append({'type': 'groupname', 'vals': [group.name]})259 attributes.append({'type': 'membername', 'vals': [member.samaccountname]})260 attributes.append({'type': 'memberdomain', 'vals': [member_domain]})261 attributes.append({'type': 'isgroup', 'vals': [is_group]})262 attributes.append({'type': 'memberdn', 'vals': [member_dn]})263 attributes.append({'type': 'membersid', 'vals': [member.objectsid]})264 final_member.add_attributes(attributes)265 final_members.append(final_member)266 return final_members267 results = list()268 groups_to_process = [(queried_groupname, queried_sid)]269 while groups_to_process:270 groupname, sid = groups_to_process.pop(0)271 members = _get_members(groupname, sid)272 for member in members:273 results.append(member)274 if (recurse and (not use_matching_rule) and member.isgroup and member.membername):275 groups_to_process.append((member.membername, str()))276 return results277 @LDAPRPCRequester._rpc_connection_init(r'\srvsvc')278 def get_netsession(self):279 try:280 resp = srvs.hNetrSessionEnum(self._rpc_connection, '\x00', NULL, 10)281 except DCERPCException:282 return list()283 results = list()284 for session in resp['InfoStruct']['SessionInfo']['Level10']['Buffer']:285 results.append(rpcobj.Session(session))286 return results287 @LDAPRPCRequester._rpc_connection_init(r'\srvsvc')288 def get_netshare(self):289 resp = srvs.hNetrShareEnum(self._rpc_connection, 1)290 results = list()291 for share in resp['InfoStruct']['ShareInfo']['Level1']['Buffer']:292 results.append(rpcobj.Share(share))293 return results294 @LDAPRPCRequester._rpc_connection_init(r'\srvsvc')295 def get_localdisks(self):296 resp = srvs.hNetrServerDiskEnum(self._rpc_connection, 0)297 results = list()298 for disk in resp['DiskInfoStruct']['Buffer']:299 if disk['Disk'] != '\x00':300 results.append(rpcobj.Disk(disk))301 return results302 @LDAPRPCRequester._rpc_connection_init(r'\samr')303 def get_netdomain(self):304 resp = samr.hSamrConnect(self._rpc_connection)305 server_handle = resp['ServerHandle']306 # We first list every domain in the SAM307 resp = samr.hSamrEnumerateDomainsInSamServer(self._rpc_connection, server_handle)308 results = list()309 for domain in resp['Buffer']['Buffer']:310 results.append(domain['Name'])311 return results312 @LDAPRPCRequester._rpc_connection_init(r'\wkssvc')313 def get_netloggedon(self):314 try:315 resp = wkst.hNetrWkstaUserEnum(self._rpc_connection, 1)316 except DCERPCException:317 return list()318 results = list()319 for wksta_user in resp['UserInfo']['WkstaUserInfo']['Level1']['Buffer']:320 results.append(rpcobj.WkstaUser(wksta_user))321 return results322 # TODO: if self._target_computer == self._domain_controller, check that323 # self._domain_controller is indeed a domain controller324 @LDAPRPCRequester._ldap_connection_init325 @LDAPRPCRequester._rpc_connection_init(r'\samr')326 def get_netlocalgroup(self, queried_groupname=str(), list_groups=False,327 recurse=False):328 from impacket.nt_errors import STATUS_MORE_ENTRIES329 results = list()330 resp = samr.hSamrConnect(self._rpc_connection)331 server_handle = resp['ServerHandle']332 # We first list every domain in the SAM333 resp = samr.hSamrEnumerateDomainsInSamServer(self._rpc_connection, server_handle)334 domains = resp['Buffer']['Buffer']335 domain_handles = dict()336 for local_domain in domains:337 resp = samr.hSamrLookupDomainInSamServer(self._rpc_connection, server_handle, local_domain['Name'])338 domain_sid = 'S-1-5-{}'.format('-'.join(str(x) for x in resp['DomainId']['SubAuthority']))339 resp = samr.hSamrOpenDomain(self._rpc_connection, serverHandle=server_handle, domainId=resp['DomainId'])340 domain_handles[domain_sid] = resp['DomainHandle']341 # If we list the groups342 if list_groups:343 # We browse every domain344 for domain_sid, domain_handle in domain_handles.items():345 # We enumerate local groups in every domain346 enumeration_context = 0347 groups = list()348 while True:349 resp = samr.hSamrEnumerateAliasesInDomain(self._rpc_connection, domain_handle,350 enumerationContext=enumeration_context)351 groups += resp['Buffer']['Buffer']352 enumeration_context = resp['EnumerationContext']353 if resp['ErrorCode'] != STATUS_MORE_ENTRIES:354 break355 # We get information on every group356 for group in groups:357 resp = samr.hSamrRidToSid(self._rpc_connection, domain_handle, rid=group['RelativeId'])358 sid = 'S-1-5-{}'.format('-'.join(str(x) for x in resp['Sid']['SubAuthority']))359 resp = samr.hSamrOpenAlias(self._rpc_connection, domain_handle, aliasId=group['RelativeId'])360 alias_handle = resp['AliasHandle']361 resp = samr.hSamrQueryInformationAlias(self._rpc_connection, alias_handle)362 final_group = rpcobj.Group(resp['Buffer']['General'])363 final_group.add_atributes({'server': self._target_computer, 'sid': sid})364 results.append(final_group)365 samr.hSamrCloseHandle(self._rpc_connection, alias_handle)366 samr.hSamrCloseHandle(self._rpc_connection, domain_handle)367 # If we query a group368 else:369 queried_group_rid = None370 queried_group_domain_handle = None371 # If the user is looking for a particular group372 if queried_groupname:373 # We look for it in every domain374 for _, domain_handle in domain_handles.items():375 try:376 resp = samr.hSamrLookupNamesInDomain(self._rpc_connection, domain_handle, [queried_groupname])377 queried_group_rid = resp['RelativeIds']['Element'][0]['Data']378 queried_group_domain_handle = domain_handle379 break380 except (DCERPCSessionError, KeyError, IndexError):381 continue382 else:383 raise ValueError('The group \'{}\' was not found on the target server'.format(queried_groupname))384 # Otherwise, we look for the local Administrators group385 else:386 queried_group_rid = 544387 resp = samr.hSamrLookupDomainInSamServer(self._rpc_connection, server_handle, 'BUILTIN')388 resp = samr.hSamrOpenDomain(self._rpc_connection, serverHandle=server_handle, domainId=resp['DomainId'])389 queried_group_domain_handle = resp['DomainHandle']390 # We get a handle on the group, and list its members391 try:392 group = samr.hSamrOpenAlias(self._rpc_connection, queried_group_domain_handle, aliasId=queried_group_rid)393 resp = samr.hSamrGetMembersInAlias(self._rpc_connection, group['AliasHandle'])394 except DCERPCSessionError:395 raise ValueError('The name \'{}\' is not a valid group on the target server'.format(queried_groupname))396 # For every user, we look for information in every local domain397 for member in resp['Members']['Sids']:398 attributes = dict()399 member_rid = member['SidPointer']['SubAuthority'][-1]400 member_sid = 'S-1-5-{}'.format('-'.join(str(x) for x in member['SidPointer']['SubAuthority']))401 attributes['server'] = self._target_computer402 attributes['sid'] = member_sid403 for domain_sid, domain_handle in domain_handles.items():404 # We've found a local member405 if member_sid.startswith(domain_sid):406 attributes['isdomain'] = False407 resp = samr.hSamrQueryInformationDomain(self._rpc_connection, domain_handle)408 member_domain = resp['Buffer']['General2']['I1']['DomainName']409 try:410 resp = samr.hSamrOpenUser(self._rpc_connection, domain_handle, userId=member_rid)411 member_handle = resp['UserHandle']412 attributes['isgroup'] = False413 resp = samr.hSamrQueryInformationUser(self._rpc_connection, member_handle)414 attributes['name'] = '{}/{}'.format(member_domain, resp['Buffer']['General']['UserName'])415 except DCERPCSessionError:416 resp = samr.hSamrOpenAlias(self._rpc_connection, domain_handle, aliasId=member_rid)417 member_handle = resp['AliasHandle']418 attributes['isgroup'] = True419 resp = samr.hSamrQueryInformationAlias(self._rpc_connection, member_handle)420 attributes['name'] = '{}/{}'.format(member_domain, resp['Buffer']['General']['Name'])421 attributes['lastlogin'] = str()422 break423 # It's a domain member424 else:425 attributes['isdomain'] = True426 try:427 ad_object = self.get_adobject(queried_sid=member_sid)[0]428 member_dn = ad_object.distinguishedname429 member_domain = member_dn[member_dn.index('DC='):].replace('DC=', '').replace(',', '.')430 try:431 attributes['name'] = '{}/{}'.format(member_domain, ad_object.samaccountname)432 except AttributeError:433 # Here, the member is a foreign security principal434 # TODO: resolve it properly435 attributes['name'] = '{}/{}'.format(member_domain, ad_object.objectsid)436 attributes['isgroup'] = ad_object.isgroup437 try:438 attributes['lastlogin'] = ad_object.lastlogon439 except AttributeError:440 attributes['lastlogin'] = str()441 except IndexError:442 # We did not manage to resolve this SID against the DC443 attributes['isdomain'] = False444 attributes['isgroup'] = False445 attributes['name'] = attributes['sid']446 attributes['lastlogin'] = str()447 results.append(rpcobj.RPCObject(attributes))448 # If we recurse and the member is a domain group, we query every member449 # TODO: implement check on self._domain_controller here?450 if self._domain_controller and recurse and attributes['isdomain'] and attributes['isgroup']:451 for domain_member in self.get_netgroupmember(full_data=True, recurse=True, queried_sid=attributes['sid']):452 domain_member_attributes = dict()453 domain_member_attributes['isdomain'] = True454 member_dn = domain_member.distinguishedname455 member_domain = member_dn[member_dn.index('DC='):].replace('DC=', '').replace(',', '.')456 domain_member_attributes['name'] = '{}/{}'.format(member_domain, domain_member.samaccountname)457 domain_member_attributes['isgroup'] = domain_member.isgroup458 domain_member_attributes['isdomain'] = True459 domain_member_attributes['server'] = attributes['name']460 domain_member_attributes['sid'] = domain_member.objectsid461 try:462 domain_member_attributes['lastlogin'] = ad_object.lastlogon463 except AttributeError:464 domain_member_attributes['lastlogin'] = str()465 results.append(rpcobj.RPCObject(domain_member_attributes))...
wmirpc.py
Source:wmirpc.py
...32 self._pipe = None33 self._rpc_connection = None34 self._dcom = None35 self._wmi_connection = None36 def _create_rpc_connection(self, pipe):37 # Here we build the DCE/RPC connection38 self._pipe = pipe39 binding_strings = dict()40 binding_strings['srvsvc'] = srvs.MSRPC_UUID_SRVS41 binding_strings['wkssvc'] = wkst.MSRPC_UUID_WKST42 binding_strings['samr'] = samr.MSRPC_UUID_SAMR43 binding_strings['svcctl'] = scmr.MSRPC_UUID_SCMR44 binding_strings['drsuapi'] = drsuapi.MSRPC_UUID_DRSUAPI45 # TODO: try to fallback to TCP/139 if tcp/445 is closed46 if self._pipe == r'\drsuapi':47 string_binding = epm.hept_map(self._target_computer, drsuapi.MSRPC_UUID_DRSUAPI,48 protocol='ncacn_ip_tcp')49 rpctransport = transport.DCERPCTransportFactory(string_binding)50 rpctransport.set_credentials(username=self._user, password=self._password,51 domain=self._domain, lmhash=self._lmhash,52 nthash=self._nthash)53 else:54 rpctransport = transport.SMBTransport(self._target_computer, 445, self._pipe,55 username=self._user, password=self._password,56 domain=self._domain, lmhash=self._lmhash,57 nthash=self._nthash)58 rpctransport.set_connect_timeout(10)59 dce = rpctransport.get_dce_rpc()60 if self._pipe == r'\drsuapi':61 dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY)62 try:63 dce.connect()64 except socket.error:65 self._rpc_connection = None66 else:67 dce.bind(binding_strings[self._pipe[1:]])68 self._rpc_connection = dce69 def _create_wmi_connection(self, namespace='root\\cimv2'):70 try:71 self._dcom = DCOMConnection(self._target_computer, self._user, self._password,72 self._domain, self._lmhash, self._nthash)73 except DCERPCException:74 self._dcom = None75 else:76 i_interface = self._dcom.CoCreateInstanceEx(wmi.CLSID_WbemLevel1Login,77 wmi.IID_IWbemLevel1Login)78 i_wbem_level1_login = wmi.IWbemLevel1Login(i_interface)79 self._wmi_connection = i_wbem_level1_login.NTLMLogin(ntpath.join('\\\\{}\\'.format(self._target_computer), namespace),80 NULL, NULL)81 @staticmethod82 def _rpc_connection_init(pipe=r'\srvsvc'):83 def decorator(f):84 def wrapper(*args, **kwargs):85 instance = args[0]86 if (not instance._rpc_connection) or (pipe != instance._pipe):87 if instance._rpc_connection:88 instance._rpc_connection.disconnect()89 instance._create_rpc_connection(pipe=pipe)90 if instance._rpc_connection is None:91 return None92 return f(*args, **kwargs)93 return wrapper94 return decorator95 @staticmethod96 def _wmi_connection_init():97 def decorator(f):98 def wrapper(*args, **kwargs):99 instance = args[0]100 if not instance._wmi_connection:101 instance._create_wmi_connection()102 if instance._dcom is None:103 return None104 return f(*args, **kwargs)105 return wrapper106 return decorator107 def __enter__(self):108 # Picked because it's the most used by the net* functions109 self._create_rpc_connection(r'\srvsvc')110 return self111 def __exit__(self, type, value, traceback):112 try:113 self._rpc_connection.disconnect()114 except AttributeError:115 pass...
wallet.py
Source:wallet.py
...4 rpcuser = settings.DASHD_RPCUSER5 rpcpassword = settings.DASHD_RPCPASSWORD6 account_name = settings.DASHD_ACCOUNT_NAME7 @property8 def _rpc_connection(self):9 return AuthServiceProxy(settings.DASHD_URL)10 def get_address_balance(self, address, min_confirmations):11 return self._rpc_connection.getreceivedbyaddress(12 address,13 min_confirmations,14 )15 def get_new_address(self):16 return self._rpc_connection.getnewaddress(self.account_name)17 def send_to_address(self, address, amount):18 return self._rpc_connection.sendtoaddress(address, amount)19 def check_address_valid(self, address):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!