Best Python code snippet using test_junkie
test_robot_command.py
Source:test_robot_command.py
...83 assert isinstance(command, robot_command_pb2.RobotCommand)84 assert command.HasField("full_body_command")85 assert not command.HasField("mobility_command")86 assert not command.HasField("synchronized_command")87def _test_has_synchronized(command):88 assert isinstance(command, robot_command_pb2.RobotCommand)89 assert command.HasField("synchronized_command")90 assert not command.HasField("full_body_command")91 assert not command.HasField("mobility_command")92def _test_has_mobility(command):93 assert isinstance(command, synchronized_command_pb2.SynchronizedCommand.Request)94 assert command.HasField("mobility_command")95def _test_has_mobility_deprecated(command):96 assert isinstance(command, robot_command_pb2.RobotCommand)97 assert command.HasField("mobility_command")98def _test_has_arm(command):99 assert isinstance(command, synchronized_command_pb2.SynchronizedCommand.Request)100 assert command.HasField("arm_command")101def _test_has_gripper(command):102 assert isinstance(command, synchronized_command_pb2.SynchronizedCommand.Request)103 assert command.HasField("gripper_command")104def test_stop_command():105 command = RobotCommandBuilder.stop_command()106 _test_has_full_body(command)107 assert command.full_body_command.HasField("stop_request")108def test_freeze_command():109 command = RobotCommandBuilder.freeze_command()110 _test_has_full_body(command)111 assert command.full_body_command.HasField("freeze_request")112def test_selfright_command():113 command = RobotCommandBuilder.selfright_command()114 _test_has_full_body(command)115 assert command.full_body_command.HasField("selfright_request")116def test_safe_power_off_command():117 command = RobotCommandBuilder.safe_power_off_command()118 _test_has_full_body(command)119 assert command.full_body_command.HasField("safe_power_off_request")120def test_trajectory_command():121 goal_x = 1.0122 goal_y = 2.0123 goal_heading = 3.0124 frame = ODOM_FRAME_NAME125 command = RobotCommandBuilder.trajectory_command(goal_x, goal_y, goal_heading, frame)126 _test_has_mobility_deprecated(command)127 assert command.mobility_command.HasField("se2_trajectory_request")128 traj = command.mobility_command.se2_trajectory_request.trajectory129 assert len(traj.points) == 1130 assert traj.points[0].pose.position.x == goal_x131 assert traj.points[0].pose.position.y == goal_y132 assert traj.points[0].pose.angle == goal_heading133 assert command.mobility_command.se2_trajectory_request.se2_frame_name == ODOM_FRAME_NAME134def test_velocity_command():135 v_x = 1.0136 v_y = 2.0137 v_rot = 3.0138 command = RobotCommandBuilder.velocity_command(v_x, v_y, v_rot)139 _test_has_mobility_deprecated(command)140 assert command.mobility_command.HasField("se2_velocity_request")141 vel_cmd = command.mobility_command.se2_velocity_request142 assert vel_cmd.velocity.linear.x == v_x143 assert vel_cmd.velocity.linear.y == v_y144 assert vel_cmd.velocity.angular == v_rot145 assert vel_cmd.se2_frame_name == BODY_FRAME_NAME146def test_stand_command():147 command = RobotCommandBuilder.stand_command()148 _test_has_mobility_deprecated(command)149 assert command.mobility_command.HasField("stand_request")150def test_sit_command():151 command = RobotCommandBuilder.sit_command()152 _test_has_mobility_deprecated(command)153 assert command.mobility_command.HasField("sit_request")154def _check_se2_traj_command(command, goal_x, goal_y, goal_heading, frame_name, n_points):155 _test_has_synchronized(command)156 _test_has_mobility(command.synchronized_command)157 assert command.synchronized_command.mobility_command.HasField("se2_trajectory_request")158 request = command.synchronized_command.mobility_command.se2_trajectory_request159 assert len(request.trajectory.points) == n_points160 assert request.trajectory.points[0].pose.position.x == goal_x161 assert request.trajectory.points[0].pose.position.y == goal_y162 assert request.trajectory.points[0].pose.angle == goal_heading163 assert request.se2_frame_name == frame_name164def test_synchro_se2_trajectory_point_command():165 goal_x = 1.0166 goal_y = 2.0167 goal_heading = 3.0168 frame = ODOM_FRAME_NAME169 command = RobotCommandBuilder.synchro_se2_trajectory_point_command(170 goal_x, goal_y, goal_heading, frame)171 _check_se2_traj_command(command, goal_x, goal_y, goal_heading, frame, 1)172 # with a build_on_command173 arm_command = RobotCommandBuilder.arm_stow_command()174 command = RobotCommandBuilder.synchro_se2_trajectory_point_command(175 goal_x, goal_y, goal_heading, frame, build_on_command=arm_command)176 _check_se2_traj_command(command, goal_x, goal_y, goal_heading, frame, 1)177 _test_has_arm(command.synchronized_command)178def test_synchro_se2_trajectory_command():179 goal_x = 1.0180 goal_y = 2.0181 goal_heading = 3.0182 frame = ODOM_FRAME_NAME183 position = geometry_pb2.Vec2(x=goal_x, y=goal_y)184 goal_se2 = geometry_pb2.SE2Pose(position=position, angle=goal_heading)185 command = RobotCommandBuilder.synchro_se2_trajectory_command(goal_se2, frame)186 _check_se2_traj_command(command, goal_x, goal_y, goal_heading, frame, 1)187 # with a build_on_command188 arm_command = RobotCommandBuilder.arm_stow_command()189 command = RobotCommandBuilder.synchro_se2_trajectory_command(goal_se2, frame,190 build_on_command=arm_command)191 _check_se2_traj_command(command, goal_x, goal_y, goal_heading, frame, 1)192 _test_has_arm(command.synchronized_command)193def test_synchro_velocity_command():194 v_x = 1.0195 v_y = 2.0196 v_rot = 3.0197 command = RobotCommandBuilder.synchro_velocity_command(v_x, v_y, v_rot)198 _test_has_synchronized(command)199 _test_has_mobility(command.synchronized_command)200 assert command.synchronized_command.mobility_command.HasField("se2_velocity_request")201 vel_cmd = command.synchronized_command.mobility_command.se2_velocity_request202 assert vel_cmd.velocity.linear.x == v_x203 assert vel_cmd.velocity.linear.y == v_y204 assert vel_cmd.velocity.angular == v_rot205 assert vel_cmd.se2_frame_name == BODY_FRAME_NAME206 # with a build_on_command207 arm_command = RobotCommandBuilder.arm_stow_command()208 command = RobotCommandBuilder.synchro_velocity_command(v_x, v_y, v_rot,209 build_on_command=arm_command)210 _test_has_synchronized(command)211 _test_has_mobility(command.synchronized_command)212 _test_has_arm(command.synchronized_command)213def test_synchro_stand_command():214 command = RobotCommandBuilder.synchro_stand_command()215 _test_has_synchronized(command)216 _test_has_mobility(command.synchronized_command)217 assert command.synchronized_command.mobility_command.HasField("stand_request")218 # basic check with a build_on_command219 arm_command = RobotCommandBuilder.arm_stow_command()220 command = RobotCommandBuilder.synchro_stand_command(build_on_command=arm_command)221 _test_has_synchronized(command)222 _test_has_mobility(command.synchronized_command)223 _test_has_arm(command.synchronized_command)224def test_synchro_sit_command():225 command = RobotCommandBuilder.synchro_sit_command()226 _test_has_synchronized(command)227 _test_has_mobility(command.synchronized_command)228 assert command.synchronized_command.mobility_command.HasField("sit_request")229 # with a build_on_command230 arm_command = RobotCommandBuilder.arm_stow_command()231 command = RobotCommandBuilder.synchro_sit_command(build_on_command=arm_command)232 _test_has_synchronized(command)233 _test_has_mobility(command.synchronized_command)234 _test_has_arm(command.synchronized_command)235def test_arm_stow_command():236 command = RobotCommandBuilder.arm_stow_command()237 _test_has_synchronized(command)238 _test_has_arm(command.synchronized_command)239 assert (command.synchronized_command.arm_command.WhichOneof("command") ==240 'named_arm_position_command')241 # with a build_on_command242 mobility_command = RobotCommandBuilder.synchro_sit_command()243 command = RobotCommandBuilder.arm_stow_command(build_on_command=mobility_command)244 _test_has_synchronized(command)245 _test_has_mobility(command.synchronized_command)246 _test_has_arm(command.synchronized_command)247def test_arm_ready_command():248 command = RobotCommandBuilder.arm_ready_command()249 _test_has_synchronized(command)250 _test_has_arm(command.synchronized_command)251 assert (command.synchronized_command.arm_command.WhichOneof("command") ==252 'named_arm_position_command')253 # with a build_on_command254 mobility_command = RobotCommandBuilder.synchro_sit_command()255 command = RobotCommandBuilder.arm_ready_command(build_on_command=mobility_command)256 _test_has_synchronized(command)257 _test_has_mobility(command.synchronized_command)258 _test_has_arm(command.synchronized_command)259def test_arm_carry_command():260 command = RobotCommandBuilder.arm_ready_command()261 _test_has_synchronized(command)262 _test_has_arm(command.synchronized_command)263 assert (command.synchronized_command.arm_command.WhichOneof("command") ==264 'named_arm_position_command')265 # with a build_on_command266 mobility_command = RobotCommandBuilder.synchro_sit_command()267 command = RobotCommandBuilder.arm_carry_command(build_on_command=mobility_command)268 _test_has_synchronized(command)269 _test_has_mobility(command.synchronized_command)270 _test_has_arm(command.synchronized_command)271def test_arm_pose_command():272 x = 0.75273 y = 0274 z = 0.25275 qw = 1276 qx = 0277 qy = 0278 qz = 0279 command = RobotCommandBuilder.arm_pose_command(x, y, z, qw, qx, qy, qz, BODY_FRAME_NAME)280 _test_has_synchronized(command)281 _test_has_arm(command.synchronized_command)282 assert command.synchronized_command.arm_command.HasField("arm_cartesian_command")283 arm_cartesian_command = command.synchronized_command.arm_command.arm_cartesian_command284 assert arm_cartesian_command.root_frame_name == BODY_FRAME_NAME285 assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.position.x == x286 assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.position.y == y287 assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.position.z == z288 assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.rotation.x == qx289 assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.rotation.y == qy290 assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.rotation.z == qz291 assert arm_cartesian_command.pose_trajectory_in_task.points[0].pose.rotation.w == qw292 # with a build_on_command293 mobility_command = RobotCommandBuilder.synchro_sit_command()294 command = RobotCommandBuilder.arm_pose_command(x, y, z, qw, qx, qy, qz, BODY_FRAME_NAME,295 build_on_command=mobility_command)296 _test_has_synchronized(command)297 _test_has_mobility(command.synchronized_command)298 _test_has_arm(command.synchronized_command)299def test_claw_gripper_open_command():300 command = RobotCommandBuilder.claw_gripper_open_command()301 _test_has_synchronized(command)302 _test_has_gripper(command.synchronized_command)303 assert (command.synchronized_command.gripper_command.WhichOneof("command") ==304 "claw_gripper_command")305 # with a build_on_command306 mobility_command = RobotCommandBuilder.synchro_sit_command()307 command = RobotCommandBuilder.claw_gripper_open_command(build_on_command=mobility_command)308 _test_has_synchronized(command)309 _test_has_mobility(command.synchronized_command)310 _test_has_gripper(command.synchronized_command)311def test_claw_gripper_close_command():312 command = RobotCommandBuilder.claw_gripper_close_command()313 _test_has_synchronized(command)314 _test_has_gripper(command.synchronized_command)315 assert (command.synchronized_command.gripper_command.WhichOneof("command") ==316 "claw_gripper_command")317 # with a build_on_command318 mobility_command = RobotCommandBuilder.synchro_sit_command()319 command = RobotCommandBuilder.claw_gripper_close_command(build_on_command=mobility_command)320 _test_has_synchronized(command)321 _test_has_mobility(command.synchronized_command)322 _test_has_gripper(command.synchronized_command)323def test_build_synchro_command():324 # two synchro subcommands of the same type:325 arm_command_1 = RobotCommandBuilder.arm_ready_command()326 arm_command_2 = RobotCommandBuilder.arm_stow_command()327 command = RobotCommandBuilder.build_synchro_command(arm_command_1, arm_command_2)328 _test_has_synchronized(command)329 _test_has_arm(command.synchronized_command)330 assert not command.synchronized_command.HasField("gripper_command")331 assert not command.synchronized_command.HasField("mobility_command")332 command_position = command.synchronized_command.arm_command.named_arm_position_command.position333 assert command_position == arm_command_pb2.NamedArmPositionsCommand.POSITIONS_STOW334 # two synchro subcommands of a different type:335 arm_command = RobotCommandBuilder.arm_ready_command()336 mobility_command = RobotCommandBuilder.synchro_stand_command()337 command = RobotCommandBuilder.build_synchro_command(arm_command, mobility_command)338 _test_has_synchronized(command)339 _test_has_mobility(command.synchronized_command)340 _test_has_arm(command.synchronized_command)341 assert not command.synchronized_command.HasField("gripper_command")342 assert command.synchronized_command.mobility_command.HasField("stand_request")343 command_position = command.synchronized_command.arm_command.named_arm_position_command.position344 assert command_position == arm_command_pb2.NamedArmPositionsCommand.POSITIONS_READY345 # one synchro command and one deprecated mobility command:346 deprecated_mobility_command = RobotCommandBuilder.sit_command()347 command = RobotCommandBuilder.build_synchro_command(mobility_command,348 deprecated_mobility_command)349 _test_has_synchronized(command)350 _test_has_mobility(command.synchronized_command)351 assert command.synchronized_command.mobility_command.HasField("sit_request")352 # fullbody command is rejected353 full_body_command = RobotCommandBuilder.selfright_command()354 with pytest.raises(Exception):355 command = RobotCommandBuilder.build_synchro_command(arm_command, full_body_command)356def test_edit_timestamps():357 def _set_new_time(key, proto):358 """If proto has a field named key, fill set it to end_time_secs as robot time. """359 if key not in proto.DESCRIPTOR.fields_by_name:360 return # No such field in the proto to be set to the end-time.361 end_time = getattr(proto, key)362 end_time.CopyFrom(timestamp_pb2.Timestamp(seconds=10))363 command = robot_command_pb2.RobotCommand()...
state.py
Source:state.py
1import logging2import threading3from collections import defaultdict, namedtuple4logger = logging.getLogger(__name__)5Offsets = namedtuple("Offsets", "local remote")6class InvalidState(Exception):7 pass8class InvalidStateTransition(Exception):9 pass10class MessageNotReady(Exception):11 pass12class SynchronizedPartitionState:13 # The ``SYNCHRONIZED`` state represents that the local offset is equal to14 # the remote offset. The local consumer should be paused to avoid advancing15 # further beyond the remote consumer.16 SYNCHRONIZED = "SYNCHRONIZED"17 # The ``LOCAL_BEHIND`` state represents that the remote offset is greater18 # than the local offset. The local consumer should be unpaused to avoid19 # falling behind the remote consumer.20 LOCAL_BEHIND = "LOCAL_BEHIND"21 # The ``REMOTE_BEHIND`` state represents that the local offset is greater22 # than the remote offset. The local consumer should be paused to avoid23 # advancing further beyond the remote consumer.24 REMOTE_BEHIND = "REMOTE_BEHIND"25 # The ``UNKNOWN`` state represents that we haven't received enough data to26 # know the current offset state.27 UNKNOWN = "UNKNOWN"28class SynchronizedPartitionStateManager(object):29 """30 This class implements a state machine that can be used to track the31 consumption progress of a Kafka partition (the "local" consumer) relative32 to a the progress of another consumer (the "remote" consumer.)33 This is intended to be paired with the ``SynchronizedConsumer``.34 """35 transitions = { # from state -> set(to states)36 None: frozenset([SynchronizedPartitionState.UNKNOWN]),37 SynchronizedPartitionState.UNKNOWN: frozenset(38 [39 SynchronizedPartitionState.LOCAL_BEHIND,40 SynchronizedPartitionState.REMOTE_BEHIND,41 SynchronizedPartitionState.SYNCHRONIZED,42 ]43 ),44 SynchronizedPartitionState.REMOTE_BEHIND: frozenset(45 [SynchronizedPartitionState.LOCAL_BEHIND, SynchronizedPartitionState.SYNCHRONIZED]46 ),47 SynchronizedPartitionState.LOCAL_BEHIND: frozenset(48 [SynchronizedPartitionState.SYNCHRONIZED, SynchronizedPartitionState.REMOTE_BEHIND]49 ),50 SynchronizedPartitionState.SYNCHRONIZED: frozenset(51 [SynchronizedPartitionState.LOCAL_BEHIND, SynchronizedPartitionState.REMOTE_BEHIND]52 ),53 }54 def __init__(self, callback):55 self.partitions = defaultdict(lambda: (None, Offsets(None, None)))56 self.callback = callback57 self.__lock = threading.RLock()58 def get_state_from_offsets(self, offsets):59 """60 Derive the partition state by comparing local and remote offsets.61 """62 if offsets.local is None or offsets.remote is None:63 return SynchronizedPartitionState.UNKNOWN64 else:65 if offsets.local < offsets.remote:66 return SynchronizedPartitionState.LOCAL_BEHIND67 elif offsets.remote < offsets.local:68 return SynchronizedPartitionState.REMOTE_BEHIND69 else: # local == remote70 return SynchronizedPartitionState.SYNCHRONIZED71 def set_local_offset(self, topic, partition, local_offset):72 """73 Update the local offset for a topic and partition.74 If this update operation results in a state change, the callback75 function will be invoked.76 """77 with self.__lock:78 previous_state, previous_offsets = self.partitions[(topic, partition)]79 if previous_offsets.local is not None and (80 local_offset is None or local_offset < previous_offsets.local81 ):82 logger.info(83 "Local offset for %s/%s has moved backwards (current: %s, previous: %s)",84 topic,85 partition,86 local_offset,87 previous_offsets.local,88 )89 updated_offsets = Offsets(local_offset, previous_offsets.remote)90 updated_state = self.get_state_from_offsets(updated_offsets)91 if (92 previous_state is not updated_state93 and updated_state not in self.transitions[previous_state]94 ):95 raise InvalidStateTransition(96 "Unexpected state transition for {}/{} from {} to {}".format(97 topic, partition, previous_state, updated_state98 )99 )100 self.partitions[(topic, partition)] = (updated_state, updated_offsets)101 if previous_state is not updated_state:102 if updated_state == SynchronizedPartitionState.REMOTE_BEHIND:103 logger.warning(104 "Current local offset for %s/%s (%s) exceeds remote offset (%s)!",105 topic,106 partition,107 updated_offsets.local,108 updated_offsets.remote,109 )110 self.callback(111 topic,112 partition,113 (previous_state, previous_offsets),114 (updated_state, updated_offsets),115 )116 def set_remote_offset(self, topic, partition, remote_offset):117 """118 Update the remote offset for a topic and partition.119 If this update operation results in a state change, the callback120 function will be invoked.121 """122 with self.__lock:123 previous_state, previous_offsets = self.partitions[(topic, partition)]124 if previous_offsets.remote is not None and (125 remote_offset is None or remote_offset < previous_offsets.remote126 ):127 logger.info(128 "Remote offset for %s/%s has moved backwards (current: %s, previous: %s)",129 topic,130 partition,131 remote_offset,132 previous_offsets.remote,133 )134 updated_offsets = Offsets(previous_offsets.local, remote_offset)135 updated_state = self.get_state_from_offsets(updated_offsets)136 if (137 previous_state is not updated_state138 and updated_state not in self.transitions[previous_state]139 ):140 raise InvalidStateTransition(141 "Unexpected state transition for {}/{} from {} to {}".format(142 topic, partition, previous_state, updated_state143 )144 )145 self.partitions[(topic, partition)] = (updated_state, updated_offsets)146 if previous_state is not updated_state:147 self.callback(148 topic,149 partition,150 (previous_state, previous_offsets),151 (updated_state, updated_offsets),152 )153 def validate_local_message(self, topic, partition, offset):154 """155 Check if a message should be consumed by the local consumer.156 The local consumer should be prevented from consuming messages that157 have yet to have been committed by the remote consumer.158 """159 with self.__lock:160 state, offsets = self.partitions[(topic, partition)]161 if state is not SynchronizedPartitionState.LOCAL_BEHIND:162 raise InvalidState(163 "Received a message while consumer is not in LOCAL_BEHIND state!"164 )165 if offset >= offsets.remote:166 raise MessageNotReady(167 "Received a message that has not been committed by remote consumer"168 )169 if offset < offsets.local:170 logger.warning(171 "Received a message prior to local offset (local consumer offset rewound without update?)"...
test_synchronized_lock.py
Source:test_synchronized_lock.py
1from __future__ import print_function2import unittest3import wrapt4@wrapt.synchronized5def function():6 print('function')7class C1(object):8 @wrapt.synchronized9 def function1(self):10 print('function1')11 @wrapt.synchronized12 @classmethod13 def function2(cls):14 print('function2')15 @wrapt.synchronized16 @staticmethod17 def function3():18 print('function3')19c1 = C1()20@wrapt.synchronized21class C2(object):22 pass23@wrapt.synchronized24class C3:25 pass26class C4(object):27 # XXX This yields undesirable results due to how class method is28 # implemented. The classmethod doesn't bind the method to the class29 # before calling. As a consequence, the decorator wrapper function30 # sees the instance as None with the class being explicitly passed31 # as the first argument. It isn't possible to detect and correct32 # this.33 @classmethod34 @wrapt.synchronized35 def function2(cls):36 print('function2')37 @staticmethod38 @wrapt.synchronized39 def function3():40 print('function3')41c4 = C4()42class TestSynchronized(unittest.TestCase):43 def test_synchronized_function(self):44 _lock0 = getattr(function, '_synchronized_lock', None)45 self.assertEqual(_lock0, None)46 function()47 _lock1 = getattr(function, '_synchronized_lock', None)48 self.assertNotEqual(_lock1, None)49 function()50 _lock2 = getattr(function, '_synchronized_lock', None)51 self.assertNotEqual(_lock2, None)52 self.assertEqual(_lock2, _lock1)53 function()54 _lock3 = getattr(function, '_synchronized_lock', None)55 self.assertNotEqual(_lock3, None)56 self.assertEqual(_lock3, _lock2)57 def test_synchronized_inner_staticmethod(self):58 _lock0 = getattr(C1.function3, '_synchronized_lock', None)59 self.assertEqual(_lock0, None)60 c1.function3()61 _lock1 = getattr(C1.function3, '_synchronized_lock', None)62 self.assertNotEqual(_lock1, None)63 C1.function3()64 _lock2 = getattr(C1.function3, '_synchronized_lock', None)65 self.assertNotEqual(_lock2, None)66 self.assertEqual(_lock2, _lock1)67 C1.function3()68 _lock3 = getattr(C1.function3, '_synchronized_lock', None)69 self.assertNotEqual(_lock3, None)70 self.assertEqual(_lock3, _lock2)71 def test_synchronized_outer_staticmethod(self):72 _lock0 = getattr(C4.function3, '_synchronized_lock', None)73 self.assertEqual(_lock0, None)74 c4.function3()75 _lock1 = getattr(C4.function3, '_synchronized_lock', None)76 self.assertNotEqual(_lock1, None)77 C4.function3()78 _lock2 = getattr(C4.function3, '_synchronized_lock', None)79 self.assertNotEqual(_lock2, None)80 self.assertEqual(_lock2, _lock1)81 C4.function3()82 _lock3 = getattr(C4.function3, '_synchronized_lock', None)83 self.assertNotEqual(_lock3, None)84 self.assertEqual(_lock3, _lock2)85 def test_synchronized_inner_classmethod(self):86 if hasattr(C1, '_synchronized_lock'):87 del C1._synchronized_lock88 _lock0 = getattr(C1, '_synchronized_lock', None)89 self.assertEqual(_lock0, None)90 c1.function2()91 _lock1 = getattr(C1, '_synchronized_lock', None)92 self.assertNotEqual(_lock1, None)93 C1.function2()94 _lock2 = getattr(C1, '_synchronized_lock', None)95 self.assertNotEqual(_lock2, None)96 self.assertEqual(_lock2, _lock1)97 C1.function2()98 _lock3 = getattr(C1, '_synchronized_lock', None)99 self.assertNotEqual(_lock3, None)100 self.assertEqual(_lock3, _lock2)101 def test_synchronized_outer_classmethod(self):102 # XXX If all was good, this would be detected as a class103 # method call, but the classmethod decorator doesn't bind104 # the wrapped function to the class before calling and105 # just calls it direct, explicitly passing the class as106 # first argument. This screws things up. Would be nice if107 # Python were fixed, but that isn't likely to happen.108 #_lock0 = getattr(C4, '_synchronized_lock', None)109 _lock0 = getattr(C4.function2, '_synchronized_lock', None)110 self.assertEqual(_lock0, None)111 c4.function2()112 #_lock1 = getattr(C4, '_synchronized_lock', None)113 _lock1 = getattr(C4.function2, '_synchronized_lock', None)114 self.assertNotEqual(_lock1, None)115 C4.function2()116 #_lock2 = getattr(C4, '_synchronized_lock', None)117 _lock2 = getattr(C4.function2, '_synchronized_lock', None)118 self.assertNotEqual(_lock2, None)119 self.assertEqual(_lock2, _lock1)120 C4.function2()121 #_lock3 = getattr(C4, '_synchronized_lock', None)122 _lock3 = getattr(C4.function2, '_synchronized_lock', None)123 self.assertNotEqual(_lock3, None)124 self.assertEqual(_lock3, _lock2)125 def test_synchronized_instancemethod(self):126 if hasattr(C1, '_synchronized_lock'):127 del C1._synchronized_lock128 _lock0 = getattr(c1, '_synchronized_lock', None)129 self.assertEqual(_lock0, None)130 C1.function1(c1)131 _lock1 = getattr(c1, '_synchronized_lock', None)132 self.assertNotEqual(_lock1, None)133 c1.function1()134 _lock2 = getattr(c1, '_synchronized_lock', None)135 self.assertNotEqual(_lock2, None)136 self.assertEqual(_lock2, _lock1)137 c1.function1()138 _lock3 = getattr(c1, '_synchronized_lock', None)139 self.assertNotEqual(_lock3, None)140 self.assertEqual(_lock3, _lock2)141 del c1._synchronized_lock142 C1.function2()143 _lock4 = getattr(C1, '_synchronized_lock', None)144 self.assertNotEqual(_lock4, None)145 c1.function1()146 _lock5 = getattr(c1, '_synchronized_lock', None)147 self.assertNotEqual(_lock5, None)148 self.assertNotEqual(_lock5, _lock4)149 def test_synchronized_type_new_style(self):150 if hasattr(C2, '_synchronized_lock'):151 del C2._synchronized_lock152 _lock0 = getattr(C2, '_synchronized_lock', None)153 self.assertEqual(_lock0, None)154 c2 = C2()155 _lock1 = getattr(C2, '_synchronized_lock', None)156 self.assertNotEqual(_lock1, None)157 c2 = C2()158 _lock2 = getattr(C2, '_synchronized_lock', None)159 self.assertNotEqual(_lock2, None)160 self.assertEqual(_lock2, _lock1)161 c2 = C2()162 _lock3 = getattr(C2, '_synchronized_lock', None)163 self.assertNotEqual(_lock3, None)164 self.assertEqual(_lock3, _lock2)165 def test_synchronized_type_old_style(self):166 if hasattr(C3, '_synchronized_lock'):167 del C3._synchronized_lock168 _lock0 = getattr(C3, '_synchronized_lock', None)169 self.assertEqual(_lock0, None)170 c2 = C3()171 _lock1 = getattr(C3, '_synchronized_lock', None)172 self.assertNotEqual(_lock1, None)173 c2 = C3()174 _lock2 = getattr(C3, '_synchronized_lock', None)175 self.assertNotEqual(_lock2, None)176 self.assertEqual(_lock2, _lock1)177 c2 = C3()178 _lock3 = getattr(C3, '_synchronized_lock', None)179 self.assertNotEqual(_lock3, None)180 self.assertEqual(_lock3, _lock2)181if __name__ == '__main__':...
sharedctypes.py
Source:sharedctypes.py
...67 ctx = ctx or get_context()68 lock = ctx.RLock()69 if not hasattr(lock, 'acquire'):70 raise AttributeError("%r has no method 'acquire'" % lock)71 return synchronized(obj, lock, ctx=ctx)72def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):73 '''74 Return a synchronization wrapper for a RawArray75 '''76 obj = RawArray(typecode_or_type, size_or_initializer)77 if lock is False:78 return obj79 if lock in (True, None):80 ctx = ctx or get_context()81 lock = ctx.RLock()82 if not hasattr(lock, 'acquire'):83 raise AttributeError("%r has no method 'acquire'" % lock)84 return synchronized(obj, lock, ctx=ctx)85def copy(obj):86 new_obj = _new_value(type(obj))87 ctypes.pointer(new_obj)[0] = obj88 return new_obj89def synchronized(obj, lock=None, ctx=None):90 assert not isinstance(obj, SynchronizedBase), 'object already synchronized'91 ctx = ctx or get_context()92 if isinstance(obj, ctypes._SimpleCData):93 return Synchronized(obj, lock, ctx)94 elif isinstance(obj, ctypes.Array):95 if obj._type_ is ctypes.c_char:96 return SynchronizedString(obj, lock, ctx)97 return SynchronizedArray(obj, lock, ctx)98 else:99 cls = type(obj)100 try:101 scls = class_cache[cls]102 except KeyError:103 names = [field[0] for field in cls._fields_]...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!