Best Python code snippet using molotov_python
database.py
Source:database.py
1from mysql.connector import errorcode2from utilities.utils import *3from utilities.enums import *4import mysql.connector5DATABASE_ERROR = -16class Database:7 def __init__(self, host, username, password, database, port):8 self.host = host9 self.username = username10 self.password = password11 self.db_name = database12 self.port = port13 self.cursor = None14 self.connection = None15 def connect(self):16 try:17 self.connection = mysql.connector.connect(user=self.username,18 password=self.password,19 host=self.host,20 database=self.db_name,21 port=self.port22 )23 return self.connection24 except mysql.connector.Error as err:25 if err.errno == errorcode.ER_ACCESS_DENIED_ERROR:26 return DatabaseErrors.ACCESS_DENIED27 elif err.errno == errorcode.ER_BAD_DB_ERROR:28 return DatabaseErrors.DB_EXCEPTION29 except mysql.connector.errors.OperationalError:30 return DatabaseErrors.CONNECTION_LOST31 def get_cursor(self):32 self.cursor = self.connection.cursor()33 def shut_down(self):34 self.cursor.close()35 self.connection.close()36 def get_admin_info(self, email):37 try:38 self.cursor.execute(f"""39 SELECT first_name, email, psw 40 FROM person JOIN administrator ON person.id = administrator.person_id41 WHERE person.email = '{email}'42 """)43 return self.cursor.fetchone()44 except mysql.connector.errors.OperationalError:45 return DatabaseErrors.CONNECTION_LOST46 except mysql.connector.errors.InterfaceError:47 return DatabaseErrors.CONNECTION_LOST48 def add_product(self, values):49 query = "INSERT INTO product (product_name, price, qnt) VALUES (%s, %s, %s);"50 args = (values[0], f'{values[1]}', values[2])51 # try to execute the query52 try:53 self.cursor.execute(query, args)54 except mysql.connector.errors.IntegrityError:55 return DatabaseErrors.NAME_ALREADY_EXIST56 except mysql.connector.errors.OperationalError:57 return DatabaseErrors.CONNECTION_LOST58 except mysql.connector.errors.DataError:59 return DatabaseErrors.DATA_ERROR60 except mysql.connector.errors.InterfaceError:61 return DatabaseErrors.CONNECTION_LOST62 self.connection.commit()63 def add_person(self, values):64 query = "INSERT INTO person (first_name, last_name, email, psw, money) VALUES (%s, %s, %s, %s, %s);"65 args = (values[0], values[1], values[2], values[3], f'{values[4]}')66 try:67 self.cursor.execute(query, args)68 except mysql.connector.errors.IntegrityError:69 return DatabaseErrors.EMAIL_ALREADY_EXIST70 except mysql.connector.errors.OperationalError:71 return DatabaseErrors.CONNECTION_LOST72 except mysql.connector.errors.DataError:73 return DatabaseErrors.DATA_ERROR74 except mysql.connector.errors.InterfaceError:75 return DatabaseErrors.CONNECTION_LOST76 self.connection.commit()77 def add_customer(self, customer_email):78 try:79 self.cursor.execute(f"SELECT id FROM person WHERE email = '{customer_email}'")80 id_ = self.cursor.fetchone()81 self.cursor.execute(f"INSERT INTO customer (person_id) VALUES ({id_[0]});")82 except mysql.connector.OperationalError:83 return DatabaseErrors.CONNECTION_LOST84 except mysql.connector.errors.InterfaceError:85 return DatabaseErrors.CONNECTION_LOST86 self.connection.commit()87 def get_customers(self, limit):88 try:89 self.cursor.execute(f'''SELECT customer.id, first_name, last_name, email, psw, money90 FROM customer JOIN person91 ON customer.person_id = person.id LIMIT {limit}92 ''')93 return self.cursor.fetchall()94 except mysql.connector.errors.OperationalError:95 return DatabaseErrors.CONNECTION_LOST96 except mysql.connector.errors.InterfaceError:97 return DatabaseErrors.CONNECTION_LOST98 def get_customer_info(self, email):99 try:100 self.cursor.execute(f"""SELECT email, psw, first_name, last_name, person.id, money101 FROM ecommerce.customer JOIN person ON customer.person_id = person.id102 WHERE email = '{email}';103 """)104 return self.cursor.fetchone()105 except mysql.connector.errors.OperationalError:106 return DatabaseErrors.CONNECTION_LOST107 except mysql.connector.errors.InterfaceError:108 return DatabaseErrors.CONNECTION_LOST109 def get_products(self, limit):110 try:111 self.cursor.execute(f"SELECT * FROM product LIMIT {limit}")112 return self.cursor.fetchall()113 except mysql.connector.errors.OperationalError:114 return DatabaseErrors.CONNECTION_LOST115 except mysql.connector.errors.InterfaceError:116 return DatabaseErrors.CONNECTION_LOST117 def get_super_root(self):118 try:119 self.cursor.execute("""SELECT email, psw FROM person120 JOIN administrator ON administrator.person_id = person.id121 WHERE administrator.id = 1"""122 )123 return self.cursor.fetchone()124 except mysql.connector.errors.OperationalError:125 return DatabaseErrors.CONNECTION_LOST126 def get_product_bought(self, prod_name):127 try:128 self.cursor.execute(f"SELECT * FROM product WHERE product.product_name LIKE '{prod_name}';")129 return self.cursor.fetchone()130 except mysql.connector.errors.OperationalError:131 return DatabaseErrors.CONNECTION_LOST132 except mysql.connector.errors.InterfaceError:133 return DatabaseErrors.CONNECTION_LOST134 def get_product_searched(self, prod_name, limit):135 try:136 self.cursor.execute(f"SELECT * FROM product WHERE product.product_name LIKE '%{prod_name}%' LIMIT {limit}")137 return self.cursor.fetchall()138 except mysql.connector.errors.OperationalError:139 return DatabaseErrors.CONNECTION_LOST140 def delete_product(self, id_):141 try:142 self.cursor.execute(f"DELETE FROM product WHERE id = {id_}")143 except mysql.connector.errors.OperationalError:144 return DatabaseErrors.CONNECTION_LOST145 except mysql.connector.errors.InterfaceError:146 return DatabaseErrors.CONNECTION_LOST147 self.connection.commit()148 def rmv_qnt_product(self, prod_name):149 try:150 product = self.get_product_bought(prod_name)151 if product[3] <= 0: # if the quantity is equal to 0152 return DatabaseErrors.OUT_OF_STOCK153 self.cursor.execute(f"UPDATE product SET qnt = {product[3] - 1} WHERE id = {product[0]};")154 except mysql.connector.errors.InterfaceError:155 return DatabaseErrors.CONNECTION_LOST156 self.connection.commit()157 def delete_customer(self, id_):158 try:159 self.cursor.execute("SET FOREIGN_KEY_CHECKS = 0;")160 self.connection.commit()161 self.cursor.execute(f"""SELECT customer.person_id FROM customer162 JOIN person ON person.id = customer.person_id WHERE customer.id = {id_};""")163 person_id = self.cursor.fetchone()164 self.cursor.execute(f"DELETE FROM customer WHERE customer.id = {id_};")165 self.cursor.execute(f"""DELETE FROM person WHERE person.id = {person_id[0]};""")166 self.connection.commit()167 except mysql.connector.errors.OperationalError:168 return DatabaseErrors.CONNECTION_LOST169 except mysql.connector.errors.InterfaceError:170 return DatabaseErrors.CONNECTION_LOST171 self.connection.commit()172 def get_customer_searched(self, customer_name, limit):173 try:174 self.cursor.execute(f"""SELECT person.id, first_name, last_name, email, psw, money175 FROM person JOIN customer ON person.id = customer.person_id176 WHERE person.first_name LIKE '%{customer_name}%' LIMIT {limit}""")177 return self.cursor.fetchall()178 except mysql.connector.errors.OperationalError:179 return DatabaseErrors.CONNECTION_LOST180 except mysql.connector.errors.InterfaceError:181 return DatabaseErrors.CONNECTION_LOST182 def customer_change_money(self, credit, person_id):183 try:184 self.cursor.execute(f"UPDATE person SET person.money = {credit} WHERE person.id = {person_id}")185 except mysql.connector.errors.OperationalError:186 return False187 except mysql.connector.errors.InterfaceError:188 return DatabaseErrors.CONNECTION_LOST189 self.connection.commit()190 return True191 def create_my_order(self, customer_id):192 try:193 self.cursor.execute(f"INSERT INTO my_order (customer_id) VALUES ({customer_id});")194 except mysql.connector.errors.OperationalError:195 return DatabaseErrors.CONNECTION_LOST196 except mysql.connector.errors.InterfaceError:197 return DatabaseErrors.CONNECTION_LOST198 self.connection.commit()199 def create_prod_ordered(self, prod_id, customer_id):200 try:201 order_id = self.get_last_order(customer_id)202 sql = "INSERT INTO product_ordered(product_id, order_id, date_) VALUES (%s, %s, %s);"203 args = (f'{prod_id}', f'{order_id[0]}', get_date())204 self.cursor.execute(sql, args)205 except mysql.connector.errors.OperationalError:206 return DatabaseErrors.CONNECTION_LOST207 self.connection.commit()208 def get_customer_id(self, person_id):209 try:210 self.cursor.execute(f"""SELECT customer.id FROM customer211 JOIN person ON person.id = customer.person_id212 WHERE person.id = {person_id}213 """)214 return self.cursor.fetchone()215 except mysql.connector.errors.OperationalError:216 return DatabaseErrors.CONNECTION_LOST217 except mysql.connector.errors.InterfaceError:218 return DatabaseErrors.CONNECTION_LOST219 def get_last_order(self, customer_id):220 try:221 self.cursor.execute(222 f"SELECT id FROM my_order WHERE my_order.customer_id = {customer_id} ORDER BY id DESC LIMIT 1")223 return self.cursor.fetchone()224 except mysql.connector.errors.OperationalError:225 return DatabaseErrors.CONNECTION_LOST226 def get_orders(self, customer_id, limit):227 try:228 self.cursor.execute(f"""SELECT person.first_name,229 person.last_name,230 product.product_name,231 product_ordered.date_,232 my_order.id, product.id233 234 FROM my_order235 JOIN customer ON my_order.customer_id = customer.id236 JOIN person ON person.id = customer.person_id237 JOIN product_ordered ON product_ordered.order_id = my_order.id238 JOIN product ON product.id = product_ordered.product_id239 240 WHERE customer_id = {customer_id}241 LIMIT {limit}242 """)243 return self.cursor.fetchall()244 except mysql.connector.errors.OperationalError:245 return DatabaseErrors.CONNECTION_LOST246 except mysql.connector.errors.InterfaceError:247 return DatabaseErrors.CONNECTION_LOST248 except TypeError:249 return DatabaseErrors.CONNECTION_LOST250 def get_orders_searched(self, customer_id, prod_name, limit):251 try:252 self.cursor.execute(f'''SELECT person.first_name,253 person.last_name,254 product.product_name,255 product_ordered.date_,256 my_order.id,257 product.id258 259 FROM my_order260 JOIN customer ON my_order.customer_id = customer.id261 JOIN person ON person.id = customer.person_id262 JOIN product_ordered ON product_ordered.order_id = my_order.id263 JOIN product ON product.id = product_ordered.product_id264 265 WHERE customer_id = {customer_id} AND product.product_name LIKE "%{prod_name}%"266 LIMIT {limit};''')267 return self.cursor.fetchall()268 except mysql.connector.errors.OperationalError:269 return DatabaseErrors.CONNECTION_LOST270 except mysql.connector.errors.InterfaceError:271 return DatabaseErrors.CONNECTION_LOST272 def delete_order(self, order_id):273 try:274 self.cursor.execute('''SET FOREIGN_KEY_CHECKS = 0;''')275 self.connection.commit()276 self.cursor.execute(f'''DELETE FROM my_order WHERE my_order.id = {order_id};''')277 self.connection.commit()278 self.cursor.execute(f'''DELETE FROM product_ordered WHERE product_ordered.order_id = {order_id};''')279 self.connection.commit()280 except mysql.connector.errors.OperationalError:281 return DatabaseErrors.CONNECTION_LOST282 except mysql.connector.errors.InterfaceError:283 return DatabaseErrors.CONNECTION_LOST284 def update_qnt_product(self, prod_id, new_qnt):285 try:286 self.cursor.execute(f'''UPDATE product SET product.qnt = "{new_qnt}" WHERE product.id = {prod_id}''')287 except mysql.connector.errors.OperationalError:288 return DatabaseErrors.CONNECTION_LOST289 except mysql.connector.errors.DataError:290 return DatabaseErrors.DATA_ERROR291 except mysql.connector.errors.InterfaceError:292 return DatabaseErrors.CONNECTION_LOST293 self.connection.commit()294 def update_name_product(self, prod_id, new_name):295 try:296 self.cursor.execute(f'''297 UPDATE product SET product.product_name = "{new_name}"298 WHERE product.id = {prod_id}299 ''')300 except mysql.connector.errors.OperationalError:301 return DatabaseErrors.CONNECTION_LOST302 except mysql.connector.errors.DataError:303 return DatabaseErrors.DATA_ERROR304 except mysql.connector.errors.IntegrityError:305 return DatabaseErrors.NAME_ALREADY_EXIST306 except mysql.connector.errors.InterfaceError:307 return DatabaseErrors.CONNECTION_LOST308 self.connection.commit()309 def update_price_product(self, prod_id, new_price):310 try:311 self.cursor.execute(f'''UPDATE product SET product.price = "{new_price}" WHERE product.id = {prod_id}''')312 except mysql.connector.errors.OperationalError:313 return DatabaseErrors.CONNECTION_LOST314 except mysql.connector.errors.InterfaceError:315 return DatabaseErrors.CONNECTION_LOST316 self.connection.commit()317 def get_last_person(self):318 try:319 self.cursor.execute("SELECT person.id FROM person ORDER BY person.id DESC LIMIT 1")320 return self.cursor.fetchone()321 except mysql.connector.errors.OperationalError:322 return DatabaseErrors.CONNECTION_LOST323 except mysql.connector.errors.InterfaceError:324 return DatabaseErrors.CONNECTION_LOST325 def add_root(self):326 try:327 last_person_id = self.get_last_person()328 self.cursor.execute(f"""INSERT INTO administrator(person_id) VALUES ('{last_person_id[0]}');""")329 except mysql.connector.OperationalError:330 return DatabaseErrors.CONNECTION_LOST331 except mysql.connector.IntegrityError:332 return DatabaseErrors.EMAIL_ALREADY_EXIST333 except mysql.connector.errors.InterfaceError:334 return DatabaseErrors.CONNECTION_LOST335 self.connection.commit()336 def get_admins(self, limit):337 try:338 self.cursor.execute(f"""SELECT administrator.id,339 person.first_name,340 person.last_name,341 person.email,342 person.psw,343 person.money344 FROM ecommerce.administrator JOIN person ON person.id = administrator.person_id345 LIMIT {limit}346 """)347 return self.cursor.fetchall()348 except mysql.connector.OperationalError:349 return DatabaseErrors.CONNECTION_LOST350 except mysql.connector.errors.InterfaceError:351 return DatabaseErrors.CONNECTION_LOST352 def get_person_id_super_root(self, id_):353 try:354 self.cursor.execute(f"""SELECT administrator.person_id355 FROM ecommerce.administrator JOIN person ON person.id = administrator.person_id356 WHERE administrator.id = {id_}357 """)358 return self.cursor.fetchone()359 except mysql.connector.OperationalError:360 return DatabaseErrors.CONNECTION_LOST361 except mysql.connector.errors.InterfaceError:362 return DatabaseErrors.CONNECTION_LOST363 def delete_admin(self, admin_id):364 try:365 id_ = self.get_person_id_super_root(admin_id)366 if id_ is None:367 return DatabaseErrors.NO_ADMIN_FOUND368 self.cursor.execute("SET FOREIGN_KEY_CHECKS = 0")369 self.connection.commit()370 self.cursor.execute(f"DELETE FROM person WHERE person.id = {id_[0]};")371 self.connection.commit()372 self.cursor.execute(f"DELETE FROM administrator WHERE administrator.person_id = {id_[0]};")373 self.connection.commit()374 except mysql.connector.errors.OperationalError:375 return DatabaseErrors.CONNECTION_LOST376 except mysql.connector.errors.InterfaceError:377 return DatabaseErrors.CONNECTION_LOST378 def get_admin_searched(self, admin_name, limit):379 try:380 self.cursor.execute(f"""SELECT administrator.id,381 person.first_name,382 person.last_name,383 person.email,384 person.psw,385 person.money386 387 FROM administrator388 389 JOIN person ON person.id = administrator.person_id390 391 WHERE person.first_name LIKE '%{admin_name}%' LIMIT {limit}392 """)393 return self.cursor.fetchall()394 except mysql.connector.errors.OperationalError:395 return DatabaseErrors.CONNECTION_LOST396 def update_person_first_name(self, new_name, person_id):397 try:398 self.cursor.execute(f"UPDATE person SET person.first_name = '{new_name}' WHERE person.id = {person_id[0]};")399 except mysql.connector.OperationalError:400 return DatabaseErrors.CONNECTION_LOST401 except mysql.connector.errors.DataError:402 return DatabaseErrors.DATA_ERROR403 except mysql.connector.errors.InterfaceError:404 return DatabaseErrors.CONNECTION_LOST405 self.connection.commit()406 def update_person_last_name(self, new_name, person_id):407 try:408 self.cursor.execute(f"UPDATE person SET person.last_name = '{new_name}' WHERE person.id = {person_id[0]};")409 except mysql.connector.OperationalError:410 return DatabaseErrors.CONNECTION_LOST411 except mysql.connector.errors.DataError:412 return DatabaseErrors.DATA_ERROR413 except mysql.connector.errors.InterfaceError:414 return DatabaseErrors.CONNECTION_LOST415 self.connection.commit()416 def update_person_email(self, new_email, person_id):417 try:418 self.cursor.execute(f"UPDATE person SET person.email = '{new_email}' WHERE person.id = {person_id[0]};")419 except mysql.connector.errors.OperationalError:420 return DatabaseErrors.CONNECTION_LOST421 except mysql.connector.errors.IntegrityError:422 return DatabaseErrors.EMAIL_ALREADY_EXIST423 except mysql.connector.errors.InterfaceError:424 return DatabaseErrors.CONNECTION_LOST425 self.connection.commit()426 def update_person_password(self, new_psw, person_id):427 try:428 self.cursor.execute(f"UPDATE person SET person.psw = '{new_psw}' WHERE person.id = {person_id[0]};")429 except mysql.connector.errors.OperationalError:430 return DatabaseErrors.CONNECTION_LOST431 except mysql.connector.errors.InterfaceError:432 return DatabaseErrors.CONNECTION_LOST433 self.connection.commit()434 def update_person_money(self, new_money, person_id):435 try:436 self.cursor.execute(f"UPDATE person SET person.money = '{new_money}' WHERE person.id = {person_id[0]}")437 except mysql.connector.errors.OperationalError:438 return DatabaseErrors439 except mysql.connector.errors.InterfaceError:440 return DatabaseErrors.CONNECTION_LOST...
__init__.py
Source:__init__.py
...20 def connection_made(self, transport):21 """Called when reader thread is started"""22 def data_received(self, data):23 """Called with snippets received from the serial port"""24 def connection_lost(self, exc):25 """\26 Called when the serial port is closed or the reader loop terminated27 otherwise.28 """29 if isinstance(exc, Exception):30 raise exc31class Packetizer(Protocol):32 """33 Read binary packets from serial port. Packets are expected to be terminated34 with a TERMINATOR byte (null byte by default).35 The class also keeps track of the transport.36 """37 TERMINATOR = b'\0'38 def __init__(self):39 self.buffer = bytearray()40 self.transport = None41 def connection_made(self, transport):42 """Store transport"""43 self.transport = transport44 def connection_lost(self, exc):45 """Forget transport"""46 self.transport = None47 super(Packetizer, self).connection_lost(exc)48 def data_received(self, data):49 """Buffer received data, find TERMINATOR, call handle_packet"""50 self.buffer.extend(data)51 while self.TERMINATOR in self.buffer:52 packet, self.buffer = self.buffer.split(self.TERMINATOR, 1)53 self.handle_packet(packet)54 def handle_packet(self, packet):55 """Process packets - to be overridden by subclassing"""56 raise NotImplementedError('please implement functionality in handle_packet')57class FramedPacket(Protocol):58 """59 Read binary packets. Packets are expected to have a start and stop marker.60 The class also keeps track of the transport.61 """62 START = b'('63 STOP = b')'64 def __init__(self):65 self.packet = bytearray()66 self.in_packet = False67 self.transport = None68 def connection_made(self, transport):69 """Store transport"""70 self.transport = transport71 def connection_lost(self, exc):72 """Forget transport"""73 self.transport = None74 self.in_packet = False75 del self.packet[:]76 super(FramedPacket, self).connection_lost(exc)77 def data_received(self, data):78 """Find data enclosed in START/STOP, call handle_packet"""79 for byte in serial.iterbytes(data):80 if byte == self.START:81 self.in_packet = True82 elif byte == self.STOP:83 self.in_packet = False84 self.handle_packet(bytes(self.packet)) # make read-only copy85 del self.packet[:]86 elif self.in_packet:87 self.packet.extend(byte)88 else:89 self.handle_out_of_packet_data(byte)90 def handle_packet(self, packet):91 """Process packets - to be overridden by subclassing"""92 raise NotImplementedError('please implement functionality in handle_packet')93 def handle_out_of_packet_data(self, data):94 """Process data that is received outside of packets"""95 pass96class LineReader(Packetizer):97 """98 Read and write (Unicode) lines from/to serial port.99 The encoding is applied.100 """101 TERMINATOR = b'\r\n'102 ENCODING = 'utf-8'103 UNICODE_HANDLING = 'replace'104 def handle_packet(self, packet):105 self.handle_line(packet.decode(self.ENCODING, self.UNICODE_HANDLING))106 def handle_line(self, line):107 """Process one line - to be overridden by subclassing"""108 raise NotImplementedError('please implement functionality in handle_line')109 def write_line(self, text):110 """111 Write text to the transport. ``text`` is a Unicode string and the encoding112 is applied before sending ans also the newline is append.113 """114 # + is not the best choice but bytes does not support % or .format in py3 and we want a single write call115 self.transport.write(text.encode(self.ENCODING, self.UNICODE_HANDLING) + self.TERMINATOR)116class ReaderThread(threading.Thread):117 """\118 Implement a serial port read loop and dispatch to a Protocol instance (like119 the asyncio.Protocol) but do it with threads.120 Calls to close() will close the serial port but it is also possible to just121 stop() this thread and continue the serial port instance otherwise.122 """123 def __init__(self, serial_instance, protocol_factory):124 """\125 Initialize thread.126 Note that the serial_instance' timeout is set to one second!127 Other settings are not changed.128 """129 super(ReaderThread, self).__init__()130 self.daemon = True131 self.serial = serial_instance132 self.protocol_factory = protocol_factory133 self.alive = True134 self._lock = threading.Lock()135 self._connection_made = threading.Event()136 self.protocol = None137 def stop(self):138 """Stop the reader thread"""139 self.alive = False140 if hasattr(self.serial, 'cancel_read'):141 self.serial.cancel_read()142 self.join(2)143 def run(self):144 """Reader loop"""145 if not hasattr(self.serial, 'cancel_read'):146 self.serial.timeout = 1147 self.protocol = self.protocol_factory()148 try:149 self.protocol.connection_made(self)150 except Exception as e:151 self.alive = False152 self.protocol.connection_lost(e)153 self._connection_made.set()154 return155 error = None156 self._connection_made.set()157 while self.alive and self.serial.is_open:158 try:159 # read all that is there or wait for one byte (blocking)160 data = self.serial.read(self.serial.in_waiting or 1)161 except serial.SerialException as e:162 # probably some I/O problem such as disconnected USB serial163 # adapters -> exit164 error = e165 break166 else:167 if data:168 # make a separated try-except for called user code169 try:170 self.protocol.data_received(data)171 except Exception as e:172 error = e173 break174 self.alive = False175 self.protocol.connection_lost(error)176 self.protocol = None177 def write(self, data):178 """Thread safe writing (uses lock)"""179 with self._lock:180 return self.serial.write(data)181 def close(self):182 """Close the serial port and exit reader thread (uses lock)"""183 # use the lock to let other threads finish writing184 with self._lock:185 # first stop reading, so that closing can be done on idle port186 self.stop()187 self.serial.close()188 def connect(self):189 """190 Wait until connection is set up and return the transport and protocol191 instances.192 """193 if self.alive:194 self._connection_made.wait()195 if not self.alive:196 raise RuntimeError('connection_lost already called')197 return (self, self.protocol)198 else:199 raise RuntimeError('already stopped')200 # - - context manager, returns protocol201 def __enter__(self):202 """\203 Enter context handler. May raise RuntimeError in case the connection204 could not be created.205 """206 self.start()207 self._connection_made.wait()208 if not self.alive:209 raise RuntimeError('connection_lost already called')210 return self.protocol211 def __exit__(self, exc_type, exc_val, exc_tb):212 """Leave context: close port"""213 self.close()214# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -215# test216if __name__ == '__main__':217 # pylint: disable=wrong-import-position218 import sys219 import time220 import traceback221 #~ PORT = 'spy:///dev/ttyUSB0'222 PORT = 'loop://'223 class PrintLines(LineReader):224 def connection_made(self, transport):225 super(PrintLines, self).connection_made(transport)226 sys.stdout.write('port opened\n')227 self.write_line('hello world')228 def handle_line(self, data):229 sys.stdout.write('line received: {!r}\n'.format(data))230 def connection_lost(self, exc):231 if exc:232 traceback.print_exc(exc)233 sys.stdout.write('port closed\n')234 ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)235 with ReaderThread(ser, PrintLines) as protocol:236 protocol.write_line('hello')237 time.sleep(2)238 # alternative usage239 ser = serial.serial_for_url(PORT, baudrate=115200, timeout=1)240 t = ReaderThread(ser, PrintLines)241 t.start()242 transport, protocol = t.connect()243 protocol.write_line('hello')244 time.sleep(2)...
fdesc.py
Source:fdesc.py
1# -*- test-case-name: twisted.test.test_fdesc -*-2# Copyright (c) 2001-2009 Twisted Matrix Laboratories.3# See LICENSE for details.4"""5Utility functions for dealing with POSIX file descriptors.6"""7import os8import errno9try:10 import fcntl11except ImportError:12 fcntl = None13# twisted imports14from twisted.internet.main import CONNECTION_LOST, CONNECTION_DONE15from twisted.python.runtime import platformType16def setNonBlocking(fd):17 """18 Make a file descriptor non-blocking.19 """20 flags = fcntl.fcntl(fd, fcntl.F_GETFL)21 flags = flags | os.O_NONBLOCK22 fcntl.fcntl(fd, fcntl.F_SETFL, flags)23def setBlocking(fd):24 """25 Make a file descriptor blocking.26 """27 flags = fcntl.fcntl(fd, fcntl.F_GETFL)28 flags = flags & ~os.O_NONBLOCK29 fcntl.fcntl(fd, fcntl.F_SETFL, flags)30if fcntl is None:31 # fcntl isn't available on Windows. By default, handles aren't32 # inherited on Windows, so we can do nothing here.33 _setCloseOnExec = _unsetCloseOnExec = lambda fd: None34else:35 def _setCloseOnExec(fd):36 """37 Make a file descriptor close-on-exec.38 """39 flags = fcntl.fcntl(fd, fcntl.F_GETFD)40 flags = flags | fcntl.FD_CLOEXEC41 fcntl.fcntl(fd, fcntl.F_SETFD, flags)42 def _unsetCloseOnExec(fd):43 """44 Make a file descriptor close-on-exec.45 """46 flags = fcntl.fcntl(fd, fcntl.F_GETFD)47 flags = flags & ~fcntl.FD_CLOEXEC48 fcntl.fcntl(fd, fcntl.F_SETFD, flags)49def readFromFD(fd, callback):50 """51 Read from file descriptor, calling callback with resulting data.52 If successful, call 'callback' with a single argument: the53 resulting data.54 Returns same thing FileDescriptor.doRead would: CONNECTION_LOST,55 CONNECTION_DONE, or None.56 @type fd: C{int}57 @param fd: non-blocking file descriptor to be read from.58 @param callback: a callable which accepts a single argument. If59 data is read from the file descriptor it will be called with this60 data. Handling exceptions from calling the callback is up to the61 caller.62 Note that if the descriptor is still connected but no data is read,63 None will be returned but callback will not be called.64 @return: CONNECTION_LOST on error, CONNECTION_DONE when fd is65 closed, otherwise None.66 """67 try:68 output = os.read(fd, 8192)69 except (OSError, IOError), ioe:70 if ioe.args[0] in (errno.EAGAIN, errno.EINTR):71 return72 else:73 return CONNECTION_LOST74 if not output:75 return CONNECTION_DONE76 callback(output)77def writeToFD(fd, data):78 """79 Write data to file descriptor.80 Returns same thing FileDescriptor.writeSomeData would.81 @type fd: C{int}82 @param fd: non-blocking file descriptor to be written to.83 @type data: C{str} or C{buffer}84 @param data: bytes to write to fd.85 @return: number of bytes written, or CONNECTION_LOST.86 """87 try:88 return os.write(fd, data)89 except (OSError, IOError), io:90 if io.errno in (errno.EAGAIN, errno.EINTR):91 return 092 return CONNECTION_LOST...
helpers.py
Source:helpers.py
...34 return zone_combinations35class HandleUDPBroadcast:36 def connection_made(self, transport):37 pass38 def connection_lost(self, exc):39 pass40class HandleServer(asyncio.Protocol):41 def __init__(self, connection_made, data_received, connection_lost):42 self._connection_made_callback = connection_made43 self._data_received_callback = data_received44 self._connection_lost_callback = connection_lost45 def connection_made(self, transport):46 self._connection_made_callback(transport)47 def data_received(self, data):48 self._data_received_callback(data)49 def connection_lost(self, exc):...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!