Best Python code snippet using hypothesis
processpool.py
Source:processpool.py
...249 self._start_if_needed()250 if extra_args is None:251 extra_args = {}252 self._validate_all_known_args(extra_args)253 transfer_id = self._transfer_monitor.notify_new_transfer()254 download_file_request = DownloadFileRequest(255 transfer_id=transfer_id, bucket=bucket, key=key,256 filename=filename, extra_args=extra_args,257 expected_size=expected_size,258 )259 logger.debug(260 'Submitting download file request: %s.', download_file_request)261 self._download_request_queue.put(download_file_request)262 call_args = CallArgs(263 bucket=bucket, key=key, filename=filename, extra_args=extra_args,264 expected_size=expected_size)265 future = self._get_transfer_future(transfer_id, call_args)266 return future267 def shutdown(self):268 """Shutdown the downloader269 It will wait till all downloads are complete before returning.270 """271 self._shutdown_if_needed()272 def __enter__(self):273 return self274 def __exit__(self, exc_type, exc_value, *args):275 if isinstance(exc_value, KeyboardInterrupt):276 if self._transfer_monitor is not None:277 self._transfer_monitor.notify_cancel_all_in_progress()278 self.shutdown()279 def _start_if_needed(self):280 with self._start_lock:281 if not self._started:282 self._start()283 def _start(self):284 self._start_transfer_monitor_manager()285 self._start_submitter()286 self._start_get_object_workers()287 self._started = True288 def _validate_all_known_args(self, provided):289 for kwarg in provided:290 if kwarg not in ALLOWED_DOWNLOAD_ARGS:291 raise ValueError(292 "Invalid extra_args key '%s', "293 "must be one of: %s" % (294 kwarg, ', '.join(ALLOWED_DOWNLOAD_ARGS)))295 def _get_transfer_future(self, transfer_id, call_args):296 meta = ProcessPoolTransferMeta(297 call_args=call_args, transfer_id=transfer_id)298 future = ProcessPoolTransferFuture(299 monitor=self._transfer_monitor, meta=meta)300 return future301 def _start_transfer_monitor_manager(self):302 logger.debug('Starting the TransferMonitorManager.')303 self._manager = TransferMonitorManager()304 # We do not want Ctrl-C's to cause the manager to shutdown immediately305 # as worker processes will still need to communicate with it when they306 # are shutting down. So instead we ignore Ctrl-C and let the manager307 # be explicitly shutdown when shutting down the downloader.308 self._manager.start(_add_ignore_handler_for_interrupts)309 self._transfer_monitor = self._manager.TransferMonitor()310 def _start_submitter(self):311 logger.debug('Starting the GetObjectSubmitter.')312 self._submitter = GetObjectSubmitter(313 transfer_config=self._transfer_config,314 client_factory=self._client_factory,315 transfer_monitor=self._transfer_monitor,316 osutil=self._osutil,317 download_request_queue=self._download_request_queue,318 worker_queue=self._worker_queue319 )320 self._submitter.start()321 def _start_get_object_workers(self):322 logger.debug('Starting %s GetObjectWorkers.',323 self._transfer_config.max_request_processes)324 for _ in range(self._transfer_config.max_request_processes):325 worker = GetObjectWorker(326 queue=self._worker_queue,327 client_factory=self._client_factory,328 transfer_monitor=self._transfer_monitor,329 osutil=self._osutil,330 )331 worker.start()332 self._workers.append(worker)333 def _shutdown_if_needed(self):334 with self._start_lock:335 if self._started:336 self._shutdown()337 def _shutdown(self):338 self._shutdown_submitter()339 self._shutdown_get_object_workers()340 self._shutdown_transfer_monitor_manager()341 self._started = False342 def _shutdown_transfer_monitor_manager(self):343 logger.debug('Shutting down the TransferMonitorManager.')344 self._manager.shutdown()345 def _shutdown_submitter(self):346 logger.debug('Shutting down the GetObjectSubmitter.')347 self._download_request_queue.put(SHUTDOWN_SIGNAL)348 self._submitter.join()349 def _shutdown_get_object_workers(self):350 logger.debug('Shutting down the GetObjectWorkers.')351 for _ in self._workers:352 self._worker_queue.put(SHUTDOWN_SIGNAL)353 for worker in self._workers:354 worker.join()355class ProcessPoolTransferFuture(BaseTransferFuture):356 def __init__(self, monitor, meta):357 """The future associated to a submitted process pool transfer request358 :type monitor: TransferMonitor359 :param monitor: The monitor associated to the proccess pool downloader360 :type meta: ProcessPoolTransferMeta361 :param meta: The metadata associated to the request. This object362 is visible to the requester.363 """364 self._monitor = monitor365 self._meta = meta366 @property367 def meta(self):368 return self._meta369 def done(self):370 return self._monitor.is_done(self._meta.transfer_id)371 def result(self):372 try:373 return self._monitor.poll_for_result(self._meta.transfer_id)374 except KeyboardInterrupt:375 # For the multiprocessing Manager, a thread is given a single376 # connection to reuse in communicating between the thread in the377 # main process and the Manager's process. If a Ctrl-C happens when378 # polling for the result, it will make the main thread stop trying379 # to receive from the connection, but the Manager process will not380 # know that the main process has stopped trying to receive and381 # will not close the connection. As a result if another message is382 # sent to the Manager process, the listener in the Manager383 # processes will not process the new message as it is still trying384 # trying to process the previous message (that was Ctrl-C'd) and385 # thus cause the thread in the main process to hang on its send.386 # The only way around this is to create a new connection and send387 # messages from that new connection instead.388 self._monitor._connect()389 self.cancel()390 raise391 def cancel(self):392 self._monitor.notify_exception(393 self._meta.transfer_id, CancelledError()394 )395class ProcessPoolTransferMeta(BaseTransferMeta):396 """Holds metadata about the ProcessPoolTransferFuture"""397 def __init__(self, transfer_id, call_args):398 self._transfer_id = transfer_id399 self._call_args = call_args400 self._user_context = {}401 @property402 def call_args(self):403 return self._call_args404 @property405 def transfer_id(self):406 return self._transfer_id407 @property408 def user_context(self):409 return self._user_context410class ClientFactory(object):411 def __init__(self, client_kwargs=None):412 """Creates S3 clients for processes413 Botocore sessions and clients are not pickleable so they cannot be414 inherited across Process boundaries. Instead, they must be instantiated415 once a process is running.416 """417 self._client_kwargs = client_kwargs418 if self._client_kwargs is None:419 self._client_kwargs = {}420 client_config = deepcopy(self._client_kwargs.get('config', Config()))421 if not client_config.user_agent_extra:422 client_config.user_agent_extra = PROCESS_USER_AGENT423 else:424 client_config.user_agent_extra += " " + PROCESS_USER_AGENT425 self._client_kwargs['config'] = client_config426 def create_client(self):427 """Create a botocore S3 client"""428 return botocore.session.Session().create_client(429 's3', **self._client_kwargs)430class TransferMonitor(object):431 def __init__(self):432 """Monitors transfers for cross-proccess communication433 Notifications can be sent to the monitor and information can be434 retrieved from the monitor for a particular transfer. This abstraction435 is ran in a ``multiprocessing.managers.BaseManager`` in order to be436 shared across processes.437 """438 # TODO: Add logic that removes the TransferState if the transfer is439 # marked as done and the reference to the future is no longer being440 # held onto. Without this logic, this dictionary will continue to441 # grow in size with no limit.442 self._transfer_states = {}443 self._id_count = 0444 self._init_lock = threading.Lock()445 def notify_new_transfer(self):446 with self._init_lock:447 transfer_id = self._id_count448 self._transfer_states[transfer_id] = TransferState()449 self._id_count += 1450 return transfer_id451 def is_done(self, transfer_id):452 """Determine a particular transfer is complete453 :param transfer_id: Unique identifier for the transfer454 :return: True, if done. False, otherwise.455 """456 return self._transfer_states[transfer_id].done457 def notify_done(self, transfer_id):458 """Notify a particular transfer is complete459 :param transfer_id: Unique identifier for the transfer...
volume_transfer.py
Source:volume_transfer.py
...24from cinder.i18n import _, _LI25from cinder import transfer as transferAPI26from cinder import utils27LOG = logging.getLogger(__name__)28def make_transfer(elem):29 elem.set('id')30 elem.set('volume_id')31 elem.set('created_at')32 elem.set('name')33 elem.set('auth_key')34class TransferTemplate(xmlutil.TemplateBuilder):35 def construct(self):36 root = xmlutil.TemplateElement('transfer', selector='transfer')37 make_transfer(root)38 alias = Volume_transfer.alias39 namespace = Volume_transfer.namespace40 return xmlutil.MasterTemplate(root, 1, nsmap={alias: namespace})41class TransfersTemplate(xmlutil.TemplateBuilder):42 def construct(self):43 root = xmlutil.TemplateElement('transfers')44 elem = xmlutil.SubTemplateElement(root, 'transfer',45 selector='transfers')46 make_transfer(elem)47 alias = Volume_transfer.alias48 namespace = Volume_transfer.namespace49 return xmlutil.MasterTemplate(root, 1, nsmap={alias: namespace})50class CreateDeserializer(wsgi.MetadataXMLDeserializer):51 def default(self, string):52 dom = utils.safe_minidom_parse_string(string)53 transfer = self._extract_transfer(dom)54 return {'body': {'transfer': transfer}}55 def _extract_transfer(self, node):56 transfer = {}57 transfer_node = self.find_first_child_named(node, 'transfer')58 attributes = ['volume_id', 'name']59 for attr in attributes:60 if transfer_node.getAttribute(attr):61 transfer[attr] = transfer_node.getAttribute(attr)62 return transfer63class AcceptDeserializer(wsgi.MetadataXMLDeserializer):64 def default(self, string):65 dom = utils.safe_minidom_parse_string(string)66 transfer = self._extract_transfer(dom)67 return {'body': {'accept': transfer}}68 def _extract_transfer(self, node):69 transfer = {}70 transfer_node = self.find_first_child_named(node, 'accept')71 attributes = ['auth_key']72 for attr in attributes:73 if transfer_node.getAttribute(attr):74 transfer[attr] = transfer_node.getAttribute(attr)75 return transfer76class VolumeTransferController(wsgi.Controller):77 """The Volume Transfer API controller for the OpenStack API."""78 _view_builder_class = transfer_view.ViewBuilder79 def __init__(self):80 self.transfer_api = transferAPI.API()81 super(VolumeTransferController, self).__init__()82 @wsgi.serializers(xml=TransferTemplate)83 def show(self, req, id):84 """Return data about active transfers."""85 context = req.environ['cinder.context']86 try:87 transfer = self.transfer_api.get(context, transfer_id=id)88 except exception.TransferNotFound as error:89 raise exc.HTTPNotFound(explanation=error.msg)90 return self._view_builder.detail(req, transfer)91 @wsgi.serializers(xml=TransfersTemplate)92 def index(self, req):93 """Returns a summary list of transfers."""94 return self._get_transfers(req, is_detail=False)95 @wsgi.serializers(xml=TransfersTemplate)96 def detail(self, req):97 """Returns a detailed list of transfers."""98 return self._get_transfers(req, is_detail=True)99 def _get_transfers(self, req, is_detail):100 """Returns a list of transfers, transformed through view builder."""101 context = req.environ['cinder.context']102 filters = req.params.copy()103 LOG.debug('Listing volume transfers')104 transfers = self.transfer_api.get_all(context, filters=filters)105 transfer_count = len(transfers)106 limited_list = common.limited(transfers, req)107 if is_detail:108 transfers = self._view_builder.detail_list(req, limited_list,109 transfer_count)110 else:111 transfers = self._view_builder.summary_list(req, limited_list,112 transfer_count)113 return transfers114 @wsgi.response(202)115 @wsgi.serializers(xml=TransferTemplate)116 @wsgi.deserializers(xml=CreateDeserializer)117 def create(self, req, body):118 """Create a new volume transfer."""119 LOG.debug('Creating new volume transfer %s', body)120 if not self.is_valid_body(body, 'transfer'):121 raise exc.HTTPBadRequest()122 context = req.environ['cinder.context']123 try:124 transfer = body['transfer']125 volume_id = transfer['volume_id']126 except KeyError:127 msg = _("Incorrect request body format")128 raise exc.HTTPBadRequest(explanation=msg)129 name = transfer.get('name', None)130 LOG.info(_LI("Creating transfer of volume %s"),131 volume_id,132 context=context)133 try:134 new_transfer = self.transfer_api.create(context, volume_id, name)135 except exception.InvalidVolume as error:136 raise exc.HTTPBadRequest(explanation=error.msg)137 except exception.VolumeNotFound as error:138 raise exc.HTTPNotFound(explanation=error.msg)139 transfer = self._view_builder.create(req,140 dict(new_transfer.iteritems()))141 return transfer142 @wsgi.response(202)143 @wsgi.serializers(xml=TransferTemplate)144 @wsgi.deserializers(xml=AcceptDeserializer)145 def accept(self, req, id, body):146 """Accept a new volume transfer."""147 transfer_id = id148 LOG.debug('Accepting volume transfer %s', transfer_id)149 if not self.is_valid_body(body, 'accept'):150 raise exc.HTTPBadRequest()151 context = req.environ['cinder.context']152 try:153 accept = body['accept']154 auth_key = accept['auth_key']155 except KeyError:156 msg = _("Incorrect request body format")157 raise exc.HTTPBadRequest(explanation=msg)158 LOG.info(_LI("Accepting transfer %s"), transfer_id,159 context=context)160 try:161 accepted_transfer = self.transfer_api.accept(context, transfer_id,162 auth_key)163 except exception.VolumeSizeExceedsAvailableQuota as error:164 raise exc.HTTPRequestEntityTooLarge(165 explanation=error.msg, headers={'Retry-After': 0})166 except exception.InvalidVolume as error:167 raise exc.HTTPBadRequest(explanation=error.msg)168 transfer = \169 self._view_builder.summary(req,170 dict(accepted_transfer.iteritems()))171 return transfer172 def delete(self, req, id):173 """Delete a transfer."""174 context = req.environ['cinder.context']175 LOG.info(_LI("Delete transfer with id: %s"), id, context=context)176 try:177 self.transfer_api.delete(context, transfer_id=id)178 except exception.TransferNotFound as error:179 raise exc.HTTPNotFound(explanation=error.msg)180 return webob.Response(status_int=202)181class Volume_transfer(extensions.ExtensionDescriptor):182 """Volume transfer management support."""183 name = "VolumeTransfer"184 alias = "os-volume-transfer"185 namespace = "http://docs.openstack.org/volume/ext/volume-transfer/" + \186 "api/v1.1"187 updated = "2013-05-29T00:00:00+00:00"188 def get_resources(self):189 resources = []190 res = extensions.ResourceExtension(Volume_transfer.alias,191 VolumeTransferController(),192 collection_actions={'detail':193 'GET'},194 member_actions={'accept': 'POST'})195 resources.append(res)...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!