Best Python code snippet using tempest_python
neptune.py
Source:neptune.py
1from __future__ import print_function2from gremlin_python import statics3from gremlin_python.structure.graph import Graph4from gremlin_python.process.graph_traversal import __5from gremlin_python.process.anonymous_traversal import *6from gremlin_python.process.strategies import *7from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection8from gremlin_python.process.traversal import *9from tornado.httpclient import HTTPError10'''11Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.12SPDX-License-Identifier: MIT-013'''14import os15import json16import urllib.request17import time18import warnings19import sys20from neptune_python_utils import bulkload21from neptune_python_utils.gremlin_utils import GremlinUtils22from neptune_python_utils.endpoints import Endpoints, Endpoint23from neptune_python_utils.bulkload import BulkLoad24class Neptune:25 26 def __init__(self):27 self.connections = []28 29 def close(self):30 for connection in self.connections:31 connection.close()32 33 def remoteConnection(self, neptune_endpoint=None, neptune_port=None, show_endpoint=True):34 connection = GremlinUtils(Endpoints(neptune_endpoint, neptune_port)).remote_connection(show_endpoint)35 self.connections.append(connection)36 return connection 37 38 def graphTraversal(self, neptune_endpoint=None, neptune_port=None, show_endpoint=True, connection=None):39 if connection is None:40 connection = self.remoteConnection(neptune_endpoint, neptune_port, show_endpoint)41 self.connections.append(connection)42 return GremlinUtils(Endpoints(neptune_endpoint, neptune_port)).traversal_source(show_endpoint, connection)43 44 def bulkLoadAsync(self, source, format='csv', role=None, region=None, neptune_endpoint=None, neptune_port=None):45 bulkload = BulkLoad(source, format, role, region=region, endpoints=Endpoints(neptune_endpoint, neptune_port))46 return bulkload.load_async()47 48 def bulkLoad(self, source, format='csv', role=None, region=None, neptune_endpoint=None, neptune_port=None, interval=2): 49 bulkload = BulkLoad(source, format, role, region=region, endpoints=Endpoints(neptune_endpoint, neptune_port))50 bulkload.load(interval)51 52 def bulkLoadStatus(self, status_url):53 return status_url.status()54 55 def sparql_endpoint(self, neptune_endpoint=None, neptune_port=None):56 return Endpoints(neptune_endpoint, neptune_port).sparql_endpoint()57 58 def clear(self, neptune_endpoint=None, neptune_port=None, batch_size=200, edge_batch_size=None, vertex_batch_size=None):59 print('clearing data...')60 self.clearGremlin(neptune_endpoint, neptune_port, batch_size, edge_batch_size, vertex_batch_size)61 self.clearSparql(neptune_endpoint, neptune_port)62 print('done')63 64 def clearGremlin(self, neptune_endpoint=None, neptune_port=None, batch_size=200, edge_batch_size=None, vertex_batch_size=None):65 if edge_batch_size is None:66 edge_batch_size = batch_size67 if vertex_batch_size is None:68 vertex_batch_size = batch_size69 g = self.graphTraversal(neptune_endpoint, neptune_port, False) 70 has_edges = True71 edge_count = None72 while has_edges:73 if edge_count is None:74 print('clearing property graph data [edge_batch_size={}, edge_count=Unknown]...'.format(edge_batch_size))75 else:76 print('clearing property graph data [edge_batch_size={}, edge_count={}]...'.format(edge_batch_size, edge_count))77 g.E().limit(edge_batch_size).drop().toList()78 edge_count = g.E().count().next()79 has_edges = (edge_count > 0)80 has_vertices = True81 vertex_count = None82 while has_vertices:83 if vertex_count is None:84 print('clearing property graph data [vertex_batch_size={}, vertex_count=Unknown]...'.format(vertex_batch_size))85 else:86 print('clearing property graph data [vertex_batch_size={}, vertex_count={}]...'.format(vertex_batch_size, vertex_count))87 g.V().limit(vertex_batch_size).drop().toList()88 vertex_count = g.V().count().next()89 has_vertices = (vertex_count > 0)90 91 def clearSparql(self, neptune_endpoint=None, neptune_port=None):92 print('clearing rdf data...')93 sparql_endpoint = self.sparql_endpoint(neptune_endpoint, neptune_port)94 data = 'update=DROP%20ALL'95 request_parameters = sparql_endpoint.prepare_request('POST', data, headers={'Content-Type':'application/x-www-form-urlencoded'})96 req = urllib.request.Request(request_parameters.uri, data=data.encode('utf8'), headers=request_parameters.headers)97 response = urllib.request.urlopen(req, timeout=3600)98 99 100statics.load_statics(globals())101del globals()['range']102del globals()['map']...
neptune-util.py
Source:neptune-util.py
1from __future__ import print_function2from gremlin_python import statics3from gremlin_python.structure.graph import Graph4from gremlin_python.process.graph_traversal import __5from gremlin_python.process.anonymous_traversal import *6from gremlin_python.process.strategies import *7from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection8from gremlin_python.process.traversal import *9from tornado.httpclient import HTTPError10import os11import json12import urllib.request13import time14import warnings15import sys16class Neptune:17 def remoteConnection(self, neptune_endpoint=None, neptune_port=None, show_endpoint=True):18 neptune_gremlin_endpoint = self.gremlin_endpoint(neptune_endpoint, neptune_port)19 if show_endpoint:20 print('gremlin: ' + neptune_gremlin_endpoint)21 retry_count = 022 while True:23 try:24 return DriverRemoteConnection(neptune_gremlin_endpoint,'g')25 except HTTPError as e:26 exc_info = sys.exc_info()27 if retry_count < 3:28 retry_count+=129 print('Connection timeout. Retrying...')30 else:31 raise exc_info[0].with_traceback(exc_info[1], exc_info[2])32 33 def graphTraversal(self, neptune_endpoint=None, neptune_port=None, show_endpoint=True, connection=None):34 if connection is None:35 connection = self.remoteConnection(neptune_endpoint, neptune_port, show_endpoint)36 return traversal().withRemote(connection)37 38 def __wait(self, status_url, interval):39 while True:40 status, jsonresponse = self.bulkLoadStatus(status_url)41 if status == 'LOAD_COMPLETED':42 print('load completed')43 break44 if status == 'LOAD_IN_PROGRESS':45 print('loading... {} records inserted'.format(jsonresponse['payload']['overallStatus']['totalRecords']))46 time.sleep(interval)47 else:48 raise Exception(jsonresponse)49 def gremlin_endpoint(self, neptune_endpoint=None, neptune_port=None):50 return self.__endpoint('ws', self.__neptune_endpoint(neptune_endpoint), self.__neptune_port(neptune_port), 'gremlin')51 52 def __endpoint(self, protocol, neptune_endpoint, neptune_port, suffix):53 return '{}://{}:{}/{}'.format(protocol, neptune_endpoint, neptune_port, suffix)54 55 def __neptune_endpoint(self, neptune_endpoint=None):56 if neptune_endpoint is None:57 neptune_endpoint = os.environ['NEPTUNE_CLUSTER_ENDPOINT']58 return neptune_endpoint59 60 def __neptune_port(self, neptune_port=None):61 if neptune_port is None:62 neptune_port = os.environ['NEPTUNE_CLUSTER_PORT']63 return neptune_port64 65statics.load_statics(globals())66del globals()['range']67del globals()['map']...
api.py
Source:api.py
1from functools import partial2from .schema import BaseSchema3from .views import BaseAPIView, api_error4class Api:5 def __init__(self, app=None, db=None):6 self.resources = []7 if app and db:8 self.init_app(app, db)9 def init_app(self, app, db):10 self.app = app11 self.db = db12 for model, endpoint in self.resources:13 self._register_resource(model, endpoint)14 self.add_error_handlers()15 app.extensions["api"] = self16 def add_error_handlers(self):17 for status_code in (400, 405, 500):18 self.app.errorhandler(status_code)(partial(api_error, status_code))19 def register_resource(self, model, endpoint):20 if self.app is None or self.db is None:21 self.resources.append((model, endpoint))22 else:23 self._register_resource(model, endpoint)24 def _register_resource(self, model, endpoint):25 resource_name = model.__tablename__.title().replace("_", "")26 schema_meta = type(27 f"{resource_name}SchemaMeta",28 (BaseSchema.Meta,),29 {"model": model, "sqla_session": self.db.session},30 )31 schema = type(f"{resource_name}Schema", (BaseSchema,), {"Meta": schema_meta},)32 view = type(33 f"{resource_name}API",34 (BaseAPIView,),35 {"model": model, "schema_cls": schema, "db": self.db},36 )37 self._register_endpoint(view, endpoint)38 def _register_endpoint(self, view, url):39 model_name = view.model.__tablename__40 base_endpoint = f"{model_name}_api"41 index_endpoint = base_endpoint + "/index"42 self.app.add_url_rule(43 url,44 defaults={"id": None},45 view_func=view.as_view(index_endpoint),46 methods=["GET"],47 endpoint=index_endpoint,48 )49 create_endpoint = base_endpoint + "/create"50 self.app.add_url_rule(51 url,52 view_func=view.as_view(create_endpoint),53 methods=["POST"],54 endpoint=create_endpoint,55 )56 show_endpoint = base_endpoint + "/show"57 self.app.add_url_rule(58 f"{url}<int:id>/",59 view_func=view.as_view(show_endpoint),60 methods=["GET", "PUT", "DELETE"],61 endpoint=show_endpoint,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!