Best Python code snippet using playwright-python
action.py
Source:action.py
1import json2import logging3import requests4from urllib.parse import urljoin5from tabulate import tabulate6from waiter import http_util, terminal7from waiter.format import format_last_request_time8from waiter.format import format_status9from waiter.querying import get_service, get_services_using_token10from waiter.querying import print_no_data, query_service, query_services, query_token11from waiter.util import is_service_current, str2bool, response_message, print_error, wait_until12def ping_on_cluster(cluster, timeout, wait_for_request, token_name, service_exists_fn):13 """Pings using the given token name (or ^SERVICE-ID#) in the given cluster."""14 cluster_name = cluster['name']15 def perform_ping():16 status = None17 try:18 default_queue_timeout_millis = 30000019 timeout_seconds = timeout if wait_for_request else 520 timeout_millis = timeout_seconds * 100021 headers = {22 'X-Waiter-Queue-Timeout': str(max(default_queue_timeout_millis, timeout_millis)),23 'X-Waiter-Token': token_name,24 'X-Waiter-Timeout': str(timeout_millis)25 }26 read_timeout = timeout_seconds if wait_for_request else (timeout_seconds + 5)27 resp = http_util.get(cluster, '/waiter-ping', headers=headers, read_timeout=read_timeout)28 logging.debug(f'Response status code: {resp.status_code}')29 resp_json = resp.json()30 if resp.status_code == 200:31 status = resp_json.get('service-state', {}).get('status')32 ping_response = resp_json['ping-response']33 ping_response_result = ping_response['result']34 if ping_response_result in ['descriptor-error', 'received-response']:35 ping_response_status = ping_response['status']36 if ping_response_status == 200:37 print(terminal.success('Ping successful.'))38 result = True39 else:40 print_error(f'Ping responded with non-200 status {ping_response_status}.')41 try:42 ping_response_waiter_error = json.loads(ping_response['body'])['waiter-error']['message']43 print_error(ping_response_waiter_error)44 except json.JSONDecodeError:45 logging.debug('Ping response is not in json format, cannot display waiter-error message.')46 except KeyError:47 logging.debug('Ping response body does not contain waiter-error message.')48 result = False49 elif ping_response_result == 'timed-out':50 if wait_for_request:51 print_error('Ping request timed out.')52 result = False53 else:54 logging.debug('ignoring ping request timeout due to --no-wait')55 result = True56 else:57 print_error(f'Encountered unknown ping result: {ping_response_result}.')58 result = False59 else:60 print_error(response_message(resp_json))61 result = False62 except Exception:63 result = False64 message = f'Encountered error while pinging in {cluster_name}.'65 logging.exception(message)66 if wait_for_request:67 print_error(message)68 return result, status69 def check_service_status():70 result = wait_until(service_exists_fn, timeout=timeout, interval=5)71 if result:72 print(f'Service is currently {format_status(result["status"])}.')73 return True74 else:75 print_error('Timeout while waiting for service to start.')76 return False77 succeeded, service_status = perform_ping()78 if succeeded:79 if service_status is None or service_status == 'Inactive':80 check_service_status()81 else:82 print(f'Service is currently {format_status(service_status)}.')83 return succeeded84def token_has_current_service(cluster, token_name, current_token_etag):85 """If the given token has a "current" service, returns that service else None"""86 services = get_services_using_token(cluster, token_name)87 if services is not None:88 services = [s for s in services if is_service_current(s, current_token_etag, token_name)]89 return services[0] if len(services) > 0 else False90 else:91 cluster_name = cluster['name']92 print_error(f'Unable to retrieve services using token {token_name} in {cluster_name}.')93 return None94def ping_token_on_cluster(cluster, token_name, timeout, wait_for_request,95 current_token_etag):96 """Pings the token with the given token name in the given cluster."""97 cluster_name = cluster['name']98 print(f'Pinging token {terminal.bold(token_name)} '99 f'in {terminal.bold(cluster_name)}...')100 return ping_on_cluster(cluster, timeout, wait_for_request,101 token_name, lambda: token_has_current_service(cluster, token_name, current_token_etag))102def service_is_active(cluster, service_id):103 """If the given service id is active, returns the service, else None"""104 service = get_service(cluster, service_id)105 return service if (service and service.get('status') != 'Inactive') else None106def ping_service_on_cluster(cluster, service_id, timeout, wait_for_request):107 """Pings the service with the given service id in the given cluster."""108 cluster_name = cluster['name']109 print(f'Pinging service {terminal.bold(service_id)} '110 f'in {terminal.bold(cluster_name)}...')111 return ping_on_cluster(cluster, timeout, wait_for_request,112 f'^SERVICE-ID#{service_id}', lambda: service_is_active(cluster, service_id))113def kill_service_on_cluster(cluster, service_id, timeout_seconds):114 """Kills the service with the given service id in the given cluster."""115 cluster_name = cluster['name']116 http_util.set_retries(0)117 try:118 print(f'Killing service {terminal.bold(service_id)} in {terminal.bold(cluster_name)}...')119 params = {'timeout': timeout_seconds * 1000}120 resp = http_util.delete(cluster, f'/apps/{service_id}', params=params, read_timeout=timeout_seconds)121 logging.debug(f'Response status code: {resp.status_code}')122 if resp.status_code == 200:123 routers_agree = resp.json().get('routers-agree')124 if routers_agree:125 print(f'Successfully killed {service_id} in {cluster_name}.')126 return True127 else:128 print(f'Successfully killed {service_id} in {cluster_name}. '129 f'Server-side timeout waiting for routers to update.')130 return False131 else:132 print_error(response_message(resp.json()))133 return False134 except requests.exceptions.ReadTimeout:135 message = f'Request timed out while killing {service_id} in {cluster_name}.'136 logging.exception(message)137 print_error(message)138 except Exception:139 message = f'Encountered error while killing {service_id} in {cluster_name}.'140 logging.exception(message)141 print_error(message)142def process_kill_request(clusters, token_name_or_service_id, is_service_id, force_flag, timeout_secs,143 no_service_result=False):144 """Kills the service(s) using the given token name or service-id.145 Returns False if no services can be found or if there was a failure in deleting any service.146 Returns True if all services using the token were deleted successfully."""147 if is_service_id:148 query_result = query_service(clusters, token_name_or_service_id)149 num_services = query_result['count']150 if num_services == 0:151 print_no_data(clusters)152 return no_service_result153 else:154 query_result = query_services(clusters, token_name_or_service_id)155 num_services = query_result['count']156 if num_services == 0:157 clusters_text = ' / '.join([terminal.bold(c['name']) for c in clusters])158 print(f'There are no services using token {terminal.bold(token_name_or_service_id)} in {clusters_text}.')159 return no_service_result160 elif num_services > 1:161 print(f'There are {num_services} services using token {terminal.bold(token_name_or_service_id)}:')162 cluster_data_pairs = sorted(query_result['clusters'].items())163 clusters_by_name = {c['name']: c for c in clusters}164 overall_success = True165 for cluster_name, data in cluster_data_pairs:166 if is_service_id:167 service = data['service']168 service['service-id'] = token_name_or_service_id169 services = [service]170 else:171 services = sorted(data['services'], key=lambda s: s.get('last-request-time', None) or '', reverse=True)172 for service in services:173 service_id = service['service-id']174 cluster = clusters_by_name[cluster_name]175 status_string = service['status']176 status = format_status(status_string)177 inactive = status_string == 'Inactive'178 should_kill = False179 if force_flag or num_services == 1:180 should_kill = True181 else:182 url = urljoin(cluster['url'], f'apps/{service_id}')183 if is_service_id:184 active_instances = service['instances']['active-instances']185 healthy_count = len([i for i in active_instances if i['healthy?']])186 unhealthy_count = len([i for i in active_instances if not i['healthy?']])187 else:188 healthy_count = service['instance-counts']['healthy-instances']189 unhealthy_count = service['instance-counts']['unhealthy-instances']190 last_request_time = format_last_request_time(service)191 run_as_user = service['service-description']['run-as-user']192 table = [['Status', status],193 ['Healthy', healthy_count],194 ['Unhealthy', unhealthy_count],195 ['URL', url],196 ['Run as user', run_as_user]]197 table_text = tabulate(table, tablefmt='plain')198 print(f'\n'199 f'=== {terminal.bold(cluster_name)} / {terminal.bold(service_id)} ===\n'200 f'\n'201 f'Last Request: {last_request_time}\n'202 f'\n'203 f'{table_text}\n')204 if not inactive:205 answer = input(f'Kill service in {terminal.bold(cluster_name)}? ')206 should_kill = str2bool(answer)207 if inactive:208 print(f'Service {terminal.bold(service_id)} on {terminal.bold(cluster_name)} cannot be killed because '209 f'it is already {status}.')210 should_kill = False211 if should_kill:212 success = kill_service_on_cluster(cluster, service_id, timeout_secs)213 overall_success = overall_success and success214 return overall_success215def token_explicitly_created_on_cluster(cluster, token_cluster_name):216 """Returns true if the given token cluster matches the configured cluster name of the given cluster"""217 cluster_settings, _ = http_util.make_data_request(cluster, lambda: http_util.get(cluster, '/settings'))218 cluster_config_name = cluster_settings['cluster-config']['name'].upper()219 created_on_this_cluster = token_cluster_name == cluster_config_name220 return created_on_this_cluster221def process_ping_request(clusters, token_name_or_service_id, is_service_id, timeout_secs, wait_for_request):222 """Pings the service using the given token name or service-id.223 Returns False if the ping request fails or times out.224 Returns True if the ping request completes successfully."""225 if is_service_id:226 query_result = query_service(clusters, token_name_or_service_id)227 else:228 query_result = query_token(clusters, token_name_or_service_id)229 if query_result['count'] == 0:230 print_no_data(clusters)231 return False232 http_util.set_retries(0)233 cluster_data_pairs = sorted(query_result['clusters'].items())234 clusters_by_name = {c['name']: c for c in clusters}235 overall_success = True236 for cluster_name, data in cluster_data_pairs:237 cluster = clusters_by_name[cluster_name]238 if is_service_id:239 success = ping_service_on_cluster(cluster, token_name_or_service_id, timeout_secs, wait_for_request)240 else:241 token_data = data['token']242 token_etag = data['etag']243 token_cluster_name = token_data['cluster'].upper()244 if len(clusters) == 1 or token_explicitly_created_on_cluster(cluster, token_cluster_name):245 success = ping_token_on_cluster(cluster, token_name_or_service_id, timeout_secs,246 wait_for_request, token_etag)247 else:248 print(f'Not pinging token {terminal.bold(token_name_or_service_id)} '249 f'in {terminal.bold(cluster_name)} '250 f'because it was created in {terminal.bold(token_cluster_name)}.')251 success = True252 overall_success = overall_success and success...
test_django.py
Source:test_django.py
...30 User.objects.all().delete()31def test_notify(bugsnag_server, django_client):32 response = django_client.get('/notes/handled-exception/?foo=strawberry')33 assert response.status_code == 20034 bugsnag_server.wait_for_request()35 payload = bugsnag_server.received[0]['json_body']36 event = payload['events'][0]37 exception = event['exceptions'][0]38 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'39 assert event['context'] == 'notes.views.handle_notify'40 assert event['severityReason'] == {'type': 'handledException'}41 assert event['device']['runtimeVersions']['django'] == django.__version__42 assert 'environment' not in payload['events'][0]['metaData']43 assert event['metaData']['request'] == {44 'method': 'GET',45 'url': 'http://testserver/notes/handled-exception/?foo=strawberry',46 'path': '/notes/handled-exception/',47 'POST': {},48 'encoding': None,49 'GET': {'foo': ['strawberry']}50 }51 assert event['user'] == {}52 assert exception['errorClass'] == 'KeyError'53 assert exception['message'] == "'nonexistent-item'"54 assert exception['stacktrace'][0] == {55 'code': {56 '39': 'def handle_notify(request):',57 '40': ' items = {}',58 '41': ' try:',59 '42': ' print("item: {}" % items["nonexistent-item"])',60 '43': ' except KeyError as e:',61 '44': " bugsnag.notify(e, unhappy='nonexistent-file')",62 '45': '',63 },64 'file': 'notes/views.py',65 'inProject': True,66 'lineNumber': 42,67 'method': 'handle_notify',68 }69def test_enable_environment(bugsnag_server, django_client):70 bugsnag.configure(send_environment=True)71 response = django_client.get('/notes/handled-exception/?foo=strawberry')72 assert response.status_code == 20073 bugsnag_server.wait_for_request()74 payload = bugsnag_server.received[0]['json_body']75 event = payload['events'][0]76 assert event['metaData']['environment']['REQUEST_METHOD'] == 'GET'77def test_notify_custom_info(bugsnag_server, django_client):78 django_client.get('/notes/handled-exception-custom/')79 bugsnag_server.wait_for_request()80 payload = bugsnag_server.received[0]['json_body']81 event = payload['events'][0]82 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'83 assert event['context'] == 'custom_info'84 assert event['severityReason'] == {'type': 'userSpecifiedSeverity'}85 assert event['severity'] == 'info'86def test_notify_post_body(bugsnag_server, django_client):87 response = django_client.post('/notes/handled-exception/',88 '{"foo": "strawberry"}',89 content_type='application/json')90 assert response.status_code == 20091 bugsnag_server.wait_for_request()92 payload = bugsnag_server.received[0]['json_body']93 event = payload['events'][0]94 exception = event['exceptions'][0]95 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'96 assert event['context'] == 'notes.views.handle_notify'97 assert event['severityReason'] == {'type': 'handledException'}98 assert event['severity'] == 'warning'99 assert event['device']['runtimeVersions']['django'] == django.__version__100 assert event['metaData']['request'] == {101 'method': 'POST',102 'url': 'http://testserver/notes/handled-exception/',103 'path': '/notes/handled-exception/',104 'GET': {},105 'encoding': None,106 'POST': {'foo': 'strawberry'}107 }108 assert event['user'] == {}109 assert exception['errorClass'] == 'KeyError'110 assert exception['message'] == "'nonexistent-item'"111 assert exception['stacktrace'][0] == {112 'code': {113 '39': 'def handle_notify(request):',114 '40': ' items = {}',115 '41': ' try:',116 '42': ' print("item: {}" % items["nonexistent-item"])',117 '43': ' except KeyError as e:',118 '44': " bugsnag.notify(e, unhappy='nonexistent-file')",119 '45': '',120 },121 'file': 'notes/views.py',122 'inProject': True,123 'lineNumber': 42,124 'method': 'handle_notify',125 }126def test_unhandled_exception(bugsnag_server, django_client):127 with pytest.raises(RuntimeError):128 django_client.get('/notes/unhandled-crash/')129 bugsnag_server.wait_for_request()130 payload = bugsnag_server.received[0]['json_body']131 event = payload['events'][0]132 exception = event['exceptions'][0]133 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'134 assert event['context'] == 'crash'135 assert event['severityReason'] == {136 'type': 'unhandledExceptionMiddleware',137 'attributes': {'framework': 'Django'}138 }139 assert event['device']['runtimeVersions']['django'] == django.__version__140 assert event['metaData']['request'] == {141 'method': 'GET',142 'url': 'http://testserver/notes/unhandled-crash/',143 'path': '/notes/unhandled-crash/',144 'POST': {},145 'encoding': None,146 'GET': {}147 }148 assert event['user'] == {}149 assert exception['errorClass'] == 'RuntimeError'150 assert exception['message'] == 'failed to return in time'151 assert exception['stacktrace'][0] == {152 'method': 'unhandled_crash',153 'file': 'notes/views.py',154 'lineNumber': 32,155 'inProject': True,156 'code': {157 '29': '',158 '30': '',159 '31': 'def unhandled_crash(request):',160 '32': " raise RuntimeError('failed to return in time')",161 '33': '',162 '34': '',163 '35': 'def unhandled_crash_in_template(request):',164 },165 }166 assert exception['stacktrace'][1]['inProject'] is False167def test_unhandled_exception_in_template(bugsnag_server, django_client):168 with pytest.raises(TemplateSyntaxError):169 django_client.get('/notes/unhandled-template-crash/')170 bugsnag_server.wait_for_request()171 payload = bugsnag_server.received[0]['json_body']172 event = payload['events'][0]173 exception = event['exceptions'][0]174 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'175 assert event['context'] == 'notes.views.unhandled_crash_in_template'176 assert event['severityReason'] == {177 'type': 'unhandledExceptionMiddleware',178 'attributes': {'framework': 'Django'}179 }180 assert event['device']['runtimeVersions']['django'] == django.__version__181 assert event['metaData']['request'] == {182 'method': 'GET',183 'url': 'http://testserver/notes/unhandled-template-crash/',184 'path': '/notes/unhandled-template-crash/',185 'POST': {},186 'encoding': None,187 'GET': {}188 }189 assert event['user'] == {}190 assert exception['errorClass'] == template_error_class191 assert 'idunno()' in exception['message']192def test_ignores_http404(bugsnag_server, django_client):193 response = django_client.get('/notes/missing_route/')194 assert response.status_code == 404195 with pytest.raises(MissingRequestError):196 bugsnag_server.wait_for_request()197 assert len(bugsnag_server.received) == 0198def test_report_error_from_http404handler(bugsnag_server, django_client):199 with pytest.raises(Exception):200 django_client.get('/notes/poorly-handled-404')201 bugsnag_server.wait_for_request()202 payload = bugsnag_server.received[0]['json_body']203 event = payload['events'][0]204 exception = event['exceptions'][0]205 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'206 assert event['context'] == 'GET /notes/poorly-handled-404'207 assert event['severityReason'] == {208 'type': 'unhandledExceptionMiddleware',209 'attributes': {'framework': 'Django'}210 }211 assert event['device']['runtimeVersions']['django'] == django.__version__212 assert event['metaData']['request'] == {213 'method': 'GET',214 'url': 'http://testserver/notes/poorly-handled-404',215 'path': '/notes/poorly-handled-404',216 'POST': {},217 'encoding': None,218 'GET': {}219 }220 assert exception['errorClass'] == 'Exception'221 assert exception['message'] == 'nah'222 assert exception['stacktrace'][0] == {223 'method': 'handler404',224 'file': 'todo/urls.py',225 'lineNumber': 11,226 'inProject': True,227 'code': {228 '8': '',229 '9': 'def handler404(request, *args, **kwargs):',230 '10': " if 'poorly-handled-404' in request.path:",231 '11': " raise Exception('nah')",232 '12': '',233 '13': " response = HttpResponseNotFound('Terrible happenings!',", # noqa: E501234 '14': " content_type='text/plain')", # noqa: E501235 },236 }237def test_notify_appends_user_data(bugsnag_server, django_client):238 django_client.login(username='test', password='hunter2')239 response = django_client.get('/notes/handled-exception/?foo=strawberry')240 assert response.status_code == 200241 bugsnag_server.wait_for_request()242 payload = bugsnag_server.received[0]['json_body']243 event = payload['events'][0]244 exception = event['exceptions'][0]245 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'246 assert event['context'] == 'notes.views.handle_notify'247 assert event['severityReason'] == {'type': 'handledException'}248 assert event['device']['runtimeVersions']['django'] == django.__version__249 assert event['metaData']['custom']['unhappy'] == 'nonexistent-file'250 assert event['metaData']['request'] == {251 'method': 'GET',252 'url': 'http://testserver/notes/handled-exception/?foo=strawberry',253 'path': '/notes/handled-exception/',254 'POST': {},255 'encoding': None,256 'GET': {'foo': ['strawberry']}257 }258 assert event['user'] == {'email': 'test@example.com', 'id': 'test'}259 assert exception['errorClass'] == 'KeyError'260 assert exception['message'] == "'nonexistent-item'"261 assert exception['stacktrace'][0] == {262 'code': {263 '39': 'def handle_notify(request):',264 '40': ' items = {}',265 '41': ' try:',266 '42': ' print("item: {}" % items["nonexistent-item"])',267 '43': ' except KeyError as e:',268 '44': " bugsnag.notify(e, unhappy='nonexistent-file')",269 '45': '',270 },271 'file': 'notes/views.py',272 'inProject': True,273 'lineNumber': 42,274 'method': 'handle_notify',275 }276def test_crash_appends_user_data(bugsnag_server, django_client):277 django_client.login(username='test', password='hunter2')278 with pytest.raises(RuntimeError):279 django_client.get('/notes/unhandled-crash/')280 bugsnag_server.wait_for_request()281 payload = bugsnag_server.received[0]['json_body']282 event = payload['events'][0]283 exception = event['exceptions'][0]284 assert payload['apiKey'] == 'a05afff2bd2ffaf0ab0f52715bbdcffd'285 assert event['context'] == 'crash'286 assert event['severityReason'] == {287 'type': 'unhandledExceptionMiddleware',288 'attributes': {'framework': 'Django'}289 }290 assert event['device']['runtimeVersions']['django'] == django.__version__291 assert event['metaData']['request'] == {292 'method': 'GET',293 'url': 'http://testserver/notes/unhandled-crash/',294 'path': '/notes/unhandled-crash/',295 'POST': {},296 'encoding': None,297 'GET': {}298 }299 assert event['user'] == {'email': 'test@example.com', 'id': 'test'}300 assert exception['errorClass'] == 'RuntimeError'301 assert exception['message'] == 'failed to return in time'302 assert exception['stacktrace'][0] == {303 'method': 'unhandled_crash',304 'file': 'notes/views.py',305 'lineNumber': 32,306 'inProject': True,307 'code': {308 '29': '',309 '30': '',310 '31': 'def unhandled_crash(request):',311 '32': " raise RuntimeError('failed to return in time')",312 '33': '',313 '34': '',314 '35': 'def unhandled_crash_in_template(request):',315 },316 }317 assert exception['stacktrace'][1]['inProject'] is False318def test_read_request_in_callback(bugsnag_server, django_client):319 with pytest.raises(RuntimeError):320 django_client.get('/notes/crash-with-callback/?user_id=foo')321 bugsnag_server.wait_for_request()322 payload = bugsnag_server.received[0]['json_body']323 event = payload['events'][0]...
better_go_for_tea_sm.py
Source:better_go_for_tea_sm.py
1#!/usr/bin/env python2import rospy3import sys4import smach5from std_msgs.msg import String6from smach_ros import IntrospectionServer, SimpleActionState, ServiceState7from move_base_msgs.msg import MoveBaseAction8from itr_tutorials.srv import Str2Coord, Str2CoordRequest9from smach import CBState10#WAITFORREQUEST STATE!!! (we got this from the slide) 11class WaitForRequest(smach.State):12 def __init__(self):13 smach.State.__init__(self, outcomes=['request_received','not_request_received','rest_needed'], #the three possible outcomes for the WAIT_FOR_REQUEST state 14 output_keys = ['out_request_loc','out_resting'],15 input_keys = ['in_resting']) #userdata allows states to communicate with each other16 17 self.request = [] #i think it makes it empty so far and changes in the execute function18 19 self.request_subs = rospy.Subscriber('/get_me_tea', String, self.request_cb) #subscribes to the topic of /get_me_tea, but where did this topic come from?20 21 def request_cb(self, msg): #what is msg?22 self.request.append(msg.data) #adds the msg to a list for a queue23 def execute(self, userdata):24 rospy.loginfo('Executing state WAIT_FOR_REQUEST with queue: ' + str(self.request))25 26 27 if self.request:28 userdata.out_request_loc = self.request.pop(0) #sets this information outside of just this state29 userdata.out_resting = False #basically says if we have a request in the queue list, we are no longer resting30 return 'request_received'31 else:32 if userdata.in_resting:33 rospy.sleep(0.5)34 return 'not_request_received'35 else:36 return 'rest_needed'37class GoForTeaSMNode:38 def __init__(self):39 pass40 def create_sm(self):41 sm = smach.StateMachine(outcomes=['succeeded', 'aborted'])42 sm.userdata.resting = True43 with sm:44 smach.StateMachine.add('WAIT_FOR_REQUEST', WaitForRequest(), #([name of state], [class name])45 transitions={'request_received': 'GO_TO_KITCHEN', #if request_received is returned, transition to GO_TO_KITCHEN 46 'not_request_received': 'WAIT_FOR_REQUEST',47 'rest_needed':'GO_TO_REST'},48 remapping = {'out_request_loc':'request_loc', #remapping format: ['inside state':'outside state'] 49 'in_resting':'resting',50 'out_resting':'resting'}) 51 smach.StateMachine.add('GO_TO_KITCHEN', GetCoordsAndMoveSM('kitchen'),52 transitions = {'succeeded':'GO_TO_REQUEST',53 'aborted':'WAIT_FOR_REQUEST'})54 smach.StateMachine.add('GO_TO_REQUEST', GetCoordsAndMoveSM(),55 transitions = {'succeeded':'WAIT_FOR_REQUEST', #we make it go to request state to see if rest is necessary56 'aborted':'WAIT_FOR_REQUEST'},57 remapping = {'in_request':'request_loc'})58 smach.StateMachine.add('GO_TO_REST', GetCoordsAndMoveSM('rest'),59 transitions = {'succeeded':'SET_RESTING',60 'aborted':'aborted'})61 #uses a callback state instead of a normal state because its simple and only has one outcome. 62 smach.StateMachine.add('SET_RESTING', CBState(self.set_resting), transitions = {'succeeded':'WAIT_FOR_REQUEST'},63 remapping = {'out_resting':'resting'})64 return sm65 @smach.cb_interface(outcomes=['succeeded'], output_keys=['out_resting'])66 def set_resting(userdata):67 userdata.out_resting = True68 return 'succeeded'69 70 def execute_sm(self):71 #this is so you can visually see the state machine72 sm = self.create_sm()73 sis = IntrospectionServer('tea_Server', sm, 'SM_ROOT') 74 sis.start()75 outcome = sm.execute()76 sis.stop()77 return outcome78#this is to simplify the redundancies in our code79class GetCoordsAndMoveSM(smach.StateMachine):80 def __init__(self, fixed_place=None):81 smach.StateMachine.__init__(self, outcomes = ['succeeded', 'aborted'], input_keys = [] if fixed_place else ['in_request']) #we know what fixed_place UNLESS from userdata82 with self:83 if fixed_place:84 #this first one is the servicestate for fixed places (rest, kitchen)85 smach.StateMachine.add('GET_COORDS', ServiceState('/str_to_coord', Str2Coord, request = Str2CoordRequest(fixed_place), #??how does it know what the fixed place coords are???86 response_slots = ['coordinates']), #must be the same thing in Str2Coord msg. the Str2Coord msg definition file has two parts: request and response. 'coordinates' is the response part87 transitions = {'succeeded':'MOVE_TO_PLACE',88 'preempted':'aborted'}, 89 remapping = {'coordinates':'place_coords'})90 else:91 #this second one is the servicestate for places from userdata92 smach.StateMachine.add('GET_COORDS', ServiceState('/str_to_coord', Str2Coord, request_slots = ['place'], #the Str2Coord service def file has two parts, and 'place' is the request part93 response_slots = ['coordinates']), #must be the same thing in Str2Coord msg. the Str2Coord msg definition file has two parts: request and response. 'coordinates' is the response part94 transitions = {'succeeded':'MOVE_TO_PLACE',95 'preempted':'aborted'}, 96 remapping = {'coordinates':'place_coords',97 'place':'in_request'})98 smach.StateMachine.add('MOVE_TO_PLACE', SimpleActionState('/move_base', MoveBaseAction,99 goal_slots = ['target_pose']),100 transitions={'succeeded':'succeeded',101 'preempted':'aborted',102 'aborted':'aborted'},103 remapping = {'target_pose':'place_coords'})104 105if __name__=='__main__':106 rospy.init_node('go_for_tea_sm', sys.argv) #name it the same as python file name107 instance = GoForTeaSMNode()108 instance.execute_sm()...
solution_better_go_for_tea_sm.py
Source:solution_better_go_for_tea_sm.py
1#!/usr/bin/env python2import rospy3import sys4import smach5from smach_ros import IntrospectionServer, SimpleActionState, ServiceState6from std_msgs.msg import String7from move_base_msgs.msg import MoveBaseAction8from itr_tutorials.srv import Str2Coord, Str2CoordRequest9from smach import CBState10class WaitForRequest(smach.State):11 def __init__(self):12 smach.State.__init__(self, outcomes=['request_received', 'not_request_received', 'rest_needed'],13 output_keys=['out_request_loc', 'out_resting'],14 input_keys=['in_resting'])15 self.request = []16 self.request_subs = rospy.Subscriber('/get_me_tea', String, self.request_cb)17 def request_cb(self, msg):18 self.request.append(msg.data)19 def execute(self, userdata):20 rospy.loginfo('Executing state WAITFORREQUEST with queue: ' + str(self.request))21 if self.request:22 userdata.out_request_loc = self.request.pop(0)23 userdata.out_resting = False24 return 'request_received'25 else:26 if userdata.in_resting:27 rospy.sleep(0.5)28 return 'not_request_received'29 else:30 return 'rest_needed'31class GoForTeaSMNode:32 def __init__(self):33 pass34 def create_sm(self):35 sm = smach.StateMachine(outcomes=['succeeded', 'aborted'])36 sm.userdata.resting = True37 with sm:38 smach.StateMachine.add('WAIT_FOR_REQUEST', WaitForRequest(),39 transitions={'request_received': 'GO_TO_KITCHEN',40 'not_request_received': 'WAIT_FOR_REQUEST',41 'rest_needed': 'GO_TO_REST'},42 remapping={'out_request_loc': 'request_loc',43 'in_resting': 'resting',44 'out_resting': 'resting'})45 smach.StateMachine.add('GO_TO_KITCHEN', GetCoordsAndMoveSM('kitchen'),46 transitions={'succeeded': 'GO_TO_REQUEST', 'aborted': 'WAIT_FOR_REQUEST'})47 smach.StateMachine.add('GO_TO_REQUEST', GetCoordsAndMoveSM(),48 transitions={'succeeded': 'WAIT_FOR_REQUEST', 'aborted': 'WAIT_FOR_REQUEST'},49 remapping={'in_request': 'request_loc'})50 smach.StateMachine.add('GO_TO_REST', GetCoordsAndMoveSM('rest'),51 transitions={'succeeded': 'SET_RESTING', 'aborted': 'WAIT_FOR_REQUEST'})52 smach.StateMachine.add('SET_RESTING', CBState(self.set_resting), transitions={'succeeded': 'WAIT_FOR_REQUEST'},53 remapping={'out_resting': 'resting'})54 return sm55 @smach.cb_interface(outcomes=['succeeded'], output_keys=['out_resting'])56 def set_resting(userdata):57 userdata.out_resting = True58 return 'succeeded'59 def execute_sm(self):60 sm = self.create_sm()61 sis = IntrospectionServer('tea_Server', sm, 'SM_ROOT')62 sis.start()63 outcome = sm.execute()64 sis.stop()65 return outcome66class GetCoordsAndMoveSM(smach.StateMachine):67 def __init__(self, fixed_place=None):68 smach.StateMachine.__init__(self, outcomes=['succeeded', 'aborted'], input_keys=[] if fixed_place else ['in_request'])69 with self:70 if fixed_place:71 smach.StateMachine.add('GET_COORDS', ServiceState('/str_to_coord', Str2Coord,72 request=Str2CoordRequest(fixed_place),73 response_slots=['coordinates']),74 transitions={'succeeded': 'MOVE_TO_PLACE', 'preempted': 'aborted'},75 remapping={'coordinates': 'place_coords'})76 else:77 smach.StateMachine.add('GET_COORDS', ServiceState('/str_to_coord', Str2Coord,78 request_slots=['place'],79 response_slots=['coordinates']),80 transitions={'succeeded': 'MOVE_TO_PLACE', 'preempted': 'aborted'},81 remapping={'coordinates': 'place_coords',82 'place': 'in_request'})83 smach.StateMachine.add("MOVE_TO_PLACE", SimpleActionState('/move_base', MoveBaseAction,84 goal_slots=['target_pose']),85 transitions={'succeeded': 'succeeded', 'aborted': 'aborted',86 'preempted': 'aborted'},87 remapping={'target_pose': 'place_coords'})88if __name__ == "__main__":89 rospy.init_node("go_for_tea_sm", sys.argv)90 get_Tea = GoForTeaSMNode()91 get_Tea.execute_sm()...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!