Best Python code snippet using playwright-python
mod_turnByTurn.py
Source:mod_turnByTurn.py
1# -*- coding: utf-8 -*-2#----------------------------------------------------------------------------3# A turn by turn navigation module.4#----------------------------------------------------------------------------5# Copyright 2007, Oliver White6#7# This program is free software: you can redistribute it and/or modify8# it under the terms of the GNU General Public License as published by9# the Free Software Foundation, either version 3 of the License, or10# (at your option) any later version.11#12# This program is distributed in the hope that it will be useful,13# but WITHOUT ANY WARRANTY; without even the implied warranty of14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the15# GNU General Public License for more details.16#17# You should have received a copy of the GNU General Public License18# along with this program. If not, see <http://www.gnu.org/licenses/>.19#---------------------------------------------------------------------------20from modules.base_module import RanaModule21from core import geo22from core import threads23from core import constants24from core.signal import Signal25import time26from threading import RLock27REROUTE_CHECK_INTERVAL = 5000 # in ms28# in m/s, about 46 km/h - if this speed is reached, the rerouting threshold is multiplied29# by REROUTING_THRESHOLD_MULTIPLIER30INCREASE_REROUTING_THRESHOLD_SPEED = 2031REROUTING_DEFAULT_THRESHOLD = 3032# not enabled at the moment - needs more field testing33REROUTING_THRESHOLD_MULTIPLIER = 1.034# how many times needs the threshold be crossed to35# trigger rerouting36REROUTING_TRIGGER_COUNT = 337MAX_CONSECUTIVE_AUTOMATIC_REROUTES = 338AUTOMATIC_REROUTE_COUNTER_EXPIRATION_TIME = 600 # in seconds39def getModule(*args, **kwargs):40 return TurnByTurn(*args, **kwargs)41class TurnByTurn(RanaModule):42 """A turn by turn navigation module."""43 def __init__(self, *args, **kwargs):44 RanaModule.__init__(self, *args, **kwargs)45 # initial colors46 self.navigationBoxBackground = (0, 0, 1, 0.3) # very transparent blue47 self.navigationBoxText = (1, 1, 1, 1) # non-transparent white48 self._tbt_worker_lock = RLock()49 self._tbt_worker_enabled = False50 self._go_to_initial_state()51 self._automatic_reroute_counter = 0 # counts consecutive automatic reroutes52 self._last_automatic_reroute_timestamp = time.time()53 # reroute even though the route was not yet reached (for special cases)54 self._override_route_reached = False55 # signals56 self.navigation_started = Signal()57 self.navigation_stopped = Signal()58 self.destination_reached = Signal()59 self.rerouting_triggered = Signal()60 self.current_step_changed = Signal()61 def _go_to_initial_state(self):62 """restore initial state"""63 self._route = None64 self._current_step_index_value = 065 self._current_step_indicator = None66 self._espeak_first_and_half_trigger = False67 self._espeak_first_trigger = False68 self._espeak_second_trigger = False69 self._current_distance = None70 self._current_step = None71 self._navigation_box_hidden = False72 self._m_route_length = 073 self._location_watch_id = None74 self._on_route = False75 # rerouting is enabled once the route is reached for the first time76 self._route_reached = False77 self._rerouting_threshold_multiplier = 1.078 self._rerouting_threshold_crossed_counter = 079 def firstTime(self):80 icons = self.m.get('icons', None)81 if icons:82 icons.subscribeColorInfo(self, self._colors_changed_cb)83 def _colors_changed_cb(self, colors):84 self.navigationBoxBackground = colors['navigation_box_background'].getCairoColor()85 self.navigationBoxText = colors['navigation_box_text'].getCairoColor()86 def handleMessage(self, message, messageType, args):87 if message == 'start':88 if messageType == 'ms':89 self.start_tbt(from_where=args)90 elif message == 'stop':91 self.stop_tbt()92 elif message == 'reroute': # manual rerouting93 # reset automatic reroute counter94 self._automatic_reroute_counter = 095 self._reroute()96 elif message == "toggleBoxHiding":97 self.log.info("toggling navigation box visibility")98 self._navigation_box_hidden = not self._navigation_box_hidden99 elif message == "switchToPreviousTurn":100 self.switch_to_previous_step()101 elif message == "switchToNextTurn":102 self.switch_to_next_step()103 elif message == "showMessageInsideNotification":104 if self.current_step:105 message = "<i>turn description:</i>\n%s" % self.current_step.description106 if self.dmod.has_custom_notification_support:107 self.dmod.notify(message, 7000)108 # TODO: add support for modRana notifications once they support line wrapping109 def _reroute_auto(self):110 """This function is called when automatic rerouting is triggered."""111 # check time from last automatic reroute112 dt = time.time() - self._last_automatic_reroute_timestamp113 if dt >= AUTOMATIC_REROUTE_COUNTER_EXPIRATION_TIME:114 # reset the automatic reroute counter115 self._automatic_reroute_counter = 0116 self.log.debug('automatic reroute counter expired, clearing')117 # on some routes, when we are moving away from the start of the route, it118 # is needed to reroute a couple of times before the correct way is found119 # on the other hand there should be a limit on the number of times120 # modRana reroutes in a row121 #122 # SOLUTION:123 # 1. enable automatic rerouting even though the route was not yet reached124 # (as we are moving away from it)125 # 2. do this only a limited number of times (up to 3 times in a row)126 # 3. the counter is reset by manual rerouting, by reaching the route or after 10 minutes127 if self._automatic_reroute_counter < MAX_CONSECUTIVE_AUTOMATIC_REROUTES:128 self.log.debug('faking that route was reached to enable new rerouting')129 self._override_route_reached = True130 else:131 self.log.info('tbt: too many consecutive reroutes (%d),', self._automatic_reroute_counter)132 self.log.info('reach the route to enable automatic rerouting')133 self.log.info('or reroute manually')134 # increment the automatic reroute counter & update the timestamp135 self._automatic_reroute_counter += 1136 self._last_automatic_reroute_timestamp = time.time()137 # trigger rerouting138 self._reroute()139 self.rerouting_triggered()140 def _reroute(self):141 # 1. say rerouting is in progress142 voiceMessage = "rerouting"143 voice = self.m.get('voice', None)144 if voice:145 voice.say(voiceMessage, "en") # make sure rerouting said with english voice146 time.sleep(2) # TODO: improve this147 route = self.m.get('route', None)148 if route:149 # 2. get a new route from current position to destination150 route.reroute()151 # 3. restart routing for to this new route from the closest point152 self.sendMessage("ms:turnByTurn:start:closest")153 @property154 def automatic_rerouting_enabled(self):155 """Report if automatic rerouting is enabled.156 Automatic rerouting is considered to be enabled if the reroutingThreshold key157 holds a value that is not None (in general a string representation of a floating158 point number).159 :return: if automatic rerouting is enabled160 :rtype: bool161 """162 return self.get('reroutingThreshold', REROUTING_DEFAULT_THRESHOLD)163 def _say_turn(self, message, distanceInMeters, force_language_code=False):164 """Say a text-to-speech message about a turn.165 This basically wraps the simple say method from voice and adds some more information,166 like current distance to the turn.167 """168 voice = self.m.get('voice', None)169 units = self.m.get('units', None)170 if voice and units:171 (dist_string, short, long) = units.humanRound(distanceInMeters)172 if dist_string == "0":173 dist_string = ""174 else:175 dist_string = '<p xml:lang="en">in <emphasis level="strong">' + dist_string + ' ' + long + '</emphasis></p><br>'176 # TODO: language specific distance strings177 text = dist_string + message178 # """ the message can contain unicode, this might cause an exception when printing it179 # in some systems (SHR-u on Neo, for example)"""180 # try:181 # print("saying: %s" % text)182 # pass183 # except UnicodeEncodeError:184 # print("voice: printing the current message to stdout failed do to unicode conversion error")185 if force_language_code:186 espeak_language_code = force_language_code187 else:188 # the espeak language code is the first part of this whitespace delimited string189 espeak_language_code = self.get('directionsLanguage', 'en en').split(" ")[0]190 return voice.say(text, espeak_language_code)191 @property192 def _max_step_index(self):193 return self._route.message_point_count - 1194 def _get_starting_step(self, which='first'):195 if self._route:196 if which == 'first':197 return self._get_step(0)198 if which == 'closest':199 return self._get_closest_step()200 def _get_closest_step(self):201 """Get the geographically closest step."""202 proj = self.m.get('projection', None) # we also need the projection module203 pos = self.get('pos', None) # and current position204 if pos and proj:205 (lat1, lon1) = pos206 temp_steps = self._route.message_points207 for step in temp_steps:208 (lat2, lon2) = step.getLL()209 step.current_distance = geo.distance(lat1, lon1, lat2, lon2) * 1000 # km to m210 closest_step = sorted(temp_steps, key=lambda x: x.current_distance)[0]211 return closest_step212 def _get_step(self, index):213 """Return steps for valid index, None otherwise."""214 max_index = self._max_step_index215 if index > max_index or index < -(max_index + 1):216 self.log.error("wrong turn index: %d, max index is: %d", index, max_index)217 return None218 else:219 return self._route.get_message_point_by_index(index)220 def _get_step_index(self, step):221 return self._route.get_message_point_index(step)222 @property223 def current_step(self):224 """Return current step."""225 return self._route.get_message_point_by_index(self._current_step_index)226 @current_step.setter227 def current_step(self, step):228 """Set a given step as current step."""229 mp_index = self._route.get_message_point_index(step)230 self._current_step_index = mp_index231 self.current_step_changed(step)232 @property233 def _current_step_index(self):234 return self._current_step_index_value235 @_current_step_index.setter236 def _current_step_index(self, index):237 self._current_step_index_value = index238 # trigger a navigation update every time current step index is set239 self._do_navigation_update()240 def switch_to_previous_step(self):241 """Switch to previous step and clean up."""242 next_index = self._current_step_index - 1243 if next_index >= 0:244 self._current_step_index = next_index245 self._espeak_first_trigger = False246 self._espeak_second_trigger = False247 self.log.info("switching to previous step")248 self.current_step_changed(self.current_step)249 else:250 self.log.info("previous step reached")251 def switch_to_next_step(self):252 """Switch to next step and clean up."""253 max_index = self._max_step_index254 next_index = self._current_step_index + 1255 if next_index <= max_index:256 self._current_step_index = next_index257 self._espeak_first_and_half_trigger = False258 self._espeak_first_trigger = False259 self._espeak_second_trigger = False260 self.log.info("switching to next step")261 self.current_step_changed(self.current_step)262 else:263 self.log.info("last step reached")264 self._last_step_reached()265 def _last_step_reached(self):266 """Handle all tasks that are needed once the last step is reached."""267 # disable automatic rerouting268 self._stop_tbt_worker()269 # automatic rerouting needs to be disabled to prevent rerouting270 # once the destination was reached271 self.destination_reached()272 def enabled(self):273 """Return if Turn by Turn navigation is enabled."""274 if self._route:275 return True276 else:277 return False278 def start_tbt(self, from_where='first'):279 """Start Turn-by-turn navigation."""280 # clean up any possible previous navigation data281 self._go_to_initial_state()282 # NOTE: turn and step are used interchangeably in the documentation283 m = self.m.get('route', None)284 if m:285 route = m.get_current_directions()286 if route: # is the route nonempty ?287 self._route = route288 # get route in radians for automatic rerouting289 self.radiansRoute = route.get_points_lle_radians(drop_elevation=True)290 # start rerouting watch291 self._start_tbt_worker()292 # for some reason the combined distance does not account for the last step293 self._m_route_length = route.length294 # some statistics295 meters_per_sec_speed = self.get('metersPerSecSpeed', None)296 dt = m.route_lookup_duration297 self.log.info("route lookup took: %f s" % dt)298 if dt and meters_per_sec_speed:299 dm = dt * meters_per_sec_speed300 self.log.info("distance traveled during lookup: %f m" % dm)301 # the duration of the road lookup and other variables are currently not used302 # in the heuristics but might be added later to make the heuristics more robust303 # now we decide if we use the closest turn, or the next one,304 # as we might be already past it and on our way to the next turn305 cs = self._get_closest_step() # get geographically closest step306 pos = self.get('pos', None) # get current position307 p_reached_dist = int(self.get('pointReachedDistance', 30)) # get the trigger distance308 next_turn_id = self._get_step_index(cs) + 1309 next_step = self._get_step(next_turn_id)310 # check if we have all the data needed for our heuristics311 self.log.info("trying to guess correct step to start navigation")312 if next_step and pos and p_reached_dist:313 (lat, lon) = pos314 (csLat, csLon) = cs.getLL()315 (nsLat, nsLon) = next_step.getLL()316 pos2next_step = geo.distance(lat, lon, nsLat, nsLon) * 1000317 pos2current_step = geo.distance(lat, lon, csLat, csLon) * 1000318 current_step2next_step = geo.distance(csLat, csLon, nsLat, nsLon) * 1000319 # self.log.debug("pos",(lat,lon))320 # self.log.debug("cs",(csLat,csLon))321 # self.log.debug("ns",(nsLat,nsLon))322 self.log.debug("position to next turn: %f m" % pos2next_step)323 self.log.debug("position to current turn: %f m" % pos2current_step)324 self.log.debug("current turn to next turn: %f m" % current_step2next_step)325 self.log.debug("turn reached trigger distance: %f m" % p_reached_dist)326 if pos2current_step > p_reached_dist:327 # this means we are out of the "capture circle" of the closest step328 # what is more distant, the closest or the next step ?329 if pos2next_step < current_step2next_step:330 # we are mostly probably already past the closest step,331 # so we switch to the next step at once332 self.log.debug("already past closest turn, switching to next turn")333 self.current_step = next_step334 # we play the message for the next step,335 # with current distance to this step,336 # to assure there is some voice output immediately after337 # getting a new route or rerouting"""338 plaintextMessage = next_step.ssml_message339 self._say_turn(plaintextMessage, pos2next_step)340 else:341 # we have probably not yet reached the closest step,342 # so we start navigation from it343 self.log.debug("closest turn not yet reached")344 self.current_step = cs345 else:346 # we are inside the "capture circle" of the closest step,347 # this means the navigation will trigger the voice message by itself348 # and correctly switch to next step349 # -> no need to switch to next step from here350 self.log.debug("inside reach distance of closest turn")351 self.current_step = cs352 else:353 # we dont have some of the data, that is needed to decide354 # if we start the navigation from the closest step of from the step that is after it355 # -> we just start from the closest step356 self.log.debug("not enough data to decide, using closest turn")357 self.current_step = cs358 self._do_navigation_update() # run a first time navigation update359 self._location_watch_id = self.watch('locationUpdated', self.location_update_cb)360 self.log.info("started and ready")361 # trigger the navigation-started signal362 self.navigation_started()363 def stop_tbt(self):364 """Stop Turn-by-Turn navigation."""365 # remove location watch366 if self._location_watch_id:367 self.removeWatch(self._location_watch_id)368 # cleanup369 self._go_to_initial_state()370 self._stop_tbt_worker()371 self.log.info("stopped")372 self.navigation_stopped()373 def location_update_cb(self, key, newValue, oldValue):374 """Position changed, do a tbt navigation update."""375 # TODO: use a Signal for this ?376 if key == "locationUpdated": # just to be sure377 self._do_navigation_update()378 else:379 self.log.error("invalid key: %r", key)380 def _do_navigation_update(self):381 """Do a navigation update."""382 # make sure there really are some steps383 if not self._route:384 self.log.error("no route")385 return386 pos = self.get('pos', None)387 if pos is None:388 self.log.error("skipping update, invalid position")389 return390 # get/compute/update necessary the values391 lat1, lon1 = pos392 lat2, lon2 = self.current_step.getLL()393 current_distance = geo.distance(lat1, lon1, lat2, lon2) * 1000 # km to m394 self._current_distance = current_distance # update current distance395 # use some sane minimum distance396 distance = int(self.get('minAnnounceDistance', 100))397 # GHK: make distance speed-sensitive398 #399 # I came up with this formula after a lot of experimentation with400 # gnuplot. The idea is to give the user some simple parameters to401 # adjust yet let him have a lot of control. There are five402 # parameters in the equation:403 #404 # low_speed Speed below which the pre-announcement time is constant.405 # low_time Announcement time at and below lowSpeed.406 # high_speed Speed above which the announcement time is constant.407 # high_time Announcement time at and above highSpeed.408 # power Exponential power used in the formula; good values are 0.5-5409 #410 # The speeds are in m/s. Obviously highXXX must be greater than lowXXX.411 # If power is 1.0, announcement times increase linearly above lowSpeed.412 # If power < 1.0, times rise rapidly just above lowSpeed and more413 # gradually approaching highSpeed. If power > 1.0, times rise414 # gradually at first and rapidly near highSpeed. I like power > 1.0.415 #416 # The reasoning is that at high speeds you are probably on a417 # motorway/freeway and will need extra time to get into the proper418 # lane to take your exit. That reasoning is pretty harmless on a419 # high-speed two-lane road, but it breaks down if you are stuck in420 # heavy traffic on a four-lane freeway (like in Los Angeles421 # where I live) because you might need quite a while to work your422 # way across the traffic even though you're creeping along. But I423 # don't know a good way to detect whether you're on a multi-lane road,424 # I chose speed as an acceptable proxy.425 #426 # Regardless of speed, we always warn a certain distance ahead (see427 # "distance" above). That distance comes from the value in the current428 # step of the directions.429 #430 # BTW, if you want to use gnuplot to play with the curves, try:431 # max(a,b) = a > b ? a : b432 # min(a,b) = a < b ? a : b433 # warn(x,t1,s1,t2,s2,p) = min(t2,(max(s1,x)-s1)**p*(t2-t1)/(s2-s1)**p+t1)434 # plot [0:160][0:] warn(x,10,50,60,100,2.0)435 #436 meters_per_sec_speed = self.get('metersPerSecSpeed', None)437 point_reached_distance = int(self.get('pointReachedDistance', 30))438 if meters_per_sec_speed:439 # check if we can miss the point by going too fast -> mps speed > point reached distance440 # also enlarge the rerouting threshold as it looks like it needs to be larger441 # when moving at high speed to prevent unnecessary rerouting442 if meters_per_sec_speed > point_reached_distance * 0.75:443 point_reached_distance = meters_per_sec_speed * 2444 # self.log.debug("tbt: enlarging point reached distance to: %1.2f m due to large speed (%1.2f m/s)". (pointReachedDistance, metersPerSecSpeed)445 if meters_per_sec_speed > INCREASE_REROUTING_THRESHOLD_SPEED:446 self._rerouting_threshold_multiplier = REROUTING_THRESHOLD_MULTIPLIER447 else:448 self._rerouting_threshold_multiplier = 1.0449 # speed & time based triggering450 low_speed = float(self.get('minAnnounceSpeed', 13.89))451 high_speed = float(self.get('maxAnnounceSpeed', 27.78))452 high_speed = max(high_speed, low_speed + 0.1)453 low_time = int(self.get('minAnnounceTime', 10))454 high_time = int(self.get('maxAnnounceTime', 60))455 high_time = max(high_time, low_time)456 power = float(self.get('announcePower', 2.0))457 warn_time = (max(low_speed, meters_per_sec_speed) - low_speed) ** power \458 * (high_time - low_time) / (high_speed - low_speed) ** power \459 + low_time460 warn_time = min(high_time, warn_time)461 distance = max(distance, warn_time * meters_per_sec_speed)462 if self.get('debugTbT', False):463 self.log.debug("#####")464 self.log.debug("min/max announce time: %d/%d s", low_time, high_time)465 self.log.debug("trigger distance: %1.2f m (%1.2f s warning)", distance, distance / float(meters_per_sec_speed))466 self.log.debug("current distance: %1.2f m", current_distance)467 self.log.debug("current speed: %1.2f m/s (%1.2f km/h)", meters_per_sec_speed, meters_per_sec_speed * 3.6)468 self.log.debug("point reached distance: %f m", point_reached_distance)469 self.log.debug("1. triggered=%r, 1.5. triggered=%r, 2. triggered=%r",470 self._espeak_first_trigger, self._espeak_first_and_half_trigger, self._espeak_second_trigger)471 if warn_time > 30:472 self.log.debug("optional (20 s) trigger distance: %1.2f", 20.0 * meters_per_sec_speed)473 if current_distance <= point_reached_distance:474 # this means we reached the point"""475 if self._espeak_second_trigger is False:476 self.log.debug("triggering espeak nr. 2")477 # say the message without distance478 plaintextMessage = self.current_step.ssml_message479 # consider turn said even if it was skipped (ignore errors)480 self._say_turn(plaintextMessage, 0)481 self.current_step.visited = True # mark this point as visited482 self._espeak_first_trigger = True # everything has been said, again :D483 self._espeak_second_trigger = True # everything has been said, again :D484 self.switch_to_next_step() # switch to next step485 else:486 if current_distance <= distance:487 # this means we reached an optimal distance for saying the message"""488 if self._espeak_first_trigger is False:489 self.log.debug("triggering espeak nr. 1")490 plaintextMessage = self.current_step.ssml_message491 if self._say_turn(plaintextMessage, current_distance):492 self._espeak_first_trigger = True # first message done493 if self._espeak_first_and_half_trigger is False and warn_time > 30:494 if current_distance <= (20.0 * meters_per_sec_speed):495 # in case that the warning time gets too big, add an intermediate warning at 20 seconds496 # NOTE: this means it is said after the first trigger497 plaintextMessage = self.current_step.ssml_message498 if self._say_turn(plaintextMessage, current_distance):499 self._espeak_first_and_half_trigger = True # intermediate message done500 ## automatic rerouting ##501 # is automatic rerouting enabled from options502 # enabled == threshold that is not not None503 if self.automatic_rerouting_enabled:504 # rerouting is enabled only once the route is reached for the first time505 if self._on_route and not self._route_reached:506 self._route_reached = True507 self._automatic_reroute_counter = 0508 self.log.info('route reached, rerouting enabled')509 # did the TBT worker detect that the rerouting threshold was reached ?510 if self._rerouting_conditions_met():511 # test if enough consecutive divergence point were recorded512 if self._rerouting_threshold_crossed_counter >= REROUTING_TRIGGER_COUNT:513 # reset the routeReached override514 self._override_route_reached = False515 # trigger rerouting516 self._reroute_auto()517 else:518 # reset the counter519 self._rerouting_threshold_crossed_counter = 0520 def _rerouting_conditions_met(self):521 return (self._route_reached or self._override_route_reached) and not self._on_route522 def _following_route(self):523 """Are we still following the route or is rerouting needed ?"""524 start1 = time.perf_counter()525 pos = self.get('pos', None)526 proj = self.m.get('projection', None)527 if pos and proj:528 pLat, pLon = pos529 # we use Radians to get rid of radian conversion overhead for530 # the geographic distance computation method531 radians_ll = self.radiansRoute532 pLat = geo.radians(pLat)533 pLon = geo.radians(pLon)534 if len(radians_ll) == 0:535 self.log.error("Divergence: can't follow a zero point route")536 return False537 elif len(radians_ll) == 1: # 1 point route538 aLat, aLon = radians_ll[0]539 min_distance = geo.distance_approx_radians(pLat, pLon, aLat, aLon)540 else: # 2+ points route541 aLat, aLon = radians_ll[0]542 bLat, bLon = radians_ll[1]543 min_distance = geo.distance_point_to_line_radians(pLat, pLon, aLat, aLon, bLat, bLon)544 aLat, aLon = bLat, bLon545 for point in radians_ll[1:]:546 bLat, bLon = point547 dist = geo.distance_point_to_line_radians(pLat, pLon, aLat, aLon, bLat, bLon)548 if dist < min_distance:549 min_distance = dist550 aLat, aLon = bLat, bLon551 # the multiplier tries to compensate for high speed movement552 threshold = float(553 self.get('reroutingThreshold', REROUTING_DEFAULT_THRESHOLD)) * self._rerouting_threshold_multiplier554 self.log.debug("Divergence from route: %1.2f/%1.2f m computed in %1.0f ms",555 min_distance * 1000, float(threshold), (1000 * (time.perf_counter() - start1)))556 return min_distance * 1000 < threshold557 def _start_tbt_worker(self):558 with self._tbt_worker_lock:559 # reuse previous thread or start new one560 if self._tbt_worker_enabled:561 self.log.info("reusing TBT worker thread")562 else:563 self.log.info("starting new TBT worker thread")564 t = threads.ModRanaThread(name=constants.THREAD_TBT_WORKER,565 target=self._tbt_worker)566 threads.threadMgr.add(t)567 self._tbt_worker_enabled = True568 def _stop_tbt_worker(self):569 with self._tbt_worker_lock:570 self.log.info("stopping the TBT worker thread")571 self._tbt_worker_enabled = False572 def _tbt_worker(self):573 """This function runs in its own thread and checks if we are still following the route."""574 self.log.info("TBTWorker: started")575 # The _tbt_worker_enabled variable is needed as once the end of the route is reached576 # there will be a route set but further rerouting should not be performed.577 while True:578 with self._tbt_worker_lock:579 # Either tbt has been shut down (no route is set)580 # or rerouting is no longer needed.581 if not self._route or not self._tbt_worker_enabled:582 self.log.info("TBTWorker: shutting down")583 break584 # first make sure automatic rerouting is enabled585 # eq. reroutingThreshold != None586 if self.automatic_rerouting_enabled:587 # check if we are still following the route588 # self.log.debug('TBTWorker: checking divergence from route')589 self._on_route = self._following_route()590 if self._rerouting_conditions_met():591 self.log.info('TBTWorker: divergence detected')592 # switch to quick updates593 for i in range(0, REROUTING_TRIGGER_COUNT + 1):594 time.sleep(1)595 onRoute = self._following_route()596 if onRoute: # divergence stopped597 self._on_route = onRoute598 self.log.info('TBTWorker: false alarm')599 break600 else: # still diverging from current route601 self._on_route = onRoute602 # increase divergence counter603 self._rerouting_threshold_crossed_counter += 1604 self.log.debug('TBTWorker: increasing divergence counter (%d)',605 self._rerouting_threshold_crossed_counter)606 time.sleep(REROUTE_CHECK_INTERVAL / 1000.0)607 def shutdown(self):608 # cleanup...
_page.py
Source:_page.py
...193 ),194 )195 self._channel.on(196 "route",197 lambda params: self._on_route(198 from_channel(params["route"]), from_channel(params["request"])199 ),200 )201 self._channel.on(202 "video",203 lambda params: cast(Video, self.video)._set_relative_path(204 params["relativePath"]205 ),206 )207 self._channel.on(208 "webSocket",209 lambda params: self.emit(210 Page.Events.WebSocket, from_channel(params["webSocket"])211 ),212 )213 self._channel.on(214 "worker", lambda params: self._on_worker(from_channel(params["worker"]))215 )216 def _set_browser_context(self, context: "BrowserContext") -> None:217 self._browser_context = context218 self._timeout_settings = TimeoutSettings(context._timeout_settings)219 def _on_request_failed(220 self,221 request: Request,222 response_end_timing: float,223 failure_text: str = None,224 ) -> None:225 request._failure_text = failure_text226 if request._timing:227 request._timing["responseEnd"] = response_end_timing228 self.emit(Page.Events.RequestFailed, request)229 def _on_request_finished(230 self, request: Request, response_end_timing: float231 ) -> None:232 if request._timing:233 request._timing["responseEnd"] = response_end_timing234 self.emit(Page.Events.RequestFinished, request)235 def _on_frame_attached(self, frame: Frame) -> None:236 frame._page = self237 self._frames.append(frame)238 self.emit(Page.Events.FrameAttached, frame)239 def _on_frame_detached(self, frame: Frame) -> None:240 self._frames.remove(frame)241 frame._detached = True242 self.emit(Page.Events.FrameDetached, frame)243 def _on_route(self, route: Route, request: Request) -> None:244 for handler_entry in self._routes:245 if handler_entry.matcher.matches(request.url):246 result = cast(Any, handler_entry.handler)(route, request)247 if inspect.iscoroutine(result):248 asyncio.create_task(result)249 return250 self._browser_context._on_route(route, request)251 def _on_binding(self, binding_call: "BindingCall") -> None:252 func = self._bindings.get(binding_call._initializer["name"])253 if func:254 asyncio.create_task(binding_call.call(func))255 self._browser_context._on_binding(binding_call)256 def _on_worker(self, worker: "Worker") -> None:257 self._workers.append(worker)258 worker._page = self259 self.emit(Page.Events.Worker, worker)260 def _on_close(self) -> None:261 self._is_closed = True262 self._browser_context._pages.remove(self)263 self.emit(Page.Events.Close)264 def _on_crash(self) -> None:...
two_way_env.py
Source:two_way_env.py
...75 #self.print_obs_space(ref_vehicle=self.vehicle)76 #self.print_obs_space(ref_vehicle=self.idmdp_vehicle)77 #self._set_curriculam(curriculam_reward_threshold=0.6*self.GOAL_REWARD)78 return (obs, rew, done, info)79 def _on_route(self, veh=None):80 if veh is None:81 veh = self.vehicle82 lane_ID = veh.lane_index[2]83 onroute = (lane_ID == 1)84 return onroute85 86 def _on_road(self, veh=None):87 if veh is None:88 veh = self.vehicle89 return (veh.position[0] < self.ROAD_LENGTH) and (veh.position[0] > 0)90 def _goal_achieved(self, veh=None):91 if veh is None:92 veh = self.vehicle93 return (veh.position[0] > self.config["GOAL_LENGTH"] and \94 self._on_route(veh))95 def _reward(self, action):96 """97 The vehicle is rewarded for driving with high velocity98 :param action: the action performed99 :return: the reward of the state-action transition100 """101 #neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index)102 collision_reward = self.config["COLLISION_REWARD"] * self.vehicle.crashed103 velocity_reward = self.config["VELOCITY_REWARD"] * (self.vehicle.velocity_index -1) / (self.vehicle.SPEED_COUNT - 1)104 if (velocity_reward > 0):105 velocity_reward *= self._on_route()106 goal_reward = self.config["GOAL_REWARD"]107 if self.vehicle.crashed:108 reward = collision_reward + min(0.0, velocity_reward)109 elif self._goal_achieved():110 reward = goal_reward + velocity_reward111 else:112 reward = velocity_reward113 if not self.vehicle.action_validity:114 reward = reward + self.config["INVALID_ACTION_REWARD"]115 return reward116 def _is_terminal(self):117 """118 The episode is over if the ego vehicle crashed or the time is out.119 """...
multilane_env.py
Source:multilane_env.py
...75 self.ego_x0 = None76 EnvViewer.SCREEN_HEIGHT = self.config['screen_height']77 EnvViewer.SCREEN_WIDTH = self.config['screen_width']78 self.reset()79 def _on_route(self, veh=None):80 return True81 82 def _on_road(self, veh=None):83 if veh is None:84 veh = self.vehicle85 return (veh.position[0] < self.ROAD_LENGTH) and (veh.position[0] > 0)86 def _goal_achieved(self, veh=None):87 if veh is None:88 veh = self.vehicle89 return (veh.position[0] > 0.99 * self.ROAD_LENGTH) and \90 self._on_route(veh)91 def reset(self):92 self.steps = 093 self._create_road()94 self._create_vehicles()95 return super(MultilaneEnv, self).reset()96 def step(self, action):97 self.steps += 198 self.previous_action = action99 obs, rew, done, info = super(MultilaneEnv, self).step(action)100 self.episode_travel = self.vehicle.position[0] - self.ego_x0101 return (obs, rew, done, info)102 def _create_road(self):103 """104 Create a road composed of straight adjacent lanes.105 """106 self.road = Road(network=RoadNetwork.straight_road_network(lanes=self.config["lanes_count"], length=self.ROAD_LENGTH),107 np_random=self.np_random)108 def _create_vehicles(self):109 """110 Create some new random vehicles of a given type, and add them on the road.111 """112 self.vehicle = MDPVehicle.create_random(road=self.road,113 velocity=np.random.randint(low=15,high=35),114 spacing=self.config["initial_spacing"],115 config=self.config)116 self.vehicle.is_ego_vehicle = True117 self.road.vehicles.append(self.vehicle)118 self.ego_x0 = self.vehicle.position[0]119 vehicles_type = utils.class_from_path(self.config["other_vehicles_type"])120 ahead_vehicles = self.config["vehicles_count"] // 2121 behind_vehicles = self.config["vehicles_count"] - ahead_vehicles122 for _ in range(ahead_vehicles):123 self.road.vehicles.append(vehicles_type.create_random(road=self.road,124 ahead=True,125 config=self.config)126 )127 for _ in range(behind_vehicles):128 self.road.vehicles.append(vehicles_type.create_random(road=self.road,129 ahead=False,130 config=self.config)131 )132 for _ in range(10):133 self.road.vehicles.append(Obstacle.create_random(road=self.road,134 ahead=False,135 config=self.config136 )137 )138 # Add the virtual obstacles139 lane = self.road.network.lanes_list()[0]140 x0 = lane.length/2141 position = lane.position(x0, -3.5)142 lane_index = self.road.network.get_closest_lane_index(143 position=position,144 heading=0 145 ) 146 virtual_obstacle_left = Obstacle(self.road,147 position=position,148 heading=lane.heading_at(x0),149 velocity=0,150 target_velocity=0,151 lane_index=lane_index,152 target_lane_index=lane_index, 153 enable_lane_change=False,154 config=self.config)155 virtual_obstacle_left.LENGTH = lane.length156 virtual_obstacle_left.virtual = True157 self.road.vehicles.append(virtual_obstacle_left)158 self.road.virtual_vehicles.append(virtual_obstacle_left)159 lane = self.road.network.lanes_list()[-1]160 x0 = lane.length/2161 position = lane.position(x0, 3.5)162 virtual_obstacle_right = Obstacle(self.road,163 position=position,164 heading=lane.heading_at(x0),165 velocity=0,166 target_velocity=0,167 lane_index=lane_index,168 target_lane_index=lane_index, 169 enable_lane_change=False,170 config=self.config)171 virtual_obstacle_right.LENGTH = lane.length172 virtual_obstacle_right.virtual = True173 self.road.vehicles.append(virtual_obstacle_right)174 self.road.virtual_vehicles.append(virtual_obstacle_right)175 def _reward(self, action):176 """177 The reward is defined to foster driving at high speed, on the rightmost lanes, and to avoid collisions.178 :param action: the last action performed179 :return: the corresponding reward180 """181 action_lookup = dict(map(reversed, AbstractEnv.ACTIONS.items()))182 action_reward = {action_lookup['LANE_LEFT']: self.LANE_CHANGE_REWARD, 183 action_lookup['IDLE']: 0, 184 action_lookup['LANE_RIGHT']: self.LANE_CHANGE_REWARD, 185 action_lookup['FASTER']: 0, 186 action_lookup['SLOWER']: 0,187 action_lookup['LANE_LEFT_AGGRESSIVE']: self.AGGRESSIVE_LANE_CHANGE_REWARD,188 action_lookup['LANE_RIGHT_AGGRESSIVE']: self.AGGRESSIVE_LANE_CHANGE_REWARD189 }190 neighbours = self.road.network.all_side_lanes(self.vehicle.lane_index)191 collision_reward = self.config["COLLISION_REWARD"] * self.vehicle.crashed192 velocity_reward = self.config["VELOCITY_REWARD"] * (self.vehicle.velocity_index -1) / (self.vehicle.SPEED_COUNT - 1)193 if (velocity_reward > 0):194 velocity_reward *= self._on_route()195 goal_reward = self.config["GOAL_REWARD"]196 if self.vehicle.crashed:197 reward = collision_reward + min(0.0, velocity_reward + action_reward[action])198 elif self._goal_achieved():199 reward = goal_reward + velocity_reward + action_reward[action]200 else:201 reward = velocity_reward + action_reward[action]202 return reward203 def _is_terminal(self):204 """205 The episode is over if the ego vehicle crashed or the time is out.206 """207 return self.vehicle.crashed or \208 self._goal_achieved() or \...
LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.
Get 100 minutes of automation test minutes FREE!!