Best Python code snippet using localstack_python
test_kinesis.py
Source: test_kinesis.py
...9 region_name='us-east-1',10 aws_access_key_id='FAKE_AWS_ACCESS_KEY_ID',11 aws_secret_access_key='FAKE_AWS_SECRET_ACCESS_KEY'12 )13def create_stream(client, stream_name):14 client.create_stream(StreamName=stream_name, ShardCount=1)15@mock_kinesis16def test_deliver_single_record_dict():17 client = setup_connection()18 create_stream(client, FAKE_STREAM_NAME)19 data = [{"id": "1", "example": "content"}]20 try:21 response = kinesis_deliver(22 client, FAKE_STREAM_NAME, PARTITION_KEY, data)23 assert False24 except Exception:25 assert True26@mock_kinesis27def test_deliver_single_record():28 client = setup_connection()29 create_stream(client, FAKE_STREAM_NAME)30 data = [{"id": "1", "example": "content"}]31 response = kinesis_deliver(client, FAKE_STREAM_NAME, PARTITION_KEY, data)32 assert response['ResponseMetadata']['HTTPStatusCode'] is 20033@mock_kinesis34def test_deliver_multiple_records():35 client = setup_connection()36 create_stream(client, FAKE_STREAM_NAME)37 data = [38 {"id": "1", "example": "content1"},39 {"id": "2", "example": "content2"}40 ]41 response = kinesis_deliver(client, FAKE_STREAM_NAME, PARTITION_KEY, data)42 assert response['ResponseMetadata']['HTTPStatusCode'] is 20043@mock_kinesis44def test_deliver_raise_on_partition_key_missing():45 client = setup_connection()46 create_stream(client, FAKE_STREAM_NAME)47 data = {"example": "content"}48 try:49 kinesis_deliver(client, FAKE_STREAM_NAME, PARTITION_KEY, data)50 assert False51 except Exception:52 assert True53@mock_kinesis54def test_deliver_raise_on_empty_dataset():55 client = setup_connection()56 create_stream(client, FAKE_STREAM_NAME)57 data = []58 try:59 kinesis_deliver(client, FAKE_STREAM_NAME, PARTITION_KEY, data)60 assert False61 except Exception:62 assert True63@mock_kinesis64def test_deliver_raise_on_nonexistent_stream():65 client = setup_connection()66 create_stream(client, FAKE_STREAM_NAME)67 data = {"example": "content"}68 try:69 kinesis_deliver(client, 'another-name', PARTITION_KEY, data)70 assert False71 except Exception:72 assert True73@mock_kinesis74def test_setup_client_kinesis():75 config = {76 "region_name": 'us-east-1',77 "aws_access_key_id": 'FAKE_AWS_ACCESS_KEY_ID',78 "aws_secret_access_key": 'FAKE_AWS_SECRET_ACCESS_KEY'79 }80 client = kinesis_setup_client(config)...
on_create_stream.py
Source: on_create_stream.py
...3from dipdup.context import HandlerContext4import radiate.models as models5from radiate.types.radiate.parameter.create_stream import CreateStreamParameter6from radiate.types.radiate.storage import RadiateStorage, TokenItem, TokenItem17async def on_create_stream(8 ctx: HandlerContext,9 create_stream: Transaction[CreateStreamParameter, RadiateStorage],10) -> None:11 sender = create_stream.data.sender_address12 # temp = await models.Stream()13 id = str((await models.Stream.filter().count()))14 receiver = create_stream.storage.streams[id].receiver15 startTime = create_stream.storage.streams[id].startTime16 stopTime = create_stream.storage.streams[id].stopTime17 ratePerSecond = create_stream.storage.streams[id].ratePerSecond18 deposit = create_stream.storage.streams[id].deposit19 createdOn = create_stream.data.timestamp20 remainingBalance = deposit21 token = models.TokenType.TEZ...
analysis.py
Source: analysis.py
2import typing3import ply.lex4class Analyzer(abc.ABC):5 @abc.abstractmethod6 def create_stream(self, source: str) -> typing.Iterable[str]:7 pass8class Tokenizer(abc.ABC):9 @abc.abstractmethod10 def create_stream(self, source: str) -> typing.Iterable[str]:11 pass12class StandardTokenizer(Tokenizer):13 tokens = ("ALPHANUM", "DECIMAL")14 t_ignore_WHITESPACE = r"\s+"15 t_ALPHANUM = "\\w+"16 t_DECIMAL = (17 r"( 0 | [1-9] [0-9]* ) (DOT [0-9]+)? ( [eE] [+\-]? [0-9]+ )? [fFdD]?"18 )19 def t_error(self, t):20 pass21 def __init__(self, max_token_length: int) -> None:22 self.max_token_length = max_token_length23 def create_stream(self, source: str) -> typing.Iterable[str]:24 lexer = ply.lex.lex(module=self)25 lexer.input(source)26 while True:27 token = lexer.token()28 if token is None:29 break30 if len(token.value) > self.max_token_length:31 continue32 yield token.value33class StandardAnalyzer(Analyzer):34 def __init__(self, max_token_length: int = 255) -> None:35 self.tokenizer = StandardTokenizer(max_token_length)36 def create_stream(self, source: str) -> typing.Iterable[str]:...
Check out the latest blogs from LambdaTest on this topic:
The fact is not alien to us anymore that cross browser testing is imperative to enhance your application’s user experience. Enhanced knowledge of popular and highly acclaimed testing frameworks goes a long way in developing a new app. It holds more significance if you are a full-stack developer or expert programmer.
QA testers have a unique role and responsibility to serve the customer. Serving the customer in software testing means protecting customers from application defects, failures, and perceived failures from missing or misunderstood requirements. Testing for known requirements based on documentation or discussion is the core of the testing profession. One unique way QA testers can both differentiate themselves and be innovative occurs when senseshaping is used to improve the application user experience.
Having a good web design can empower business and make your brand stand out. According to a survey by Top Design Firms, 50% of users believe that website design is crucial to an organization’s overall brand. Therefore, businesses should prioritize website design to meet customer expectations and build their brand identity. Your website is the face of your business, so it’s important that it’s updated regularly as per the current web design trends.
Enterprise resource planning (ERP) is a form of business process management software—typically a suite of integrated applications—that assists a company in managing its operations, interpreting data, and automating various back-office processes. The introduction of a new ERP system is analogous to the introduction of a new product into the market. If the product is not handled appropriately, it will fail, resulting in significant losses for the business. Most significantly, the employees’ time, effort, and morale would suffer as a result of the procedure.
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!