Best Python code snippet using autotest_python
tasks.py
Source:tasks.py
1from __future__ import absolute_import, unicode_literals2import copy3import json4import requests5from celery import task6from django.conf import settings7from django.core.mail import send_mail8from django.http import HttpResponse9from .models import Setting10TOKEN = settings.SMART_HOME_ACCESS_TOKEN 11header = {'Authorization': 'Bearer {}'.format(settings.SMART_HOME_ACCESS_TOKEN)}12GET_ALL_CONTROLLERS = settings.SMART_HOME_API_URL13class Processor(object):14 """ ÐлаÑÑ Ð»Ð¾Ð³Ð¸ÐºÐ¸ ÑабоÑÑ Ñ ÑмнÑм домом """15 # Processor ÑвлÑеÑÑÑ ÑинглÑоном16 obj = None17 def __new__(cls, *args, **kwargs):18 if not cls.obj:19 cls.obj = super().__new__(cls)20 return cls.obj21 storage = {22 'hot_water_target_temperature': 80,23 'bedroom_target_temperature': 2124 } # Ñ
ÑанилиÑе даннÑÑ
на ÑеÑвеÑе25 @classmethod26 def _read_all_controllers(cls):27 """ ФÑнкÑÐ¸Ñ Ð¾Ð¿ÑоÑа вÑеÑ
конÑÑоллеÑов """28 try: # оÑпÑавка запÑоÑа на ÑеÑвеÑ29 r = requests.get(GET_ALL_CONTROLLERS, headers=header)30 print(f'ÐÑпÑавлен запÑÐ¾Ñ Ð¿Ð¾ адÑеÑÑ {GET_ALL_CONTROLLERS}')31 for position in r.json()['data']: # паÑÑинг оÑвеÑа Ð¾Ñ ÑеÑвеÑа32 if position['name'] not in cls.storage:33 cls.storage[position['name']] = None34 cls.storage[position['name']] = position['value'] # лÑбÑе даннÑе запиÑÑваем в Ñ
ÑанилиÑе35 try: # беÑем знаÑÐµÐ½Ð¸Ñ Ð¸Ð· ÐÐ, коÑоÑÑе заполнÑÑÑÑÑ ÑеÑез ÑоÑмÑ36 cls.storage['hot_water_target_temperature'] = Setting.objects.get(37 controller_name='hot_water_target_temperature').value38 cls.storage['bedroom_target_temperature'] = Setting.objects.get(39 controller_name='bedroom_target_temperature').value40 except Setting.DoesNotExist:41 pass42 except json.decoder.JSONDecodeError: # пÑовеÑка Ñого ÑÑо даннÑе пÑоÑли дейÑÑвиÑелÑно валиднÑе 43 print(f'ÐапÑÐ¾Ñ Ð½Ðµ доÑÑл до адÑеÑа')44 return HttpResponse(content='Bad Gateway', status=502)45 return cls.storage46 @classmethod47 def control_bedroom_light(cls, val): # вклÑÑаем ÑÐ²ÐµÑ ÐµÑли даÑÑик дÑма оÑклÑÑен(Ñм. views.py)48 if not cls.storage['smoke_detector']:49 return val50 else:51 return False52 @classmethod53 def control_bathroom_light(cls, val): # вклÑÑаем ÑÐ²ÐµÑ ÐµÑли даÑÑик дÑма оÑклÑÑен(Ñм. views.py)54 if not cls.storage['smoke_detector']:55 return val56 else:57 return False58 @classmethod59 def control_bedroom_target_temperature(cls, val): # пиÑем в ÐÐ ÑÑебÑемÑÑ ÑемпеÑаÑÑÑÑ(Ñм. views.py)60 try:61 s = Setting.objects.filter(controller_name='bedroom_target_temperature')62 if val != s[0].value:63 s.update(value=val)64 print('ÐовÑе даннÑе по ÑемпеÑаÑÑÑе в комнаÑе')65 except Setting.DoesNotExist:66 Setting.objects.create(controller_name='bedroom_target_temperature', label='ÐÐµÐ»Ð°ÐµÐ¼Ð°Ñ ÑемпеÑаÑÑÑа в ÑпалÑне',67 value=val)68 return val69 @classmethod70 def control_hot_water_target_temperature(cls, val): # пиÑем в ÐÐ ÑÑебÑемÑÑ ÑемпеÑаÑÑÑÑ(Ñм. views.py)71 try:72 s = Setting.objects.filter(controller_name='hot_water_target_temperature')73 if val != s[0].value:74 s.update(value=val)75 print('ÐовÑе даннÑе по ÑемпеÑаÑÑÑе гоÑÑÑей водÑ')76 except Setting.DoesNotExist:77 Setting.objects.create(controller_name='hot_water_target_temperature',78 label='ÐÐµÐ»Ð°ÐµÐ¼Ð°Ñ ÑемпеÑаÑÑÑа гоÑÑÑей водÑ', value=val)79 return val80 @classmethod81 def check_signalization(cls): # пÑовеÑÑем вÑе даÑÑики на налиÑие аваÑийнÑÑ
ÑиÑÑаÑий82 # логика ÑабоÑÑ Ð¿Ñи ÑемпеÑаÑÑÑе ниже ÑÑебÑемой83 if cls.storage['bedroom_temperature'] > 1.1 * cls.storage['bedroom_target_temperature']:84 cls.storage['air_conditioner'] = True85 if cls.storage['bedroom_temperature'] < 0.9 * cls.storage['bedroom_target_temperature']:86 cls.storage['air_conditioner'] = False87 # логика ÑабоÑÑ Ð¿Ñи ÑемпеÑаÑÑÑе ниже ÑÑебÑемой88 try:89 if cls.storage['boiler_temperature'] < 0.9 * cls.storage['hot_water_target_temperature']:90 cls.storage['boiler'] = True91 if cls.storage['boiler_temperature'] >= 1.1 * cls.storage['hot_water_target_temperature']:92 cls.storage['boiler'] = False93 except TypeError:94 pass95 if cls.storage['leak_detector']: # логика ÑабоÑÑ Ð¿Ñи пÑоÑеÑке96 send_mail(subject='ÐваÑÐ¸Ñ Ð´Ð¾Ð¼Ð°', message='Рдоме ÑлÑÑилаÑÑ Ð¿ÑоÑеÑка водÑ', from_email='from@example.com',97 recipient_list=[settings.EMAIL_RECEPIENT, ])98 cls.storage['cold_water'] = False99 cls.storage['hot_water'] = False100 if not cls.storage['cold_water']: # логика ÑабоÑÑ Ð¿Ñи оÑклÑÑенной Ñ
олодной воде101 cls.storage['boiler'] = False102 cls.storage['washing_machine'] = 'off'103 if cls.storage['smoke_detector']: # логика ÑабоÑÑ Ð¿Ñи вклÑÑенном даÑÑике задÑмлениÑ104 cls.storage['air_conditioner'] = False105 cls.storage['bedroom_light'] = False106 cls.storage['bathroom_light'] = False107 cls.storage['boiler'] = False108 cls.storage['washing_machine'] = 'off'109 if cls.storage['outdoor_light'] < 50 and cls.storage['bedroom_light'] is False and cls.storage['curtains'] != 'slightly_open':110 cls.storage['curtains'] = 'open'111 elif cls.storage['outdoor_light'] > 50 and cls.storage['curtains'] != 'slightly_open':112 cls.storage['curtains'] = 'close'113 @classmethod114 def _write_all_controllers(cls, some_data): # меÑод Ð´Ð»Ñ Ð·Ð°Ð¿Ð¸Ñи даннÑÑ
в конÑÑоллеÑÑ115 data_in_controllers = []116 for key in some_data.keys():117 data_in_controllers.append({'name': key, 'value': some_data[key]})118 data_to_send = json.dumps({'controllers': data_in_controllers})119 requests.post(GET_ALL_CONTROLLERS, data=data_to_send, headers=header)120 print('ÐапÑÐ¾Ñ Ð¾ÑпÑавлен на ÑеÑвеÑ')121@task()122def smart_home_manager():123 d1 = copy.deepcopy(Processor._read_all_controllers()) # полÑÑÐ°Ñ Ð´Ð°Ð½Ð½Ñе из запÑоÑа к API124 print('ÐаннÑе полÑÑеннÑе Ð¾Ñ ÑеÑвеÑа:')125 print(d1)126 Processor.check_signalization()127 d2 = copy.deepcopy(Processor.storage)128 print('ÐаннÑе поÑле обÑабоÑки:')129 print(d2)130 for key in d1:131 if d1[key] == d2[key]:132 d2.pop(key, None)133 if len(d2) > 0:134 Processor._write_all_controllers(d2)135 print('ÐаннÑе оÑпÑÐ°Ð²Ð»ÐµÐ½Ñ Ð½Ð° ÑеÑвеÑ')136 print('ÐаннÑе Ð´Ð»Ñ Ð¾ÑпÑавки:')137 print(d2)138 else:...
control_arbiter.py
Source:control_arbiter.py
1#!/usr/bin/env python2import rospy3import roslib4import threading5from controller.msg import ControlDynamixelContinuousAngleConfig6from dynamixel_servo.msg import DynamixelContinuousAngleConfig7from controller.msg import ControlDynamixelFullConfig8from dynamixel_servo.msg import DynamixelFullConfig9from controller.msg import ControlDynamixelJointConfig10from dynamixel_servo.msg import DynamixelJointConfig11from controller.msg import ControlDynamixelWheelConfig12from dynamixel_servo.msg import DynamixelWheelConfig13from controller.msg import ControlThrustConfig14from std_msgs.msg import Float64, Bool, String15from controller.srv import register_controller, register_controllerResponse16from controller.srv import request_controller, request_controllerResponse17from controller.srv import FloatMode18from controller.srv import get_all_controllers, get_all_controllersResponse19from kill_handling.listener import KillListener20from kill_handling.broadcaster import KillBroadcaster21PORT = ControlThrustConfig.PORT22STARBOARD = ControlThrustConfig.STARBOARD23ZERO_PWM = 1.5e-324# TODO25# Add service to get all avalable controllers26# Find way that servos and thruster call backs can all fire simltaniously but still27# are thread locked with request_controller28#Notes29# Switching controllers happens in a different thread than the thrusters getting published30# It is possible that after a switch occurs and the zero thrusters method has been called31# the other thread will still call its last cb so thread lock32class control_arbiter:33 def __init__(self):34 # Initilize ros35 rospy.init_node('control_arbiter')36 # Vars37 self.controller = 'none'38 self.controllers = []39 self.floating = False40 self.killed = False41 self.continuous_angle_lock = threading.Lock()42 self.joint_lock = threading.Lock()43 self.full_lock = threading.Lock()44 self.wheel_lock = threading.Lock()45 self.thrust_lock = threading.Lock()46 # Services47 self.request_controller_srv = rospy.Service('request_controller', request_controller,48 self.request_controller_cb)49 self.register_controller_srv = rospy.Service('register_controller', register_controller,50 self.register_controller_cb)51 self.float_srv = rospy.Service('float_mode', FloatMode, 52 self.set_float_mode)53 self.get_all_controllers_srv = rospy.Service('get_all_controllers', get_all_controllers,54 self.get_all_controllers_cb)55 # Publishers56 self.continuous_angle_pub = rospy.Publisher('/dynamixel/dynamixel_continuous_angle_config', 57 DynamixelContinuousAngleConfig, queue_size = 10)58 self.full_pub = rospy.Publisher('/dynamixel/dynamixel_full_config', 59 DynamixelFullConfig, queue_size = 10)60 self.joint_pub = rospy.Publisher('/dynamixel/dynamixel_joint_config', 61 DynamixelJointConfig, queue_size = 10)62 self.wheel_pub = rospy.Publisher('/dynamixel/dynamixel_wheel_config', 63 DynamixelWheelConfig, queue_size = 10)64 self.port_pub = rospy.Publisher('/stm32f3discovery_imu_driver/pwm1', 65 Float64, queue_size = 10)66 self.starboard_pub = rospy.Publisher('/stm32f3discovery_imu_driver/pwm2', 67 Float64, queue_size = 10)68 self.float_status_pub = rospy.Publisher('float_status', Bool, queue_size=1)69 self.current_controller_pub = rospy.Publisher('current_controller', String, queue_size=1)70 # Subscribers71 self.continuous_angle_sub = rospy.Subscriber('dynamixel_continuous_angle_config', 72 ControlDynamixelContinuousAngleConfig, self.continuous_angle_cb, queue_size = 10)73 self.full_sub = rospy.Subscriber('dynamixel_full_config', 74 ControlDynamixelFullConfig, self.full_cb, queue_size = 10)75 self.joint_sub = rospy.Subscriber('dynamixel_joint_config', 76 ControlDynamixelJointConfig, self.joint_cb, queue_size = 10)77 self.wheel_sub = rospy.Subscriber('dynamixel_wheel_config', 78 ControlDynamixelWheelConfig, self.wheel_cb, queue_size = 10)79 self.thruster_sub = rospy.Subscriber('thruster_config', 80 ControlThrustConfig, self.thruster_cb, queue_size = 10)81 # Timers82 self.update_timer = rospy.Timer(rospy.Duration(0.1), self.status_update)83 # Kill84 self.kill_listener = KillListener(self.set_kill_cb, self.clear_kill_cb)85 self.kill_broadcaster = KillBroadcaster(id = 'control_arbiter', description = 'control_arbiter has shutdown')86 try:87 self.kill_broadcaster.clear()88 except rospy.service.ServiceException, e:89 rospy.logwarn(str(e))90 rospy.on_shutdown(self.shutdown)91 # Utility functions92 # Zero thrusters93 def _zero_thrusters(self):94 self.port_pub.publish(Float64(ZERO_PWM))95 self.starboard_pub.publish(Float64(ZERO_PWM))96 def isValidController(self, controller):97 if controller == 'always_on':98 return True99 if controller == self.controller:100 return True101 return False102 # Servo callbacks103 def continuous_angle_cb(self, msg):104 with self.continuous_angle_lock:105 if not self.isValidController(msg.controller):106 return107 self.continuous_angle_pub.publish(msg.config)108 def full_cb(self, msg):109 with self.full_lock:110 if not self.isValidController(msg.controller):111 return112 self.full_pub.publish(msg.config)113 def joint_cb(self, msg):114 with self.joint_lock:115 if not self.isValidController(msg.controller):116 return117 self.joint_pub.publish(msg.config)118 def wheel_cb(self, msg):119 with self.wheel_lock:120 if not self.isValidController(msg.controller):121 return122 self.wheel_pub.publish(msg.config)123 # Thrust callback124 def thruster_cb(self, msg):125 with self.thrust_lock:126 if not self.isValidController(msg.controller):127 return128 if self.floating or self.killed:129 self._zero_thrusters()130 return131 if msg.id == PORT:132 self.port_pub.publish(Float64(msg.pulse_width))133 elif msg.id == STARBOARD:134 self.starboard_pub.publish(Float64(msg.pulse_width))135 else:136 rospy.logerr(str(msg.id) + ' is not a vaild thruster id, valid ids are 2 (starboard) or 3 (port)')137 # Service callbacks138 def register_controller_cb(self, request):139 response = register_controllerResponse()140 if request.controller not in self.controllers:141 response.success = True142 self.controllers.append(request.controller)143 else:144 response.success = False145 response.failure_description = '%s is already a registered controller' % request.controller146 return response147 def request_controller_cb(self, request):148 response = request_controllerResponse()149 if request.controller in self.controllers:150 response.success = True151 response.current_controller = request.controller152 153 self.continuous_angle_lock.acquire()154 self.joint_lock.acquire()155 self.full_lock.acquire()156 self.wheel_lock.acquire()157 self.thrust_lock.acquire()158 self.controller = request.controller159 self._zero_thrusters()160 161 self.continuous_angle_lock.release()162 self.joint_lock.release()163 self.full_lock.release()164 self.wheel_lock.release()165 self.thrust_lock.release()166 else:167 response.success = False168 response.current_controller = self.controller169 response.failure_description = request.controller + ' is not a registered controller. Registered controllers are ' + str(self.controllers)170 return response171 def get_all_controllers_cb(self, request):172 response = get_all_controllersResponse()173 response.controllers = self.controllers174 return response175 # Float functions176 # Used to place boat in floating mode (Thrusters are turned off)177 def set_float_mode(self, mode):178 self.floating = mode.float179 if self.floating:180 self._zero_thrusters()181 rospy.loginfo('PropaGator float-mode is ' + str(mode.float))182 return self.floating183 # Status update function184 def status_update(self, event):185 self.float_status_pub.publish(self.floating)186 self.current_controller_pub.publish(self.controller)187 # kill funcitons188 def shutdown(self):189 self.kill_broadcaster.send(True)190 def set_kill_cb(self):191 self.killed = True192 self._zero_thrusters()193 rospy.logwarn('control_arbiter killed because: %s' % self.kill_listener.get_kills())194 def clear_kill_cb(self):195 self.killed = False196 rospy.loginfo('control_arbiter unkilled')197if __name__ == '__main__':198 node = control_arbiter()...
controllers.py
Source:controllers.py
...13# License for the specific language governing permissions and limitations14# under the License.15from zadarapy.validators import verify_start_limit, verify_vc_index, \16 verify_capacity, get_parameters_options, verify_interval17def get_all_controllers(session, start=None, limit=None, return_type=None,18 **kwargs):19 """20 Retrieves details for all virtual controllers for the VPSAOS.21 :type session: zadarapy.session.Session22 :param session: A valid zadarapy.session.Session object. Required.23 :type start: int24 :param start: The offset to start displaying controllers from. Optional.25 :type: limit: int26 :param limit: The maximum number of controllers to return. Optional.27 :type return_type: str28 :param return_type: If this is set to the string 'json', this function29 will return a JSON string. Otherwise, it will return a Python30 dictionary. Optional (will return a Python dictionary by default).31 :rtype: dict, str...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!