Best Python code snippet using lisa_python
sallen_key.py
Source:sallen_key.py
...115 self.choose_random_result()116 # ------------------------ #117 # Private Internal Methods #118 # ------------------------ #119 def load_variables(self):120 return (121 self.components["R1"],122 self.components["R2"],123 self.components["C1"],124 self.components["C2"],125 self.components["Ra"],126 self.components["Rb"],127 )128 def k(self):129 R1, R2, C1, C2, Ra, Rb = self.load_variables()130 return 1 + Rb / Ra131 def wp(self):132 R1, R2, C1, C2, Ra, Rb = self.load_variables()133 return 1 / sqrt(R1 * R2 * C1 * C2)134 def qp(self):135 R1, R2, C1, C2, Ra, Rb = self.load_variables()136 k = self.k()137 return (1 / sqrt(R1 * R2 * C1 * C2)) / (1 / (C1 * R1) + 1 / (C1 * R2) + (1 - k) / (R2 * C2))138 def sens_k_rb(self):139 k = self.k()140 return 1 - (1 / k)141 def sens_k_ra(self):142 return (-1) * self.sens_k_rb()143 def sens_q_r1(self):144 R1, R2, C1, C2, Ra, Rb = self.load_variables()145 qp = self.qp()146 return (-1 / 2) + qp * sqrt((R2 * C2) / (R1 * C1))147 def sens_q_r2(self):148 R1, R2, C1, C2, Ra, Rb = self.load_variables()149 qp = self.qp()150 k = self.k()151 return (-1 / 2) + qp * (152 sqrt((R1 * C2) / (R2 * C1)) + (1 - k) * sqrt((R1 * C1) / (R2 * C2))153 )154 def sens_q_c1(self):155 R1, R2, C1, C2, Ra, Rb = self.load_variables()156 qp = self.qp()157 return (-1 / 2) + qp * (158 sqrt((R1 * C1) / (R2 * C2)) + sqrt((R2 * C1) / (R1 * C2))159 )160 def sens_q_c2(self):161 return self.sens_q_c1() * (-1)162 def sens_q_rb(self):163 R1, R2, C1, C2, Ra, Rb = self.load_variables()164 qp = self.qp()165 k = self.k()166 return (-1) * (1 - k) * qp * sqrt((R1 * C1) / (R2 * C2))167 def sens_q_ra(self):168 return (-1) * self.sens_q_rb()169class SallenKeyLowPassUnityGain(Cell):170 def __init__(self):171 super(SallenKeyLowPassUnityGain, self).__init__(172 "Sallen Key Low Pass",173 CellType.LOW_PASS.value,174 "app/images/sallen_key_low_pass_unity.png"175 )176 self.options = {177 "inverter": False,178 "canGain": False,179 "canUnityGain": True,180 "canAttenuate": False181 }182 self.components = {183 "R1": None,184 "R2": None,185 "C1": None,186 "C2": None187 }188 # -------------- #189 # Public Methods #190 # -------------- #191 def get_parameters(self) -> tuple:192 zeros = {}193 gain = self.k()194 poles = {"wp": self.wp(), "qp": self.qp()}195 return zeros, poles, gain196 def get_sensitivities(self) -> dict:197 return {198 "k": {199 "R1": 0,200 "R2": 0,201 "C1": 0,202 "C2": 0203 },204 "wp": {205 "R1": -1/2,206 "R2": -1/2,207 "C1": -1/2,208 "C2": -1/2209 },210 "qp": {211 "R1": self.sens_q_r1(),212 "R2": self.sens_q_r2(),213 "C1": self.sens_q_c1(),214 "C2": self.sens_q_c2()215 }216 }217 def design_components(self, zeros: dict, poles: dict, gain: float, stop_at_first=False) -> dict:218 if "wp" not in poles.keys() or "qp" not in poles.keys() or gain != 1:219 raise CellError(CellErrorCodes.INVALID_PARAMETERS)220 else:221 # Declaring and cleaning222 self.results = []223 # Secondly, but using the relationship R1=R2, calculates C1, and C2224 c2_c1 = compute_commercial_by_iteration(225 ComponentType.Capacitor, ComponentType.Capacitor,226 lambda c1: c1 / (4 * poles["qp"] ** 2),227 self.error228 )229 # Finally, calculates with the previous C1, C2 values, options for R = R1 = R2230 r1_r2_c1_c2 = []231 for c2, c1 in c2_c1:232 r = 1 / (poles["wp"] * sqrt(c1 * c2))233 matches, commercial = matches_commercial_values(ComponentType.Resistor, r, self.error)234 if matches:235 r1_r2_c1_c2.append((commercial, commercial, c1, c2))236 # Cross selection of possible values of components237 self.results = nexpand_component_list(self.results, r1_r2_c1_c2, "R1", "R2", "C1", "C2")238 self.flush_results()239 self.choose_random_result()240 # ------------------------ #241 # Private Internal Methods #242 # ------------------------ #243 def load_variables(self):244 return (245 self.components["R1"],246 self.components["R2"],247 self.components["C1"],248 self.components["C2"],249 )250 def k(self):251 return 1252 def wp(self):253 R1, R2, C1, C2 = self.load_variables()254 return 1 / sqrt(R1 * R2 * C1 * C2)255 def qp(self):256 R1, R2, C1, C2 = self.load_variables()257 return (1 / sqrt(R1 * R2 * C1 * C2)) / (1 / (C1 * R1) + 1 / (C1 * R2))258 def sens_q_r1(self):259 R1, R2, C1, C2 = self.load_variables()260 qp = self.qp()261 return (-1 / 2) + qp * sqrt((R2 * C2) / (R1 * C1))262 def sens_q_r2(self):263 R1, R2, C1, C2 = self.load_variables()264 qp = self.qp()265 return (-1 / 2) + qp * (266 sqrt((R1 * C2) / (R2 * C1))267 )268 def sens_q_c1(self):269 R1, R2, C1, C2 = self.load_variables()270 qp = self.qp()271 return (-1 / 2) + qp * (272 sqrt((R1 * C1) / (R2 * C2)) + sqrt((R2 * C1) / (R1 * C2))273 )274 def sens_q_c2(self):275 return self.sens_q_c1() * (-1)276class SallenKeyLowPassAttenuation(Cell):277 def __init__(self):278 super(SallenKeyLowPassAttenuation, self).__init__(279 "Sallen Key Low Pass",280 CellType.LOW_PASS.value,281 "app/images/sallen_key_low_pass_attenuation.png"282 )283 self.options = {284 "inverter": False,285 "canGain": False,286 "canUnityGain": False,287 "canAttenuate": True288 }289 self.components = {290 "R1A": None,291 "R1B": None,292 "R2": None,293 "C1": None,294 "C2": None295 }296 # -------------- #297 # Public Methods #298 # -------------- #299 def get_parameters(self) -> tuple:300 zeros = {}301 gain = self.k()302 poles = {"wp": self.wp(), "qp": self.qp()}303 return zeros, poles, gain304 def get_sensitivities(self) -> dict:305 return {306 "k": {307 "R1A": self.sens_k_r1a(),308 "R1B": self.sens_k_r1b(),309 "R2": 0,310 "C1": 0,311 "C2": 0312 },313 "wp": {314 "R1A": -1/2,315 "R1B": -1/2,316 "R2": -1/2,317 "C1": -1/2,318 "C2": -1/2319 },320 "qp": {321 "R1A": -1/2,322 "R1B": -1/2,323 "R2": self.sens_q_r2(),324 "C1": self.sens_q_c1(),325 "C2": self.sens_q_c2()326 }327 }328 def design_components(self, zeros: dict, poles: dict, gain: float, stop_at_first=False) -> dict:329 if "wp" not in poles.keys() or "qp" not in poles.keys() or gain >= 1:330 raise CellError(CellErrorCodes.INVALID_PARAMETERS)331 else:332 # Declaring and cleaning333 self.results = []334 # Secondly, but using the relationship R1=R2, calculates C1, and C2335 c2_c1 = compute_commercial_by_iteration(336 ComponentType.Capacitor, ComponentType.Capacitor,337 lambda c1: c1 / (4 * poles["qp"] ** 2),338 self.error339 )340 # Finally, calculates with the previous C1, C2 values, options for R = R1 = R2341 r1_r2_c1_c2 = []342 for c2, c1 in c2_c1:343 r = 1 / (poles["wp"] * sqrt(c1 * c2))344 matches, commercial = matches_commercial_values(ComponentType.Resistor, r, self.error)345 if matches:346 r1_r2_c1_c2.append((commercial, commercial, c1, c2))347 r1a_r1b = []348 for r1, r2, c1, c2 in r1_r2_c1_c2:349 r1a = r1 / gain350 r1b = r1 / (1 - gain)351 r1a_r1b.append((r1a, r1b))352 # Cross selection of possible values of components353 self.results = nexpand_component_list(self.results, [(r2, c1, c2) for _, r2, c1, c2 in r1_r2_c1_c2], "R2", "C1", "C2")354 self.results = nexpand_component_list(self.results, r1a_r1b, "R1A", "R1B")355 self.flush_results()356 self.choose_random_result()357 # ------------------------ #358 # Private Internal Methods #359 # ------------------------ #360 def load_variables(self):361 return (362 self.components["R1A"],363 self.components["R1B"],364 self.components["R2"],365 self.components["C1"],366 self.components["C2"],367 )368 def r1(self):369 R1A, R1B, R2, C1, C2 = self.load_variables()370 return (R1A * R1B) / (R1A + R1B)371 def k(self):372 R1A, R1B, R2, C1, C2 = self.load_variables()373 return R1B / (R1A + R1B)374 def wp(self):375 R1A, R1B, R2, C1, C2 = self.load_variables()376 R1 = self.r1()377 return 1 / sqrt(R1 * R2 * C1 * C2)378 def qp(self):379 R1A, R1B, R2, C1, C2 = self.load_variables()380 R1 = self.r1()381 return (1 / sqrt(R1 * R2 * C1 * C2)) / (1 / (C1 * R1) + 1 / (C1 * R2))382 def sens_k_r1a(self):383 R1A, R1B, R2, C1, C2 = self.load_variables()384 return (-R1A) / (R1A + R1B)385 def sens_k_r1b(self):386 return (-1) * self.sens_k_r1a()387 def sens_q_r2(self):388 R1A, R1B, R2, C1, C2 = self.load_variables()389 R1 = self.r1()390 qp = self.qp()391 return (-1 / 2) + qp * (392 sqrt((R1 * C2) / (R2 * C1))393 )394 def sens_q_c1(self):395 R1A, R1B, R2, C1, C2 = self.load_variables()396 R1 = self.r1()397 qp = self.qp()398 return (-1 / 2) + qp * (399 sqrt((R1 * C1) / (R2 * C2)) + sqrt((R2 * C1) / (R1 * C2))400 )401 def sens_q_c2(self):402 return self.sens_q_c1() * (-1)403class SallenKeyHighPassGain(SallenKeyLowPassGain):404 def __init__(self):405 super(SallenKeyHighPassGain, self).__init__()406 self.circuit = "app/images/sallen_key_high_pass.png"407 self.name = "Sallen Key High Pass"408 self.type = CellType.HIGH_PASS.value409 # -------------- #410 # Public Methods #411 # -------------- #412 def get_parameters(self) -> tuple:413 _, poles, gain = super(SallenKeyHighPassGain, self).get_parameters()414 zeros = {"wz": 0, "nz": 2}415 return zeros, poles, gain416 def design_components(self, zeros: dict, poles: dict, gain: float, stop_at_first=False) -> dict:417 if "wp" not in poles.keys() or "qp" not in poles.keys() or gain <= 1:418 raise CellError(CellErrorCodes.INVALID_PARAMETERS)419 else:420 # Declaring and cleaning421 self.results = []422 # First, calculate the easy gain resistors423 ra_rb = compute_commercial_by_iteration(424 ComponentType.Resistor, ComponentType.Resistor,425 lambda rb: rb / (gain - 1),426 self.error427 )428 # Secondly, but using the relationship C1=C2, calculates R1, and R2429 r1_r2 = compute_commercial_by_iteration_list(430 ComponentType.Resistor, ComponentType.Resistor,431 [lambda r2: (-r2 * (4 * (1 - gain) - 1) + r2 * sqrt(1 - 8 * poles["qp"] ** 2 * (1 - gain))) / (432 8 * poles["qp"] ** 2),433 lambda r2: (-r2 * (4 * (1 - gain) - 1) - r2 * sqrt(1 - 8 * poles["qp"] ** 2 * (1 - gain))) / (434 8 * poles["qp"] ** 2)435 ],436 self.error437 )438 # Finally, calculates with the previous R1, R2 values, options for C = C1 = C2439 r1_r2_c1_c2 = []440 for r1, r2 in r1_r2:441 c1 = 1 / (poles["wp"] * sqrt(r1 * r2))442 matches, commercial = matches_commercial_values(ComponentType.Capacitor, c1, self.error)443 if matches:444 r1_r2_c1_c2.append((r1, r2, commercial, commercial))445 # Cross selection of possible values of components446 self.results = nexpand_component_list(self.results, ra_rb, "Ra", "Rb")447 self.results = nexpand_component_list(self.results, r1_r2_c1_c2, "R1", "R2", "C1", "C2")448 self.flush_results()449 self.choose_random_result()450 # -------------- #451 # Static Methods #452 # -------------- #453 def qp(self):454 R1, R2, C1, C2, Ra, Rb = self.load_variables()455 k = self.k()456 return (1 / sqrt(R1 * R2 * C1 * C2)) / (1 / (C2 * R2) + 1 / (C1 * R2) + (1 - k) / (R1 * C1))457class SallenKeyHighPassUnityGain(SallenKeyLowPassUnityGain):458 def __init__(self):459 super(SallenKeyHighPassUnityGain, self).__init__()460 self.circuit = "app/images/sallen_key_high_pass_unity.png"461 self.name = "Sallen Key High Pass"462 self.type = CellType.HIGH_PASS.value463 # -------------- #464 # Public Methods #465 # -------------- #466 def get_parameters(self) -> tuple:467 _, poles, gain = super(SallenKeyHighPassUnityGain, self).get_parameters()468 zeros = {"wz": 0, "nz": 2}469 return zeros, poles, gain470 # -------------- #471 # Static Methods #472 # -------------- #473 def qp(self):474 R1, R2, C1, C2 = self.load_variables()...
test_specific_transforms.py
Source:test_specific_transforms.py
1from typing import Tuple2from pandas.testing import assert_frame_equal3import pandas as pd4import numpy as np5from datacode import VariableCollection, Variable, Column, Index, StringType, ColumnIndex, SourceTransform6from tests.test_source import SourceTest7from tests.test_variables import VC_NAME8class SpecificTransformsTest(SourceTest):9 time_index = Index('time', dtype='datetime')10 by_index = Index('id', dtype=StringType(categorical=True))11 test_df_with_ids_and_dates = pd.DataFrame(12 [13 (1, 2, 'd'),14 (2, 4, 'd'),15 (3, 6, 'd'),16 (4, 8, 'e'),17 (5, 10, 'e'),18 (6, 12, 'e'),19 ],20 columns=['a', 'b', 'c']21 )22 orig_date_index = pd.date_range(start='1/1/2000', periods=3, freq='d')23 date_index_with_gaps = pd.Index.union(pd.DatetimeIndex([pd.to_datetime('1/1/2000')]),24 pd.date_range(start='1/3/2000', periods=2, freq='d'))25 full_date_index = pd.Index.append(orig_date_index, date_index_with_gaps)26 test_df_with_ids_and_dates['date'] = full_date_index27 c_df_index = pd.Index(['d', 'd', 'e'], name='C')28 expect_loaded_df_with_lags = pd.DataFrame(29 [30 (np.nan, np.nan, 'd'),31 (1, 2, 'd'),32 (3, 4, 'e')33 ],34 columns=['A$_{t - 1}$', 'B$_{t - 1}$', 'C'],35 ).convert_dtypes()36 expect_loaded_df_with_two_lags = pd.DataFrame(37 [38 (np.nan, np.nan, 'd'),39 (np.nan, np.nan, 'd'),40 (1, 2, 'e')41 ],42 columns=['A$_{t - 2}$', 'B$_{t - 2}$', 'C'],43 ).convert_dtypes()44 expect_loaded_df_with_lags_and_by_var = pd.DataFrame(45 [46 (np.nan, np.nan),47 (1, 2),48 (np.nan, np.nan)49 ],50 columns=['A$_{t - 1}$', 'B$_{t - 1}$'],51 index=c_df_index,52 ).convert_dtypes()53 expect_lag_df_with_ids_and_dates = pd.DataFrame(54 [55 (np.nan, np.nan, 'd'),56 (1, 2, 'd'),57 (2, 4, 'd'),58 (np.nan, np.nan, 'e'),59 (np.nan, np.nan, 'e'),60 (5, 10, 'e'),61 ],62 columns=['A$_{t - 1}$', 'B$_{t - 1}$', 'C'],63 ).convert_dtypes()64 expect_lag_df_with_ids_and_dates['Date'] = full_date_index65 expect_lag_df_with_ids_and_dates.set_index(['C', 'Date'], inplace=True)66 expect_loaded_df_with_change = pd.DataFrame(67 [68 (np.nan, np.nan, 'd'),69 (2, 2, 'd'),70 (2, 2, 'e')71 ],72 columns=['A Change', 'B Change', 'C'],73 ).convert_dtypes()74 expect_loaded_df_with_dual_change = pd.DataFrame(75 [76 (np.nan, np.nan, 'd'),77 (np.nan, np.nan, 'd'),78 (0, 0, 'e')79 ],80 columns=['A Change Change', 'B Change Change', 'C'],81 ).convert_dtypes()82 expect_change_df_with_ids_and_dates = pd.DataFrame(83 [84 (np.nan, np.nan, 'd'),85 (1, 2, 'd'),86 (1, 2, 'd'),87 (np.nan, np.nan, 'e'),88 (np.nan, np.nan, 'e'),89 (1, 2, 'e'),90 ],91 columns=['A Change', 'B Change', 'C'],92 ).convert_dtypes()93 expect_change_df_with_ids_and_dates['Date'] = full_date_index94 expect_change_df_with_ids_and_dates.set_index(['C', 'Date'], inplace=True)95 def create_variable_collection(self, **kwargs) -> Tuple[VariableCollection, Variable, Variable, Variable]:96 config_dict = dict(97 name=VC_NAME98 )99 config_dict.update(**kwargs)100 a, b, c = self.create_variables()101 vc = VariableCollection(a, b, c, **config_dict)102 return vc, a, b, c103class TestLag(SpecificTransformsTest):104 def test_lag_with_defaults_no_indices(self):105 vc, a, b, c = self.create_variable_collection()106 self.create_csv()107 all_cols = self.create_columns()108 load_variables = [109 vc.a.lag(),110 vc.b.lag(),111 c112 ]113 ds = self.create_source(df=None, columns=all_cols, load_variables=load_variables)114 assert_frame_equal(ds.df, self.expect_loaded_df_with_lags)115 assert str(vc.a.lag().symbol) == r'\text{A}_{t - 1}'116 assert str(vc.b.lag().symbol) == r'\text{B}_{t - 1}'117 assert str(vc.c.symbol) == r'\text{C}'118 def test_two_lags_no_indices(self):119 vc, a, b, c = self.create_variable_collection()120 self.create_csv()121 all_cols = self.create_columns()122 load_variables = [123 vc.a.lag(2),124 vc.b.lag(2),125 c126 ]127 ds = self.create_source(df=None, columns=all_cols, load_variables=load_variables)128 assert_frame_equal(ds.df, self.expect_loaded_df_with_two_lags)129 assert str(vc.a.lag(2).symbol) == r'\text{A}_{t - 2}'130 assert str(vc.b.lag(2).symbol) == r'\text{B}_{t - 2}'131 assert str(vc.c.symbol) == r'\text{C}'132 def test_two_separate_lags_no_indices(self):133 vc, a, b, c = self.create_variable_collection()134 self.create_csv()135 all_cols = self.create_columns()136 load_variables = [137 vc.a.lag().lag(),138 vc.b.lag().lag(),139 c140 ]141 ds = self.create_source(df=None, columns=all_cols, load_variables=load_variables)142 assert_frame_equal(ds.df, self.expect_loaded_df_with_two_lags)143 assert str(vc.a.lag().lag().symbol) == r'\text{A}_{t - 2}'144 assert str(vc.b.lag().lag().symbol) == r'\text{B}_{t - 2}'145 assert str(vc.c.symbol) == r'\text{C}'146 def test_lags_with_by_index(self):147 vc, a, b, c = self.create_variable_collection()148 self.create_csv()149 by_colindex = [ColumnIndex(self.by_index, [c])]150 ac = Column(a, 'a', by_colindex)151 bc = Column(b, 'b', by_colindex)152 cc = Column(c, 'c')153 all_cols = [154 ac, bc, cc155 ]156 load_variables = [157 vc.a.lag(),158 vc.b.lag(),159 c160 ]161 ds = self.create_source(df=None, columns=all_cols, load_variables=load_variables)162 assert_frame_equal(ds.df, self.expect_loaded_df_with_lags_and_by_var)163 def test_lags_with_by_index_and_time_index_with_gaps(self):164 vc, a, b, c = self.create_variable_collection()165 d = Variable('d', 'Date', dtype='datetime')166 self.create_csv(df=self.test_df_with_ids_and_dates)167 by_colindex = ColumnIndex(self.by_index, [c])168 time_colindex = ColumnIndex(self.time_index, [d])169 by_time_colindex = [by_colindex, time_colindex]170 ac = Column(a, 'a', by_time_colindex)171 bc = Column(b, 'b', by_time_colindex)172 cc = Column(c, 'c')173 dd = Column(d, 'date')174 all_cols = [175 ac, bc, cc, dd176 ]177 load_variables = [178 vc.a.lag(fill_method=None),179 vc.b.lag(fill_method=None),180 c,181 d182 ]183 ds = self.create_source(df=None, columns=all_cols, load_variables=load_variables)184 assert_frame_equal(ds.df, self.expect_lag_df_with_ids_and_dates)185 def test_lags_as_source_transform_with_subset_and_no_preserve(self):186 vc, a, b, c = self.create_variable_collection()187 self.create_csv()188 all_cols = self.create_columns()189 load_variables = [190 vc.a,191 vc.b,192 c193 ]194 ds = self.create_source(df=None, columns=all_cols, load_variables=load_variables)195 lag_transform = [transform for transform in vc.transforms if transform.key == 'lag'][0]196 source_transform = SourceTransform.from_transform(lag_transform, subset=[a, b])197 source_transform.apply(ds, preserve_original=False)198 assert_frame_equal(ds.df, self.expect_loaded_df_with_lags)199 assert str(vc.a.lag().symbol) == r'\text{A}_{t - 1}'200 assert str(vc.b.lag().symbol) == r'\text{B}_{t - 1}'201 assert str(vc.c.symbol) == r'\text{C}'202class TestChange(SpecificTransformsTest):203 def test_change_with_defaults_no_indices(self):204 vc, a, b, c = self.create_variable_collection()205 self.create_csv()206 all_cols = self.create_columns()207 load_variables = [208 vc.a.change(),209 vc.b.change(),210 c211 ]212 ds = self.create_source(df=None, columns=all_cols, load_variables=load_variables)213 assert_frame_equal(ds.df, self.expect_loaded_df_with_change)214 assert str(vc.a.change().symbol) == r'\delta \text{A}'215 assert str(vc.b.change().symbol) == r'\delta \text{B}'216 assert str(vc.c.symbol) == r'\text{C}'217 def test_two_separate_changes_no_indices(self):218 vc, a, b, c = self.create_variable_collection()219 self.create_csv()220 all_cols = self.create_columns()221 load_variables = [222 vc.a.change().change(),223 vc.b.change().change(),224 c225 ]226 ds = self.create_source(df=None, columns=all_cols, load_variables=load_variables)227 assert_frame_equal(ds.df, self.expect_loaded_df_with_dual_change)228 assert str(vc.a.change().change().symbol) == r'\delta \delta \text{A}'229 assert str(vc.b.change().change().symbol) == r'\delta \delta \text{B}'230 assert str(vc.c.symbol) == r'\text{C}'231 def test_change_with_by_index_and_time_index_with_gaps(self):232 vc, a, b, c = self.create_variable_collection()233 d = Variable('d', 'Date', dtype='datetime')234 self.create_csv(df=self.test_df_with_ids_and_dates)235 by_colindex = ColumnIndex(self.by_index, [c])236 time_colindex = ColumnIndex(self.time_index, [d])237 by_time_colindex = [by_colindex, time_colindex]238 ac = Column(a, 'a', by_time_colindex)239 bc = Column(b, 'b', by_time_colindex)240 cc = Column(c, 'c')241 dd = Column(d, 'date')242 all_cols = [243 ac, bc, cc, dd244 ]245 load_variables = [246 vc.a.change(fill_method=None),247 vc.b.change(fill_method=None),248 c,249 d250 ]251 ds = self.create_source(df=None, columns=all_cols, load_variables=load_variables)...
task5Airflow.py
Source:task5Airflow.py
...4from airflow import DAG5from airflow.operators.python import PythonOperator6from dotenv import load_dotenv7from pymongo import MongoClient8def load_variables(ti):9 load_dotenv()10 db_name = os.getenv("DB_NAME")11 db_host = os.getenv("DB_HOST")12 db_port = int(os.getenv("DB_PORT"))13 source_file_path = os.getenv("SOURCE_FILE_PATH")14 tmp_file_path = os.getenv("TMP_FILE_PATH")15 ti.xcom_push(key="db_name", value=db_name)16 ti.xcom_push(key="db_host", value=db_host)17 ti.xcom_push(key="db_port", value=db_port)18 ti.xcom_push(key="source_file_path", value=source_file_path)19 ti.xcom_push(key="tmp_file_path", value=tmp_file_path)20def prepare_data(ti):21 source_file_path = ti.xcom_pull(key="source_file_path", task_ids="load_variables")22 tmp_file_path = ti.xcom_pull(key="tmp_file_path", task_ids="load_variables")...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!