How to use to_be_disabled method in Playwright Python

Best Python code snippet using playwright-python

models.py

Source:models.py Github

copy

Full Screen

1from collections import OrderedDict2import datetime3import re4import itertools5from operator import attrgetter6from hashlib import md57from moto.core import BaseBackend, BaseModel, CloudFormationModel8from moto.core.utils import unix_time, BackendDict9from moto.core import ACCOUNT_ID10from moto.utilities.paginator import paginate11from .exceptions import (12 ConsumerNotFound,13 StreamNotFoundError,14 ShardNotFoundError,15 ResourceInUseError,16 ResourceNotFoundError,17 InvalidArgumentError,18 InvalidRetentionPeriod,19 InvalidDecreaseRetention,20 InvalidIncreaseRetention,21 ValidationException,22)23from .utils import (24 compose_shard_iterator,25 compose_new_shard_iterator,26 decompose_shard_iterator,27 PAGINATION_MODEL,28)29class Consumer(BaseModel):30 def __init__(self, consumer_name, region_name, stream_arn):31 self.consumer_name = consumer_name32 self.created = unix_time()33 self.stream_arn = stream_arn34 stream_name = stream_arn.split("/")[-1]35 self.consumer_arn = f"arn:aws:kinesis:{region_name}:{ACCOUNT_ID}:stream/{stream_name}/consumer/{consumer_name}"36 def to_json(self, include_stream_arn=False):37 resp = {38 "ConsumerName": self.consumer_name,39 "ConsumerARN": self.consumer_arn,40 "ConsumerStatus": "ACTIVE",41 "ConsumerCreationTimestamp": self.created,42 }43 if include_stream_arn:44 resp["StreamARN"] = self.stream_arn45 return resp46class Record(BaseModel):47 def __init__(self, partition_key, data, sequence_number, explicit_hash_key):48 self.partition_key = partition_key49 self.data = data50 self.sequence_number = sequence_number51 self.explicit_hash_key = explicit_hash_key52 self.created_at_datetime = datetime.datetime.utcnow()53 self.created_at = unix_time(self.created_at_datetime)54 def to_json(self):55 return {56 "Data": self.data,57 "PartitionKey": self.partition_key,58 "SequenceNumber": str(self.sequence_number),59 "ApproximateArrivalTimestamp": self.created_at,60 }61class Shard(BaseModel):62 def __init__(63 self, shard_id, starting_hash, ending_hash, parent=None, adjacent_parent=None64 ):65 self._shard_id = shard_id66 self.starting_hash = starting_hash67 self.ending_hash = ending_hash68 self.records = OrderedDict()69 self.is_open = True70 self.parent = parent71 self.adjacent_parent = adjacent_parent72 @property73 def shard_id(self):74 return "shardId-{0}".format(str(self._shard_id).zfill(12))75 def get_records(self, last_sequence_id, limit):76 last_sequence_id = int(last_sequence_id)77 results = []78 secs_behind_latest = 079 for sequence_number, record in self.records.items():80 if sequence_number > last_sequence_id:81 results.append(record)82 last_sequence_id = sequence_number83 very_last_record = self.records[next(reversed(self.records))]84 secs_behind_latest = very_last_record.created_at - record.created_at85 if len(results) == limit:86 break87 millis_behind_latest = int(secs_behind_latest * 1000)88 return results, last_sequence_id, millis_behind_latest89 def put_record(self, partition_key, data, explicit_hash_key):90 # Note: this function is not safe for concurrency91 if self.records:92 last_sequence_number = self.get_max_sequence_number()93 else:94 last_sequence_number = 095 sequence_number = last_sequence_number + 196 self.records[sequence_number] = Record(97 partition_key, data, sequence_number, explicit_hash_key98 )99 return str(sequence_number)100 def get_min_sequence_number(self):101 if self.records:102 return list(self.records.keys())[0]103 return 0104 def get_max_sequence_number(self):105 if self.records:106 return list(self.records.keys())[-1]107 return 0108 def get_sequence_number_at(self, at_timestamp):109 if not self.records or at_timestamp < list(self.records.values())[0].created_at:110 return 0111 else:112 # find the last item in the list that was created before113 # at_timestamp114 r = next(115 (116 r117 for r in reversed(self.records.values())118 if r.created_at < at_timestamp119 ),120 None,121 )122 return r.sequence_number123 def to_json(self):124 response = {125 "HashKeyRange": {126 "EndingHashKey": str(self.ending_hash),127 "StartingHashKey": str(self.starting_hash),128 },129 "SequenceNumberRange": {130 "StartingSequenceNumber": str(self.get_min_sequence_number()),131 },132 "ShardId": self.shard_id,133 }134 if self.parent:135 response["ParentShardId"] = self.parent136 if self.adjacent_parent:137 response["AdjacentParentShardId"] = self.adjacent_parent138 if not self.is_open:139 response["SequenceNumberRange"]["EndingSequenceNumber"] = str(140 self.get_max_sequence_number()141 )142 return response143class Stream(CloudFormationModel):144 def __init__(self, stream_name, shard_count, retention_period_hours, region_name):145 self.stream_name = stream_name146 self.creation_datetime = datetime.datetime.now().strftime(147 "%Y-%m-%dT%H:%M:%S.%f000"148 )149 self.region = region_name150 self.account_number = ACCOUNT_ID151 self.shards = {}152 self.tags = {}153 self.status = "ACTIVE"154 self.shard_count = None155 self.init_shards(shard_count)156 self.retention_period_hours = (157 retention_period_hours if retention_period_hours else 24158 )159 self.shard_level_metrics = []160 self.encryption_type = "NONE"161 self.key_id = None162 self.consumers = []163 def delete_consumer(self, consumer_arn):164 self.consumers = [c for c in self.consumers if c.consumer_arn != consumer_arn]165 def get_consumer_by_arn(self, consumer_arn):166 return next((c for c in self.consumers if c.consumer_arn == consumer_arn), None)167 def init_shards(self, shard_count):168 self.shard_count = shard_count169 step = 2**128 // shard_count170 hash_ranges = itertools.chain(171 map(lambda i: (i, i * step, (i + 1) * step - 1), range(shard_count - 1)),172 [(shard_count - 1, (shard_count - 1) * step, 2**128)],173 )174 for index, start, end in hash_ranges:175 shard = Shard(index, start, end)176 self.shards[shard.shard_id] = shard177 def split_shard(self, shard_to_split, new_starting_hash_key):178 new_starting_hash_key = int(new_starting_hash_key)179 shard = self.shards[shard_to_split]180 if shard.starting_hash < new_starting_hash_key < shard.ending_hash:181 pass182 else:183 raise InvalidArgumentError(184 message=f"NewStartingHashKey {new_starting_hash_key} used in SplitShard() on shard {shard_to_split} in stream {self.stream_name} under account {ACCOUNT_ID} is not both greater than one plus the shard's StartingHashKey {shard.starting_hash} and less than the shard's EndingHashKey {(shard.ending_hash - 1)}."185 )186 if not shard.is_open:187 raise InvalidArgumentError(188 message=f"Shard {shard.shard_id} in stream {self.stream_name} under account {ACCOUNT_ID} has already been merged or split, and thus is not eligible for merging or splitting."189 )190 last_id = sorted(self.shards.values(), key=attrgetter("_shard_id"))[191 -1192 ]._shard_id193 # Create two new shards194 new_shard_1 = Shard(195 last_id + 1,196 starting_hash=shard.starting_hash,197 ending_hash=new_starting_hash_key - 1,198 parent=shard.shard_id,199 )200 new_shard_2 = Shard(201 last_id + 2,202 starting_hash=new_starting_hash_key,203 ending_hash=shard.ending_hash,204 parent=shard.shard_id,205 )206 self.shards[new_shard_1.shard_id] = new_shard_1207 self.shards[new_shard_2.shard_id] = new_shard_2208 shard.is_open = False209 records = shard.records210 shard.records = OrderedDict()211 for index in records:212 record = records[index]213 self.put_record(record.partition_key, record.explicit_hash_key, record.data)214 def merge_shards(self, shard_to_merge, adjacent_shard_to_merge):215 shard1 = self.shards[shard_to_merge]216 shard2 = self.shards[adjacent_shard_to_merge]217 # Validate the two shards are adjacent218 if shard1.ending_hash == (shard2.starting_hash - 1):219 pass220 elif shard2.ending_hash == (shard1.starting_hash + 1):221 pass222 else:223 raise InvalidArgumentError(adjacent_shard_to_merge)224 # Create a new shard225 last_id = sorted(self.shards.values(), key=attrgetter("_shard_id"))[226 -1227 ]._shard_id228 new_shard = Shard(229 last_id + 1,230 starting_hash=shard1.starting_hash,231 ending_hash=shard2.ending_hash,232 parent=shard1.shard_id,233 adjacent_parent=shard2.shard_id,234 )235 self.shards[new_shard.shard_id] = new_shard236 # Close the merged shards237 shard1.is_open = False238 shard2.is_open = False239 # Move all data across240 for record in shard1.records.values():241 new_shard.put_record(242 record.partition_key, record.data, record.explicit_hash_key243 )244 for record in shard2.records.values():245 new_shard.put_record(246 record.partition_key, record.data, record.explicit_hash_key247 )248 def update_shard_count(self, target_shard_count):249 current_shard_count = len([s for s in self.shards.values() if s.is_open])250 if current_shard_count == target_shard_count:251 return252 # Split shards until we have enough shards253 # AWS seems to split until we have (current * 2) shards, and then merge until we reach the target254 # That's what observable at least - the actual algorithm is probably more advanced255 #256 if current_shard_count < target_shard_count:257 open_shards = [258 (shard_id, shard)259 for shard_id, shard in self.shards.items()260 if shard.is_open261 ]262 for shard_id, shard in open_shards:263 # Split the current shard264 new_starting_hash_key = str(265 int((shard.ending_hash + shard.starting_hash) / 2)266 )267 self.split_shard(shard_id, new_starting_hash_key)268 current_shard_count = len([s for s in self.shards.values() if s.is_open])269 # If we need to reduce the shard count, merge shards until we get there270 while current_shard_count > target_shard_count:271 # Keep track of how often we need to merge to get to the target shard count272 required_shard_merges = current_shard_count - target_shard_count273 # Get a list of pairs of adjacent shards274 shard_list = sorted(275 [s for s in self.shards.values() if s.is_open],276 key=lambda x: x.starting_hash,277 )278 adjacent_shards = zip(279 [s for s in shard_list[0:-1:2]], [s for s in shard_list[1::2]]280 )281 for (shard, adjacent) in adjacent_shards:282 self.merge_shards(shard.shard_id, adjacent.shard_id)283 required_shard_merges -= 1284 if required_shard_merges == 0:285 break286 current_shard_count = len([s for s in self.shards.values() if s.is_open])287 self.shard_count = target_shard_count288 @property289 def arn(self):290 return "arn:aws:kinesis:{region}:{account_number}:stream/{stream_name}".format(291 region=self.region,292 account_number=self.account_number,293 stream_name=self.stream_name,294 )295 def get_shard(self, shard_id):296 if shard_id in self.shards:297 return self.shards[shard_id]298 else:299 raise ShardNotFoundError(shard_id, stream="")300 def get_shard_for_key(self, partition_key, explicit_hash_key):301 if not isinstance(partition_key, str):302 raise InvalidArgumentError("partition_key")303 if len(partition_key) > 256:304 raise InvalidArgumentError("partition_key")305 if explicit_hash_key:306 if not isinstance(explicit_hash_key, str):307 raise InvalidArgumentError("explicit_hash_key")308 key = int(explicit_hash_key)309 if key >= 2**128:310 raise InvalidArgumentError("explicit_hash_key")311 else:312 key = int(md5(partition_key.encode("utf-8")).hexdigest(), 16)313 for shard in self.shards.values():314 if shard.starting_hash <= key < shard.ending_hash:315 return shard316 def put_record(self, partition_key, explicit_hash_key, data):317 shard = self.get_shard_for_key(partition_key, explicit_hash_key)318 sequence_number = shard.put_record(partition_key, data, explicit_hash_key)319 return sequence_number, shard.shard_id320 def to_json(self, shard_limit=None):321 all_shards = list(self.shards.values())322 requested_shards = all_shards[0 : shard_limit or len(all_shards)]323 return {324 "StreamDescription": {325 "StreamARN": self.arn,326 "StreamName": self.stream_name,327 "StreamCreationTimestamp": self.creation_datetime,328 "StreamStatus": self.status,329 "HasMoreShards": len(requested_shards) != len(all_shards),330 "RetentionPeriodHours": self.retention_period_hours,331 "EnhancedMonitoring": [{"ShardLevelMetrics": self.shard_level_metrics}],332 "EncryptionType": self.encryption_type,333 "KeyId": self.key_id,334 "Shards": [shard.to_json() for shard in requested_shards],335 }336 }337 def to_json_summary(self):338 return {339 "StreamDescriptionSummary": {340 "StreamARN": self.arn,341 "StreamName": self.stream_name,342 "StreamStatus": self.status,343 "StreamCreationTimestamp": self.creation_datetime,344 "OpenShardCount": self.shard_count,345 }346 }347 @staticmethod348 def cloudformation_name_type():349 return "Name"350 @staticmethod351 def cloudformation_type():352 # https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesis-stream.html353 return "AWS::Kinesis::Stream"354 @classmethod355 def create_from_cloudformation_json(356 cls, resource_name, cloudformation_json, region_name, **kwargs357 ):358 properties = cloudformation_json.get("Properties", {})359 shard_count = properties.get("ShardCount", 1)360 retention_period_hours = properties.get("RetentionPeriodHours", resource_name)361 tags = {362 tag_item["Key"]: tag_item["Value"]363 for tag_item in properties.get("Tags", [])364 }365 backend = kinesis_backends[region_name]366 stream = backend.create_stream(367 resource_name, shard_count, retention_period_hours368 )369 if any(tags):370 backend.add_tags_to_stream(stream.stream_name, tags)371 return stream372 @classmethod373 def update_from_cloudformation_json(374 cls, original_resource, new_resource_name, cloudformation_json, region_name375 ):376 properties = cloudformation_json["Properties"]377 if Stream.is_replacement_update(properties):378 resource_name_property = cls.cloudformation_name_type()379 if resource_name_property not in properties:380 properties[resource_name_property] = new_resource_name381 new_resource = cls.create_from_cloudformation_json(382 properties[resource_name_property], cloudformation_json, region_name383 )384 properties[resource_name_property] = original_resource.name385 cls.delete_from_cloudformation_json(386 original_resource.name, cloudformation_json, region_name387 )388 return new_resource389 else: # No Interruption390 if "ShardCount" in properties:391 original_resource.update_shard_count(properties["ShardCount"])392 if "RetentionPeriodHours" in properties:393 original_resource.retention_period_hours = properties[394 "RetentionPeriodHours"395 ]396 if "Tags" in properties:397 original_resource.tags = {398 tag_item["Key"]: tag_item["Value"]399 for tag_item in properties.get("Tags", [])400 }401 return original_resource402 @classmethod403 def delete_from_cloudformation_json(404 cls, resource_name, cloudformation_json, region_name405 ):406 backend = kinesis_backends[region_name]407 backend.delete_stream(resource_name)408 @staticmethod409 def is_replacement_update(properties):410 properties_requiring_replacement_update = ["BucketName", "ObjectLockEnabled"]411 return any(412 [413 property_requiring_replacement in properties414 for property_requiring_replacement in properties_requiring_replacement_update415 ]416 )417 @classmethod418 def has_cfn_attr(cls, attr):419 return attr in ["Arn"]420 def get_cfn_attribute(self, attribute_name):421 from moto.cloudformation.exceptions import UnformattedGetAttTemplateException422 if attribute_name == "Arn":423 return self.arn424 raise UnformattedGetAttTemplateException()425 @property426 def physical_resource_id(self):427 return self.stream_name428class KinesisBackend(BaseBackend):429 def __init__(self, region):430 self.streams = OrderedDict()431 self.region_name = region432 def reset(self):433 region = self.region_name434 self.__dict__ = {}435 self.__init__(region)436 @staticmethod437 def default_vpc_endpoint_service(service_region, zones):438 """Default VPC endpoint service."""439 return BaseBackend.default_vpc_endpoint_service_factory(440 service_region, zones, "kinesis", special_service_name="kinesis-streams"441 )442 def create_stream(self, stream_name, shard_count, retention_period_hours):443 if stream_name in self.streams:444 raise ResourceInUseError(stream_name)445 stream = Stream(446 stream_name, shard_count, retention_period_hours, self.region_name447 )448 self.streams[stream_name] = stream449 return stream450 def describe_stream(self, stream_name):451 if stream_name in self.streams:452 return self.streams[stream_name]453 else:454 raise StreamNotFoundError(stream_name)455 def describe_stream_summary(self, stream_name):456 return self.describe_stream(stream_name)457 def list_streams(self):458 return self.streams.values()459 def delete_stream(self, stream_name):460 if stream_name in self.streams:461 return self.streams.pop(stream_name)462 raise StreamNotFoundError(stream_name)463 def get_shard_iterator(464 self,465 stream_name,466 shard_id,467 shard_iterator_type,468 starting_sequence_number,469 at_timestamp,470 ):471 # Validate params472 stream = self.describe_stream(stream_name)473 try:474 shard = stream.get_shard(shard_id)475 except ShardNotFoundError:476 raise ResourceNotFoundError(477 message=f"Shard {shard_id} in stream {stream_name} under account {ACCOUNT_ID} does not exist"478 )479 shard_iterator = compose_new_shard_iterator(480 stream_name,481 shard,482 shard_iterator_type,483 starting_sequence_number,484 at_timestamp,485 )486 return shard_iterator487 def get_records(self, shard_iterator, limit):488 decomposed = decompose_shard_iterator(shard_iterator)489 stream_name, shard_id, last_sequence_id = decomposed490 stream = self.describe_stream(stream_name)491 shard = stream.get_shard(shard_id)492 records, last_sequence_id, millis_behind_latest = shard.get_records(493 last_sequence_id, limit494 )495 next_shard_iterator = compose_shard_iterator(496 stream_name, shard, last_sequence_id497 )498 return next_shard_iterator, records, millis_behind_latest499 def put_record(500 self,501 stream_name,502 partition_key,503 explicit_hash_key,504 data,505 ):506 stream = self.describe_stream(stream_name)507 sequence_number, shard_id = stream.put_record(508 partition_key, explicit_hash_key, data509 )510 return sequence_number, shard_id511 def put_records(self, stream_name, records):512 stream = self.describe_stream(stream_name)513 response = {"FailedRecordCount": 0, "Records": []}514 for record in records:515 partition_key = record.get("PartitionKey")516 explicit_hash_key = record.get("ExplicitHashKey")517 data = record.get("Data")518 sequence_number, shard_id = stream.put_record(519 partition_key, explicit_hash_key, data520 )521 response["Records"].append(522 {"SequenceNumber": sequence_number, "ShardId": shard_id}523 )524 return response525 def split_shard(self, stream_name, shard_to_split, new_starting_hash_key):526 stream = self.describe_stream(stream_name)527 if not re.match("[a-zA-Z0-9_.-]+", shard_to_split):528 raise ValidationException(529 value=shard_to_split,530 position="shardToSplit",531 regex_to_match="[a-zA-Z0-9_.-]+",532 )533 if shard_to_split not in stream.shards:534 raise ShardNotFoundError(shard_id=shard_to_split, stream=stream_name)535 if not re.match(r"0|([1-9]\d{0,38})", new_starting_hash_key):536 raise ValidationException(537 value=new_starting_hash_key,538 position="newStartingHashKey",539 regex_to_match=r"0|([1-9]\d{0,38})",540 )541 stream.split_shard(shard_to_split, new_starting_hash_key)542 def merge_shards(self, stream_name, shard_to_merge, adjacent_shard_to_merge):543 stream = self.describe_stream(stream_name)544 if shard_to_merge not in stream.shards:545 raise ShardNotFoundError(shard_to_merge, stream=stream_name)546 if adjacent_shard_to_merge not in stream.shards:547 raise ShardNotFoundError(adjacent_shard_to_merge, stream=stream_name)548 stream.merge_shards(shard_to_merge, adjacent_shard_to_merge)549 def update_shard_count(self, stream_name, target_shard_count):550 stream = self.describe_stream(stream_name)551 current_shard_count = len([s for s in stream.shards.values() if s.is_open])552 stream.update_shard_count(target_shard_count)553 return current_shard_count554 @paginate(pagination_model=PAGINATION_MODEL)555 def list_shards(self, stream_name):556 stream = self.describe_stream(stream_name)557 shards = sorted(stream.shards.values(), key=lambda x: x.shard_id)558 return [shard.to_json() for shard in shards]559 def increase_stream_retention_period(self, stream_name, retention_period_hours):560 stream = self.describe_stream(stream_name)561 if retention_period_hours < 24:562 raise InvalidRetentionPeriod(retention_period_hours, too_short=True)563 if retention_period_hours > 8760:564 raise InvalidRetentionPeriod(retention_period_hours, too_short=False)565 if retention_period_hours < stream.retention_period_hours:566 raise InvalidIncreaseRetention(567 name=stream_name,568 requested=retention_period_hours,569 existing=stream.retention_period_hours,570 )571 stream.retention_period_hours = retention_period_hours572 def decrease_stream_retention_period(self, stream_name, retention_period_hours):573 stream = self.describe_stream(stream_name)574 if retention_period_hours < 24:575 raise InvalidRetentionPeriod(retention_period_hours, too_short=True)576 if retention_period_hours > 8760:577 raise InvalidRetentionPeriod(retention_period_hours, too_short=False)578 if retention_period_hours > stream.retention_period_hours:579 raise InvalidDecreaseRetention(580 name=stream_name,581 requested=retention_period_hours,582 existing=stream.retention_period_hours,583 )584 stream.retention_period_hours = retention_period_hours585 def list_tags_for_stream(586 self, stream_name, exclusive_start_tag_key=None, limit=None587 ):588 stream = self.describe_stream(stream_name)589 tags = []590 result = {"HasMoreTags": False, "Tags": tags}591 for key, val in sorted(stream.tags.items(), key=lambda x: x[0]):592 if limit and len(tags) >= limit:593 result["HasMoreTags"] = True594 break595 if exclusive_start_tag_key and key < exclusive_start_tag_key:596 continue597 tags.append({"Key": key, "Value": val})598 return result599 def add_tags_to_stream(self, stream_name, tags):600 stream = self.describe_stream(stream_name)601 stream.tags.update(tags)602 def remove_tags_from_stream(self, stream_name, tag_keys):603 stream = self.describe_stream(stream_name)604 for key in tag_keys:605 if key in stream.tags:606 del stream.tags[key]607 def enable_enhanced_monitoring(self, stream_name, shard_level_metrics):608 stream = self.describe_stream(stream_name)609 current_shard_level_metrics = stream.shard_level_metrics610 desired_metrics = list(set(current_shard_level_metrics + shard_level_metrics))611 stream.shard_level_metrics = desired_metrics612 return current_shard_level_metrics, desired_metrics613 def disable_enhanced_monitoring(self, stream_name, to_be_disabled):614 stream = self.describe_stream(stream_name)615 current_metrics = stream.shard_level_metrics616 if "ALL" in to_be_disabled:617 desired_metrics = []618 else:619 desired_metrics = [620 metric for metric in current_metrics if metric not in to_be_disabled621 ]622 stream.shard_level_metrics = desired_metrics623 return current_metrics, desired_metrics624 def _find_stream_by_arn(self, stream_arn):625 for stream in self.streams.values():626 if stream.arn == stream_arn:627 return stream628 def list_stream_consumers(self, stream_arn):629 """630 Pagination is not yet implemented631 """632 stream = self._find_stream_by_arn(stream_arn)633 return stream.consumers634 def register_stream_consumer(self, stream_arn, consumer_name):635 consumer = Consumer(consumer_name, self.region_name, stream_arn)636 stream = self._find_stream_by_arn(stream_arn)637 stream.consumers.append(consumer)638 return consumer639 def describe_stream_consumer(self, stream_arn, consumer_name, consumer_arn):640 if stream_arn:641 stream = self._find_stream_by_arn(stream_arn)642 for consumer in stream.consumers:643 if consumer_name and consumer.consumer_name == consumer_name:644 return consumer645 if consumer_arn:646 for stream in self.streams.values():647 consumer = stream.get_consumer_by_arn(consumer_arn)648 if consumer:649 return consumer650 raise ConsumerNotFound(consumer=consumer_name or consumer_arn)651 def deregister_stream_consumer(self, stream_arn, consumer_name, consumer_arn):652 if stream_arn:653 stream = self._find_stream_by_arn(stream_arn)654 stream.consumers = [655 c for c in stream.consumers if c.consumer_name == consumer_name656 ]657 if consumer_arn:658 for stream in self.streams.values():659 # Only one stream will actually have this consumer660 # It will be a noop for other streams661 stream.delete_consumer(consumer_arn)662 def start_stream_encryption(self, stream_name, encryption_type, key_id):663 stream = self.describe_stream(stream_name)664 stream.encryption_type = encryption_type665 stream.key_id = key_id666 def stop_stream_encryption(self, stream_name):667 stream = self.describe_stream(stream_name)668 stream.encryption_type = "NONE"669 stream.key_id = None...

Full Screen

Full Screen

Cartesian_Task.py

Source:Cartesian_Task.py Github

copy

Full Screen

1#!/usr/bin/env python 2import numpy as np3import math as m4from matplotlib import path5from numpy.linalg import inv6import time7import rospy8import roslib9import tf10import geometry_msgs.msg11from cartesian_interface.pyci_all import *12#########################################################13############ AUXILIARY FUNCTIONS AND CLASSES ############14#########################################################15# colors and styles for text to be used as print(color.BOLD + 'Hello World !' + color.END)16class color:17 PURPLE = '\033[95m'18 CYAN = '\033[96m'19 DARKCYAN = '\033[36m'20 BLUE = '\033[94m'21 GREEN = '\033[92m'22 YELLOW = '\033[93m'23 RED = '\033[91m'24 BOLD = '\033[1m'25 UNDERLINE = '\033[4m'26 END = '\033[0m'27# fct giving coords of target_frame wrt reference_frame28def getPose(reference_frame, target_frame):29 listener = tf.TransformListener()30 listener.waitForTransform(reference_frame, target_frame, rospy.Time(), rospy.Duration(4.0))31 while not rospy.is_shutdown():32 try:33 now = rospy.Time(0)34 listener.waitForTransform(reference_frame, target_frame, now, rospy.Duration(4.0))35 (t,r) = listener.lookupTransform(reference_frame, target_frame, now)36 return (t, r) 37 except (tf.LookupException, tf.ConnectivityException, tf.ExtrapolationException):38 continue39# computes coefficients of a cubic with given initial and final positions and 0 velocity at beginning and end40def cubic(vi, vf, dvi, dvf, duration):41 a = ((dvi + dvf)*duration + 2*(vi-vf))/duration**342 b = (-(2*dvi + dvf)*duration + 3*(vf-vi))/duration**243 c = dvi44 d = vi45 return a, b, c, d46 47#########################################################48############## BRING TO HOME CONFIGURATION ##############49#########################################################50def homing(car, w1, w2, w3, w4, pelvis, com):51 com.disable()52 Tw1, _, _ = w1.getPoseReference()53 Tw2, _, _ = w2.getPoseReference()54 Tw3, _, _ = w3.getPoseReference()55 Tw4, _, _ = w4.getPoseReference()56 Tcar, _, _ = car.getPoseReference()57 Tw1.translation_ref()[0] = 0.3558 Tw1.translation_ref()[1] = 0.3559 Tw2.translation_ref()[0] = 0.3560 Tw2.translation_ref()[1] = -0.3561 Tw3.translation_ref()[0] = -0.3562 Tw3.translation_ref()[1] = 0.3563 Tw4.translation_ref()[0] = -0.3564 Tw4.translation_ref()[1] = -0.3565 w1.setPoseTarget(Tw1, 1.0)66 w2.setPoseTarget(Tw2, 1.0)67 w3.setPoseTarget(Tw3, 1.0)68 w4.setPoseTarget(Tw4, 1.0)69 w1.waitReachCompleted(1.5)70 w2.waitReachCompleted(1.5)71 w3.waitReachCompleted(1.5)72 w4.waitReachCompleted(1.5)73 com.enable()74 75 movecar(car, [w1, w2, w3, w4], 1.0, [pelvis, com])76 77#########################################################78######### FUNCTIONS CORRESPONDING TO PRIMITIVES #########79#########################################################80# fct for rolling along the x axis and y axis81def roll(car, wheels, distance, axis, duration, to_be_disabled, cli):82 for item in to_be_disabled:83 item.disable()84 85 Tcar = cli.getPoseFromTf('ci/car_frame', 'ci/world')86 Tw1 = cli.getPoseFromTf('ci/'+wheels[0].getDistalLink(), 'ci/car_frame')87 Tw2 = cli.getPoseFromTf('ci/'+wheels[1].getDistalLink(), 'ci/car_frame')88 Tw3 = cli.getPoseFromTf('ci/'+wheels[2].getDistalLink(), 'ci/car_frame')89 Tw4 = cli.getPoseFromTf('ci/'+wheels[3].getDistalLink(), 'ci/car_frame')90 91 x_init = Tcar.translation[0]92 y_init = Tcar.translation[1]93 94 R = Tcar.linear95 c = R[0, 0]96 s = R[1, 0]97 t = 0.098 dt = 0.01 99 if axis == 'x':100 x_goal = x_init + c*distance101 y_goal = y_init + s*distance102 elif axis == 'y':103 x_goal = x_init + s*distance104 y_goal = y_init - c*distance105 a_x, b_x, c_x, d_x = cubic(x_init, x_goal, 0., 0., duration)106 a_y, b_y, c_y, d_y = cubic(y_init, y_goal, 0., 0., duration)107 108 while t < duration:109 Tcar.translation_ref()[0] = a_x * t**3 + b_x * t**2 + c_x * t + d_x110 Tcar.translation_ref()[1] = a_y * t**3 + b_y * t**2 + c_y * t + d_y111 car.setPoseReference(Tcar) # this publishes the reference112 wheels[0].setPoseReference(Tw1)113 wheels[1].setPoseReference(Tw2)114 wheels[2].setPoseReference(Tw3)115 wheels[3].setPoseReference(Tw4)116 t += dt117 time.sleep(dt)118 for item in to_be_disabled:119 item.enable()120# fct for rolling two wheels only121def rollTwoWheelsandMoveCom(wheels, distance_wheels, com, distance_com, movecom, other_wheels, duration, to_be_disabled, cli):122 for item in to_be_disabled:123 item.disable()124 if movecom:125 com.enable() 126 127 Tw0 = cli.getPoseFromTf('ci/'+wheels[0].getDistalLink(), 'ci/car_frame')128 Tw1 = cli.getPoseFromTf('ci/'+wheels[1].getDistalLink(), 'ci/car_frame')129 Tcom = cli.getPoseFromTf('ci/'+com.getDistalLink(), 'ci/world')130 other_wheels[0].setBaseLink(u'world')131 other_wheels[1].setBaseLink(u'world')132 Tw2 = cli.getPoseFromTf('ci/'+other_wheels[0].getDistalLink(), 'ci/world')133 Tw3 = cli.getPoseFromTf('ci/'+other_wheels[1].getDistalLink(), 'ci/world')134 Tw0_trans = Tw0.translation135 Tw1_trans = Tw1.translation136 Tcom_trans = Tcom.translation137 Tw2_trans = Tw2.translation138 Tw3_trans = Tw3.translation139 a_x_w0, b_x_w0, c_x_w0, d_x_w0 = cubic(Tw0_trans[0], Tw0_trans[0] + distance_wheels, 0., 0., duration)140 a_x_w1, b_x_w1, c_x_w1, d_x_w1 = cubic(Tw1_trans[0], Tw1_trans[0] + distance_wheels, 0., 0., duration)141 a_x_com, b_x_com, c_x_com, d_x_com = cubic(Tcom_trans[0], Tcom_trans[0] + distance_com, 0., 0., duration)142 t = 0.0143 dt = 0.01144 145 while t < duration:146 Tw0.translation_ref()[0] = a_x_w0 * t**3 + b_x_w0 * t**2 + c_x_w0 * t + d_x_w0147 Tw1.translation_ref()[0] = a_x_w1 * t**3 + b_x_w1 * t**2 + c_x_w1 * t + d_x_w1148 Tw2.translation_ref()[0] = Tw2.translation[0]149 Tw3.translation_ref()[0] = Tw3.translation[0]150 Tw2.translation_ref()[1] = Tw2.translation[1]151 Tw3.translation_ref()[1] = Tw3.translation[1]152 Tw2.translation_ref()[2] = Tw2.translation[2]153 Tw3.translation_ref()[2] = Tw3.translation[2]154 if movecom:155 Tcom.translation_ref()[0] = a_x_com * t**3 + b_x_com * t**2 + c_x_com * t + d_x_com156 wheels[0].setPoseReference(Tw0)157 wheels[1].setPoseReference(Tw1)158 other_wheels[0].setPoseReference(Tw2)159 other_wheels[1].setPoseReference(Tw3)160 if movecom:161 com.setPoseReference(Tcom)162 163 t += dt164 time.sleep(dt)165 other_wheels[0].setBaseLink(u'car_frame')166 other_wheels[1].setBaseLink(u'car_frame')167 if movecom:168 com.disable()169 for item in to_be_disabled:170 item.enable()171 172# fct for spinning (i.e., rotating around the center of the sp -> rotation about the z-axis)173def spin(car, wheels, angle, duration, to_be_disabled, cli):174 for item in to_be_disabled:175 item.disable()176 Tcar = cli.getPoseFromTf('ci/car_frame', 'ci/world')177 Tw1 = cli.getPoseFromTf('ci/'+wheels[0].getDistalLink(), 'ci/car_frame')178 Tw2 = cli.getPoseFromTf('ci/'+wheels[1].getDistalLink(), 'ci/car_frame')179 Tw3 = cli.getPoseFromTf('ci/'+wheels[2].getDistalLink(), 'ci/car_frame')180 Tw4 = cli.getPoseFromTf('ci/'+wheels[3].getDistalLink(), 'ci/car_frame')181 182 R = Tcar.linear183 c_i = R[0, 0]184 s_i = R[1, 0]185 t = 0.0186 dt = 0.01187 angle_i = m.atan2(s_i, c_i)188 angle_f = angle_i + angle189 a, b, c, d = cubic(angle_i, angle_f, 0., 0., duration)190 while t < duration:191 angle = a * t**3 + b * t**2 + c * t + d192 c_t = m.cos(angle)193 s_t = m.sin(angle)194 R = np.array([[c_t, -s_t, 0.], [s_t, c_t, 0.], [0., 0., 1.]])195 for row in range(0, 3):196 for col in range(0, 3):197 Tcar.linear_ref()[row][col] = R[row][col]198 car.setPoseReference(Tcar)199 wheels[0].setPoseReference(Tw1)200 wheels[1].setPoseReference(Tw2)201 wheels[2].setPoseReference(Tw3)202 wheels[3].setPoseReference(Tw4)203 204 t += dt205 time.sleep(dt)206 for item in to_be_disabled:207 item.enable()208# fct for moving the car frame keeping the stance --- to be launched after the 4 steps (or 2) in order to bring the car frame at the center of the SP209def movecar(car, wheels, duration, to_be_disabled, cli):210 211 for item in to_be_disabled:212 item.disable()213 Tcar = cli.getPoseFromTf('ci/car_frame', 'ci/world')214 Tw1 = cli.getPoseFromTf('ci/wheel_1', 'ci/car_frame')215 Tw2 = cli.getPoseFromTf('ci/wheel_2', 'ci/car_frame')216 Tw3 = cli.getPoseFromTf('ci/wheel_3', 'ci/car_frame')217 Tw4 = cli.getPoseFromTf('ci/wheel_4', 'ci/car_frame')218 # initial position of car and wheels219 xcar_i = Tcar.translation[0]220 ycar_i = Tcar.translation[1]221 xw1_i = Tw1.translation[0]222 yw1_i = Tw1.translation[1]223 xw2_i = Tw2.translation[0]224 yw2_i = Tw2.translation[1]225 xw3_i = Tw3.translation[0]226 yw3_i = Tw3.translation[1]227 xw4_i = Tw4.translation[0]228 yw4_i = Tw4.translation[1]229 # final position of car (center of the SP)230 Tw1_world = np.matmul(Tcar.matrix(), Tw1.matrix())231 Tw2_world = np.matmul(Tcar.matrix(), Tw2.matrix())232 Tw3_world = np.matmul(Tcar.matrix(), Tw3.matrix())233 Tw4_world = np.matmul(Tcar.matrix(), Tw4.matrix())234 xcar_f = (Tw1_world[0, 3] + Tw2_world[0, 3] + Tw3_world[0, 3] + Tw4_world[0, 3])/4235 ycar_f = (Tw1_world[1, 3] + Tw2_world[1, 3] + Tw3_world[1, 3] + Tw4_world[1, 3])/4236 # final position of wheels wrt car237 xw1_f = 0.35238 yw1_f = 0.35239 xw2_f = 0.35240 yw2_f = -0.35241 xw3_f = -0.35242 yw3_f = 0.35243 xw4_f = -0.35244 yw4_f = -0.35245 246 a_carx, b_carx, c_carx, d_carx = cubic(xcar_i, xcar_f, 0., 0., duration)247 a_cary, b_cary, c_cary, d_cary = cubic(ycar_i, ycar_f, 0., 0., duration)248 a_w1x, b_w1x, c_w1x, d_w1x = cubic(xw1_i, xw1_f, 0., 0., duration)249 a_w1y, b_w1y, c_w1y, d_w1y = cubic(yw1_i, yw1_f, 0., 0., duration)250 a_w2x, b_w2x, c_w2x, d_w2x = cubic(xw2_i, xw2_f, 0., 0., duration)251 a_w2y, b_w2y, c_w2y, d_w2y = cubic(yw2_i, yw2_f, 0., 0., duration)252 a_w3x, b_w3x, c_w3x, d_w3x = cubic(xw3_i, xw3_f, 0., 0., duration)253 a_w3y, b_w3y, c_w3y, d_w3y = cubic(yw3_i, yw3_f, 0., 0., duration)254 a_w4x, b_w4x, c_w4x, d_w4x = cubic(xw4_i, xw4_f, 0., 0., duration)255 a_w4y, b_w4y, c_w4y, d_w4y = cubic(yw4_i, yw4_f, 0., 0., duration)256 257 t = 0.0258 dt = 0.01259 260 while t < duration:261 xcar_t = a_carx * t**3 + b_carx * t**2 + c_carx * t + d_carx262 ycar_t = a_cary * t**3 + b_cary * t**2 + c_cary * t + d_cary263 xw1_t = a_w1x * t**3 + b_w1x * t**2 + c_w1x * t + d_w1x264 yw1_t = a_w1y * t**3 + b_w1y * t**2 + c_w1y * t + d_w1y265 xw2_t = a_w2x * t**3 + b_w2x * t**2 + c_w2x * t + d_w2x266 yw2_t = a_w2y * t**3 + b_w2y * t**2 + c_w2y * t + d_w2y267 xw3_t = a_w3x * t**3 + b_w3x * t**2 + c_w3x * t + d_w3x268 yw3_t = a_w3y * t**3 + b_w3y * t**2 + c_w3y * t + d_w3y269 xw4_t = a_w4x * t**3 + b_w4x * t**2 + c_w4x * t + d_w4x270 yw4_t = a_w4y * t**3 + b_w4y * t**2 + c_w4y * t + d_w4y271 Tcar.translation_ref()[0] = xcar_t272 Tcar.translation_ref()[1] = ycar_t273 Tw1.translation_ref()[0] = xw1_t274 Tw1.translation_ref()[1] = yw1_t275 Tw2.translation_ref()[0] = xw2_t276 Tw2.translation_ref()[1] = yw2_t277 Tw3.translation_ref()[0] = xw3_t278 Tw3.translation_ref()[1] = yw3_t279 Tw4.translation_ref()[0] = xw4_t280 Tw4.translation_ref()[1] = yw4_t281 282 car.setPoseReference(Tcar)283 wheels[0].setPoseReference(Tw1)284 wheels[1].setPoseReference(Tw2)285 wheels[2].setPoseReference(Tw3)286 wheels[3].setPoseReference(Tw4)287 288 t += dt289 time.sleep(2*dt)290 for item in to_be_disabled:291 item.enable()292 293# fct for stepping. the step is a semi-circumference. every 0.1s, it is checked if the com lies in the support triangle294def step(moving_foot, still_feet, step_length, duration, to_be_disabled, car, com, cli, filename_pos, filename_vel):295 for item in to_be_disabled:296 item.disable()297 com.enable() 298 299 com_pos = np.loadtxt('/home/matteo/catkin_ws/src/centauro_cartesio-devel-cpack/python/' + filename_pos)300 com_vel = np.loadtxt('/home/matteo/catkin_ws/src/centauro_cartesio-devel-cpack/python/' + filename_vel)301 interval_duration = 0.1302 t = 0.00303 dt = 0.01304 period = 4.0305 radius = step_length/2306 T1 = 1.0307 T2 = 3.0308 moving_foot.setBaseLink(u'world')309 for foot in still_feet:310 foot.setBaseLink(u'world')311 Tmoving_foot = (cli.getPoseFromTf('ci/'+moving_foot.getDistalLink(), 'ci/world'))312 Tmoving_init = Tmoving_foot.translation313 Tw1 = cli.getPoseFromTf('ci/'+still_feet[0].getDistalLink(), 'ci/world')314 Tw2 = cli.getPoseFromTf('ci/'+still_feet[1].getDistalLink(), 'ci/world')315 Tw3 = cli.getPoseFromTf('ci/'+still_feet[2].getDistalLink(), 'ci/world')316 Tcar = cli.getPoseFromTf('ci/car_frame', 'ci/world')317 Tcom = cli.getPoseFromTf('ci/com', 'ci/world')318 counter = 0319 i = 0320 total = int(duration*100)321 '''322 # first "control" loop that brings the com from the current position to the one imposed by casadi323 a_x_com, b_x_com, c_x_com, d_x_com = cubic(Tcom.translation[0], data[0][0], 3.0)324 a_y_com, b_y_com, c_y_com, d_y_com = cubic(Tcom.translation[1], data[0][1], 3.0)325 a_z_com, b_z_com, c_z_com, d_z_com = cubic(Tcom.translation[2], data[0][2], 3.0)326 while t < 3.0:327 Tcom.translation_ref()[0] = a_x_com * t**3 + b_x_com * t**2 + c_x_com * t + d_x_com328 Tcom.translation_ref()[1] = a_y_com * t**3 + b_y_com * t**2 + c_y_com * t + d_y_com329 Tcom.translation_ref()[2] = a_z_com * t**3 + b_z_com * t**2 + c_z_com * t + d_z_com330 com.setPoseReference(Tcom)331 t += dt332 time.sleep(dt)333 t = 0.0334 '''335 while counter < total:336 337 if counter%10 == 0:338 com_init = com_pos[i]339 com_goal = com_pos[i+1]340 dcom_init = com_vel[i]341 dcom_goal = com_vel[i+1]342 343 a_comx, b_comx, c_comx, d_comx = cubic(com_init[0], com_goal[0], dcom_init[0], dcom_goal[0], interval_duration)344 a_comy, b_comy, c_comy, d_comy = cubic(com_init[1], com_goal[1], dcom_init[1], dcom_goal[1], interval_duration)345 a_comz, b_comz, c_comz, d_comz = cubic(com_init[2], com_goal[2], dcom_init[2], dcom_goal[2], interval_duration)346 i += 1347 print 'com_init: ' + str(com_init)348 print 'com_goal: ' + str(com_goal)349 print 'com: ' + str(Tcom.translation)350 Tcom.translation_ref()[0] = a_comx * (t-(i-1)*interval_duration)**3 + b_comx * (t-(i-1)*interval_duration)**2 + c_comx * (t-(i-1)*interval_duration) + d_comx351 Tcom.translation_ref()[1] = a_comy * (t-(i-1)*interval_duration)**3 + b_comy * (t-(i-1)*interval_duration)**2 + c_comy * (t-(i-1)*interval_duration) + d_comy352 Tcom.translation_ref()[2] = a_comz * (t-(i-1)*interval_duration)**3 + b_comz * (t-(i-1)*interval_duration)**2 + c_comz * (t-(i-1)*interval_duration) + d_comz + 0.3353 354 Tw1.translation_ref()[0] = Tw1.translation[0]355 Tw1.translation_ref()[1] = Tw1.translation[1]356 Tw1.translation_ref()[2] = Tw1.translation[2]357 Tw2.translation_ref()[0] = Tw2.translation[0]358 Tw2.translation_ref()[1] = Tw2.translation[1]359 Tw2.translation_ref()[2] = Tw2.translation[2]360 Tw3.translation_ref()[0] = Tw3.translation[0]361 Tw3.translation_ref()[1] = Tw3.translation[1]362 Tw3.translation_ref()[2] = Tw3.translation[2]363 364 if t >= T1 and t <= T2:365 delta_x = radius - radius * m.cos((t-T1)*m.pi/(T2-T1))366 delta_z = radius * m.sin((t-T1)*m.pi/(T2-T1))367 Tmoving_foot.translation_ref()[0] = delta_x + Tmoving_init[0]368 Tmoving_foot.translation_ref()[1] = Tmoving_init[1]369 Tmoving_foot.translation_ref()[2] = delta_z + Tmoving_init[2]370 else: 371 Tmoving_foot.translation_ref()[0] = Tmoving_foot.translation[0]372 Tmoving_foot.translation_ref()[1] = Tmoving_foot.translation[1]373 Tmoving_foot.translation_ref()[2] = Tmoving_foot.translation[2]374 com.setPoseReference(Tcom)375 still_feet[0].setPoseReference(Tw1)376 still_feet[1].setPoseReference(Tw2)377 still_feet[2].setPoseReference(Tw3)378 moving_foot.setPoseReference(Tmoving_foot)379 counter += 1380 381 t += dt382 time.sleep(5*dt)383 com.disable()384 for item in to_be_disabled:385 item.enable()386 moving_foot.setBaseLink(u'car_frame')387 for foot in still_feet:388 foot.setBaseLink(u'car_frame')389#########################################################390########################## MAIN #########################391#########################################################392def main():393 start_time = time.time()394 395 k = 0396 397 # load file with primitives and number of times they are applied398 plan = np.loadtxt('/home/matteo/catkin_ws/src/centauro_cartesio-devel-cpack/python/plan.txt')399 primitives = plan[:, 0]400 times = plan[:, 1]401 cli = pyci.CartesianInterfaceRos() # initialization of cartesIO402 # tasks403 com = cli.getTask('Com')404 car = cli.getTask('car_frame')405 pelvis = cli.getTask('pelvis')406 w1 = cli.getTask('wheel_1')407 w2 = cli.getTask('wheel_2')408 w3 = cli.getTask('wheel_3')409 w4 = cli.getTask('wheel_4')410 rw1 = cli.getTask('rolling_wheel_1')411 rw2 = cli.getTask('rolling_wheel_2')412 rw3 = cli.getTask('rolling_wheel_3')413 rw4 = cli.getTask('rolling_wheel_4')414 #homing(car, w1, w2, w3, w4, pelvis, com)415 416 n_primitives = len(primitives)417 # executes planner indications418 for i in range(0, n_primitives):419 420 primitive = primitives[i]421 application = times[i]422 423 if primitive == 0:424 print(color.BOLD + 'Primitive 0: clockwise spin of 10 deg for ' + str(int(application)) + ' times. (' + str(i+1) + '/' + str(n_primitives) + ')' + color.END)425 spin(car, [w1, w2, w3, w4], -m.pi/18 * application, 5.0 * application, [com], cli)426 elif primitive == 1:427 print(color.BOLD + 'Primitive 1: counter-clockwise spin of 10 deg for ' + str(int(application)) + ' times. (' + str(i+1) + '/' + str(n_primitives) + ')' + color.END)428 spin(car, [w1, w2, w3, w4], m.pi/18 * application, 5.0 * application, [com], cli)429 elif primitive == 2:430 print(color.BOLD + 'Primitive 2: forward roll of 0.05 m for ' + str(int(application)) + ' times. (' + str(i+1) + '/' + str(n_primitives) + ')' + color.END)431 roll(car, [w1, w2, w3, w4], 0.05 * application, 'x', 1.0 * application, [com], cli)432 elif primitive == 3:433 print(color.BOLD + 'Primitive 3: backward roll of 0.05 m for ' + str(int(application)) + ' times. (' + str(i+1) + '/' + str(n_primitives) + ')' + color.END)434 roll(car, [w1, w2, w3, w4], -0.05 * application, 'x', 1.0 * application, [com], cli)435 elif primitive == 4:436 print(color.BOLD + 'Primitive 4: right roll of 0.05 m for ' + str(int(application)) + ' times. (' + str(i+1) + '/' + str(n_primitives) + ')' + color.END)437 roll(car, [w1, w2, w3, w4], 0.05 * application, 'y', 1.0 * application, [com], cli)438 elif primitive == 5:439 print(color.BOLD + 'Primitive 5: left roll of 0.05 m for ' + str(int(application)) + ' times. (' + str(i+1) + '/' + str(n_primitives) + ')' + color.END)440 roll(car, [w1, w2, w3, w4], -0.05 * application, 'y', 1.0 * application, [com], cli)441 else:442 443 if k == 0:444 445 print(color.BOLD + 'Preparation to step: forward roll 0.20 m with back wheels.' + color.END)446 rollTwoWheelsandMoveCom([w3, w4], 0.2, com, 0., False, [w1, w2], 4.0, [], cli)447 #time.sleep(3.0)448 cli.update()449 #return 0450 451 k += 1452 if primitive == 6:453 print(color.BOLD + 'Primitive 6: step of 0.20 m with BR foot. (' + str(i+1) + '/' + str(n_primitives) + ')' + color.END)454 step(moving_foot = w4, still_feet = [w1, w2, w3], step_length = 0.2, duration = 4.0, to_be_disabled = [pelvis], car=car, com=com, cli=cli, filename_pos='com_traj_with_com_vel02/COMtraj_BR.txt', filename_vel = 'com_traj_with_com_vel02/DCOMtraj_BR.txt')455 elif primitive == 7:456 print(color.BOLD + 'Primitive 7: step of 0.20 m with BL foot. (' + str(i+1) + '/' + str(n_primitives) + ')' + color.END)457 step(moving_foot = w3, still_feet = [w1, w2, w4], step_length = 0.2, duration = 4.0, to_be_disabled = [pelvis], car=car, com=com, cli=cli, filename_pos='com_traj_with_com_vel02/COMtraj_BL.txt', filename_vel = 'com_traj_with_com_vel02/DCOMtraj_BL.txt')458 elif primitive == 8:459 print(color.BOLD + 'Primitive 8: step of 0.20 m with FR foot. (' + str(i+1) + '/' + str(n_primitives) + ')' + color.END)460 cli.update()461 step(moving_foot = w2, still_feet = [w1, w3, w4], step_length = 0.2, duration = 4.0, to_be_disabled = [pelvis], car=car, com=com, cli=cli, filename_pos='com_traj_with_com_vel02/COMtraj_FR.txt', filename_vel = 'com_traj_with_com_vel02/DCOMtraj_FR.txt')462 elif primitive == 9:463 print(color.BOLD + 'Primitive 9: step of 0.20 m with FL foot. (' + str(i+1) + '/' + str(n_primitives) + ')' + color.END)464 step(moving_foot = w1, still_feet = [w2, w3, w4], step_length = 0.2, duration = 4.0, to_be_disabled = [pelvis], car=car, com=com, cli=cli, filename_pos='com_traj_with_com_vel02/COMtraj_FL.txt', filename_vel = 'com_traj_with_com_vel02/DCOMtraj_FL.txt')465 466 #time.sleep(3.0)467 cli.update()468 469 if k == 4:470 print(color.BOLD + 'Conclusion of step: forward roll 0.20 m with front wheels.' + color.END)471 rollTwoWheelsandMoveCom([w1, w2], 0.2, com, 0.1, True, [w3, w4], 3.0, [pelvis], cli)472 #time.sleep(3.0)473 cli.update()474 print(color.BOLD + 'Realigning car_frame with the center of the support polygon...' + color.END)475 movecar(car, [w1, w2, w3, w4], 5.0, [pelvis], cli)476 #time.sleep(3.0)477 cli.update()478 479 k = 0480 481 print(color.BOLD + color.GREEN + 'Execution completed in ' + str(time.time() - start_time) + ' s. \n' + color.END)482 483# main484if __name__ == "__main__":485 main()...

Full Screen

Full Screen

_assertions.py

Source:_assertions.py Github

copy

Full Screen

...365 timeout: float = None,366 ) -> None:367 __tracebackhide__ = True368 await self._not.to_be_checked(timeout)369 async def to_be_disabled(370 self,371 timeout: float = None,372 ) -> None:373 __tracebackhide__ = True374 await self._expect_impl(375 "to.be.disabled",376 FrameExpectOptions(timeout=timeout),377 None,378 "Locator expected to be disabled",379 )380 async def not_to_be_disabled(381 self,382 timeout: float = None,383 ) -> None:384 __tracebackhide__ = True385 await self._not.to_be_disabled(timeout)386 async def to_be_editable(387 self,388 timeout: float = None,389 ) -> None:390 __tracebackhide__ = True391 await self._expect_impl(392 "to.be.editable",393 FrameExpectOptions(timeout=timeout),394 None,395 "Locator expected to be editable",396 )397 async def not_to_be_editable(398 self,399 timeout: float = None,...

Full Screen

Full Screen

cli.py

Source:cli.py Github

copy

Full Screen

1from __future__ import absolute_import2from __future__ import print_function3from __future__ import unicode_literals4import importlib5import logging6import signal7import sys8import traceback9import click10from django.core.management import execute_from_command_line11import kolibri12from kolibri.plugins import config13from kolibri.plugins import DEFAULT_PLUGINS14from kolibri.plugins.utils import disable_plugin15from kolibri.plugins.utils import enable_plugin16from kolibri.plugins.utils import iterate_plugins17from kolibri.utils import server18from kolibri.utils.conf import OPTIONS19from kolibri.utils.debian_check import check_debian_user20from kolibri.utils.main import initialize21from kolibri.utils.main import setup_logging22logger = logging.getLogger(__name__)23# We use Unicode strings for Python 2.7 throughout the codebase, so choosing24# to silence these warnings.25# Ref:26# https://github.com/learningequality/kolibri/pull/5494#discussion_r31805738527# https://github.com/PythonCharmers/python-future/issues/2228click.disable_unicode_literals_warning = True29def validate_module(ctx, param, value):30 if value:31 try:32 importlib.import_module(value)33 except ImportError:34 raise click.BadParameter(35 "{param} must be a valid python module import path"36 )37 return value38debug_option = click.Option(39 param_decls=["--debug"],40 default=False,41 is_flag=True,42 help="Display and log debug messages (for development)",43 envvar="KOLIBRI_DEBUG",44)45debug_database_option = click.Option(46 param_decls=["--debug-database"],47 default=False,48 is_flag=True,49 help="Display and log database queries (for development), very noisy!",50 envvar="KOLIBRI_DEBUG_LOG_DATABASE",51)52settings_option = click.Option(53 param_decls=["--settings"],54 callback=validate_module,55 help="Django settings module path",56)57pythonpath_option = click.Option(58 param_decls=["--pythonpath"],59 type=click.Path(exists=True, file_okay=False),60 help="Add a path to the Python path",61)62skip_update_option = click.Option(63 param_decls=["--skip-update"],64 default=False,65 is_flag=True,66 help="Do not run update logic. (Useful when running multiple Kolibri commands in parallel)",67)68noinput_option = click.Option(69 param_decls=["--no-input"],70 default=False,71 is_flag=True,72 help="Suppress user prompts",73)74base_params = [debug_option, debug_database_option, noinput_option]75initialize_params = base_params + [76 settings_option,77 pythonpath_option,78 skip_update_option,79]80initialize_kwargs = {param.name: param.default for param in initialize_params}81def get_initialize_params():82 try:83 return {84 k: v85 for k, v in click.get_current_context().params.items()86 if k in initialize_kwargs87 }88 except RuntimeError:89 return initialize_kwargs90class KolibriCommand(click.Command):91 """92 A command class for basic Kolibri commands that do not require93 the django stack. By default adds a debug param for logging purposes94 also invokes setup_logging before invoking the command.95 """96 allow_extra_args = True97 def __init__(self, *args, **kwargs):98 kwargs["params"] = base_params + (99 kwargs["params"] if "params" in kwargs else []100 )101 super(KolibriCommand, self).__init__(*args, **kwargs)102 def invoke(self, ctx):103 # Check if the current user is the kolibri user when running kolibri from Debian installer.104 check_debian_user(ctx.params.get("no_input"))105 setup_logging(106 debug=ctx.params.get("debug"),107 debug_database=ctx.params.get("debug_database"),108 )109 for param in base_params:110 ctx.params.pop(param.name)111 return super(KolibriCommand, self).invoke(ctx)112class KolibriGroupCommand(click.Group):113 """114 A command class for Kolibri commands that do not require115 the django stack, but have subcommands. By default adds116 a debug param for logging purposes117 also invokes setup_logging before invoking the command.118 """119 allow_extra_args = True120 def __init__(self, *args, **kwargs):121 kwargs["params"] = base_params + (122 kwargs["params"] if "params" in kwargs else []123 )124 super(KolibriGroupCommand, self).__init__(*args, **kwargs)125 def invoke(self, ctx):126 # Check if the current user is the kolibri user when running kolibri from Debian installer.127 check_debian_user(ctx.params.get("no_input"))128 setup_logging(129 debug=ctx.params.get("debug"),130 debug_database=ctx.params.get("debug_database"),131 )132 for param in base_params:133 ctx.params.pop(param.name)134 return super(KolibriGroupCommand, self).invoke(ctx)135class KolibriDjangoCommand(click.Command):136 """137 A command class for Kolibri commands that do require138 the django stack. By default adds all params needed for139 the initialize function, calls the initialize function and140 also invokes setup_logging before invoking the command.141 """142 allow_extra_args = True143 def __init__(self, *args, **kwargs):144 kwargs["params"] = initialize_params + (145 kwargs["params"] if "params" in kwargs else []146 )147 super(KolibriDjangoCommand, self).__init__(*args, **kwargs)148 def invoke(self, ctx):149 try:150 initialize(**get_initialize_params())151 except Exception:152 raise click.ClickException(traceback.format_exc())153 # Remove parameters that are not for Django management command154 for param in initialize_params:155 ctx.params.pop(param.name)156 return super(KolibriDjangoCommand, self).invoke(ctx)157main_help = """Kolibri command-line utility158Details for each main command: kolibri COMMAND --help159List of additional management commands: kolibri manage help160For more information, see: https://kolibri.readthedocs.io/161"""162@click.group(invoke_without_command=True, help=main_help)163@click.pass_context164@click.version_option(version=kolibri.__version__)165def main(ctx):166 """167 Kolibri's main function.168 Utility functions should be callable for unit testing purposes, but remember169 to use main() for integration tests in order to test the argument API.170 """171 if ctx.invoked_subcommand is None:172 click.echo(ctx.get_help())173 ctx.exit(1)174 try:175 signal.signal(signal.SIGINT, signal.SIG_DFL)176 except ValueError:177 pass178@main.command(cls=KolibriDjangoCommand, help="Start the Kolibri process")179@click.option(180 "--port",181 default=None,182 type=int,183 help="Port on which Kolibri is being served",184)185@click.option(186 "--zip-port",187 default=None,188 type=int,189 help="Port on which zip content server is being served",190)191@click.option(192 "--background/--foreground",193 default=True,194 help="Run Kolibri as a background process",195)196def start(port, zip_port, background):197 """198 Start the server on given port.199 """200 port = OPTIONS["Deployment"]["HTTP_PORT"] if port is None else port201 zip_port = (202 OPTIONS["Deployment"]["ZIP_CONTENT_PORT"] if zip_port is None else zip_port203 )204 server.start(205 port=port,206 zip_port=zip_port,207 serve_http=OPTIONS["Server"]["CHERRYPY_START"],208 background=background,209 )210@main.command(cls=KolibriCommand, help="Stop the Kolibri process")211def stop():212 """213 Stops the server unless it isn't running214 """215 try:216 server.get_status()217 except server.NotRunning as e:218 if e.status_code == server.STATUS_STOPPED:219 logging.info(220 "Already stopped: {}".format(221 server.status_messages[server.STATUS_STOPPED]222 )223 )224 sys.exit(0)225 status = server.stop()226 if status == server.STATUS_STOPPED:227 if OPTIONS["Server"]["CHERRYPY_START"]:228 logger.info("Kolibri server has successfully been stopped.")229 else:230 logger.info("Kolibri background services have successfully been stopped.")231 sys.exit(0)232 sys.exit(status)233@main.command(cls=KolibriCommand, help="Show the status of the Kolibri process")234def status():235 """236 How is Kolibri doing?237 Check the server's status. For possible statuses, see the status dictionary238 server.status_messages239 Status *always* outputs the current status in the first line of stderr.240 The following lines contain optional information such as the addresses where241 the server is listening.242 TODO: We can't guarantee the above behavior because of the django stack243 being loaded regardless244 Exits with status_code, key has description in server.status_messages245 """246 status_code, urls = server.get_urls()247 if status_code == server.STATUS_RUNNING:248 sys.stderr.write("{msg:s} (0)\n".format(msg=server.status_messages[0]))249 if urls:250 sys.stderr.write("Kolibri running on:\n\n")251 for addr in urls:252 sys.stderr.write("\t{}\n".format(addr))253 else:254 verbose_status = server.status_messages[status_code]255 sys.stderr.write(256 "{msg:s} ({code:d})\n".format(code=status_code, msg=verbose_status)257 )258 sys.exit(status_code)259@main.command(cls=KolibriDjangoCommand, help="Start worker processes")260@click.option(261 "--port",262 default=None,263 type=int,264 help="Port on which Kolibri is running to inform services",265)266@click.option(267 "--background/--foreground",268 default=True,269 help="Run Kolibri services as a background task",270)271def services(port, background):272 """273 Start the kolibri background services.274 """275 port = OPTIONS["Deployment"]["HTTP_PORT"] if port is None else port276 logger.info("Starting Kolibri background services")277 server.start(port=port, zip_port=None, serve_http=False, background=background)278@main.command(cls=KolibriCommand, help="Restart the Kolibri process")279def restart():280 """281 Restarts the server if it is running282 """283 if server.restart():284 logger.info("Kolibri has successfully restarted")285 sys.exit(0)286 logger.info("Kolibri has failed to restart - confirm that the server is running")287 sys.exit(1)288@main.command(289 cls=KolibriDjangoCommand,290 context_settings=dict(ignore_unknown_options=True, allow_extra_args=True),291 help="Django management commands. See also 'kolibri manage help'",292)293@click.pass_context294def manage(ctx):295 if ctx.args:296 logger.info("Invoking command {}".format(" ".join(ctx.args)))297 execute_from_command_line(["kolibri manage"] + ctx.args)298@main.command(cls=KolibriDjangoCommand, help="Launch a Django shell")299@click.pass_context300def shell(ctx):301 execute_from_command_line(["kolibri manage", "shell"] + ctx.args)302@main.command(cls=KolibriGroupCommand, help="Manage Kolibri plugins")303def plugin():304 pass305@plugin.command(help="Enable Kolibri plugins")306@click.argument("plugin_names", nargs=-1)307@click.option("-d", "--default-plugins", default=False, is_flag=True)308def enable(plugin_names, default_plugins):309 error = False310 if not plugin_names and default_plugins:311 plugin_names = DEFAULT_PLUGINS312 for name in plugin_names:313 try:314 logger.info("Enabling plugin '{}'".format(name))315 error = error or not enable_plugin(name)316 except Exception as e:317 error = True318 logger.error("Error enabling plugin '{}', error was: {}".format(name, e))319 if error:320 exception = click.ClickException("One or more plugins could not be enabled")321 exception.exit_code = 2322 raise exception323@plugin.command(help="Disable Kolibri plugins")324@click.argument("plugin_names", nargs=-1)325@click.option("-a", "--all-plugins", default=False, is_flag=True)326def disable(plugin_names, all_plugins):327 error = False328 if not plugin_names and all_plugins:329 plugin_names = config.ACTIVE_PLUGINS330 for name in plugin_names:331 try:332 logger.info("Disabling plugin '{}'".format(name))333 error = error or not disable_plugin(name)334 except Exception as e:335 error = True336 logger.error("Error Disabling plugin '{}', error was: {}".format(name, e))337 if error:338 exception = click.ClickException("One or more plugins could not be disabled")339 exception.exit_code = 2340 raise exception341@plugin.command(help="Set Kolibri plugins to be enabled and disable all others")342@click.argument("plugin_names", nargs=-1)343@click.pass_context344def apply(ctx, plugin_names):345 to_be_disabled = set(config.ACTIVE_PLUGINS) - set(plugin_names)346 error = False347 try:348 ctx.invoke(disable, plugin_names=to_be_disabled, all_plugins=False)349 except click.ClickException:350 error = True351 try:352 ctx.invoke(enable, plugin_names=plugin_names, default_plugins=False)353 except click.ClickException:354 error = True355 if error:356 exception = click.ClickException(357 "An error occurred applying the plugin configuration"358 )359 exception.exit_code = 2360 raise exception361@plugin.command(help="List all available Kolibri plugins")362def list():363 plugins = [plugin for plugin in iterate_plugins()]364 max_len = max((len(plugin) for plugin in plugins))365 available_plugins = "Available plugins"366 status = "Status"367 click.echo(368 available_plugins + " " * (max_len - len(available_plugins) + 4) + status369 )370 for plugin in sorted(plugins):371 click.echo(372 plugin373 + " " * (max_len - len(plugin) + 4)374 + ("ENABLED" if plugin in config.ACTIVE_PLUGINS else "DISABLED")375 )376@main.command(cls=KolibriGroupCommand, help="Configure Kolibri and enabled plugins")377def configure():378 pass379def _format_env_var(envvar, value):380 if value.get("deprecated", False) or envvar in value.get(381 "deprecated_envvars", tuple()382 ):383 return click.style(384 "{envvar} - DEPRECATED - {description}\n\n".format(385 envvar=envvar, description=value.get("description", "")386 ),387 fg="yellow",388 )389 return "{envvar} - {description}\n\n".format(390 envvar=envvar, description=value.get("description", "")391 )392def _get_env_vars():393 """394 Generator to iterate over all environment variables395 """396 from kolibri.utils.env import ENVIRONMENT_VARIABLES397 for key, value in ENVIRONMENT_VARIABLES.items():398 yield _format_env_var(key, value)399 from kolibri.utils.options import option_spec400 for value in option_spec.values():401 for v in value.values():402 if "envvars" in v:403 for envvar in v["envvars"]:404 yield _format_env_var(envvar, v)405@configure.command(help="List all available environment variables to configure Kolibri")406def list_env():...

Full Screen

Full Screen

Playwright tutorial

LambdaTest’s Playwright tutorial will give you a broader idea about the Playwright automation framework, its unique features, and use cases with examples to exceed your understanding of Playwright testing. This tutorial will give A to Z guidance, from installing the Playwright framework to some best practices and advanced concepts.

Chapters:

  1. What is Playwright : Playwright is comparatively new but has gained good popularity. Get to know some history of the Playwright with some interesting facts connected with it.
  2. How To Install Playwright : Learn in detail about what basic configuration and dependencies are required for installing Playwright and run a test. Get a step-by-step direction for installing the Playwright automation framework.
  3. Playwright Futuristic Features: Launched in 2020, Playwright gained huge popularity quickly because of some obliging features such as Playwright Test Generator and Inspector, Playwright Reporter, Playwright auto-waiting mechanism and etc. Read up on those features to master Playwright testing.
  4. What is Component Testing: Component testing in Playwright is a unique feature that allows a tester to test a single component of a web application without integrating them with other elements. Learn how to perform Component testing on the Playwright automation framework.
  5. Inputs And Buttons In Playwright: Every website has Input boxes and buttons; learn about testing inputs and buttons with different scenarios and examples.
  6. Functions and Selectors in Playwright: Learn how to launch the Chromium browser with Playwright. Also, gain a better understanding of some important functions like “BrowserContext,” which allows you to run multiple browser sessions, and “newPage” which interacts with a page.
  7. Handling Alerts and Dropdowns in Playwright : Playwright interact with different types of alerts and pop-ups, such as simple, confirmation, and prompt, and different types of dropdowns, such as single selector and multi-selector get your hands-on with handling alerts and dropdown in Playright testing.
  8. Playwright vs Puppeteer: Get to know about the difference between two testing frameworks and how they are different than one another, which browsers they support, and what features they provide.
  9. Run Playwright Tests on LambdaTest: Playwright testing with LambdaTest leverages test performance to the utmost. You can run multiple Playwright tests in Parallel with the LammbdaTest test cloud. Get a step-by-step guide to run your Playwright test on the LambdaTest platform.
  10. Playwright Python Tutorial: Playwright automation framework support all major languages such as Python, JavaScript, TypeScript, .NET and etc. However, there are various advantages to Python end-to-end testing with Playwright because of its versatile utility. Get the hang of Playwright python testing with this chapter.
  11. Playwright End To End Testing Tutorial: Get your hands on with Playwright end-to-end testing and learn to use some exciting features such as TraceViewer, Debugging, Networking, Component testing, Visual testing, and many more.
  12. Playwright Video Tutorial: Watch the video tutorials on Playwright testing from experts and get a consecutive in-depth explanation of Playwright automation testing.

Run Playwright Python automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful