How to use json method in yandex-tank

Best Python code snippet using yandex-tank

views.py

Source:views.py Github

copy

Full Screen

1from django.http import HttpResponse, JsonResponse2from django.shortcuts import render, get_object_or_4043from django.views.generic import View4from django.core.exceptions import ValidationError5from django.contrib.auth import hashers6from django.views.decorators.csrf import csrf_exempt7from django.utils.decorators import method_decorator8from datetime import datetime, timezone9import json10from fafs_api.models import User, Address, School, Category, Product, Transaction, Authenticator11def get_key(dictionary, key):12 try:13 return dictionary[key]14 except KeyError:15 return None16def retrieve_all_fields(dictionary, field_list):17 return_dict = {}18 for field in field_list:19 value = get_key(dictionary, field)20 return_dict[field] = value21 return return_dict22def get_object_or_none(model, **kwargs):23 try:24 return model.objects.get(**kwargs)25 except model.DoesNotExist:26 return None27def json_encode_dict_and_status(dictionary, status):28 response_dict = {}29 response_dict["status"] = status30 response_dict["response"] = dictionary31 return response_dict32class AuthView(View):33 model = Authenticator34 required_fields = ['user_id']35 @method_decorator(csrf_exempt)36 def dispatch(self, request, *args, **kwargs):37 return super(AuthView, self).dispatch(request, *args, **kwargs)38 def get(self, request, token=None):39 status = False40 if token is not None:41 queryset = get_object_or_none(42 self.model,43 token=token44 )45 if queryset is not None:46 status = True47 json_data = {48 "token": queryset.token,49 "user_id":queryset.user.pk,50 "date_created":queryset.date_created51 }52 else:53 status = False54 json_data = {"message": "No authenticator with token found"}55 else:56 status = True57 queryset = self.model.objects.all()58 json_data = list(queryset.values('token', 'user_id', 'date_created'))59 return JsonResponse(json_encode_dict_and_status(json_data, status))60 def post(self, request):61 json_data = json.loads(request.body.decode('utf-8'))62 field_dict = retrieve_all_fields(63 json_data,64 self.required_fields65 )66 try:67 user = User.objects.get(pk=field_dict['user_id'])68 auth = Authenticator()69 auth.user = user70 auth.save()71 response_data = {72 "token": auth.token,73 "user_id": auth.user.pk,74 "date_created": auth.date_created75 }76 status = True77 except User.DoesNotExist:78 status = False79 response_data = {"message": "Invalid user id"}80 return JsonResponse(json_encode_dict_and_status(response_data, status))81 def delete(self, request, token=None):82 status = False83 obj = get_object_or_none(self.model, token=token)84 if obj is not None:85 obj.delete()86 status = True87 return JsonResponse(json_encode_dict_and_status({},status))88@method_decorator(csrf_exempt)89def auth_check(request):90 required_fields = ['authenticator']91 if request.method == "POST":92 json_data = json.loads(request.body.decode('utf-8'))93 field_dict = retrieve_all_fields(94 json_data,95 required_fields96 )97 status = False98 try:99 auth = Authenticator.objects.get(token=field_dict['authenticator'])100 user_id = auth.user.pk101 status = True102 response_data = {"user_id": user_id}103 except Authenticator.DoesNotExist:104 response_data = {"message": "Invalid authenticator"}105 return JsonResponse(json_encode_dict_and_status(response_data, status))106@method_decorator(csrf_exempt)107def users_check_pass(request):108 required_fields = ['email', 'password']109 if request.method == "POST":110 json_data = json.loads(request.body.decode('utf-8'))111 field_dict = retrieve_all_fields(112 json_data,113 required_fields114 )115 status = False116 response_data = {"message": "Incorrect email/password"}117 try:118 login_user = User.objects.get(email=field_dict['email'])119 # Check password120 hashed_password = login_user.password121 if hashers.check_password(field_dict['password'], hashed_password):122 status = True123 response_data = {"user_id": login_user.pk}124 except User.DoesNotExist:125 pass126 return JsonResponse(json_encode_dict_and_status(response_data, status))127class UserView(View):128 required_fields = ['email', 'school_id', 'password']129 update_fields = ['user_pk','email','password']130 model = User131 @method_decorator(csrf_exempt)132 def dispatch(self, request, *args, **kwargs):133 return super(UserView, self).dispatch(request, *args, **kwargs)134 def get(self, request, pk=None):135 status = False136 if pk is not None:137 queryset = get_object_or_none(138 self.model,139 pk=pk140 )141 if queryset is not None:142 status = True143 json_data = {144 "pk":queryset.pk,145 "email":queryset.email,146 "school_id":queryset.school_id.pk,147 "password":queryset.password148 }149 else:150 status = False151 json_data = {}152 else:153 status = True154 queryset = self.model.objects.all()155 json_data = list(queryset.values('pk','email','school_id', 'password'))156 return JsonResponse(json_encode_dict_and_status(json_data, status))157 def post(self, request):158 status = True159 json_data = json.loads(request.body.decode('utf-8'))160 field_dict = retrieve_all_fields(161 json_data,162 self.required_fields163 )164 try:165 user = self.model.objects.create_user(166 email=field_dict['email'],167 password=field_dict['password'],168 school=field_dict['school_id']169 )170 json_data = {171 "pk":user.pk,172 "email":user.email,173 "school_id":user.school_id.pk174 }175 except ValidationError as e:176 json_data = e.message_dict177 status = False178 return JsonResponse(json_encode_dict_and_status(json_data, status))179 def patch(self, request):180 status = True181 json_data = json.loads(request.body.decode('utf-8'))182 field_dict = retrieve_all_fields(183 json_data,184 self.update_fields185 )186 try:187 user = self.model.objects.update_user(188 user_pk = get_key(field_dict,'user_pk'),189 email = get_key(field_dict,'email'),190 password = get_key(field_dict,'password')191 )192 json_data = {193 "pk":user.pk,194 "email":user.email,195 "school_id":user.school_id.pk196 }197 except ValidationError as e:198 json_data = e.message_dict199 status = False200 return JsonResponse(json_encode_dict_and_status(json_data, status))201 def delete(self, request, pk=None):202 status = False203 obj = get_object_or_none(self.model, pk=pk)204 if obj is not None:205 obj.delete()206 status = True207 return JsonResponse(json_encode_dict_and_status({},status))208class AddressView(View):209 required_fields = ['street_number', 'street_name', 'city', 'state', 'zipcode', 'description']210 model = Address211 @method_decorator(csrf_exempt)212 def dispatch(self, request, *args, **kwargs):213 return super(AddressView, self).dispatch(request, *args, **kwargs)214 def get(self, request, pk=None):215 status = True216 if pk is not None:217 queryset = get_object_or_none(218 self.model,219 pk=pk220 )221 if queryset is not None:222 json_data = {223 "pk": queryset.pk,224 "street_number": queryset.street_number,225 "street_name": queryset.street_name,226 "city": queryset.city,227 "state": queryset.state,228 "zipcode": queryset.zipcode,229 "description": queryset.description,230 "address_2": queryset.address_2231 }232 else:233 status = False234 json_data = {}235 else:236 queryset = self.model.objects.all()237 json_data = list(queryset.values('pk', 'street_number', 'street_name',238 'city', 'state', 'zipcode', 'address_2'))239 return JsonResponse(json_encode_dict_and_status(json_data, status))240 def post(self, request):241 status = True242 json_data = json.loads(request.body.decode('utf-8'))243 field_dict = retrieve_all_fields(244 json_data,245 self.required_fields246 )247 try:248 address = self.model(249 street_number = field_dict['street_number'],250 street_name = field_dict['street_name'],251 city = field_dict['city'],252 state = field_dict['state'],253 zipcode = field_dict['zipcode'],254 address_2 = field_dict['address_2']255 )256 address.clean()257 address.save()258 json_data = {259 "street_number": address.street_number,260 "street_name": address.street_name,261 "city": address.city,262 "state": address.state,263 "zipcode": address.zipcode,264 "address_2": address.address_2265 }266 except ValidationError as e:267 status = False268 json_data = e.message_dict269 return JsonResponse(json_encode_dict_and_status(json_data, status))270 def delete(self, request, pk=None):271 status = False272 obj = get_object_or_none(self.model, pk=pk)273 if obj is not None:274 obj.delete()275 status = True276 return JsonResponse(json_encode_dict_and_status({},status))277class SchoolView(View):278 required_fields = ['name','city','state']279 model = School280 @method_decorator(csrf_exempt)281 def dispatch(self, request, *args, **kwargs):282 return super(SchoolView, self).dispatch(request, *args, **kwargs)283 def get(self, request, pk=None):284 status = True285 if pk is not None:286 queryset = get_object_or_none(287 self.model,288 pk=pk)289 if queryset is not None:290 json_data = {291 "pk":queryset.pk,292 "name":queryset.name,293 "city":queryset.city,294 "state":queryset.state295 }296 else:297 status = False298 json_data = {}299 else:300 queryset = self.model.objects.all()301 json_data = list(queryset.values('pk','name','city','state'))302 return JsonResponse(json_encode_dict_and_status(json_data, status))303 def post(self, request):304 status = True305 json_data = json.loads(request.body.decode('utf-8'))306 field_dict = retrieve_all_fields(307 json_data,308 self.required_fields309 )310 try:311 school = self.model(312 name=field_dict['name'],313 city=field_dict['city'],314 state=field_dict['state']315 )316 school.clean()317 school.save()318 json_data = {319 "name":school.name,320 "city":school.city,321 "state":school.state,322 "pk":school.pk323 }324 except ValidationError as e:325 status = False326 json_data = e.message_dict327 return JsonResponse(json_encode_dict_and_status(json_data, status))328 def delete(self, request, pk=None):329 status = False330 obj = get_object_or_none(self.model, pk=pk)331 if obj is not None:332 obj.delete()333 status = True334 return JsonResponse(json_encode_dict_and_status({},status))335class CategoryView(View):336 required_fields = ['name', 'description']337 model = Category338 @method_decorator(csrf_exempt)339 def dispatch(self, request, *args, **kwargs):340 return super(CategoryView, self).dispatch(request, *args, **kwargs)341 def get(self, request, pk=None):342 status = True343 if pk is not None:344 queryset = get_object_or_none(345 self.model,346 pk=pk347 )348 if queryset is not None:349 json_data = {350 "pk": queryset.pk,351 "name": queryset.name,352 "description": queryset.description353 }354 else:355 status = False356 json_data = {}357 else:358 queryset = self.model.objects.all()359 json_data = list(queryset.values('pk', 'name', 'description'))360 return JsonResponse(json_encode_dict_and_status(json_data, status))361 def post(self, request):362 status = True363 json_data = json.loads(request.body.decode('utf-8'))364 field_dict = retrieve_all_fields(365 json_data,366 self.required_fields367 )368 try:369 category = self.model(370 name=field_dict['name'],371 description=field_dict['description']372 )373 category.clean()374 category.save()375 json_data = {376 "name": category.name,377 "description": category.description378 }379 except ValidationError as e:380 status = False381 json_data = e.message_dict382 return JsonResponse(json_encode_dict_and_status(json_data, status))383 def delete(self, request, pk=None):384 status = False385 obj = get_object_or_none(self.model, pk=pk)386 if obj is not None:387 obj.delete()388 status = True389 return JsonResponse(json_encode_dict_and_status({},status))390 def delete(self, request, pk=None):391 status = False392 obj = get_object_or_none(self.model, pk=pk)393 if obj is not None:394 obj.delete()395 status = True396 return JsonResponse(json_encode_dict_and_status({},status))397class ProductView(View):398 required_fields = ['name', 'description', 'category_id', 'price', 'owner_id', 'pick_up']399 model = Product400 @method_decorator(csrf_exempt)401 def dispatch(self, request, *args, **kwargs):402 return super(ProductView, self).dispatch(request, *args, **kwargs)403 def get(self, request, pk=None):404 status = True405 if pk is not None:406 queryset = get_object_or_none(407 self.model,408 pk=pk409 )410 if queryset is not None:411 json_data = {412 "pk": queryset.pk,413 "name": queryset.name,414 "description": queryset.description,415 "category_id": queryset.category_id.pk,416 "price": queryset.price,417 "owner_id": queryset.owner_id.pk,418 "pick_up": queryset.pick_up,419 "time_posted": queryset.time_posted,420 "time_updated": queryset.time_updated421 }422 else:423 status = False424 json_data = {}425 else:426 queryset = self.model.objects.all()427 json_data = list(queryset.values('pk','name','description',428 'category_id', 'price', 'owner_id', 'pick_up', 'time_posted', 'time_updated'))429 return JsonResponse(json_encode_dict_and_status(json_data, status))430 def post(self, request):431 status = True432 json_data = json.loads(request.body.decode('utf-8'))433 field_dict = retrieve_all_fields(434 json_data,435 self.required_fields436 )437 try:438 category = None439 owner = None440 try:441 category = Category.objects.get(pk=field_dict['category_id'])442 owner = User.objects.get(pk=field_dict['owner_id'])443 except (Category.DoesNotExist, User.DoesNotExist):444 raise ValidationError({445 "Error":"Invalid category or owner id"446 })447 product = self.model(448 name=field_dict['name'],449 description=field_dict['description'],450 category_id=category,451 price=field_dict['price'],452 owner_id=owner,453 pick_up=field_dict['pick_up']454 )455 product.clean()456 product.save()457 json_data = {458 "pk": product.pk,459 "name": product.name,460 "description": product.description,461 "category_id": product.category_id.pk,462 "price": product.price,463 "owner_id": product.owner_id.pk,464 "pick_up": product.pick_up,465 "time_posted": product.time_posted,466 "time_updated": product.time_updated467 }468 except ValidationError as e:469 status = False470 json_data = e.message_dict471 return JsonResponse(json_encode_dict_and_status(json_data, status))472 def delete(self, request, pk=None):473 status = False474 obj = get_object_or_none(self.model, pk=pk)475 if obj is not None:476 obj.delete()477 status = True478 return JsonResponse(json_encode_dict_and_status({},status))479class TransactionView(View):480 required_fields = ['seller', 'buyer', 'product_id']481 model = Transaction482 @method_decorator(csrf_exempt)483 def dispatch(self, request, *args, **kwargs):484 return super(TransactionView, self).dispatch(request, *args, **kwargs)485 def get(self, request, pk=None):486 status = True487 if pk is not None:488 queryset = get_object_or_none(489 self.model,490 pk=pk491 )492 if queryset is not None:493 json_data = {494 "pk": queryset.pk,495 "seller": queryset.seller.pk,496 "buyer": queryset.buyer.pk,497 "product_id": queryset.product_id.pk498 }499 else:500 status = False501 json_data = {}502 else:503 queryset = self.model.objects.all()504 json_data = list(queryset.values('pk', 'seller', 'buyer', 'product_id'))505 return JsonResponse(json_encode_dict_and_status(json_data, status))506 def post(self, request):507 status = True508 json_data = json.loads(request.body.decode('utf-8'))509 field_dict = retrieve_all_fields(510 json_data,511 self.required_fields512 )513 try:514 seller_id = None515 buyer_id = None516 product_id = None517 try:518 seller_id = User.objects.get(pk=field_dict['seller'])519 buyer_id = User.objects.get(pk=field_dict['buyer'])520 product_id = Product.objects.get(pk=field_dict['product_id'])521 except User.DoesNotExist:522 raise ValidationError({523 "Error":"Invalid seller id"524 })525 transaction = self.model(526 seller=seller_id,527 buyer=buyer_id,528 product_id=product_id529 )530 transaction.clean()531 transaction.save()532 json_data = {533 "seller": transaction.seller.pk,534 "buyer": transaction.buyer.pk,535 "product_id": transaction.product_id.pk536 }537 except ValidationError as e:538 status = False539 json_data = e.message_dict540 return JsonResponse(json_encode_dict_and_status(json_data, status))541 def delete(self, request, pk=None):542 status = False543 obj = get_object_or_none(self.model, pk=pk)544 if obj is not None:545 obj.delete()546 status = True...

Full Screen

Full Screen

interface_rest.py

Source:interface_rest.py Github

copy

Full Screen

1#!/usr/bin/env python32# Copyright (c) 2014-2017 The Bitcoin Core developers3# Distributed under the MIT software license, see the accompanying4# file COPYING or http://www.opensource.org/licenses/mit-license.php.5"""Test the REST API."""6from test_framework.test_framework import BitcoinTestFramework7from test_framework.util import *8from struct import *9from io import BytesIO10from codecs import encode11import http.client12import urllib.parse13def deser_uint256(f):14 r = 015 for i in range(8):16 t = unpack(b"<I", f.read(4))[0]17 r += t << (i * 32)18 return r19#allows simple http get calls20def http_get_call(host, port, path, response_object = 0):21 conn = http.client.HTTPConnection(host, port)22 conn.request('GET', path)23 if response_object:24 return conn.getresponse()25 return conn.getresponse().read().decode('utf-8')26#allows simple http post calls with a request body27def http_post_call(host, port, path, requestdata = '', response_object = 0):28 conn = http.client.HTTPConnection(host, port)29 conn.request('POST', path, requestdata)30 if response_object:31 return conn.getresponse()32 return conn.getresponse().read()33class RESTTest (BitcoinTestFramework):34 FORMAT_SEPARATOR = "."35 def set_test_params(self):36 self.setup_clean_chain = True37 self.num_nodes = 338 def setup_network(self, split=False):39 super().setup_network()40 connect_nodes_bi(self.nodes, 0, 2)41 def run_test(self):42 url = urllib.parse.urlparse(self.nodes[0].url)43 self.log.info("Mining blocks...")44 self.nodes[0].generate(1)45 self.sync_all()46 self.nodes[2].generate(100)47 self.sync_all()48 assert_equal(self.nodes[0].getbalance(), 50)49 txid = self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 0.1)50 self.sync_all()51 self.nodes[2].generate(1)52 self.sync_all()53 bb_hash = self.nodes[0].getbestblockhash()54 assert_equal(self.nodes[1].getbalance(), Decimal("0.1")) #balance now should be 0.1 on node 155 # load the latest 0.1 tx over the REST API56 json_string = http_get_call(url.hostname, url.port, '/rest/tx/'+txid+self.FORMAT_SEPARATOR+"json")57 json_obj = json.loads(json_string)58 vintx = json_obj['vin'][0]['txid'] # get the vin to later check for utxo (should be spent by then)59 # get n of 0.1 outpoint60 n = 061 for vout in json_obj['vout']:62 if vout['value'] == 0.1:63 n = vout['n']64 #######################################65 # GETUTXOS: query an unspent outpoint #66 #######################################67 json_request = '/checkmempool/'+txid+'-'+str(n)68 json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')69 json_obj = json.loads(json_string)70 #check chainTip response71 assert_equal(json_obj['chaintipHash'], bb_hash)72 #make sure there is one utxo73 assert_equal(len(json_obj['utxos']), 1)74 assert_equal(json_obj['utxos'][0]['value'], 0.1)75 #################################################76 # GETUTXOS: now query an already spent outpoint #77 #################################################78 json_request = '/checkmempool/'+vintx+'-0'79 json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')80 json_obj = json.loads(json_string)81 #check chainTip response82 assert_equal(json_obj['chaintipHash'], bb_hash)83 #make sure there is no utox in the response because this oupoint has been spent84 assert_equal(len(json_obj['utxos']), 0)85 #check bitmap86 assert_equal(json_obj['bitmap'], "0")87 ##################################################88 # GETUTXOS: now check both with the same request #89 ##################################################90 json_request = '/checkmempool/'+txid+'-'+str(n)+'/'+vintx+'-0'91 json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')92 json_obj = json.loads(json_string)93 assert_equal(len(json_obj['utxos']), 1)94 assert_equal(json_obj['bitmap'], "10")95 #test binary response96 bb_hash = self.nodes[0].getbestblockhash()97 binaryRequest = b'\x01\x02'98 binaryRequest += hex_str_to_bytes(txid)99 binaryRequest += pack("i", n)100 binaryRequest += hex_str_to_bytes(vintx)101 binaryRequest += pack("i", 0)102 bin_response = http_post_call(url.hostname, url.port, '/rest/getutxos'+self.FORMAT_SEPARATOR+'bin', binaryRequest)103 output = BytesIO()104 output.write(bin_response)105 output.seek(0)106 chainHeight = unpack("i", output.read(4))[0]107 hashFromBinResponse = hex(deser_uint256(output))[2:].zfill(64)108 assert_equal(bb_hash, hashFromBinResponse) #check if getutxo's chaintip during calculation was fine109 assert_equal(chainHeight, 102) #chain height must be 102110 ############################111 # GETUTXOS: mempool checks #112 ############################113 # do a tx and don't sync114 txid = self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 0.1)115 json_string = http_get_call(url.hostname, url.port, '/rest/tx/'+txid+self.FORMAT_SEPARATOR+"json")116 json_obj = json.loads(json_string)117 vintx = json_obj['vin'][0]['txid'] # get the vin to later check for utxo (should be spent by then)118 # get n of 0.1 outpoint119 n = 0120 for vout in json_obj['vout']:121 if vout['value'] == 0.1:122 n = vout['n']123 json_request = '/'+txid+'-'+str(n)124 json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')125 json_obj = json.loads(json_string)126 assert_equal(len(json_obj['utxos']), 0) #there should be an outpoint because it has just added to the mempool127 json_request = '/checkmempool/'+txid+'-'+str(n)128 json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')129 json_obj = json.loads(json_string)130 assert_equal(len(json_obj['utxos']), 1) #there should be an outpoint because it has just added to the mempool131 #do some invalid requests132 json_request = '{"checkmempool'133 response = http_post_call(url.hostname, url.port, '/rest/getutxos'+self.FORMAT_SEPARATOR+'json', json_request, True)134 assert_equal(response.status, 400) #must be a 400 because we send an invalid json request135 json_request = '{"checkmempool'136 response = http_post_call(url.hostname, url.port, '/rest/getutxos'+self.FORMAT_SEPARATOR+'bin', json_request, True)137 assert_equal(response.status, 400) #must be a 400 because we send an invalid bin request138 response = http_post_call(url.hostname, url.port, '/rest/getutxos/checkmempool'+self.FORMAT_SEPARATOR+'bin', '', True)139 assert_equal(response.status, 400) #must be a 400 because we send an invalid bin request140 #test limits141 json_request = '/checkmempool/'142 for x in range(0, 20):143 json_request += txid+'-'+str(n)+'/'144 json_request = json_request.rstrip("/")145 response = http_post_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json', '', True)146 assert_equal(response.status, 400) #must be a 400 because we exceeding the limits147 json_request = '/checkmempool/'148 for x in range(0, 15):149 json_request += txid+'-'+str(n)+'/'150 json_request = json_request.rstrip("/")151 response = http_post_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json', '', True)152 assert_equal(response.status, 200) #must be a 200 because we are within the limits153 self.nodes[0].generate(1) #generate block to not affect upcoming tests154 self.sync_all()155 ################156 # /rest/block/ #157 ################158 # check binary format159 response = http_get_call(url.hostname, url.port, '/rest/block/'+bb_hash+self.FORMAT_SEPARATOR+"bin", True)160 assert_equal(response.status, 200)161 assert_greater_than(int(response.getheader('content-length')), 80)162 response_str = response.read()163 # compare with block header164 response_header = http_get_call(url.hostname, url.port, '/rest/headers/1/'+bb_hash+self.FORMAT_SEPARATOR+"bin", True)165 assert_equal(response_header.status, 200)166 assert_equal(int(response_header.getheader('content-length')), 80)167 response_header_str = response_header.read()168 assert_equal(response_str[0:80], response_header_str)169 # check block hex format170 response_hex = http_get_call(url.hostname, url.port, '/rest/block/'+bb_hash+self.FORMAT_SEPARATOR+"hex", True)171 assert_equal(response_hex.status, 200)172 assert_greater_than(int(response_hex.getheader('content-length')), 160)173 response_hex_str = response_hex.read()174 assert_equal(encode(response_str, "hex_codec")[0:160], response_hex_str[0:160])175 # compare with hex block header176 response_header_hex = http_get_call(url.hostname, url.port, '/rest/headers/1/'+bb_hash+self.FORMAT_SEPARATOR+"hex", True)177 assert_equal(response_header_hex.status, 200)178 assert_greater_than(int(response_header_hex.getheader('content-length')), 160)179 response_header_hex_str = response_header_hex.read()180 assert_equal(response_hex_str[0:160], response_header_hex_str[0:160])181 assert_equal(encode(response_header_str, "hex_codec")[0:160], response_header_hex_str[0:160])182 # check json format183 block_json_string = http_get_call(url.hostname, url.port, '/rest/block/'+bb_hash+self.FORMAT_SEPARATOR+'json')184 block_json_obj = json.loads(block_json_string)185 assert_equal(block_json_obj['hash'], bb_hash)186 # compare with json block header187 response_header_json = http_get_call(url.hostname, url.port, '/rest/headers/1/'+bb_hash+self.FORMAT_SEPARATOR+"json", True)188 assert_equal(response_header_json.status, 200)189 response_header_json_str = response_header_json.read().decode('utf-8')190 json_obj = json.loads(response_header_json_str, parse_float=Decimal)191 assert_equal(len(json_obj), 1) #ensure that there is one header in the json response192 assert_equal(json_obj[0]['hash'], bb_hash) #request/response hash should be the same193 #compare with normal RPC block response194 rpc_block_json = self.nodes[0].getblock(bb_hash)195 assert_equal(json_obj[0]['hash'], rpc_block_json['hash'])196 assert_equal(json_obj[0]['confirmations'], rpc_block_json['confirmations'])197 assert_equal(json_obj[0]['height'], rpc_block_json['height'])198 assert_equal(json_obj[0]['version'], rpc_block_json['version'])199 assert_equal(json_obj[0]['merkleroot'], rpc_block_json['merkleroot'])200 assert_equal(json_obj[0]['time'], rpc_block_json['time'])201 assert_equal(json_obj[0]['nonce'], rpc_block_json['nonce'])202 assert_equal(json_obj[0]['bits'], rpc_block_json['bits'])203 assert_equal(json_obj[0]['difficulty'], rpc_block_json['difficulty'])204 assert_equal(json_obj[0]['chainwork'], rpc_block_json['chainwork'])205 assert_equal(json_obj[0]['previousblockhash'], rpc_block_json['previousblockhash'])206 #see if we can get 5 headers in one response207 self.nodes[1].generate(5)208 self.sync_all()209 response_header_json = http_get_call(url.hostname, url.port, '/rest/headers/5/'+bb_hash+self.FORMAT_SEPARATOR+"json", True)210 assert_equal(response_header_json.status, 200)211 response_header_json_str = response_header_json.read().decode('utf-8')212 json_obj = json.loads(response_header_json_str)213 assert_equal(len(json_obj), 5) #now we should have 5 header objects214 # do tx test215 tx_hash = block_json_obj['tx'][0]['txid']216 json_string = http_get_call(url.hostname, url.port, '/rest/tx/'+tx_hash+self.FORMAT_SEPARATOR+"json")217 json_obj = json.loads(json_string)218 assert_equal(json_obj['txid'], tx_hash)219 # check hex format response220 hex_string = http_get_call(url.hostname, url.port, '/rest/tx/'+tx_hash+self.FORMAT_SEPARATOR+"hex", True)221 assert_equal(hex_string.status, 200)222 assert_greater_than(int(response.getheader('content-length')), 10)223 # check block tx details224 # let's make 3 tx and mine them on node 1225 txs = []226 txs.append(self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 11))227 txs.append(self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 11))228 txs.append(self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 11))229 self.sync_all()230 # check that there are exactly 3 transactions in the TX memory pool before generating the block231 json_string = http_get_call(url.hostname, url.port, '/rest/mempool/info'+self.FORMAT_SEPARATOR+'json')232 json_obj = json.loads(json_string)233 assert_equal(json_obj['size'], 3)234 # the size of the memory pool should be greater than 3x ~100 bytes235 assert_greater_than(json_obj['bytes'], 300)236 # check that there are our submitted transactions in the TX memory pool237 json_string = http_get_call(url.hostname, url.port, '/rest/mempool/contents'+self.FORMAT_SEPARATOR+'json')238 json_obj = json.loads(json_string)239 for tx in txs:240 assert_equal(tx in json_obj, True)241 # now mine the transactions242 newblockhash = self.nodes[1].generate(1)243 self.sync_all()244 #check if the 3 tx show up in the new block245 json_string = http_get_call(url.hostname, url.port, '/rest/block/'+newblockhash[0]+self.FORMAT_SEPARATOR+'json')246 json_obj = json.loads(json_string)247 for tx in json_obj['tx']:248 if not 'coinbase' in tx['vin'][0]: #exclude coinbase249 assert_equal(tx['txid'] in txs, True)250 #check the same but without tx details251 json_string = http_get_call(url.hostname, url.port, '/rest/block/notxdetails/'+newblockhash[0]+self.FORMAT_SEPARATOR+'json')252 json_obj = json.loads(json_string)253 for tx in txs:254 assert_equal(tx in json_obj['tx'], True)255 #test rest bestblock256 bb_hash = self.nodes[0].getbestblockhash()257 json_string = http_get_call(url.hostname, url.port, '/rest/chaininfo.json')258 json_obj = json.loads(json_string)259 assert_equal(json_obj['bestblockhash'], bb_hash)260if __name__ == '__main__':...

Full Screen

Full Screen

rest.py

Source:rest.py Github

copy

Full Screen

1#!/usr/bin/env python32# Copyright (c) 2014-2016 The Bitcoin Core developers3# Distributed under the MIT software license, see the accompanying4# file COPYING or http://www.opensource.org/licenses/mit-license.php.5"""Test the REST API."""6from test_framework.test_framework import BitcoinTestFramework7from test_framework.util import *8from struct import *9from io import BytesIO10from codecs import encode11import http.client12import urllib.parse13def deser_uint256(f):14 r = 015 for i in range(8):16 t = unpack(b"<I", f.read(4))[0]17 r += t << (i * 32)18 return r19#allows simple http get calls20def http_get_call(host, port, path, response_object = 0):21 conn = http.client.HTTPConnection(host, port)22 conn.request('GET', path)23 if response_object:24 return conn.getresponse()25 return conn.getresponse().read().decode('utf-8')26#allows simple http post calls with a request body27def http_post_call(host, port, path, requestdata = '', response_object = 0):28 conn = http.client.HTTPConnection(host, port)29 conn.request('POST', path, requestdata)30 if response_object:31 return conn.getresponse()32 return conn.getresponse().read()33class RESTTest (BitcoinTestFramework):34 FORMAT_SEPARATOR = "."35 def set_test_params(self):36 self.setup_clean_chain = True37 self.num_nodes = 338 def setup_network(self, split=False):39 super().setup_network()40 connect_nodes_bi(self.nodes, 0, 2)41 def run_test(self):42 url = urllib.parse.urlparse(self.nodes[0].url)43 self.log.info("Mining blocks...")44 self.nodes[0].generate(1)45 self.sync_all()46 self.nodes[2].generate(100)47 self.sync_all()48 assert_equal(self.nodes[0].getbalance(), 50)49 txid = self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 0.1)50 self.sync_all()51 self.nodes[2].generate(1)52 self.sync_all()53 bb_hash = self.nodes[0].getbestblockhash()54 assert_equal(self.nodes[1].getbalance(), Decimal("0.1")) #balance now should be 0.1 on node 155 # load the latest 0.1 tx over the REST API56 json_string = http_get_call(url.hostname, url.port, '/rest/tx/'+txid+self.FORMAT_SEPARATOR+"json")57 json_obj = json.loads(json_string)58 vintx = json_obj['vin'][0]['txid'] # get the vin to later check for utxo (should be spent by then)59 # get n of 0.1 outpoint60 n = 061 for vout in json_obj['vout']:62 if vout['value'] == 0.1:63 n = vout['n']64 #######################################65 # GETUTXOS: query an unspent outpoint #66 #######################################67 json_request = '/checkmempool/'+txid+'-'+str(n)68 json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')69 json_obj = json.loads(json_string)70 #check chainTip response71 assert_equal(json_obj['chaintipHash'], bb_hash)72 #make sure there is one utxo73 assert_equal(len(json_obj['utxos']), 1)74 assert_equal(json_obj['utxos'][0]['value'], 0.1)75 #################################################76 # GETUTXOS: now query an already spent outpoint #77 #################################################78 json_request = '/checkmempool/'+vintx+'-0'79 json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')80 json_obj = json.loads(json_string)81 #check chainTip response82 assert_equal(json_obj['chaintipHash'], bb_hash)83 #make sure there is no utox in the response because this oupoint has been spent84 assert_equal(len(json_obj['utxos']), 0)85 #check bitmap86 assert_equal(json_obj['bitmap'], "0")87 ##################################################88 # GETUTXOS: now check both with the same request #89 ##################################################90 json_request = '/checkmempool/'+txid+'-'+str(n)+'/'+vintx+'-0'91 json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')92 json_obj = json.loads(json_string)93 assert_equal(len(json_obj['utxos']), 1)94 assert_equal(json_obj['bitmap'], "10")95 #test binary response96 bb_hash = self.nodes[0].getbestblockhash()97 binaryRequest = b'\x01\x02'98 binaryRequest += hex_str_to_bytes(txid)99 binaryRequest += pack("i", n)100 binaryRequest += hex_str_to_bytes(vintx)101 binaryRequest += pack("i", 0)102 bin_response = http_post_call(url.hostname, url.port, '/rest/getutxos'+self.FORMAT_SEPARATOR+'bin', binaryRequest)103 output = BytesIO()104 output.write(bin_response)105 output.seek(0)106 chainHeight = unpack("i", output.read(4))[0]107 hashFromBinResponse = hex(deser_uint256(output))[2:].zfill(64)108 assert_equal(bb_hash, hashFromBinResponse) #check if getutxo's chaintip during calculation was fine109 assert_equal(chainHeight, 102) #chain height must be 102110 ############################111 # GETUTXOS: mempool checks #112 ############################113 # do a tx and don't sync114 txid = self.nodes[0].sendtoaddress(self.nodes[1].getnewaddress(), 0.1)115 json_string = http_get_call(url.hostname, url.port, '/rest/tx/'+txid+self.FORMAT_SEPARATOR+"json")116 json_obj = json.loads(json_string)117 vintx = json_obj['vin'][0]['txid'] # get the vin to later check for utxo (should be spent by then)118 # get n of 0.1 outpoint119 n = 0120 for vout in json_obj['vout']:121 if vout['value'] == 0.1:122 n = vout['n']123 json_request = '/'+txid+'-'+str(n)124 json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')125 json_obj = json.loads(json_string)126 assert_equal(len(json_obj['utxos']), 0) #there should be an outpoint because it has just added to the mempool127 json_request = '/checkmempool/'+txid+'-'+str(n)128 json_string = http_get_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json')129 json_obj = json.loads(json_string)130 assert_equal(len(json_obj['utxos']), 1) #there should be an outpoint because it has just added to the mempool131 #do some invalid requests132 json_request = '{"checkmempool'133 response = http_post_call(url.hostname, url.port, '/rest/getutxos'+self.FORMAT_SEPARATOR+'json', json_request, True)134 assert_equal(response.status, 400) #must be a 400 because we send an invalid json request135 json_request = '{"checkmempool'136 response = http_post_call(url.hostname, url.port, '/rest/getutxos'+self.FORMAT_SEPARATOR+'bin', json_request, True)137 assert_equal(response.status, 400) #must be a 400 because we send an invalid bin request138 response = http_post_call(url.hostname, url.port, '/rest/getutxos/checkmempool'+self.FORMAT_SEPARATOR+'bin', '', True)139 assert_equal(response.status, 400) #must be a 400 because we send an invalid bin request140 #test limits141 json_request = '/checkmempool/'142 for x in range(0, 20):143 json_request += txid+'-'+str(n)+'/'144 json_request = json_request.rstrip("/")145 response = http_post_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json', '', True)146 assert_equal(response.status, 400) #must be a 400 because we exceeding the limits147 json_request = '/checkmempool/'148 for x in range(0, 15):149 json_request += txid+'-'+str(n)+'/'150 json_request = json_request.rstrip("/")151 response = http_post_call(url.hostname, url.port, '/rest/getutxos'+json_request+self.FORMAT_SEPARATOR+'json', '', True)152 assert_equal(response.status, 200) #must be a 200 because we are within the limits153 self.nodes[0].generate(1) #generate block to not affect upcoming tests154 self.sync_all()155 ################156 # /rest/block/ #157 ################158 # check binary format159 response = http_get_call(url.hostname, url.port, '/rest/block/'+bb_hash+self.FORMAT_SEPARATOR+"bin", True)160 assert_equal(response.status, 200)161 assert_greater_than(int(response.getheader('content-length')), 80)162 response_str = response.read()163 # compare with block header164 response_header = http_get_call(url.hostname, url.port, '/rest/headers/1/'+bb_hash+self.FORMAT_SEPARATOR+"bin", True)165 assert_equal(response_header.status, 200)166 assert_equal(int(response_header.getheader('content-length')), 80)167 response_header_str = response_header.read()168 assert_equal(response_str[0:80], response_header_str)169 # check block hex format170 response_hex = http_get_call(url.hostname, url.port, '/rest/block/'+bb_hash+self.FORMAT_SEPARATOR+"hex", True)171 assert_equal(response_hex.status, 200)172 assert_greater_than(int(response_hex.getheader('content-length')), 160)173 response_hex_str = response_hex.read()174 assert_equal(encode(response_str, "hex_codec")[0:160], response_hex_str[0:160])175 # compare with hex block header176 response_header_hex = http_get_call(url.hostname, url.port, '/rest/headers/1/'+bb_hash+self.FORMAT_SEPARATOR+"hex", True)177 assert_equal(response_header_hex.status, 200)178 assert_greater_than(int(response_header_hex.getheader('content-length')), 160)179 response_header_hex_str = response_header_hex.read()180 assert_equal(response_hex_str[0:160], response_header_hex_str[0:160])181 assert_equal(encode(response_header_str, "hex_codec")[0:160], response_header_hex_str[0:160])182 # check json format183 block_json_string = http_get_call(url.hostname, url.port, '/rest/block/'+bb_hash+self.FORMAT_SEPARATOR+'json')184 block_json_obj = json.loads(block_json_string)185 assert_equal(block_json_obj['hash'], bb_hash)186 # compare with json block header187 response_header_json = http_get_call(url.hostname, url.port, '/rest/headers/1/'+bb_hash+self.FORMAT_SEPARATOR+"json", True)188 assert_equal(response_header_json.status, 200)189 response_header_json_str = response_header_json.read().decode('utf-8')190 json_obj = json.loads(response_header_json_str, parse_float=Decimal)191 assert_equal(len(json_obj), 1) #ensure that there is one header in the json response192 assert_equal(json_obj[0]['hash'], bb_hash) #request/response hash should be the same193 #compare with normal RPC block response194 rpc_block_json = self.nodes[0].getblock(bb_hash)195 assert_equal(json_obj[0]['hash'], rpc_block_json['hash'])196 assert_equal(json_obj[0]['confirmations'], rpc_block_json['confirmations'])197 assert_equal(json_obj[0]['height'], rpc_block_json['height'])198 assert_equal(json_obj[0]['version'], rpc_block_json['version'])199 assert_equal(json_obj[0]['merkleroot'], rpc_block_json['merkleroot'])200 assert_equal(json_obj[0]['time'], rpc_block_json['time'])201 assert_equal(json_obj[0]['nonce'], rpc_block_json['nonce'])202 assert_equal(json_obj[0]['bits'], rpc_block_json['bits'])203 assert_equal(json_obj[0]['difficulty'], rpc_block_json['difficulty'])204 assert_equal(json_obj[0]['chainwork'], rpc_block_json['chainwork'])205 assert_equal(json_obj[0]['previousblockhash'], rpc_block_json['previousblockhash'])206 #see if we can get 5 headers in one response207 self.nodes[1].generate(5)208 self.sync_all()209 response_header_json = http_get_call(url.hostname, url.port, '/rest/headers/5/'+bb_hash+self.FORMAT_SEPARATOR+"json", True)210 assert_equal(response_header_json.status, 200)211 response_header_json_str = response_header_json.read().decode('utf-8')212 json_obj = json.loads(response_header_json_str)213 assert_equal(len(json_obj), 5) #now we should have 5 header objects214 # do tx test215 tx_hash = block_json_obj['tx'][0]['txid']216 json_string = http_get_call(url.hostname, url.port, '/rest/tx/'+tx_hash+self.FORMAT_SEPARATOR+"json")217 json_obj = json.loads(json_string)218 assert_equal(json_obj['txid'], tx_hash)219 # check hex format response220 hex_string = http_get_call(url.hostname, url.port, '/rest/tx/'+tx_hash+self.FORMAT_SEPARATOR+"hex", True)221 assert_equal(hex_string.status, 200)222 assert_greater_than(int(response.getheader('content-length')), 10)223 # check block tx details224 # let's make 3 tx and mine them on node 1225 txs = []226 txs.append(self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 11))227 txs.append(self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 11))228 txs.append(self.nodes[0].sendtoaddress(self.nodes[2].getnewaddress(), 11))229 self.sync_all()230 # check that there are exactly 3 transactions in the TX memory pool before generating the block231 json_string = http_get_call(url.hostname, url.port, '/rest/mempool/info'+self.FORMAT_SEPARATOR+'json')232 json_obj = json.loads(json_string)233 assert_equal(json_obj['size'], 3)234 # the size of the memory pool should be greater than 3x ~100 bytes235 assert_greater_than(json_obj['bytes'], 300)236 # check that there are our submitted transactions in the TX memory pool237 json_string = http_get_call(url.hostname, url.port, '/rest/mempool/contents'+self.FORMAT_SEPARATOR+'json')238 json_obj = json.loads(json_string)239 for tx in txs:240 assert_equal(tx in json_obj, True)241 # now mine the transactions242 newblockhash = self.nodes[1].generate(1)243 self.sync_all()244 #check if the 3 tx show up in the new block245 json_string = http_get_call(url.hostname, url.port, '/rest/block/'+newblockhash[0]+self.FORMAT_SEPARATOR+'json')246 json_obj = json.loads(json_string)247 for tx in json_obj['tx']:248 if not 'coinbase' in tx['vin'][0]: #exclude coinbase249 assert_equal(tx['txid'] in txs, True)250 #check the same but without tx details251 json_string = http_get_call(url.hostname, url.port, '/rest/block/notxdetails/'+newblockhash[0]+self.FORMAT_SEPARATOR+'json')252 json_obj = json.loads(json_string)253 for tx in txs:254 assert_equal(tx in json_obj['tx'], True)255 #test rest bestblock256 bb_hash = self.nodes[0].getbestblockhash()257 json_string = http_get_call(url.hostname, url.port, '/rest/chaininfo.json')258 json_obj = json.loads(json_string)259 assert_equal(json_obj['bestblockhash'], bb_hash)260if __name__ == '__main__':...

Full Screen

Full Screen

test_unicode.py

Source:test_unicode.py Github

copy

Full Screen

1import sys2import codecs3from unittest import TestCase4import simplejson as json5from simplejson.compat import unichr, text_type, b, u, BytesIO6class TestUnicode(TestCase):7 def test_encoding1(self):8 encoder = json.JSONEncoder(encoding='utf-8')9 u = u'\N{GREEK SMALL LETTER ALPHA}\N{GREEK CAPITAL LETTER OMEGA}'10 s = u.encode('utf-8')11 ju = encoder.encode(u)12 js = encoder.encode(s)13 self.assertEqual(ju, js)14 def test_encoding2(self):15 u = u'\N{GREEK SMALL LETTER ALPHA}\N{GREEK CAPITAL LETTER OMEGA}'16 s = u.encode('utf-8')17 ju = json.dumps(u, encoding='utf-8')18 js = json.dumps(s, encoding='utf-8')19 self.assertEqual(ju, js)20 def test_encoding3(self):21 u = u'\N{GREEK SMALL LETTER ALPHA}\N{GREEK CAPITAL LETTER OMEGA}'22 j = json.dumps(u)23 self.assertEqual(j, '"\\u03b1\\u03a9"')24 def test_encoding4(self):25 u = u'\N{GREEK SMALL LETTER ALPHA}\N{GREEK CAPITAL LETTER OMEGA}'26 j = json.dumps([u])27 self.assertEqual(j, '["\\u03b1\\u03a9"]')28 def test_encoding5(self):29 u = u'\N{GREEK SMALL LETTER ALPHA}\N{GREEK CAPITAL LETTER OMEGA}'30 j = json.dumps(u, ensure_ascii=False)31 self.assertEqual(j, u'"' + u + u'"')32 def test_encoding6(self):33 u = u'\N{GREEK SMALL LETTER ALPHA}\N{GREEK CAPITAL LETTER OMEGA}'34 j = json.dumps([u], ensure_ascii=False)35 self.assertEqual(j, u'["' + u + u'"]')36 def test_big_unicode_encode(self):37 u = u'\U0001d120'38 self.assertEqual(json.dumps(u), '"\\ud834\\udd20"')39 self.assertEqual(json.dumps(u, ensure_ascii=False), u'"\U0001d120"')40 def test_big_unicode_decode(self):41 u = u'z\U0001d120x'42 self.assertEqual(json.loads('"' + u + '"'), u)43 self.assertEqual(json.loads('"z\\ud834\\udd20x"'), u)44 def test_unicode_decode(self):45 for i in range(0, 0xd7ff):46 u = unichr(i)47 #s = '"\\u{0:04x}"'.format(i)48 s = '"\\u%04x"' % (i,)49 self.assertEqual(json.loads(s), u)50 def test_object_pairs_hook_with_unicode(self):51 s = u'{"xkd":1, "kcw":2, "art":3, "hxm":4, "qrt":5, "pad":6, "hoy":7}'52 p = [(u"xkd", 1), (u"kcw", 2), (u"art", 3), (u"hxm", 4),53 (u"qrt", 5), (u"pad", 6), (u"hoy", 7)]54 self.assertEqual(json.loads(s), eval(s))55 self.assertEqual(json.loads(s, object_pairs_hook=lambda x: x), p)56 od = json.loads(s, object_pairs_hook=json.OrderedDict)57 self.assertEqual(od, json.OrderedDict(p))58 self.assertEqual(type(od), json.OrderedDict)59 # the object_pairs_hook takes priority over the object_hook60 self.assertEqual(json.loads(s,61 object_pairs_hook=json.OrderedDict,62 object_hook=lambda x: None),63 json.OrderedDict(p))64 def test_default_encoding(self):65 self.assertEqual(json.loads(u'{"a": "\xe9"}'.encode('utf-8')),66 {'a': u'\xe9'})67 def test_unicode_preservation(self):68 self.assertEqual(type(json.loads(u'""')), text_type)69 self.assertEqual(type(json.loads(u'"a"')), text_type)70 self.assertEqual(type(json.loads(u'["a"]')[0]), text_type)71 def test_ensure_ascii_false_returns_unicode(self):72 # http://code.google.com/p/simplejson/issues/detail?id=4873 self.assertEqual(type(json.dumps([], ensure_ascii=False)), text_type)74 self.assertEqual(type(json.dumps(0, ensure_ascii=False)), text_type)75 self.assertEqual(type(json.dumps({}, ensure_ascii=False)), text_type)76 self.assertEqual(type(json.dumps("", ensure_ascii=False)), text_type)77 def test_ensure_ascii_false_bytestring_encoding(self):78 # http://code.google.com/p/simplejson/issues/detail?id=4879 doc1 = {u'quux': b('Arr\xc3\xaat sur images')}80 doc2 = {u'quux': u('Arr\xeat sur images')}81 doc_ascii = '{"quux": "Arr\\u00eat sur images"}'82 doc_unicode = u'{"quux": "Arr\xeat sur images"}'83 self.assertEqual(json.dumps(doc1), doc_ascii)84 self.assertEqual(json.dumps(doc2), doc_ascii)85 self.assertEqual(json.dumps(doc1, ensure_ascii=False), doc_unicode)86 self.assertEqual(json.dumps(doc2, ensure_ascii=False), doc_unicode)87 def test_ensure_ascii_linebreak_encoding(self):88 # http://timelessrepo.com/json-isnt-a-javascript-subset89 s1 = u'\u2029\u2028'90 s2 = s1.encode('utf8')91 expect = '"\\u2029\\u2028"'92 self.assertEqual(json.dumps(s1), expect)93 self.assertEqual(json.dumps(s2), expect)94 self.assertEqual(json.dumps(s1, ensure_ascii=False), expect)95 self.assertEqual(json.dumps(s2, ensure_ascii=False), expect)96 def test_invalid_escape_sequences(self):97 # incomplete escape sequence98 self.assertRaises(json.JSONDecodeError, json.loads, '"\\u')99 self.assertRaises(json.JSONDecodeError, json.loads, '"\\u1')100 self.assertRaises(json.JSONDecodeError, json.loads, '"\\u12')101 self.assertRaises(json.JSONDecodeError, json.loads, '"\\u123')102 self.assertRaises(json.JSONDecodeError, json.loads, '"\\u1234')103 # invalid escape sequence104 self.assertRaises(json.JSONDecodeError, json.loads, '"\\u123x"')105 self.assertRaises(json.JSONDecodeError, json.loads, '"\\u12x4"')106 self.assertRaises(json.JSONDecodeError, json.loads, '"\\u1x34"')107 self.assertRaises(json.JSONDecodeError, json.loads, '"\\ux234"')108 if sys.maxunicode > 65535:109 # invalid escape sequence for low surrogate110 self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u"')111 self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u0"')112 self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u00"')113 self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u000"')114 self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u000x"')115 self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u00x0"')116 self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\u0x00"')117 self.assertRaises(json.JSONDecodeError, json.loads, '"\\ud800\\ux000"')118 def test_ensure_ascii_still_works(self):119 # in the ascii range, ensure that everything is the same120 for c in map(unichr, range(0, 127)):121 self.assertEqual(122 json.dumps(c, ensure_ascii=False),123 json.dumps(c))124 snowman = u'\N{SNOWMAN}'125 self.assertEqual(126 json.dumps(c, ensure_ascii=False),127 '"' + c + '"')128 def test_strip_bom(self):129 content = u"\u3053\u3093\u306b\u3061\u308f"130 json_doc = codecs.BOM_UTF8 + b(json.dumps(content))131 self.assertEqual(json.load(BytesIO(json_doc)), content)132 for doc in json_doc, json_doc.decode('utf8'):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run yandex-tank automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful