Best Python code snippet using pytest-django_python
environment.py
Source:environment.py
1# Authored by elia on 22/10/2020 2# Feature: #Enter feature name here3# Enter feature description here4# Scenario: # Enter scenario name here5"""6"""7import gym8from gym import spaces9import numpy as np10from main.env.data import *11from main.util.binary_ops import *12from main.util.java_utils import dlf_analyse13from main.util.model_utils import *14np.seterr(all='raise')15class Environment(gym.Env):16 """Custom Environment that follows gym interface"""17 metadata = {'render.modes': ['human']}18 """19 Arguments:20 grid_name: string (name of the electrical grid)21 22 action_type: string ("continous" or "discrete")23 """24 def __init__(self, grid_name="bus33", action_type="discrete", load_shedding=100):25 super(Environment, self).__init__()26 self.grid_name = grid_name27 self.action_type = action_type28 # self.load_shedding = load_shedding29 self.load_data, self.line_data = get_data_from_csv(grid_name)30 self.max_capacity, _ = get_mva_kva(self.grid_name) # MVa to KVa31 self.max_capacity = self.max_capacity * 100032 self.n_nodes = len(self.line_data) + 133 self.current_reward = self.calculate_reward(self.load_data[:, 3], self.load_data[:, 1], self.load_data[:, 4], self.load_data)34 self.reward_range = spaces.Box(low=0, high=1000, shape=(1,)) # spaces.Box(np.array(0), np.array(100))35 high = np.array([10000] * self.n_nodes)36 self.observation_space = spaces.Box(-high, high)37 if action_type.lower() == "discrete":38 self.action_space = spaces.Discrete(self.n_nodes + 1)39 elif action_type.lower() == "continous":40 # self.action_space = spaces.Box(low=0,high=1, shape=(self.n_nodes, 1), dtype=np.int)41 self.action_space = spaces.MultiBinary(self.n_nodes)42 else:43 raise Exception("Action type: {} not implemented. Use 'discrete' or 'continous'")44 self.num_actions = 045 self.done = True46 def get_remaining_power(self):47 power_assigned = np.sum(self.load_data[:, 1] * self.load_data[:, 3])48 return self.max_capacity - power_assigned49 def calculate_reward(self, status, load, priority, load_data):50 # calculate dlf before returning reward51 load_data_temp = load_data.copy()52 load_data_temp[:, 3] = status53 power_values_from_dlf, _ = dlf_analyse(self.line_data, load_data_temp, grid_name=self.grid_name)54 power_values_from_dlf = np.array(power_values_from_dlf)55 if not (power_values_from_dlf.min() > 0.9 and power_values_from_dlf.max() < 1.1):56 return -np.sum(status)57 return np.sum(load * status * np.square(priority))58 def step(self, action):59 # Execute one time step within the environment60 print("ACTION: {}".format(action))61 if self.action_type == "continous":62 action = np.array(action)63 action[0] = 164 obs = self.get_observation(action)65 reward = self.reward(action)66 print("REWARD {}".format(reward))67 load_data_copy = self.load_data.copy()68 load_data_copy[:, 3] = action69 power_values_from_dlf, _ = dlf_analyse(self.line_data, load_data_copy, grid_name=self.grid_name)70 power_values_from_dlf = np.array(power_values_from_dlf)71 load_data_1, line_data = get_data_from_csv(self.grid_name)72 restored_load = np.sum(load_data_1[:, 1][(load_data_copy[:, 3] == 1)]) / (np.sum(load_data_1[:, 1]) + 0.001 ) * 10073 self.done = True74 reward=reward if self.action_type == 'discrete' else reward[0]75 return obs, reward, self.done, {"min" : power_values_from_dlf.min(),"max" : power_values_from_dlf.max(), "load": restored_load}76 def reset(self):77 print("**** EPISODE STARTS ...\n")78 if self.action_type == "continous":79 # Reset the state of the environment to an initial state80 status = self.load_data[:, 3]81 self.load_data, self.line_data = get_data_from_csv(self.grid_name)82 self.load_data[:, 3] = status83 else:84 # Reset the state of the environment to an initial state85 status = self.load_data[:, 3]86 self.load_data, self.line_data = get_data_from_csv(self.grid_name)87 # print("Load data {}".format(self.load_data[:, 1]))88 self.load_data[:, 3] = status89 self.num_actions = 090 self.done = False91 print("OBSERVATIONS: {}".format(self.current_state()))92 return self.current_state()93 def get_observation(self, action=np.inf):94 if self.action_type == "discrete":95 if action == np.inf:96 return self.current_state()97 else:98 if action <= self.n_nodes:99 self.act_from_num(action=action)100 print(action)101 else:102 action_str = get_bin_str_with_max_count(action, self.n_nodes)103 self.act(action_str)104 print(action_str)105 # print("CURRENT STATE: " + str(self.load_data[:, 3]))106 return self.current_state()107 elif self.action_type == "continous":108 if not isinstance(action, np.ndarray):109 if action == np.inf:110 return self.current_state()111 print("ACTION REWARD {}".format(112 self.calculate_reward(action, self.load_data[:, 1], self.load_data[:, 4], self.load_data)))113 print("OBSERVATION REWARD {}".format(self.current_reward))114 print("PRIORITY: " + str(self.load_data[:, 4]))115 if self.calculate_reward(np.array(action), self.load_data[:, 1], self.load_data[:, 4], self.load_data) > self.current_reward:116 self.load_data[:, 3] = np.array(action)117 return self.load_data[:, 3]118 else:119 return self.load_data[:, 3]120 def act_from_num(self, action):121 if action == 0:122 pass123 elif self.load_data[:, 3][action - 1] == 0:124 self.load_data[:, 3][action - 1] = 1125 else:126 self.load_data[:, 3][action - 1] = 0127 self.load_data[:, 3][0] = 1128 def act(self, action_str):129 for action in range(len(action_str)):130 print(action_str)131 self.load_data[:, 3][action] = int(action_str[action])132 self.load_data[:, 3][0] = 1133 return134 def current_state(self):135 return self.load_data[:, 3]136 def restored_load_percentage(self):137 load_data, line_data = get_data_from_csv(self.grid_name)138 return np.sum(load_data[:, 1][(self.load_data[:, 3] == 1)] / np.sum(load_data[:, 1])) * 100139 def reward(self, action):140 print("CURRENT STATE: " + str(self.load_data[:, 3]))141 print("RESTORED LOAD: {}%".format(self.restored_load_percentage()))142 power_values_from_dlf, _ = dlf_analyse(self.line_data, self.load_data, grid_name=self.grid_name)143 power_values_from_dlf = np.array(power_values_from_dlf)144 # print(power_values_from_dlf)145 print("MIN VOL: {}".format(power_values_from_dlf.min()))146 print("MAX VOL: {}".format(power_values_from_dlf.max()))147 if self.action_type == "continous":148 if self.calculate_reward(np.array(action), self.load_data[:, 1],149 self.load_data[:, 4], self.load_data) <= self.current_reward:150 return self.calculate_reward(np.array(action), self.load_data[:, 1],151 self.load_data[:, 4], self.load_data) - self.current_reward152 status_reward = self.calculate_reward(self.load_data[:, 3], self.load_data[:, 1], self.load_data[:,153 4], self.load_data) # np.sum(self.load_data[:, 3] * np.square(self.load_data[:, 4]))154 # print("STATUS REWARD: {}".format(status_reward))155 self.current_reward = status_reward # divide by num_actions which is the number of episodes156 return status_reward157 def power_assigned(self):...
ner_test.py
Source:ner_test.py
...13 s = {'read_model':True,'seed':345, 'epoch':20, 'lr':0.1, 'decay':0.95, 'wsize':5, 'hnum':100 , 'dnum':100, 'ynum':17, 'wnum':6336, 'L2': 0.000001,14 'f0':33716, 'f1':34238, 'f2':34189, 'f3':33768, 'f4':42828, 'fsize':1, 'kalpha':0.2}15 16 print 'load train data'17 train_word = load_data("data/train/train.word.txt") 18 train_label = load_data("data/train/train.label.txt")19 train_f0 = load_data('data/train/trainfeature0.txt')20 train_f1 = load_data('data/train/trainfeature1.txt')21 train_f2 = load_data('data/train/trainfeature2.txt')22 train_f3 = load_data('data/train/trainfeature3.txt')23 train_f4 = load_data('data/train/trainfeature4.txt')24 print 'load sighan data'25 sighan_word = load_data("data/train_sighan/trainsighan.word.txt") 26 sighan_label = load_data("data/train_sighan/trainsighan.label.txt")27 sighan_f0 = load_data('data/train_sighan/trainsighanfeature0.txt')28 sighan_f1 = load_data('data/train_sighan/trainsighanfeature1.txt')29 sighan_f2 = load_data('data/train_sighan/trainsighanfeature2.txt')30 sighan_f3 = load_data('data/train_sighan/trainsighanfeature3.txt')31 sighan_f4 = load_data('data/train_sighan/trainsighanfeature4.txt')32 sighan_simi = load_emb('data/train_sighan/cos.txt')#cross / cos / poly / gaussian33 print 'load dev data'34 dev_word = load_data("data/test/test.word.txt")35 dev_label = load_data("data/test/test.label.txt")36 dev_f0 = load_data('data/test/testfeature0.txt')37 dev_f1 = load_data('data/test/testfeature1.txt')38 dev_f2 = load_data('data/test/testfeature2.txt')39 dev_f3 = load_data('data/test/testfeature3.txt')40 dev_f4 = load_data('data/test/testfeature4.txt')41 print 'load test data'42 test_word = load_data("data/test/test.word.txt")43 test_label = load_data("data/test/test.label.txt")44 test_f0 = load_data('data/test/testfeature0.txt')45 test_f1 = load_data('data/test/testfeature1.txt')46 test_f2 = load_data('data/test/testfeature2.txt')47 test_f3 = load_data('data/test/testfeature3.txt')48 test_f4 = load_data('data/test/testfeature4.txt')49 sys.stdout.flush()50 51 print 'load baseline predict data'52 baseline_label = load_data("data/semi_test_pred.txt")53 for i in range(len(baseline_label)):54 for j in range(len(baseline_label[i])):55 if int(baseline_label[i][j])<=8:56 baseline_label[i][j]=057 else:58 baseline_label[i][j]=int(baseline_label[i][j])59 60 nsentences = len(train_word)61 np.random.seed(s['seed'])62 random.seed(s['seed'])63 rnn = model(read_model=s['read_model'],hnum = s['hnum'], ynum = s['ynum'], wnum = s['wnum'], dnum = s['dnum'], wsize = s['wsize'], fsize = s['fsize'], L2 = s['L2'],64 fnum0 = s['f0'], fnum1 = s['f1'], fnum2 = s['f2'], fnum3 = s['f3'], fnum4 = s['f4'], kalpha=s['kalpha'])65 #rnn.emb = load_emb("data/embeddingsall")66 s['cur_lr'] = s['lr']...
extract_peaks.py
Source:extract_peaks.py
1import pandas as pd2import numpy as np3import datetime4from plotly import graph_objs as go5import plotly.express as px6def get_peak_days(load_data,7 peak_level=2,8 weekday_only=False,9 peak_hours_only=False,10 peak_hours=(7,18)):11 """12 load_data: input data to be analyzed,13 peak_level: amount of peaks to be considered for each month - values between 1 to 10,14 weekday_only: consider only weekdays,15 peak_hours_only: consider only peak hours,16 peak_hours: tuple with start and end of peak hours17 """18 if weekday_only:19 load_data = load_data[load_data.index.weekday <= 4]20 if peak_hours_only:21 load_data = load_data[(load_data.index.hour <= peak_hours[1]) &\22 (load_data.index.hour >= peak_hours[0])]23 monthly_peak_days = load_data.groupby([load_data.index.month,24 load_data.index.year])['net_load_after_pv'].nlargest(int(peak_level*10))25 monthly_peak_days.index = monthly_peak_days.index.droplevel(0).droplevel(0)26 monthly_peak_days = monthly_peak_days.groupby([monthly_peak_days.index.year,27 monthly_peak_days.index.month,28 monthly_peak_days.index.date]).max()29 return list(monthly_peak_days.index.levels[2])30def get_peak_statistics(load_data,31 peak_days=False,32 peak_level=3,33 weekday_only=False,34 peak_hours_only=False,35 peak_hours=(7,18)):36 """37 load_data: input data to be analyzed,38 peak_days: consider only peak days identified from get_peak_days()39 peak_level: amount of peaks to be considered for each month - values between 1 to 10,40 weekday_only: consider only weekdays,41 peak_hours_only: consider only peak hours,42 peak_hours: tuple with start and end of peak hours43 """44 title = "all days"45 if weekday_only:46 load_data = load_data[load_data.index.weekday <= 4]47 if peak_hours_only:48 load_data = load_data[(load_data.index.hour <= peak_hours[1]) &\49 (load_data.index.hour >= peak_hours[0])]50 if peak_days:51 title = "selected peak days"52 load_data = load_data.loc[np.isin(load_data.index.date,53 get_peak_days(load_data=load_data, peak_level=peak_level,54 weekday_only=weekday_only,55 peak_hours_only=peak_hours_only,56 peak_hours=peak_hours))]57 peak_time = load_data['net_load_after_pv'].groupby(load_data.index.date).idxmax()58 peak_hours = peak_time.dt.hour59 peak_hours = pd.DataFrame(peak_hours.values, index = peak_hours.index)60 peak_hours['peak'] = 161 peak_hours.columns = ['hour', 'peak']62 peak_distribution = peak_hours.groupby('hour').sum()63 fig = px.bar(peak_distribution, x=peak_distribution.index, y='peak')64 fig.update_xaxes(range=[0, 24])65 fig.update_layout(title=f"Peak Distribution for {title}")...
process_csv_util.py
Source:process_csv_util.py
1import calendar2import holidays3def add_year(load_data):4 load_data['Year'] = load_data.apply(lambda row: row['Date'].year, axis=1)5 return load_data6def add_month(load_data):7 load_data['Month'] = load_data.apply(lambda row: row['Date'].month, axis=1)8 return load_data9def add_season(load_data):10 load_data['Season'] = load_data.apply(lambda row: calculate_season(row['Date']), axis=1)11 return load_data12def add_day(load_data):13 load_data['Day'] = load_data.apply(lambda row: row['Date'].weekday(), axis=1)14 return load_data15def add_day_of_year(load_data):16 load_data['Dayofyear'] = load_data.apply(lambda row: row['Date'].dayofyear, axis=1)17 return load_data18def add_is_weekend(load_data):19 load_data['Is_Weekend'] = load_data.apply(lambda row: row['Date'].isoweekday() > 5, axis=1)20 return load_data21def add_holiday(load_data, country, prov=None, state=None):22 load_data['Holiday'] = load_data.apply(lambda row: calculate_holiday(row['Date'], country, prov, state), axis=1)23 return load_data24def calculate_holiday(date, country, prov, state):25 holiday_list = getattr(holidays, country)(prov=prov, state=state)26 holiday = holiday_list.get(date)27 if holiday == None:28 return 'Non-Holiday'29 return holiday30def calculate_season(date):31 spring = ['March', 'April', 'May']32 summer = ['June', 'July','August']33 autumn = ['September', 'October', 'November']34 month = calendar.month_name[date.month]35 36 if (month in spring):37 return 138 elif (month in summer):39 return 240 elif (month in autumn):41 return 3...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!