Best Python code snippet using avocado_python
resource_manager_base_mimo.py
Source:resource_manager_base_mimo.py
1#!/usr/bin/env python2# 3# Copyright 2014 Institute for Theoretical Information Technology,4# RWTH Aachen University5# www.ti.rwth-aachen.de6# 7# This is free software; you can redistribute it and/or modify8# it under the terms of the GNU General Public License as published by9# the Free Software Foundation; either version 3, or (at your option)10# any later version.11# 12# This software is distributed in the hope that it will be useful,13# but WITHOUT ANY WARRANTY; without even the implied warranty of14# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the15# GNU General Public License for more details.16# 17# You should have received a copy of the GNU General Public License18# along with this software; see the file COPYING. If not, write to19# the Free Software Foundation, Inc., 51 Franklin Street,20# Boston, MA 02110-1301, USA.21#22# Framework for resource managers. Derive specific implementation from this23# base class24from omniORB import CORBA, PortableServer25from corba_stubs import ofdm_ti,ofdm_ti__POA26import CosNaming27import CosEventComm__POA, CosEventComm28import CosEventChannelAdmin29import os,sys,time,threading30from omniORB.any import to_any31from gnuradio import gr32import random,cmath33from array import array as array_f34from random import seed,randint35from corba_servants import *36from threading import Timer37from numpy import concatenate38import numpy39import scipy,math40from scipy import sqrt,log, exp,randn, sum, absolute, multiply, array2string, reshape, ceil, array, zeros,ones, log, floor41from pylab import plot, stem, subplot, show, ylim42from numpy.fft import fftshift43import socket44std_event_channel = "corbaname:rir:/NameService#"45import logging46from time import clock,strftime,gmtime47class ctrl_event:48 def __init__(self,ctrl=None,txamp=0.0,ber=0.0,constraint=0.0,datarate=0.0):49 self.ctrl = ctrl50 self.timestamp = clock()51 self.tx_amplitude = txamp52 self.ber = ber53 self.constraint = constraint54 self.datarate = datarate55class resource_manager_base (ofdm_ti__POA.PA_Ctrl):56 def __init__(self,orb,loggerbase="",options=None):57 self.orb = orb58 self.options = options59 60 self.rm_logger = logging.getLogger(loggerbase+"rmbase")61 self.rm_logger.setLevel(logging.DEBUG)62 self.connect_push_supplier()63 self.connect_push_consumer()64 # set initial parameters65 self.strategy_mode = ofdm_ti.PA_Ctrl.reset66 self.required_ber = 0.00167 self.constraint = 1000.0 # rate or power68 self.setup_time = 2000 # ms69 self.data_rate= 36507970 self.tx_amplitude = 100071 if options.dyn_freq:72 self.tx_freq = 245000000073 self.rx_freq = self.tx_freq74 self.frame_length = options.data_blocks + 1 + 275 self.data_blocks = options.data_blocks76 self.id_blocks = 177 self.tx_id = 178 self.max_tx_id = 102479 80 81 self.store_ctrl_events = False82 self.ctrl_events = dict()83 # shadow copies84 self._strategy_mode = self.strategy_mode85 self._required_ber = self.required_ber86 self._constraint = self.constraint87 # get object references88 self.info_tx = resolve(orb,"info_tx",ofdm_ti.info_tx)89 self.ci_impulse = resolve(orb,"sounder_cir_c", ofdm_ti.data_buffer)90 self.tx_power_ref = resolve(orb,"txpower", ofdm_ti.push_vector_f)91 self.estim_power_ref = resolve(orb,"estim_power", ofdm_ti.push_vector_f)92 if options.dyn_freq:93 self.tx_freq_ref = resolve(orb,"txfreq", ofdm_ti.push_vector_f)94 self.rx_freq_ref = resolve(orb,"rxfreq", ofdm_ti.push_vector_f)95 self.tx_ac = resolve(orb,"channelcheat", ofdm_ti.push_vector_c)96 # current datarate corba servant97 def dummy():98 pass99 self.datarate_servant = corba_ndata_buffer_servant("cur_datarate",100 self._get_data_rate,101 dummy)102 #self.UDP_client()103 if self.info_tx is None:104 raise SystemExit, "Need TX information"105 # latch tx information106 self.subcarriers = self.info_tx._get_subcarriers() #data subcarriers w/o pilot subcarriers107 self.fft_length = self.info_tx._get_fft_window()108 self.cp_length = self.info_tx._get_cp_length()109 self.block_length = self.fft_length+self.cp_length110 self.data_frame_length = self.info_tx._get_burst_length() # damn111 self.bandwidth = self.info_tx._get_bandwidth()112 # modify these variables to change the power allocation scheme or to113 # influence the artificial channel114 self.pa_vector = [float(1.0)]*self.subcarriers115 try:116 self.ac_vlen = self.tx_ac.vlen()117 self.ac_vector = [0.0+0.0j]*self.ac_vlen118 self.ac_vector[0] = 1.0+0.0j119 except:120 self.ac_vector=[1]121 self.ac_vlen = 1122 self.mod_map = [2]*self.subcarriers123 self.assignment_map = [1]*self.subcarriers124 self.rm_logger.info("Subcarriers: %d"%(self.subcarriers))125 self.pa_msgq = gr.msg_queue(2)126 self.pa_disp_servant = corba_data_buffer_servant("padisp",self.subcarriers,127 self.pa_msgq)128 self.ra_msgq = gr.msg_queue(2)129 self.ra_disp_servant = corba_data_buffer_servant("radisp",self.subcarriers,130 self.ra_msgq)131 # tell transmitter initial settings and wait for it to stabilize132 self.propagate_changes()133 time.sleep(1.0) # seconds134 def UDP_client (self):135 self.clisock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)136 self.serverip = socket.gethostbyname('tabur')137 self.rm_logger.info( "Tabur IP: %s" %(str( self.serverip)))138 vv = self.data_rate139 self.rm_logger.debug( "Data rate: %d"%( vv))140 try:141 self.clisock.connect ((self.serverip,5000))142 self.clisock.send(str(vv)+"\n")143 #print self.clisock.recv(100)144 self.clisock.close()145 except:146 pass147 def connect_push_supplier(self):148 # get event channel149 obj = self.orb.string_to_object(std_event_channel+"GNUradio_EventChannel")150 self.channel = obj._narrow(CosEventChannelAdmin.EventChannel)151 assert(self.channel is not None)152 self.supplier=Supplier_i()153 sptr=self.supplier._this() # SIDE EFFECT: Activates object in POA154 #155 # Get Supplier Admin interface - retrying on Comms Failure.156 while(1):157 try:158 self.supplier_admin=self.channel.for_suppliers()159 if self.supplier_admin is None:160 sys.stderr.write("Event Channel returned nil Supplier Admin!\n")161 sys.exit(1)162 break163 except CORBA.COMM_FAILURE, ex:164 sys.stderr.write("Caught COMM_FAILURE Exception. "+ \165 "obtaining Supplier Admin! Retrying...\n")166 time.sleep(1)167 self.rm_logger.debug( "Obtained SupplierAdmin.")168 #169 # Get proxy consumer - retrying on Comms Failure.170 while (1):171 try:172 self.proxy_consumer=self.supplier_admin.obtain_push_consumer()173 if self.proxy_consumer is None:174 sys.stderr.write("Supplier Admin returned nil proxy_consumer!\n")175 sys.exit(1)176 break177 except CORBA.COMM_FAILURE, ex:178 sys.stderr.write("Caught COMM_FAILURE Exception "+ \179 "obtaining Proxy Push Consumer! Retrying...\n")180 time.sleep(1)181 self.rm_logger.debug( "Obtained ProxyPushConsumer.")182 #183 # Connect Push Supplier - retrying on Comms Failure.184 while (1):185 try:186 self.proxy_consumer.connect_push_supplier(sptr)187 break188 except CORBA.BAD_PARAM, ex:189 sys.stderr.write( \190 'Caught BAD_PARAM Exception connecting Push Supplier!')191 sys.exit(1)192 except CosEventChannelAdmin.AlreadyConnected, ex:193 sys.stderr.write('Proxy Push Consumer already connected!')194 sys.exit(1)195 except CORBA.COMM_FAILURE, ex:196 sys.stderr.write("Caught COMM_FAILURE Exception " +\197 "connecting Push Supplier! Retrying...")198 time.sleep(1)199 self.rm_logger.debug( "Connected Push Supplier.")200 def connect_push_consumer(self):201 obj = self.orb.string_to_object(std_event_channel+"himalaya")202 channel = obj._narrow(CosEventChannelAdmin.EventChannel)203 assert(channel is not None)204 #205 # Get Consumer Admin interface - retrying on Comms Failure.206 while(1):207 try:208 consumer_admin = channel.for_consumers ()209 if consumer_admin is None:210 sys.stderr.write("Event Channel returned nil Consumer Admin!\n")211 sys.exit(1)212 break213 except CORBA.COMM_FAILURE, ex:214 sys.stderr.write("Caught COMM_FAILURE Exception. "+ \215 "obtaining Consumer Admin! Retrying...\n")216 time.sleep(1)217 self.rm_logger.debug( "Obtained ConsumerAdmin.")218 #219 # Make a Push Consumer.220 self.pushconsumer = consumer = Consumer_i()221 #222 # Get proxy supplier - retrying on Comms Failure.223 while (1):224 try:225 proxy_supplier = consumer_admin.obtain_push_supplier ()226 if proxy_supplier is None:227 sys.stderr.write("Consumer Admin return nil proxy_supplier!\n")228 sys.exit(1)229 break230 except CORBA.COMM_FAILURE, ex:231 sys.stderr.write("Caught COMM_FAILURE Exception. "+ \232 "obtaining Proxy Push Supplier! Retrying...\n")233 time.sleep(1)234 self.rm_logger.debug( "Obtained ProxyPushSupplier.")235 #236 # Connect Push Consumer - retrying on Comms Failure.237 while (1):238 try:239 proxy_supplier.connect_push_consumer(consumer._this())240 break241 except CORBA.BAD_PARAM, ex:242 sys.stderr.write( \243 'Caught BAD_PARAM Exception connecting Push Consumer!\n')244 sys.exit(1)245 except CosEventChannelAdmin.AlreadyConnected, ex:246 sys.stderr.write('Proxy Push Supplier already connected!\n')247 sys.exit(1)248 except CORBA.COMM_FAILURE, ex:249 sys.stderr.write("Caught COMM_FAILURE Exception " +\250 "connecting Push Consumer! Retrying...\n")251 time.sleep(1)252 self.rm_logger.debug( "Connected Push Consumer.")253 def push_data(self):254 pa_vector = list(map(lambda x : float(x),self.pa_vector))255 assignment_map = list(map(lambda x: int(x),self.assignment_map))256 mod_map = array_f('B',self.mod_map).tostring()257 258 bits_per_mode = numpy.array([0.5,1.,1.5,2.,3.,4.,4.5,5.,6.])259 bit_map = numpy.zeros(len(self.mod_map))260 for x in range(len(self.mod_map)):261 if self.mod_map[x] > 0:262 if self.options.coding:263 bit_map[x] = bits_per_mode[self.mod_map[x]-1]264 else:265 bit_map[x] = float(self.mod_map[x])266 bit_map = bit_map.tolist()267 tx_id = self.tx_id268 self.tx_id = (self.tx_id + 1) % self.max_tx_id269 for x in range(len(self.mod_map)):270 if self.mod_map[x] == 0:271 assert(assignment_map[x] == 0)272 else:273 assert(assignment_map[x] > 0)274 if (self.options.coding and self.mod_map[x] not in (0,1,2,3,4,5,6,7,8,9)) or (not self.options.coding and self.mod_map[x] not in (0,1,2,3,4,5,6,7,8)):275 raise SystemError,"Modulation scheme not supported: %d" % (self.mod_map[x])276 if sum(pa_vector) > self.subcarriers+10:277 self.rm_logger.error( "sum(pa_vector) = %d too big" %(sum(pa_vector)))278 pa_vector = numpy.array( pa_vector )279 pa_vector = [0]*len(mod_map)280 pa_vector[numpy.array(mod_map)>0]=1.0281 self.rm_logger.error( "ERROR!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")282 283# self.rm_logger.debug( "PUSHING PA VECTOR: %s"%(str(pa_vector)))284 if self.options.coding:285 data = ofdm_ti.tx_config_data(286 tx_id = tx_id,287 power_map = pa_vector,288 mod_map = mod_map,289 bit_map = bit_map,290 assignment_map = assignment_map,291 data_blocks = self.data_blocks,292 id_blocks = self.id_blocks293 )294 else:295 data = ofdm_ti.tx_config_data(296 tx_id = tx_id,297 power_map = pa_vector,298 mod_map = mod_map,299 bit_map = bit_map,300 assignment_map = assignment_map,301 data_blocks = self.data_blocks,302 id_blocks = self.id_blocks303 )304 data_any = to_any(data)305 306 if self.store_ctrl_events:307 self.ctrl_events[tx_id] = ctrl_event(data,self.tx_amplitude,308 self.required_ber,self.constraint,309 self.data_rate)310 311 while (1):312 try:313 self.rm_logger.debug( "Push Supplier: push() called. ")314 self.proxy_consumer.push(data_any)315 break316 except CosEventComm.Disconnected, ex:317 sys.stderr.write("Failed. Caught Disconnected Exception!")318 sys.exit(1)319 except CORBA.COMM_FAILURE, ex:320 sys.stderr.write("Failed. Caught COMM_FAILURE Exception! Retrying ...")321 self.rm_logger.warning("Failed. Caught COMM_FAILURE Exception! Retrying ...")322 time.sleep(1)323 except:324 sys.stderr.write("Unknown exception, terminating")325 sys.exit(1)326 def start(self):327 self.timer = Timer(self.setup_time/1000, self.periodic_work)328 self.timer.start()329 def periodic_work(self):330 self.rm_logger.info( "Begin of Round -----------------------------------------------------")331 332 self.do_update = True333 self.work()334 335 if self.do_update:336 self.propagate_changes()337 else:338 self.rm_logger.debug("Not performing update")339 self.rm_logger.info( "End of Round -------------------------------------------------------")340 self.start()341 def propagate_changes(self):342 if not(0 < self.tx_amplitude < 32768):343 self.rm_logger.warning( "TX Power %d" %( self.tx_amplitude))344 self.rm_logger.warning( "WARNING!!! Limiting Power ---------------------------------------")345 self.tx_amplitude = 32768346 assert(len(self.pa_vector) == self.subcarriers)347 assert(len(self.ac_vector) == self.ac_vlen)348 assert(len(self.mod_map) == self.subcarriers)349 assert(len(self.assignment_map) == self.subcarriers)350 # amplifier control351 try:352 self.tx_power_ref.push([float(self.tx_amplitude)])353 self.rm_logger.info( "Pushed new tx power")354 except:355 self.rm_logger.error( "Failed to push new tx power")356 self.tx_power_ref = resolve(self.orb,"txpower", ofdm_ti.push_vector_f)357 358 try:359 self.estim_power_ref.push([float(self.tx_amplitude)])360 print "self.tx_amplitude for estimation", self.tx_amplitude361 self.rm_logger.info( "Pushed new estim power")362 except:363 self.rm_logger.error( "Failed to push new estim power")364 self.estim_power_ref = resolve(self.orb,"estim_power", ofdm_ti.push_vector_f)365 366 try:367 self.tx_freq_ref.push([float(self.tx_freq)])368 self.rm_logger.info( "Pushed new tx frequency")369 except:370 self.rm_logger.error( "Failed to push new tx frequency")371 self.tx_freq_ref = resolve(self.orb,"txfreq", ofdm_ti.push_vector_f)372 373 try:374 self.rx_freq_ref.push([float(self.rx_freq)])375 self.rm_logger.info( "Pushed new rx frequency")376 except:377 self.rm_logger.error( "Failed to push new trx frequency")378 self.rx_freq_ref = resolve(self.orb,"rxfreq", ofdm_ti.push_vector_f)379 acv = self.ac_vector380 acv = acv/sqrt(sum(absolute(acv)**2))381 # artificial channel382 ac_vector = []383 for x in acv:384 ac_vector.append(float(numpy.real(x)))385 ac_vector.append(float(numpy.imag(x)))386 387 try:388 self.tx_ac.push(list(ac_vector))389 except:390 self.rm_logger.error("FAILED push artificial channel")391 self.tx_ac = resolve(self.orb,"channelcheat", ofdm_ti.push_vector_c)392 self.rm_logger.debug( "Pushed artificial channel update")393 # display pa scheme394 pa_vector_s = array_f('f',array(self.pa_vector)**(1./2)).tostring()395 msg = gr.message_from_string(pa_vector_s)396 if not self.pa_msgq.full_p():397 self.pa_msgq.insert_tail(msg)398 self.rm_logger.debug("Pushed pa scheme to GUI")399 ra_vector_s = array_f('f',array(self.mod_map)).tostring()400 msg_ra = gr.message_from_string(ra_vector_s)401 if not self.ra_msgq.full_p():402 self.ra_msgq.insert_tail(msg_ra)403 #self.UDP_client()404 # event channel communication405 self.push_data()406 self.rm_logger.info( "Pushed data")407 ## CORBA interface to control the PA408 ##############################################################################409 def _set_required_ber(self,val):410 self._required_ber = val411 def _get_required_ber(self):412 return self.required_ber413 def _set_constraint(self,val):414 # FIXME check if in acceptable range415 self._constraint = val416 def _get_constraint(self):417 return self.constraint418 def _set_channel_refresh_interval(self,val):419 self._channel_refresh_interval = val420 def _get_channel_refresh_interval(self):421 return self.channel_refresh_interval422 def _get_data_rate(self):423 return self.data_rate424 def change_strategy(self,mode):425 # FIXME check if contraints etc. fit to the new mode426 self._strategy_mode = mode427 def update(self):428 self.strategy_mode = self._strategy_mode429 self.required_ber = self._required_ber430 self.constraint = self._constraint431 ##############################################################################432 def query_sounder(self):433 try:434 #CTF from sounder435 ci_imp = self.ci_impulse.get_data()436 ci_vector_len = self.ac_vlen437 ci_imp=ci_imp[len(ci_imp)-2*ci_vector_len:len(ci_imp)]438 ci_imp=[ci_imp[2*i]+1j*ci_imp[2*i+1] for i in range(ci_vector_len)]439 ci_imp=array(ci_imp)440 except:441 #Stupid CTF#####################################442 ci_imp = concatenate([[1],[0]*(self.ac_vlen-1)])443 self.ci_impulse = resolve(self.orb,"sounder_cir_c", ofdm_ti.data_buffer)444 self.ac_vector = ci_imp445 def get_rx_perf_meas(self):446 return self.pushconsumer.get_received(True)447 def is_reset_mode(self):448 return self.strategy_mode==ofdm_ti.PA_Ctrl.reset449 def is_margin_adaptive_policy(self):450 return self.strategy_mode==ofdm_ti.PA_Ctrl.margin_adaptive451 def is_rate_adaptive_policy(self):452 return self.strategy_mode==ofdm_ti.PA_Ctrl.rate_adaptive453 454 455 def add_options(normal, expert):456 """457 Adds receiver-specific options to the Options Parser458 """459 expert.add_option("","--data-blocks",type="intx",460 default=9,help="Set number of data blocks per OFDM frame")461 normal.add_option("", "--dyn-freq", action="store_true", default=False,462 help="enable troughput measure, usrp disabled");463 normal.add_option("", "--coding", action="store_true", default=False,464 help="Enable channel coding");465 normal.add_option("", "--ideal", action="store_true", default=False,466 help="Ideal conditions (relevant for coding)");467 normal.add_option("", "--rf", action="store_true", default=False,468 help="RF conditions (relevant for coding)");469 add_options = staticmethod(add_options)470################################################################################471class Supplier_i(CosEventComm__POA.PushSupplier):472 def disconnect_push_supplier (self):473 print "Push Supplier: disconnected."474################################################################################475class Consumer_i(CosEventComm__POA.PushConsumer):476 def __init__(self):477 self._received = []478 def get_received(self,clear=False):479 t = self._received480 if clear:481 self.clear_received()482 return t483 def clear_received(self):484 self._received = []485 def push(self,data_any):486 v = data_any.value(CORBA.TypeCode(CORBA.id(ofdm_ti.rx_performance_measure)))487 if v is not None:488 self._received.append(v)489 def disconnect_push_consumer(self):490 print "Push Consumer: disconnected."491#end class Consumer_i492################################################################################493def resolve(orb,uid,c):494 try:495 obj = orb.string_to_object("corbaname:rir:/NameService#ofdm_ti."+str(uid))496 obj_ref = obj._narrow(c)497 except:498 print "Couldn't find object reference to "+str(uid)499 obj_ref=None500 return obj_ref501################################################################################502def start_resource_manager(rmanager,unique_id,options=None):503 orb = CORBA.ORB_init(sys.argv,CORBA.ORB_ID)504 poa = orb.resolve_initial_references("RootPOA")505 poaManager = poa._get_the_POAManager()506 poaManager.activate()507 rm_i = rmanager(orb,options)508 rm_o = rm_i._this()509 unique_id = str(unique_id)510 try:511 obj = orb.resolve_initial_references("NameService")512 rootContext = obj._narrow(CosNaming.NamingContext)513 except:514 raise SystemExit, "Failed to get NamingContext"515 corba_name = [CosNaming.NameComponent("ofdm_ti",unique_id)]516 try:517 rootContext.bind(corba_name,rm_o)518 except: # CosNaming.NamingContext.AlreadyBound, ex:519 rootContext.rebind(corba_name,rm_o)520 seed()521 rm_i.start()522 try:523 orb.run()524 except:525 rm_i.timer.cancel()526def main():527 start_resource_manager(PA, "PA")528if __name__ == '__main__':529 try:530 main()531 except KeyboardInterrupt:...
replication-manager.py
Source:replication-manager.py
1#2# File: Replication Manager3# Author: aquinn, nsaizan, ranz24# Group: TheEnemy'sGateIsDown5# Date: 11/8/20206#7# Main code for our replication manager.8import socket9import sys10import threading11import traceback12import time13sys.path.append("..")14from helper import Logger15from helper import Messenger16from ports import ports17from ports import HOST18import threading19from ports import ACTIVE_REPLICATION20MAX_MSG_LEN = 1024 #Max number of bytes we're willing to receive at once.21member_count = 022membership = []23gfd_conn = []24members_lock = threading.Lock()25rm_logger = Logger()26def send2servers(servers_list, msg):27 logger = Logger()28 for s_num in servers_list:29 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)30 s.connect((HOST, ports[f"{s_num}_LISTEN"]))31 new_messenger = Messenger(s, f'RM', f"{s_num}", logger)32 new_messenger.send(msg)33 logger.info(f"{s_num}, " + msg)34 s.close()35def add_member(server):36 global member_count37 global membership38 global members_lock39 members_lock.acquire()40 if server in membership:41 rm_logger.warning(f"Server {server} is already a member")42 else:43 membership.append(server)44 # membership.sort()45 member_count += 146 membership_string = ", ".join(membership)47 rm_logger.info(f"RM: {member_count} members: {membership_string}")48 # # # # # # # # # # # BEGIN OF RM TO SEVER # # # # # # # # # # #49 # RM should tell server what to do when membership changed50 msg2server = ""51 if ACTIVE_REPLICATION:52 if member_count == 1:53 msg2server = "You Must Set I Am Ready To True"54 server_list = [membership[0]]55 else:56 # the new joined replica should not send check point57 msg2server = "You Must Send Checkpoint"58 server_list = membership[:-1]59 else:60 if member_count == 1:61 msg2server = "You Are The Primary"62 server_list = [membership[0]]63 if msg2server != "":64 t = threading.Thread(target=send2servers,65 args=(server_list, msg2server),66 daemon=True)67 t.start()68 # # # # # # # # # # # END OF RM TO SEVER # # # # # # # # # # #69 members_lock.release()70def delete_member(server):71 global member_count72 global membership73 global members_lock74 members_lock.acquire()75 if server in membership:76 # # # # # # # # # # # BEGIN OF RM TO SEVER # # # # # # # # # # #77 # RM should tell server what to do when membership changed78 msg2server = ""79 if not ACTIVE_REPLICATION:80 # if delete the earliest joined replica in passive mode81 # it delete the primary, then need to choose another primary82 if server == membership[0] and member_count > 1:83 msg2server = "You Are The Primary"84 server_list = [membership[1]]85 if msg2server != "":86 t = threading.Thread(target=send2servers,87 args=(server_list, msg2server),88 daemon=True)89 t.start()90 # # # # # # # # # # # END OF RM TO SEVER # # # # # # # # # # #91 membership.remove(server)92 member_count -= 193 membership_string = ", ".join(membership)94 rm_logger.info(f"RM: {member_count} members: {membership_string}")95 else:96 rm_logger.warning(f"Server {server} is not a member")97 members_lock.release()98def main():99 # Open the top-level listening socket.100 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:101 #Set the socket address so it can be reused without a timeout.102 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)103 104 # Bind to the RM_LISTEN port.105 s.bind((HOST, ports["RM_LISTEN"]))106 # This should be a listening port.107 s.listen()108 rm_logger.info("RM Initialized! member_count="+str(member_count)) 109 110 111 #Run forever112 try:113 114 # Wait (blocking) for a single connection from the GFD.115 gfd_conn, addr = s.accept()116 rm_messenger = Messenger(gfd_conn, "RM", "GFD", rm_logger)117 while True:118 # Wait for status updates from the GFD.119 x = rm_messenger.recv(gfd_conn)120 if(x):121 # Check if this is an add membership request122 if("add" in x):123 args = x.split(" ")124 requestor = args[0][0:4]125 server_to_add = args[3]126 add_member(server_to_add)127 # Check if this is a delete membership request128 elif("delete" in x):129 args = x.split(" ")130 requestor = args[0][0:4]131 server_to_delete = args[3]132 delete_member(server_to_delete)133 elif("GFD" in x):134 rm_logger.info("GFD registered, 0 members")135 136 137 138 except: 139 traceback.print_exc()140 s.close() #Gracefully exit by closing s & all connections.141 return142if __name__ == '__main__':...
update_projects.py
Source:update_projects.py
1# Script to pre-select projects in github2import os3import logging4import subprocess5import shutil6from pathlib import Path7def save_data():8 with open("maven_list", "w") as f:9 for item in maven:10 f.write("%s\n" % item)11 with open("ant_list", "w") as f:12 for item in ant:13 f.write("%s\n" % item)14 with open("gradlew_list", "w") as f:15 for item in gradlew:16 f.write("%s\n" % item)17 with open("scanned_list", "w") as f:18 for item in scanned:19 f.write("%s\n" % item)20def rmtree_try(rm_name, rm_logger):21 try:22 shutil.rmtree(rm_name)23 except shutil.Error:24 rm_logger.error("remove folder error occurred")25 save_data()26 exit(1)27 except Exception:28 rm_logger.error("Unkown error")29 save_data()30 exit(1)31def update():32 URL_list = []33 project_list = os.listdir('.')34 for project_name in project_list:35 os.chdir(project_name)36 file_URL = Path('URL')37 if file_URL.is_file():38 with open('URL') as fp:39 stored_URL = [line.rstrip('\n') for line in fp]40 if len(stored_URL) == 1:41 logger.info("URL is " + stored_URL[0])42 URL_list.append(stored_URL[0])43 name = stored_URL[0].split('/')[-1]44 os.chdir('..')45 rmtree_try(name, logger)46 cmd = ["git", "clone", stored_URL[0]]47 try:48 status = subprocess.run(cmd, shell=False, timeout=180, check=True).returncode49 except subprocess.TimeoutExpired:50 rmtree_try(name, logger)51 logger.info("Timeout occur")52 continue53 except subprocess.CalledProcessError:54 logger.info("Cannot download the repository")55 rmtree_try(name, logger)56 continue57 else:58 if status != 0:59 logger.error("git clone failed without catching exception")60 # save data at first61 save_data()62 exit(1)63 else:64 os.chdir(name)65 with open("URL", "w") as f:66 f.write("%s" % stored_URL[0])67 os.chdir('..')68 logger.info("Downloading repository succeed")69 else:70 logger.error("Multiple line of URLs found")71 os.chdir('..')72 else:73 logger.error("URL file doesn't exist")74 os.chdir('..')75 os.chdir('..')76 return URL_list77# create logger with 'script_logger'78logger = logging.getLogger('update_project_logger')79logger.setLevel(logging.INFO)80# create file handler which logs even debug messages81fh = logging.FileHandler('update_project.log')82fh.setLevel(logging.INFO)83# create formatter and add it to the handlers84formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')85fh.setFormatter(formatter)86# add the handlers to the logger87logger.addHandler(fh)88# Set path89# List initialization90scanned = []91maven = []92ant = []93gradlew = []94# 1. scan all projects in maven95os.chdir("maven")96maven = update()97# 2. scan all projects in gradle98os.chdir("gradlew")99gradlew = update()100# 3. scan all projects in ant101os.chdir("ant")102ant = update()103# 4. combine all list into scanned104scanned = maven + gradlew + ant...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!