Best Python code snippet using molecule_python
ansible.py
Source:ansible.py
1# -*- coding: utf-8 -*-2'''3Read in an Ansible inventory file or script4Flat inventory files should be in the regular ansible inventory format.5.. code-block:: ini6 [servers]7 salt.gtmanfred.com ansible_ssh_user=gtmanfred ansible_ssh_host=127.0.0.1 ansible_ssh_port=22 ansible_ssh_pass='password'8 [desktop]9 home ansible_ssh_user=gtmanfred ansible_ssh_host=12.34.56.78 ansible_ssh_port=23 ansible_ssh_pass='password'10 [computers:children]11 desktop12 servers13 [names:vars]14 http_port=8015then salt-ssh can be used to hit any of them16.. code-block:: bash17 [~]# salt-ssh all test.ping18 salt.gtmanfred.com:19 True20 home:21 True22 [~]# salt-ssh desktop test.ping23 home:24 True25 [~]# salt-ssh computers test.ping26 salt.gtmanfred.com:27 True28 home:29 True30 [~]# salt-ssh salt.gtmanfred.com test.ping31 salt.gtmanfred.com:32 True33There is also the option of specifying a dynamic inventory, and generating it on the fly34.. code-block:: bash35 #!/bin/bash36 echo '{37 "servers": {38 "hosts": [39 "salt.gtmanfred.com"40 ]41 },42 "desktop": {43 "hosts": [44 "home"45 ]46 },47 "computers": {48 "hosts":{},49 "children": [50 "desktop",51 "servers"52 ]53 },54 "_meta": {55 "hostvars": {56 "salt.gtmanfred.com": {57 "ansible_ssh_user": "gtmanfred",58 "ansible_ssh_host": "127.0.0.1",59 "ansible_sudo_pass": "password",60 "ansible_ssh_port": 2261 },62 "home": {63 "ansible_ssh_user": "gtmanfred",64 "ansible_ssh_host": "12.34.56.78",65 "ansible_sudo_pass": "password",66 "ansible_ssh_port": 2367 }68 }69 }70 }'71This is the format that an inventory script needs to output to work with ansible, and thus here.72.. code-block:: bash73 [~]# salt-ssh --roster-file /etc/salt/hosts salt.gtmanfred.com test.ping74 salt.gtmanfred.com:75 True76Any of the [groups] or direct hostnames will return. The 'all' is special, and returns everything.77'''78from __future__ import absolute_import79import os80import re81import fnmatch82import shlex83import json84import salt.utils85import subprocess86from salt.roster import get_roster_file87CONVERSION = {88 'ansible_ssh_host': 'host',89 'ansible_ssh_port': 'port',90 'ansible_ssh_user': 'user',91 'ansible_ssh_pass': 'passwd',92 'ansible_sudo_pass': 'sudo',93 'ansible_ssh_private_key_file': 'priv'94}95def targets(tgt, tgt_type='glob', **kwargs):96 '''97 Return the targets from the ansible inventory_file98 Default: /etc/salt/roster99 '''100 if tgt == 'all':101 tgt = '*'102 inventory_file = get_roster_file(__opts__)103 if os.path.isfile(inventory_file) and os.access(inventory_file, os.X_OK):104 imatcher = Script(tgt, tgt_type='glob', inventory_file=inventory_file)105 else:106 imatcher = Inventory(tgt, tgt_type='glob', inventory_file=inventory_file)107 return imatcher.targets()108class Target(object):109 def targets(self):110 '''111 Execute the correct tgt_type routine and return112 '''113 try:114 return getattr(self, 'get_{0}'.format(self.tgt_type))()115 except AttributeError:116 return {}117 def get_glob(self):118 '''119 Return minions that match via glob120 '''121 ret = dict()122 for key, value in self.groups.items():123 for host, info in value.items():124 if fnmatch.fnmatch(host, self.tgt):125 ret[host] = info126 for nodegroup in self.groups:127 if fnmatch.fnmatch(nodegroup, self.tgt):128 ret.update(self.groups[nodegroup])129 for parent_nodegroup in self.parents:130 if fnmatch.fnmatch(parent_nodegroup, self.tgt):131 ret.update(self._get_parent(parent_nodegroup))132 return ret133 def _get_parent(self, parent_nodegroup):134 '''135 Recursively resolve all [*:children] group blocks136 '''137 ret = dict()138 for nodegroup in self.parents[parent_nodegroup]:139 if nodegroup in self.parents:140 ret.update(self._get_parent(nodegroup))141 elif nodegroup in self.groups:142 ret.update(self.groups[nodegroup])143 return ret144class Inventory(Target):145 '''146 Matcher for static inventory files147 '''148 def __init__(self, tgt, tgt_type='glob', inventory_file='/etc/salt/roster'):149 self.tgt = tgt150 self.tgt_type = tgt_type151 self.groups = dict()152 self.hostvars = dict()153 self.parents = dict()154 blocks = re.compile(r'^\[.*\]$')155 hostvar = re.compile(r'^\[([^:]+):vars\]$')156 parents = re.compile(r'^\[([^:]+):children\]$')157 with salt.utils.fopen(inventory_file) as config:158 for line in config.read().split('\n'):159 if not line or line.startswith('#'):160 continue161 if blocks.match(line):162 if hostvar.match(line):163 proc = '_parse_hostvars_line'164 varname = hostvar.match(line).groups()[0]165 elif parents.match(line):166 proc = '_parse_parents_line'167 varname = parents.match(line).groups()[0]168 else:169 proc = '_parse_group_line'170 varname = line.strip('[]')171 getattr(self, proc)(line, varname)172 continue173 def _parse_group_line(self, line, varname):174 '''175 Parse lines in the inventory file that are under the same group block176 '''177 line_args = shlex.split(line)178 name = line_args[0]179 host = {line_args[0]: dict()}180 for arg in line_args[1:]:181 key, value = arg.split('=')182 host[name][CONVERSION[key]] = value183 if 'sudo' in host[name]:184 host[name]['passwd'], host[name]['sudo'] = host[name]['sudo'], True185 if self.groups.get(varname, ''):186 self.groups[varname].update(host)187 else:188 self.groups[varname] = host189 def _parse_hostvars_line(self, line, varname):190 '''191 Parse lines in the inventory file that are under the same [*:vars] block192 '''193 key, value = line.split('=')194 if varname not in self.hostvars:195 self.hostvars[varname] = dict()196 self.hostvars[varname][key] = value197 def _parse_parents_line(self, line, varname):198 '''199 Parse lines in the inventory file that are under the same [*:children] block200 '''201 if varname not in self.parents:202 self.parents[varname] = []203 self.parents[varname].append(line)204class Script(Target):205 '''206 Matcher for Inventory scripts207 '''208 def __init__(self, tgt, tgt_type='glob', inventory_file='/etc/salt/roster'):209 self.tgt = tgt210 self.tgt_type = tgt_type211 inventory, error = subprocess.Popen([inventory_file], shell=True, stdout=subprocess.PIPE).communicate()212 self.inventory = json.loads(inventory)213 self.meta = self.inventory.get('_meta', {})214 self.groups = dict()215 self.hostvars = dict()216 self.parents = dict()217 for key, value in self.inventory.items():218 if key == '_meta':219 continue220 if 'hosts' in value:221 self._parse_groups(key, value['hosts'])222 if 'children' in value:223 self._parse_parents(key, value['children'])224 if 'hostvars' in value:225 self._parse_hostvars(key, value['hostvars'])226 def _parse_groups(self, key, value):227 '''228 Parse group data from inventory_file229 '''230 host = dict()231 if key not in self.groups:232 self.groups[key] = dict()233 for server in value:234 tmp = self.meta.get('hostvars', {}).get(server, False)235 if tmp is not False:236 if server not in host:237 host[server] = dict()238 for tmpkey, tmpval in tmp.items():239 host[server][CONVERSION[tmpkey]] = tmpval240 if 'sudo' in host[server]:241 host[server]['passwd'], host[server]['sudo'] = host[server]['sudo'], True242 self.groups[key].update(host)243 def _parse_hostvars(self, key, value):244 '''245 Parse hostvars data from inventory_file246 '''247 if key not in self.hostvars:248 self.hostvars[key] = dict()249 self.hostvars[key] = value250 def _parse_parents(self, key, value):251 '''252 Parse children data from inventory_file253 '''254 if key not in self.parents:255 self.parents[key] = []...
Inventory.py
Source:Inventory.py
1#!/usr/bin/env python32class InventoryItem:3 LOW_MSG = "Inventory item {} is low."4 EMPTY_MSG = "Inventory item {} is empty."5 6 def __init__(self,path,name,original_quantity,current_quantity,valve_num):7 self.inventory_file_path = path8 self.name = name9 self.org_quant = original_quantity10 self.cur_quant = current_quantity11 self.ratio_left = self.cur_quant / self.org_quant12 self.valve_number = valve_num13 14 self.checkRatio()15 def checkRatio(self):16 """Updates ratio value and sends a notification if low or empty."""17 self.ratio_left= self.cur_quant / self.org_quant18 19 if self.ratio_left < 0.5:20 if self.ratio_left <= 0:21 return self.EMPTY_MSG.format(self.name)22 else:23 return self.LOW_MSG.format(self.name)24 else:25 return "At least half of inventory left." #empty string returned if more than half full26 27 28 def updateQuantityLeft(self,new_quantity):29 """A new quantity value is inserted into the drink menu file."""30 lines = None31 with open(self.inventory_file_path ,"r+") as inventory_file:32 lines = inventory_file.readlines()33 lines[self.valve_number] ="{},{},{}\n".format(self.name,new_quantity,self.org_quant)34 35 with open(self.inventory_file_path ,"w") as inventory_file: 36 inventory_file.writelines(lines)37 38 self.cur_quant = new_quantity39 self.checkRatio()40 41 42 def switchValves(self,new_valve):43 """Switches the place of the current item and the given item position.44 The inventory values should be retrieved from file after this function."""45 lines = None46 with open(self.inventory_file_path ,"r+") as inventory_file:47 48 lines = inventory_file.readlines()49 if new_valve > len(lines) -1 or new_valve < 1: #don't include header50 return #invalid argument values51 52 old_line = lines[self.valve_number] #get current drink menu line53 new_line = lines[new_valve] #get the new position's line54 55 lines[self.valve_number] = new_line56 lines[new_valve] = old_line57 58 with open(self.inventory_file_path ,"w") as inventory_file: 59 inventory_file.writelines(lines)60 61 def replaceItem(self,new_name,new_current_quantity,new_original_quantity):62 """Replaces current Inventory item with the new one specified in63 the argument values. Should recollect inventory values from file64 after using this method."""65 lines = None66 with open(self.inventory_file_path ,"r+") as inventory_file:67 lines = inventory_file.readlines()68 lines[self.valve_number] ="{},{},{}\n".format(new_name,69 new_current_quantity,new_original_quantity)70 with open(self.inventory_file_path ,"w") as inventory_file: 71 inventory_file.writelines(lines)72 73 def addNewItem(self):74 """Create a new item to the inventory file"""75 new_item = "{},{},{}\n".format(self.name,self.cur_quant,self.org_quant)76 with open(self.inventory_file_path ,"a+") as inventory_file:77 inventory_file.write(new_item)78 79 def deleteItem(self):80 """Removes the current item from inventory file."""81 with open(self.inventory_file_path ,"r+") as inventory_file:82 lines = inventory_file.readlines()83 84 del lines[self.valve_number] #removes that line from the file85 with open(self.inventory_file_path ,"w") as inventory_file: 86 inventory_file.writelines(lines)87 88 89 ...
get-aws-instance-details.py
Source:get-aws-instance-details.py
1import yaml, boto32results={} 3ansible_user="ec2-user" 4ansible_ssh_private_key_file="~/.ssh/dcarmack.pem"5inventory_file="/Users/dcarmack/Documents/git/splunk-ansible/inventory/hosts"6ansible_vars="/Users/dcarmack/Documents/git/splunk-ansible/group_vars/dynamic"7#u'State': {u'Code': 16, u'Name': 'running'}8filter1={"Name":"instance-state-name","Values":["running"]}9filter2={"Name":"tag:generated_by","Values":["ansible"]}10ec2client = boto3.client('ec2')11response = ec2client.describe_instances(Filters=[filter1,filter2])12for reservation in response["Reservations"]:13 14 public_dns=[]15 private_dns=[]16 hostname=[]17 for instance in reservation["Instances"]:18 public_dns.append(instance["PublicDnsName"])19 private_dns.append(instance["PrivateDnsName"])20 hostname.append('ip-%s' % instance["PrivateIpAddress"])21 for x in instance['Tags']:22 if x['Key'] == "Name":23 instance_type=x['Value']24 results.update({instance_type:{'public_dns':public_dns,'private_dns':private_dns,'hostname':hostname}})25def createInventoryFile(inventory_file):26 inventory_file=open(inventory_file,'w')27 inventory_file.write("[localhost]\nlocalhost ansible_connection=local ansible_python_interpreter=python\n\n")28 for each in results:29 inventory_file.write('[%s]\n' % each)30 for dns in results[each]['public_dns']:31 inventory_file.write('%s ansible_user=%s ansible_ssh_private_key_file=%s\n' % (dns,ansible_user,ansible_ssh_private_key_file))32 inventory_file.write("\n")33 inventory_file.close()34def updateYaml(value,file):35 cm_private_dns = value['cluster-master']['private_dns'][0]36 hostname=value['cluster-master']['hostname'][0].replace(".","-")37 f=open(file,'w')38 f.write("---\n")39 f.write("cm_private_dns: %s\n" % (cm_private_dns))40 f.write("app_loc: '{{ \"master-apps\" if ansible_hostname == \"%s\" else \"deployment-apps\" }}'\n" % hostname)41 f.write("app_loc_outputs: '{{ \"apps\" if ansible_hostname == \"%s\" else \"deployment-apps\" }}'\n" % hostname)42 f.close()43updateYaml(results,ansible_vars)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!