Best Python code snippet using pytest-benchmark
test_bus2.py
Source:test_bus2.py
...59 def setUp(self):60 self.subscriber = BasicSubscriber("test")61 self.publisher = BasicPublisher("test")62 def tearDown(self):63 # This, along with code in minibus._cleanup(), was found via trail 64 # and error using suggestions from http://blackjml.livejournal.com/23029.html65 s1 = self.subscriber._cleanup()66 p1 = self.publisher._cleanup()67 return defer.gatherResults([s1, p1])68 def examin(self):69 self.publisher.fini()70 self.subscriber.fini()71# print "sent=", self.publisher.counter72# print "received=", self.subscriber.counter73 self.assertEqual(self.publisher.counter, self.subscriber.counter)74 def test_test(self):75 d = task.deferLater(reactor, 0.5, self.examin)76 return d77class TestMultipleSeperateTopics(unittest.TestCase):78 """ Test two sets of publisher and subscriber pairs talking on different topics.79 There should not be any cross talk.80 """81 def setUp(self):82 self.subscriber1 = BasicSubscriber("test1")83 self.publisher1 = BasicPublisher("test1")84 self.subscriber2 = BasicSubscriber("test2")85 self.publisher2 = BasicPublisher("test2")86 def tearDown(self):87 a = self.subscriber1._cleanup()88 b = self.publisher1._cleanup()89 c = self.subscriber2._cleanup()90 d = self.publisher2._cleanup()91 return defer.gatherResults([a, b, c, d])92 def examin(self):93 self.publisher1.fini()94 self.publisher2.fini()95 self.subscriber1.fini()96 self.subscriber2.fini()97 self.assertEqual(self.publisher1.counter,self.subscriber1.counter)98 self.assertEqual(self.publisher2.counter,self.subscriber2.counter)99 def test_test(self):100 d = task.deferLater(reactor, 0.5, self.examin)101 return d102class TestMultipleSubscribersToTopic(unittest.TestCase):103 """ Tests a single publisher sending to multiple subscribers.104 Each subscriber should hear exactly the same set of messages.105 """106 def setUp(self):107 self.subscriber1 = BasicSubscriber("test")108 self.subscriber2 = BasicSubscriber("test")109 self.subscriber3 = BasicSubscriber("test")110 self.publisher = BasicPublisher("test")111 def tearDown(self):112 a = self.subscriber1._cleanup()113 b = self.subscriber2._cleanup()114 c = self.subscriber3._cleanup()115 d = self.publisher._cleanup()116 return defer.gatherResults([a, b, c, d])117 def examin(self):118 self.publisher.fini()119 self.subscriber1.fini()120 self.subscriber2.fini()121 self.subscriber3.fini()122# print "sent=", self.publisher.counter123# print "received=", self.subscriber1.counter124# print "received=", self.subscriber2.counter125# print "received=", self.subscriber3.counter126 self.assertEqual(self.publisher.counter,self.subscriber1.counter)127 self.assertEqual(self.publisher.counter,self.subscriber2.counter)128 self.assertEqual(self.publisher.counter,self.subscriber3.counter)129 def test_test(self):130 d = task.deferLater(reactor, 0.5, self.examin)131 return d132class TestMultiplePublishersToTopic(unittest.TestCase):133 """ Tests multiple publishers sending to a single subscriber.134 The subscriber should hear from all the publishers.135 """136 def setUp(self):137 self.subscriber = BasicSubscriber("test")138 self.publisher1 = BasicPublisher("test")139 self.publisher2 = BasicPublisher("test")140 self.publisher3 = BasicPublisher("test")141 def tearDown(self):142 a = self.subscriber._cleanup()143 b = self.publisher1._cleanup()144 c = self.publisher2._cleanup()145 d = self.publisher3._cleanup()146 return defer.gatherResults([a, b, c, d])147 def examin(self):148 self.publisher1.fini()149 self.publisher2.fini()150 self.publisher3.fini()151 self.subscriber.fini()152# print "sent=", self.publisher1.counter153# print "sent=", self.publisher2.counter154# print "sent=", self.publisher3.counter155# print "received=", self.subscriber.counter156 total_sent = self.publisher1.counter + self.publisher2.counter + self.publisher3.counter157 total_recv = self.subscriber.counter158 self.assertEqual(total_sent,total_recv)159 def test_test(self):160 d = task.deferLater(reactor, 0.5, self.examin)161 return d162class TestMixedTopics(unittest.TestCase):163 """ Test that the regular expressions which define topic names are working164 correctly to route data. """165 def setUp(self):166 self.pub1 = BasicPublisher("foo1")167 self.pub2 = BasicPublisher("foo2")168 self.pub3 = BasicPublisher("bar1")169 self.sub1 = BasicSubscriber(".*1")170 self.sub2 = BasicSubscriber("foo.")171 self.sub3 = BasicSubscriber("foo2")172 self.sub4 = BasicSubscriber("bar")173 def tearDown(self):174 self.pub1._cleanup()175 self.pub2._cleanup()176 self.pub3._cleanup()177 self.sub1._cleanup()178 self.sub2._cleanup()179 self.sub3._cleanup()180 self.sub4._cleanup()181 def test_test(self):182 def deferred_test():183 self.pub1.fini()184 self.pub2.fini()185 self.pub3.fini()186 self.assertEqual(self.pub1.counter + self.pub3.counter, self.sub1.counter)187 self.assertEqual(self.pub1.counter + self.pub2.counter, self.sub2.counter)188 self.assertEqual(self.pub2.counter, self.sub3.counter)189 self.assertEqual(0, self.sub4.counter)190 return task.deferLater(reactor, 0.5, deferred_test)191class TestSchema(unittest.TestCase):192 """ Tests multiple publishers sending to a single subscriber.193 The subscriber should hear from all the publishers.194 """195 def setUp(self):196 self.client = MiniBusTwistedClient()197 self.pub = self.client.publisher("test", {"type": "string"})198 self.client.subscribe("test", {"type": "string"}, self.callback)199 self.count = 0200 def tearDown(self):201 a = self.client._cleanup()202 return defer.gatherResults([a])203 def callback(self, data):204 self.count += 1205 def test_good_schema(self):206 def deferred_good_schema():207 self.pub("this is a test")208 d = task.deferLater(reactor, 0.5, deferred_good_schema)209 return d210 def test_good_schema_callback(self):211 def deferred_good_callback():212 if not self.count == 1:213 raise Exception("bad count")214 def deferred_good_callback_pub():215 self.pub("this is a test")216 d1 = task.deferLater(reactor, 0.5, deferred_good_callback_pub)217 d = task.deferLater(reactor, 1, deferred_good_callback)218 return d219 def test_bad_schema(self):220 def deferred_bad_schema():221 try:222 self.pub({"one": 1, "two": 2})223 except Exception:224 return225 raise Exception("I should not have passed")226 d = task.deferLater(reactor, 0.5, deferred_bad_schema)227 return d228 def test_conflict_schema(self):229 try: 230 self.client.subscribe("test", {"type": "number"}, self.callback)231 except Exception:232 return233 raise Exception("I should not have passed")234class Service1Test(unittest.TestCase):235 def add_two_ints(self, data):236 return sum(data)237 def setUp(self):238 self.srvreq = {'type': 'array' }239 self.srvrply = { 'type': 'integer' }240 self.node1 = MiniBusTwistedClient()241 self.node1.service("add_two_ints", self.srvreq, self.srvrply, self.add_two_ints)242 self.node2 = MiniBusTwistedClient()243 def tearDown(self):244 a = self.node1._cleanup()245 b = self.node2._cleanup()246 @MiniBusTwistedClient.inlineServiceCallbacks247 def test_call(self):248 adder = self.node2.service_client("add_two_ints", self.srvreq, self.srvrply)249 ret = yield task.deferLater(reactor, 0.5, adder, [4,5,6])250 self.assertEqual(ret, 15)251 @MiniBusTwistedClient.inlineServiceCallbacks252 def test_bad_input_call(self):253 adder = self.node2.service_client("add_two_ints", self.srvreq, self.srvrply)254 try:255 ret = yield task.deferLater(reactor, 0.5, adder, 4)256 raise Exception("I should have failed due to not being passed an array")257 except jsonschema.exceptions.ValidationError:258 pass259 @MiniBusTwistedClient.inlineServiceCallbacks260 def test_bad_output_call(self):261 adder = self.node2.service_client("add_two_ints", self.srvreq, {'type': 'string'})262 try:263 ret = yield task.deferLater(reactor, 0.5, adder, [4,5,6])264 raise Exception("I should have failed due to being returned a number instead of a string")265 except jsonschema.exceptions.ValidationError:266 pass267 @MiniBusTwistedClient.inlineServiceCallbacks268 def test_bad_input_data(self):269 adder = self.node2.service_client("add_two_ints", self.srvreq, self.srvrply)270 try:271 ret = yield task.deferLater(reactor, 0.5, adder, [4,'5',6])272 raise Exception("I should have thrown a RemoteServiceException")273 except RemoteServiceException:274 pass275class TestMultiService(unittest.TestCase):276 def add_ints(self, data):277 return sum(data)278 def double_ints(self, data):279 return [x*2 for x in data]280 def print_ints(self, data):281 return " ".join(data)282 def setUp(self):283 self.srvreq = {'type': 'array' }284 self.srvrply = { 'type': 'integer' }285 self.server1 = MiniBusTwistedClient()286 self.server1.service("add_ints", {'type': 'array'}, {'type': 'integer'}, self.add_ints)287 self.server1.service("double_ints", {'type': 'array'}, {'type': 'array'}, self.double_ints)288 self.server2 = MiniBusTwistedClient()289 self.server2.service("print_ints", {'type': 'array'}, {'type': 'string'}, self.print_ints)290 self.client1 = MiniBusTwistedClient()291 self.client2 = MiniBusTwistedClient()292 def tearDown(self):293 a = self.server1._cleanup()294 b = self.server2._cleanup()295 c = self.client1._cleanup()296 d = self.client2._cleanup()297 @MiniBusTwistedClient.inlineServiceCallbacks298 def test_calls(self):299 adder = self.client1.service_client("add_ints", {'type': 'array'}, {'type': 'integer'})300 printer = self.client1.service_client("print_ints", {'type': 'array'}, {'type': 'string'})301 double = self.client1.service_client("double_ints", {'type': 'array'}, {'type': 'array'})302 ret1 = yield task.deferLater(reactor, 0.5, adder, [4,5,6])303 ret2 = yield task.deferLater(reactor, 0.5, printer, ['a','b','c'])304 ret3 = yield task.deferLater(reactor, 0.5, double, [1, 2, 3])305 self.assertEqual(ret1, 15)306 self.assertEqual(ret2, "a b c")307 self.assertEqual(ret3, [2, 4, 6])308 @MiniBusTwistedClient.inlineServiceCallbacks309 def test_multiclients(self):310 adder = self.client1.service_client("add_ints", {'type': 'array'}, {'type': 'integer'})311 adder2 = self.client2.service_client("add_ints", {'type': 'array'}, {'type': 'integer'})312 ret1 = yield task.deferLater(reactor, 0.5, adder, [1,2,3])313 ret2 = yield task.deferLater(reactor, 0.5, adder2, [4,5,6])314 self.assertEqual(ret1, 6)315 self.assertEqual(ret2, 15)316# This is not working yet317# class TestChainedServices(unittest.TestCase):318# @MiniBusTwistedClient.inlineServiceCallbacks319# def service1(self, data):320# data = data + " stuff"321# call2val = yield self.call2(data)322# defer.returnValue(call2val)323# 324# def service2(self, data):325# return data + " things"326# 327# def setUp(self):328# self.server1 = MiniBusTwistedClient()329# self.server1.service("service1", {'type':'string'}, {}, self.service1)330# self.client1 = MiniBusTwistedClient()331# self.call1 = self.client1.service_client("service1",{'type':'string'}, {})332# self.server2 = MiniBusTwistedClient()333# self.server2.service("service2", {'type':'string'}, {'type':'string'}, self.service2)334# self.client2 = MiniBusTwistedClient()335# self.call2 = self.client2.service_client("service2",{'type':'string'}, {'type':'string'})336# 337# 338# def tearDown(self):339# a = self.server1._cleanup()340# b = self.server2._cleanup()341# c = self.client1._cleanup()342# d = self.client2._cleanup()343# 344# @MiniBusTwistedClient.inlineServiceCallbacks345# def test_calls(self):346# retval = yield task.deferLater(reactor, 0.5, self.call1, "mine")347# self.assertEqual(retval, "mine stuff things")...
test_bus.py
Source:test_bus.py
...58 def setUp(self):59 self.subscriber = BasicSubscriber("test")60 self.publisher = BasicPublisher("test")61 def tearDown(self):62 # This, along with code in minibus._cleanup(), was found via trail 63 # and error using suggestions from http://blackjml.livejournal.com/23029.html64 s1 = self.subscriber._cleanup()65 p1 = self.publisher._cleanup()66 return defer.gatherResults([s1, p1])67 def examin(self):68 self.publisher.fini()69 self.subscriber.fini()70# print "sent=", self.publisher.counter71# print "received=", self.subscriber.counter72 self.assertEqual(self.publisher.counter, self.subscriber.counter)73 def test_test(self):74 d = task.deferLater(reactor, 0.5, self.examin)75 return d76class TestMultipleSeperateTopics(unittest.TestCase):77 """ Test two sets of publisher and subscriber pairs talking on different topics.78 There should not be any cross talk.79 """80 def setUp(self):81 self.subscriber1 = BasicSubscriber("test1")82 self.publisher1 = BasicPublisher("test1")83 self.subscriber2 = BasicSubscriber("test2")84 self.publisher2 = BasicPublisher("test2")85 def tearDown(self):86 a = self.subscriber1._cleanup()87 b = self.publisher1._cleanup()88 c = self.subscriber2._cleanup()89 d = self.publisher2._cleanup()90 return defer.gatherResults([a, b, c, d])91 def examin(self):92 self.publisher1.fini()93 self.publisher2.fini()94 self.subscriber1.fini()95 self.subscriber2.fini()96 self.assertEqual(self.publisher1.counter,self.subscriber1.counter)97 self.assertEqual(self.publisher2.counter,self.subscriber2.counter)98 def test_test(self):99 d = task.deferLater(reactor, 0.5, self.examin)100 return d101class TestMultipleSubscribersToTopic(unittest.TestCase):102 """ Tests a single publisher sending to multiple subscribers.103 Each subscriber should hear exactly the same set of messages.104 """105 def setUp(self):106 self.subscriber1 = BasicSubscriber("test")107 self.subscriber2 = BasicSubscriber("test")108 self.subscriber3 = BasicSubscriber("test")109 self.publisher = BasicPublisher("test")110 def tearDown(self):111 a = self.subscriber1._cleanup()112 b = self.subscriber2._cleanup()113 c = self.subscriber3._cleanup()114 d = self.publisher._cleanup()115 return defer.gatherResults([a, b, c, d])116 def examin(self):117 self.publisher.fini()118 self.subscriber1.fini()119 self.subscriber2.fini()120 self.subscriber3.fini()121# print "sent=", self.publisher.counter122# print "received=", self.subscriber1.counter123# print "received=", self.subscriber2.counter124# print "received=", self.subscriber3.counter125 self.assertEqual(self.publisher.counter,self.subscriber1.counter)126 self.assertEqual(self.publisher.counter,self.subscriber2.counter)127 self.assertEqual(self.publisher.counter,self.subscriber3.counter)128 def test_test(self):129 d = task.deferLater(reactor, 0.5, self.examin)130 return d131class TestMultiplePublishersToTopic(unittest.TestCase):132 """ Tests multiple publishers sending to a single subscriber.133 The subscriber should hear from all the publishers.134 """135 def setUp(self):136 self.subscriber = BasicSubscriber("test")137 self.publisher1 = BasicPublisher("test")138 self.publisher2 = BasicPublisher("test")139 self.publisher3 = BasicPublisher("test")140 def tearDown(self):141 a = self.subscriber._cleanup()142 b = self.publisher1._cleanup()143 c = self.publisher2._cleanup()144 d = self.publisher3._cleanup()145 return defer.gatherResults([a, b, c, d])146 def examin(self):147 self.publisher1.fini()148 self.publisher2.fini()149 self.publisher3.fini()150 self.subscriber.fini()151# print "sent=", self.publisher1.counter152# print "sent=", self.publisher2.counter153# print "sent=", self.publisher3.counter154# print "received=", self.subscriber.counter155 total_sent = self.publisher1.counter + self.publisher2.counter + self.publisher3.counter156 total_recv = self.subscriber.counter157 self.assertEqual(total_sent,total_recv)158 def test_test(self):159 d = task.deferLater(reactor, 0.5, self.examin)160 return d161class TestMixedTopics(unittest.TestCase):162 """ Test that the regular expressions which define topic names are working163 correctly to route data. """164 def setUp(self):165 self.pub1 = BasicPublisher("foo1")166 self.pub2 = BasicPublisher("foo2")167 self.pub3 = BasicPublisher("bar1")168 self.sub1 = BasicSubscriber(".*1")169 self.sub2 = BasicSubscriber("foo.")170 self.sub3 = BasicSubscriber("foo2")171 self.sub4 = BasicSubscriber("bar")172 def tearDown(self):173 self.pub1._cleanup()174 self.pub2._cleanup()175 self.pub3._cleanup()176 self.sub1._cleanup()177 self.sub2._cleanup()178 self.sub3._cleanup()179 self.sub4._cleanup()180 def test_test(self):181 def deferred_test():182 self.pub1.fini()183 self.pub2.fini()184 self.pub3.fini()185 self.assertEqual(self.pub1.counter + self.pub3.counter, self.sub1.counter)186 self.assertEqual(self.pub1.counter + self.pub2.counter, self.sub2.counter)187 self.assertEqual(self.pub2.counter, self.sub3.counter)188 self.assertEqual(0, self.sub4.counter)189 return task.deferLater(reactor, 0.5, deferred_test)190class TestSchema(unittest.TestCase):191 """ Tests multiple publishers sending to a single subscriber.192 The subscriber should hear from all the publishers.193 """194 def setUp(self):195 self.client = MiniBusClient()196 self.pub = self.client.publisher("test", {"type": "string"})197 self.client.subscribe("test", {"type": "string"}, self.callback)198 self.count = 0199 def tearDown(self):200 a = self.client._cleanup()201 return defer.gatherResults([a])202 def callback(self, data):203 self.count += 1204 def test_good_schema(self):205 def deferred_good_schema():206 self.pub("this is a test")207 d = task.deferLater(reactor, 0.5, deferred_good_schema)208 return d209 def test_good_schema_callback(self):210 def deferred_good_callback():211 if not self.count == 1:212 raise Exception("bad count")213 def deferred_good_callback_pub():214 self.pub("this is a test")215 d1 = task.deferLater(reactor, 0.5, deferred_good_callback_pub)216 d = task.deferLater(reactor, 1, deferred_good_callback)217 return d218 def test_bad_schema(self):219 def deferred_bad_schema():220 try:221 self.pub({"one": 1, "two": 2})222 except Exception:223 return224 raise Exception("I should not have passed")225 d = task.deferLater(reactor, 0.5, deferred_bad_schema)226 return d227 def test_conflict_schema(self):228 try: 229 self.client.subscribe("test", {"type": "number"}, self.callback)230 except Exception:231 return232 raise Exception("I should not have passed")233class Service1Test(unittest.TestCase):234 def add_two_ints(self, data):235 return sum(data)236 def setUp(self):237 self.srvreq = {'type': 'array' }238 self.srvrply = { 'type': 'integer' }239 self.node1 = MiniBusClient()240 self.node1.service("add_two_ints", self.srvreq, self.srvrply, self.add_two_ints)241 self.node2 = MiniBusClient()242 def tearDown(self):243 a = self.node1._cleanup()244 b = self.node2._cleanup()245 @MiniBusClient.inlineServiceCallbacks246 def test_call(self):247 adder = self.node2.service_client("add_two_ints", self.srvreq, self.srvrply)248 ret = yield task.deferLater(reactor, 0.5, adder, [4,5,6])249 self.assertEqual(ret, 15)250 @MiniBusClient.inlineServiceCallbacks251 def test_bad_input_call(self):252 adder = self.node2.service_client("add_two_ints", self.srvreq, self.srvrply)253 try:254 ret = yield task.deferLater(reactor, 0.5, adder, 4)255 raise Exception("I should have failed due to not being passed an array")256 except jsonschema.exceptions.ValidationError:257 pass258 @MiniBusClient.inlineServiceCallbacks259 def test_bad_output_call(self):260 adder = self.node2.service_client("add_two_ints", self.srvreq, {'type': 'string'})261 try:262 ret = yield task.deferLater(reactor, 0.5, adder, [4,5,6])263 raise Exception("I should have failed due to being returned a number instead of a string")264 except jsonschema.exceptions.ValidationError:265 pass266 @MiniBusClient.inlineServiceCallbacks267 def test_bad_input_data(self):268 adder = self.node2.service_client("add_two_ints", self.srvreq, self.srvrply)269 try:270 ret = yield task.deferLater(reactor, 0.5, adder, [4,'5',6])271 raise Exception("I should have thrown a RemoteServiceException")272 except RemoteServiceException:273 pass274class TestMultiService(unittest.TestCase):275 def add_ints(self, data):276 return sum(data)277 def double_ints(self, data):278 return [x*2 for x in data]279 def print_ints(self, data):280 return " ".join(data)281 def setUp(self):282 self.srvreq = {'type': 'array' }283 self.srvrply = { 'type': 'integer' }284 self.server1 = MiniBusClient()285 self.server1.service("add_ints", {'type': 'array'}, {'type': 'integer'}, self.add_ints)286 self.server1.service("double_ints", {'type': 'array'}, {'type': 'array'}, self.double_ints)287 self.server2 = MiniBusClient()288 self.server2.service("print_ints", {'type': 'array'}, {'type': 'string'}, self.print_ints)289 self.client1 = MiniBusClient()290 self.client2 = MiniBusClient()291 def tearDown(self):292 a = self.server1._cleanup()293 b = self.server2._cleanup()294 c = self.client1._cleanup()295 d = self.client2._cleanup()296 @MiniBusClient.inlineServiceCallbacks297 def test_calls(self):298 adder = self.client1.service_client("add_ints", {'type': 'array'}, {'type': 'integer'})299 printer = self.client1.service_client("print_ints", {'type': 'array'}, {'type': 'string'})300 double = self.client1.service_client("double_ints", {'type': 'array'}, {'type': 'array'})301 ret1 = yield task.deferLater(reactor, 0.5, adder, [4,5,6])302 ret2 = yield task.deferLater(reactor, 0.5, printer, ['a','b','c'])303 ret3 = yield task.deferLater(reactor, 0.5, double, [1, 2, 3])304 self.assertEqual(ret1, 15)305 self.assertEqual(ret2, "a b c")306 self.assertEqual(ret3, [2, 4, 6])307 @MiniBusClient.inlineServiceCallbacks308 def test_multiclients(self):309 adder = self.client1.service_client("add_ints", {'type': 'array'}, {'type': 'integer'})310 adder2 = self.client2.service_client("add_ints", {'type': 'array'}, {'type': 'integer'})311 ret1 = yield task.deferLater(reactor, 0.5, adder, [1,2,3])312 ret2 = yield task.deferLater(reactor, 0.5, adder2, [4,5,6])313 self.assertEqual(ret1, 6)314 self.assertEqual(ret2, 15)315# This is not working yet316# class TestChainedServices(unittest.TestCase):317# @MiniBusClient.inlineServiceCallbacks318# def service1(self, data):319# data = data + " stuff"320# call2val = yield self.call2(data)321# defer.returnValue(call2val)322# 323# def service2(self, data):324# return data + " things"325# 326# def setUp(self):327# self.server1 = MiniBusClient()328# self.server1.service("service1", {'type':'string'}, {}, self.service1)329# self.client1 = MiniBusClient()330# self.call1 = self.client1.service_client("service1",{'type':'string'}, {})331# self.server2 = MiniBusClient()332# self.server2.service("service2", {'type':'string'}, {'type':'string'}, self.service2)333# self.client2 = MiniBusClient()334# self.call2 = self.client2.service_client("service2",{'type':'string'}, {'type':'string'})335# 336# 337# def tearDown(self):338# a = self.server1._cleanup()339# b = self.server2._cleanup()340# c = self.client1._cleanup()341# d = self.client2._cleanup()342# 343# @MiniBusClient.inlineServiceCallbacks344# def test_calls(self):345# retval = yield task.deferLater(reactor, 0.5, self.call1, "mine")346# self.assertEqual(retval, "mine stuff things")...
test_fops.py
Source:test_fops.py
...25from unittest import mock26from pathlib import Path27from eljef.core import fops28logging.disable(logging.ERROR)29def _cleanup(*args) -> None:30 for p in args:31 if os.path.exists(p):32 fops.delete(p)33def _get_empty_file(suffix: str) -> str:34 path = os.path.join(tempfile.gettempdir(), "testFile.{0!s}".format(suffix))35 Path(path).touch()36 return path37def _get_file(data: str, extension: str) -> str:38 fd, path = tempfile.mkstemp(extension, None, tempfile.gettempdir(), True)39 os.write(fd, data.encode('UTF-8'))40 os.close(fd)41 return path42# noinspection PyBroadException43class TestBackupPath(unittest.TestCase):44 def test_backup_path(self):45 path = _get_empty_file('tmp')46 expected = "{0!s}.bak".format(path)47 fops.backup_path(path)48 try:49 self.assertTrue(os.path.isfile(expected))50 _cleanup(expected, path)51 except Exception:52 _cleanup(expected, path)53 raise54 def test_backup_path_multiple_files(self):55 path = _get_empty_file('tmp')56 path2 = _get_empty_file("tmp.bak")57 path3 = _get_empty_file("tmp.bak.1")58 expected = "{0!s}.bak.2".format(path)59 fops.backup_path(path)60 try:61 self.assertTrue(os.path.isfile(expected))62 _cleanup(expected, path, path2, path3)63 except Exception:64 _cleanup(expected, path, path2, path3)65 raise66# noinspection PyBroadException67class TestDelete(unittest.TestCase):68 def test_delete_exception(self):69 path = _get_empty_file('tmp')70 try:71 with mock.patch('os.remove', new=mock.Mock(side_effect=OSError(5, 'test'))):72 self.assertRaises(OSError, fops.delete, path)73 except Exception:74 _cleanup(path)75 raise76 _cleanup(path)77 def test_delete_path_does_not_exist(self):78 raised = False79 path = os.path.join(tempfile.gettempdir(), "hopefully_this_file_does_not_exist.tmp")80 try:81 fops.delete(path)82 except Exception:83 raised = True84 self.assertFalse(raised)85 def test_delete_symlink_only(self):86 path = _get_empty_file('tmp')87 link = "{0!s}.lnk".format(path)88 os.symlink(path, link)89 fops.delete(link)90 try:91 self.assertFalse(os.path.exists(link))92 _cleanup(link, path)93 except Exception:94 _cleanup(link, path)95 raise96 def test_delete_symlink_backup(self):97 path = _get_empty_file('tmp')98 link = "{0!s}.lnk".format(path)99 os.symlink(path, link)100 fops.delete(link, follow=True, backup=True)101 try:102 self.assertFalse(os.path.exists(link))103 self.assertTrue(os.path.isfile(path))104 _cleanup(link, path)105 except Exception:106 _cleanup(link, path)107 raise108 def test_delete_symlink_and_parent(self):109 path = _get_empty_file('tmp')110 link = "{0!s}.lnk".format(path)111 os.symlink(path, link)112 fops.delete(link, follow=True)113 try:114 self.assertFalse(os.path.exists(link))115 self.assertFalse(os.path.exists(path))116 _cleanup(link, path)117 except Exception:118 _cleanup(link, path)119 raise120 def test_delete_backup(self):121 path = _get_empty_file('tmp')122 expected = "{0!s}.bak".format(path)123 fops.delete(path, backup=True)124 try:125 self.assertFalse(os.path.exists(path))126 self.assertTrue(os.path.exists(expected))127 _cleanup(path, expected)128 except Exception:129 _cleanup(path, expected)130 raise131 def test_delete_directory(self):132 path = os.path.join(tempfile.gettempdir(), "testDir")133 os.mkdir(path)134 fops.delete(path)135 try:136 self.assertFalse(os.path.exists(path))137 except Exception:138 _cleanup(path)139 raise140 def test_delete_file(self):141 path = _get_empty_file('tmp')142 fops.delete(path)143 try:144 self.assertFalse(os.path.exists(path))145 _cleanup(path)146 except Exception:147 _cleanup(path)148 raise149class TestFileRead(unittest.TestCase):150 def test_file_read_no_strip(self):151 data = """test file data152 test file new line153 """154 path = _get_file(data, ".tmp")155 got = fops.file_read(path)156 os.remove(path)157 self.assertEqual(data, got)158 def test_file_read_strip(self):159 data = """test file data160 test file new line161 """162 path = _get_file(data, ".tmp")163 got = fops.file_read(path, True)164 os.remove(path)165 self.assertEqual(data.strip(), got)166class TestFileReadConvert(unittest.TestCase):167 def test_file_read_convert_unknown_type(self):168 self.assertRaises(ValueError, fops.file_read_convert, 'no_path', 'unknown_type')169 def test_file_read_convert_file_does_not_exist(self):170 self.assertRaises(FileNotFoundError, fops.file_read_convert, 'should_not_exist_hopefully_maybe', fops.JSON)171 def test_file_read_convert_not_file(self):172 self.assertRaises(IOError, fops.file_read_convert, tempfile.gettempdir(), fops.JSON)173 def test_file_read_convert_file_does_not_exist_get_default(self):174 want = dict()175 got = fops.file_read_convert("should_not_exist_hopefully_maybe", fops.JSON, default=True)176 self.assertDictEqual(got, want)177 def test_file_read_convert_json(self):178 data = """{179 "test": "test"180 }181 """182 want = {183 'test': 'test'184 }185 path = _get_file(data, ".json")186 got = fops.file_read_convert(path, fops.JSON)187 os.remove(path)188 self.assertDictEqual(got, want)189 def test_file_read_convert_kv(self):190 data = """test=test191 """192 want = {193 'test': 'test'194 }195 path = _get_file(data, ".tmp")196 got = fops.file_read_convert(path, fops.KV)197 os.remove(path)198 self.assertDictEqual(got, want)199 def test_file_read_convert_xml(self):200 data = """<?xml version="1.0"?>201 <test>test</test>202 """203 want = {204 'test': 'test'205 }206 path = _get_file(data, ".xml")207 got = fops.file_read_convert(path, fops.XML)208 os.remove(path)209 self.assertDictEqual(got, want)210 def test_file_read_convert_yaml(self):211 data = '''test: "test"'''212 want = {213 'test': 'test'214 }215 path = _get_file(data, ".yml")216 got = fops.file_read_convert(path, fops.YAML)217 os.remove(path)218 self.assertDictEqual(got, want)219# noinspection PyBroadException220class TestFileWrite(unittest.TestCase):221 def test_file_write_file_does_not_exist(self):222 data = """test data"""223 path = os.path.join(tempfile.gettempdir(), "testFile.tmp")224 fops.file_write(path, data, newline='\n')225 try:226 self.assertTrue(os.path.isfile(path))227 except Exception:228 _cleanup(path)229 raise230 got = fops.file_read(path, strip=True)231 _cleanup(path)232 self.assertEqual(data, got)233 def test_file_write_file_exists(self):234 data = """test data"""235 path = _get_file("testing data", ".tmp")236 fops.file_write(path, data, newline='\n')237 got = fops.file_read(path, strip=True)238 _cleanup(path)239 self.assertEqual(data, got)240 def test_file_write_with_backup(self):241 data = """test data"""242 path = _get_file("testing data", ".tmp")243 expected = "{0!s}.bak".format(path)244 fops.file_write(path, data, backup=True, newline='\n')245 try:246 self.assertTrue(os.path.isfile(expected))247 except Exception:248 _cleanup(path, expected)249 raise250 got = fops.file_read(path, strip=True)251 _cleanup(path, expected)252 self.assertEqual(data, got)253class TestFileWriteCovertDefaults(unittest.TestCase):254 def test_file_write_convert_defaults_unknown_type(self):255 self.assertRaises(ValueError, fops.file_write_convert_defaults, 'unknown')256 def test_file_write_convert_defaults(self):257 tests = {258 fops.JSON: {'indent': 4},259 fops.KV: {'spaced': False},260 fops.XML: {'pretty': True, 'full_document': True, 'indent': ' '},261 fops.YAML: {'default_flow_style': False}262 }263 for key, value in tests.items():264 got = fops.file_write_convert_defaults(key)265 self.assertDictEqual(value, got, msg=key)266# noinspection PyBroadException267class TestFileWriteConvert(unittest.TestCase):268 def test_file_write_convert(self):269 data = {"test": "test"}270 tests = {271 fops.JSON: '{\n "test": "test"\n}',272 fops.KV: 'test=test',273 fops.XML: '<?xml version="1.0" encoding="utf-8"?>\n<test>test</test>',274 fops.YAML: 'test: test'275 }276 for key, value in tests.items():277 path = os.path.join(tempfile.gettempdir(), "tempFile.{0!s}".format(key))278 args = fops.file_write_convert_defaults(key)279 fops.file_write_convert(path, key, data, dumper_args=args)280 try:281 self.assertTrue(os.path.isfile(path))282 except Exception:283 _cleanup(path)284 raise285 got = fops.file_read(path, strip=True)286 _cleanup(path)287 self.assertEqual(value, got, msg=key)288class TestListDirsByExtension(unittest.TestCase):289 def test_list_dirs_by_extension(self):290 path = tempfile.mkdtemp(dir=tempfile.gettempdir())291 path1 = tempfile.mkdtemp(dir=path)292 path2 = tempfile.mkdtemp(dir=path)293 path3 = tempfile.mkdtemp(dir=path)294 path4 = tempfile.mkdtemp(dir=path)295 path5 = tempfile.mkdtemp(dir=path)296 data = {os.path.basename(path1), os.path.basename(path2), os.path.basename(path5)}297 Path(os.path.join(path1, 'test.txt')).touch()298 Path(os.path.join(path2, 'test.txt')).touch()299 Path(os.path.join(path3, 'test.json')).touch()300 Path(os.path.join(path4, 'test.not_txt')).touch()301 Path(os.path.join(path5, 'test.txt')).touch()302 got = fops.list_dirs_by_extension(path, 'txt')303 _cleanup(path)304 self.assertCountEqual(data, got)305 self.assertSetEqual(data, got)306class TestListFilesByExtension(unittest.TestCase):307 def test_list_files_by_extension(self):308 data = ['test1.txt', 'test2.txt']309 path = tempfile.mkdtemp(dir=tempfile.gettempdir())310 Path(os.path.join(path, 'test1.txt')).touch()311 Path(os.path.join(path, 'test2.txt')).touch()312 Path(os.path.join(path, 'test3.txt.nope')).touch()313 got = fops.list_files_by_extension(path, 'txt')314 _cleanup(path)315 self.assertCountEqual(data, got)316 self.assertListEqual(sorted(data), sorted(got))317class TestPushd(unittest.TestCase):318 def test_pushd_directory_does_not_exist(self):319 def child():320 with fops.pushd('something_that_should_not_exist'):321 print('')322 self.assertRaises(FileNotFoundError, child)323 def test_pushd(self):324 data = 'pushd test data'325 path = os.path.join(tempfile.gettempdir(), 'testFile.tmp')326 with fops.pushd(tempfile.gettempdir()):327 fops.file_write('testFile.tmp', data)328 got = fops.file_read(path, True)329 _cleanup(path)330 self.assertEqual(data, got)331# noinspection PyBroadException332class TestRequiredExecutables(unittest.TestCase):333 def test_required_executables_exist(self):334 try:335 fops.required_executables(['python'])336 test_var = True337 except Exception:338 test_var = False339 self.assertTrue(test_var)340 def test_required_executables_not_exist(self):341 def child():342 fops.required_executables(['this_definitely_should_not_exist_at_all'])343 self.assertRaises(SystemExit, child)...
nmea.py
Source:nmea.py
1############################################################################2# #3# Copyright (c)2008, 2009, Digi International (Digi). All Rights Reserved. #4# #5# Permission to use, copy, modify, and distribute this software and its #6# documentation, without fee and without a signed licensing agreement, is #7# hereby granted, provided that the software is used on Digi products only #8# and that the software contain this copyright notice, and the following #9# two paragraphs appear in all copies, modifications, and distributions as #10# well. Contact Product Management, Digi International, Inc., 11001 Bren #11# Road East, Minnetonka, MN, +1 952-912-3444, for commercial licensing #12# opportunities for non-Digi products. #13# #14# DIGI SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED #15# TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A #16# PARTICULAR PURPOSE. THE SOFTWARE AND ACCOMPANYING DOCUMENTATION, IF ANY, #17# PROVIDED HEREUNDER IS PROVIDED "AS IS" AND WITHOUT WARRANTY OF ANY KIND. #18# DIGI HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, #19# ENHANCEMENTS, OR MODIFICATIONS. #20# #21# IN NO EVENT SHALL DIGI BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, #22# SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST PROFITS, #23# ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF #24# DIGI HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. #25# #26############################################################################27"""Parse NMEA data to extract useful GPS information28"""29# Module to accept an NMEA stream and provide asynchronous access to30# the contained data content.31import re as _re32# NMEA sentence, begins with a '$', ends with EOL, may have an33# optional checksum sequence of a '*' and two hex digits at the end.34# Group 1 is sentence proper, Group 2 is the optional checksum.35_sentence_def = r"\$(.*?)(?:\*([0-9a-fA-F]{2}))?[\n\r]+"36_sentence_re = _re.compile(_sentence_def, _re.MULTILINE)37_sentence_templates = {38 # GPS fix data39 "GGA": ["fix_time", "latitude_magnitude", "latitude_hemisphere",40 "longitude_magnitude", "longitude_hemisphere",41 "fix_quality", "num_satellites", "hdop",42 "altitude", "altitude_units",43 "geoid_height", "geoid_units", "DGPS_time", "DGPS_station_id"],44 # Latitude and Longitude45 "GLL": ["latitude_magnitude", "latitude_hemisphere",46 "longitude_magnitude", "longitude_hemisphere",47 "fix_time", "fix_good"],48 # Recommended minimum GPS info49 "RMC": ["fix_time", "fix_good",50 "latitude_magnitude", "latitude_hemisphere",51 "longitude_magnitude", "longitude_hemisphere",52 "speed_over_ground", "course_over_ground", "fix_date",53 "magnetic_variation", "variation_direction"],54 }55_position_items = set(['latitude_magnitude',56 'longitude_magnitude',57 'latitude_hemisphere',58 'longitude_hemisphere'])59# Report unit information for items as appropriate60units = {"fix_time": "UTC",61 "latitude_magnitude": "degrees",62 "longitude_magnitude": "degrees",63 "speed_over_ground": "knots",64 "course_over_ground": "degrees",65 "altitude": "meters",66 "num_satellites": "satellites",67 }68# Convert string values to cleaner abstracted and typed values for consumption69_cleanup = dict()70_cleanup['latitude_magnitude'] = lambda val: int(val[:2]) + float(val[2:])/6071_cleanup['longitude_magnitude'] = lambda val: int(val[:3]) + float(val[3:])/6072_cleanup['fix_time'] = lambda val: "%s:%s:%s" % (73 val[0:2],val[2:4],val[4:6])74_cleanup['fix_date'] = lambda val: "%s/%s/%s" % (75 val[2:4],val[0:2],val[4:6])76_cleanup['speed_over_ground'] = lambda val: float(val)77_cleanup['course_over_ground'] = lambda val: float(val)78_cleanup['num_satellites'] = lambda val: int(val)79_cleanup['altitude'] = lambda val: float(val)80_cleanup['hdop'] = lambda val: float(val)81_cleanup['magnetic_variation'] = lambda val: float(val)82_cleanup['fix_good'] = lambda val: val == 'A'83class NMEA:84 """NMEA 0183 data stream parsing object85 86 feed(string) - Parse stream of NMEA protocol data87 position() -> (latitude, longitude)88 Once parsing has begun, raw NMEA data elements can be retrieved89 from the object using direct attribute access.90 """91 def __init__(self):92 self._position = (51.4772, 0) # Greenwich royal observatory93 self._working_sentence = ""94# _valid is passed a MatchObject from feed() that represents the95# sentence. If the sentence contains a group(2), we will calculate96# and compare the check sequence and return appropriate truth values.97 def _valid(self, sentence):98 if not sentence.group(2): #have to believe it valid99 return True100 # Calculate check sequence101 #checkcalc = reduce(lambda x, y: x^ord(y), sentence.group(1), 0)102 checkcalc = 0103 s = sentence.group(1)104 for i in xrange(len(s)):105 checkcalc = checkcalc ^ ord(s[i])106 107 check = int(sentence.group(2), 16)108 if check != checkcalc:109 return False110 else:111 return True112 def set_position(self):113 try:114 lat = self.latitude_magnitude115 lon = self.longitude_magnitude116 if self.latitude_hemisphere == 'S': lat = -lat117 if self.longitude_hemisphere == 'W': lon = -lon118 self.latitude_degrees = lat119 self.longitude_degrees = lon120 self._position = (lat, lon)121 finally:122 return123 def position(self):124 """position() -> (latitude, longitude)125 Return the position tuple with coordinates represented in126 numeric decimal format.127 """128 self.set_position()129 return self._position130# Given a sentence w/ possible check sequence and header/footer data131# removed, process for interesting information132 def _extract(self, sentence, report):133 update_position = False134 135 sentence = sentence.split(",")136 try:137 template = _sentence_templates[sentence[0][2:5]]138 except KeyError:139 return # Don't understand this sentence140 141 for i in range(len(template)):142 # Sometimes a field is not populated.143 # If it is not, we need to just simply skip over it.144 if sentence[i + 1] == "":145 continue146 if template[i] in _cleanup:147 sentence[i+1] = _cleanup[template[i]](sentence[i+1])148 self.__dict__[template[i]] = sentence[i + 1]149 if report:150 report(template[i], sentence[i+1])151 if template[i] in _position_items:152 update_position = True153 return update_position154 def feed(self, stream, report=None):155 """feed(string) - Parse NMEA 0183 data stream156 As data from your NMEA source is received, provide it to the157 object with this routine. This function updates the state of158 the object with extracted position information.159 """160 self._working_sentence += stream161 sentence = _sentence_re.search(self._working_sentence)162 end = 0163 while sentence:164 if self._valid(sentence):165 update_position = self._extract(sentence.group(1), report)166 if update_position:167 self.set_position()168 if report:169 report('latitude_degrees', self.latitude_degrees)170 report('longitude_degrees', self.longitude_degrees)171 172 end = sentence.end()173 sentence = _sentence_re.search(self._working_sentence,174 sentence.end())175 self._working_sentence = self._working_sentence[end:]176if __name__ == "__main__":177 def print_args(*args):178 print args179 try:180 print units[args[0]]181 except:182 pass183 def dummy(*args):184 pass185 for i in xrange(10000):186 gps = NMEA()187 #print "RMC"188 gps.feed(189 "$GPRMC,225446,A,4916.45,N,12311.12,W,000.5,054.7,191194,020.3,E*68\r",190 dummy)191 #print gps.position()192 #print '-'*70193 #print "GGA"194 gps.feed(195 "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47\r",196 dummy)197 #print gps.position()198 #print '-'*70199 #print "GLL"200 gps.feed("$GPGLL,4916.45,N,12311.12,W,225444,A,*1D\r",201 dummy)202 #print gps.position()203 #print '-'*70204 from pprint import pprint...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!