Best Python code snippet using localstack_python
test_session_recording_list.py
Source:test_session_recording_list.py
...27 team = self.team28 event_factory(29 team=team, event=event_name, timestamp=timestamp, distinct_id=distinct_id, properties=properties,30 )31 def create_snapshot(32 self, distinct_id, session_id, timestamp, window_id="", team_id=None, has_full_snapshot=True33 ):34 if team_id is None:35 team_id = self.team.pk36 session_recording_event_factory(37 team_id=team_id,38 distinct_id=distinct_id,39 timestamp=timestamp,40 session_id=session_id,41 window_id=window_id,42 snapshot_data={"timestamp": timestamp.timestamp(), "has_full_snapshot": has_full_snapshot,},43 )44 @property45 def base_time(self):46 return now() - relativedelta(hours=1)47 @test_with_materialized_columns(["$current_url"])48 @freeze_time("2021-01-21T20:00:00.000Z")49 def test_basic_query(self):50 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})51 self.create_snapshot("user", "1", self.base_time)52 self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=10))53 self.create_snapshot("user", "2", self.base_time + relativedelta(seconds=20))54 filter = SessionRecordingsFilter(team=self.team, data={"no_filter": None})55 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)56 (session_recordings, more_recordings_available) = session_recording_list_instance.run()57 self.assertEqual(len(session_recordings), 2)58 self.assertEqual(session_recordings[0]["start_time"], self.base_time + relativedelta(seconds=20))59 self.assertEqual(session_recordings[0]["session_id"], "2")60 self.assertEqual(session_recordings[0]["distinct_id"], "user")61 self.assertEqual(session_recordings[1]["start_time"], self.base_time)62 self.assertEqual(session_recordings[1]["session_id"], "1")63 self.assertEqual(session_recordings[1]["distinct_id"], "user")64 self.assertEqual(more_recordings_available, False)65 @freeze_time("2021-01-21T20:00:00.000Z")66 def test_recordings_dont_leak_data_between_teams(self):67 another_team = Team.objects.create(organization=self.organization)68 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})69 Person.objects.create(team=another_team, distinct_ids=["user"], properties={"email": "bla"})70 self.create_snapshot("user", "1", self.base_time, team_id=another_team.pk)71 self.create_snapshot("user", "2", self.base_time)72 filter = SessionRecordingsFilter(team=self.team, data={"no_filter": None})73 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)74 (session_recordings, _) = session_recording_list_instance.run()75 self.assertEqual(len(session_recordings), 1)76 self.assertEqual(session_recordings[0]["session_id"], "2")77 self.assertEqual(session_recordings[0]["distinct_id"], "user")78 @freeze_time("2021-01-21T20:00:00.000Z")79 def test_all_sessions_recording_object_keys(self):80 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})81 self.create_snapshot("user", "1", self.base_time)82 self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))83 filter = SessionRecordingsFilter(team=self.team, data={"no_filter": None})84 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)85 (session_recordings, _) = session_recording_list_instance.run()86 self.assertEqual(len(session_recordings), 1)87 self.assertEqual(session_recordings[0]["session_id"], "1")88 self.assertEqual(session_recordings[0]["distinct_id"], "user")89 self.assertEqual(session_recordings[0]["start_time"], self.base_time)90 self.assertEqual(session_recordings[0]["end_time"], self.base_time + relativedelta(seconds=30))91 self.assertEqual(session_recordings[0]["duration"], 30)92 @freeze_time("2021-01-21T20:00:00.000Z")93 def test_event_filter(self):94 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})95 self.create_snapshot("user", "1", self.base_time)96 self.create_event("user", self.base_time)97 self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))98 filter = SessionRecordingsFilter(99 team=self.team,100 data={"events": [{"id": "$pageview", "type": "events", "order": 0, "name": "$pageview"}]},101 )102 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)103 (session_recordings, _) = session_recording_list_instance.run()104 self.assertEqual(len(session_recordings), 1)105 self.assertEqual(session_recordings[0]["session_id"], "1")106 filter = SessionRecordingsFilter(107 team=self.team,108 data={"events": [{"id": "$autocapture", "type": "events", "order": 0, "name": "$autocapture"}]},109 )110 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)111 (session_recordings, _) = session_recording_list_instance.run()112 self.assertEqual(len(session_recordings), 0)113 @test_with_materialized_columns(["$current_url", "$browser"])114 @freeze_time("2021-01-21T20:00:00.000Z")115 def test_event_filter_with_properties(self):116 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})117 self.create_snapshot("user", "1", self.base_time)118 self.create_event("user", self.base_time, properties={"$browser": "Chrome"})119 self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))120 filter = SessionRecordingsFilter(121 team=self.team,122 data={123 "events": [124 {125 "id": "$pageview",126 "type": "events",127 "order": 0,128 "name": "$pageview",129 "properties": [130 {"key": "$browser", "value": ["Chrome"], "operator": "exact", "type": "event"}131 ],132 },133 ]134 },135 )136 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)137 (session_recordings, _) = session_recording_list_instance.run()138 self.assertEqual(len(session_recordings), 1)139 self.assertEqual(session_recordings[0]["session_id"], "1")140 filter = SessionRecordingsFilter(141 team=self.team,142 data={143 "events": [144 {145 "id": "$pageview",146 "type": "events",147 "order": 0,148 "name": "$pageview",149 "properties": [150 {"key": "$browser", "value": ["Firefox"], "operator": "exact", "type": "event"}151 ],152 },153 ]154 },155 )156 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)157 (session_recordings, _) = session_recording_list_instance.run()158 self.assertEqual(len(session_recordings), 0)159 @freeze_time("2021-01-21T20:00:00.000Z")160 def test_multiple_event_filters(self):161 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})162 self.create_snapshot("user", "1", self.base_time)163 self.create_event("user", self.base_time)164 self.create_event("user", self.base_time, event_name="new-event")165 self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))166 filter = SessionRecordingsFilter(167 team=self.team,168 data={169 "events": [170 {"id": "$pageview", "type": "events", "order": 0, "name": "$pageview"},171 {"id": "new-event", "type": "events", "order": 0, "name": "new-event"},172 ]173 },174 )175 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)176 (session_recordings, _) = session_recording_list_instance.run()177 self.assertEqual(len(session_recordings), 1)178 self.assertEqual(session_recordings[0]["session_id"], "1")179 filter = SessionRecordingsFilter(180 team=self.team,181 data={182 "events": [183 {"id": "$pageview", "type": "events", "order": 0, "name": "$pageview"},184 {"id": "new-event2", "type": "events", "order": 0, "name": "new-event2"},185 ]186 },187 )188 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)189 (session_recordings, _) = session_recording_list_instance.run()190 self.assertEqual(len(session_recordings), 0)191 @test_with_materialized_columns(["$current_url", "$browser"])192 @freeze_time("2021-01-21T20:00:00.000Z")193 def test_action_filter(self):194 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})195 action1 = self.create_action("custom-event", properties=[{"key": "$browser", "value": "Firefox"}])196 action2 = self.create_action(name="custom-event")197 self.create_snapshot("user", "1", self.base_time)198 self.create_event("user", self.base_time, event_name="custom-event", properties={"$browser": "Chrome"})199 self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))200 # An action with properties201 filter = SessionRecordingsFilter(202 team=self.team,203 data={"actions": [{"id": action1.id, "type": "actions", "order": 1, "name": "custom-event",}]},204 )205 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)206 (session_recordings, _) = session_recording_list_instance.run()207 self.assertEqual(len(session_recordings), 0)208 # An action without properties209 filter = SessionRecordingsFilter(210 team=self.team,211 data={"actions": [{"id": action2.id, "type": "actions", "order": 1, "name": "custom-event",}]},212 )213 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)214 (session_recordings, _) = session_recording_list_instance.run()215 self.assertEqual(len(session_recordings), 1)216 self.assertEqual(session_recordings[0]["session_id"], "1")217 # Adding properties to an action218 filter = SessionRecordingsFilter(219 team=self.team,220 data={221 "actions": [222 {223 "id": action2.id,224 "type": "actions",225 "order": 1,226 "name": "custom-event",227 "properties": [228 {"key": "$browser", "value": ["Firefox"], "operator": "exact", "type": "event"}229 ],230 }231 ]232 },233 )234 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)235 (session_recordings, _) = session_recording_list_instance.run()236 self.assertEqual(len(session_recordings), 0)237 @freeze_time("2021-01-21T20:00:00.000Z")238 def test_all_sessions_recording_object_keys_with_entity_filter(self):239 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})240 self.create_snapshot("user", "1", self.base_time)241 self.create_event("user", self.base_time)242 self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))243 filter = SessionRecordingsFilter(244 team=self.team,245 data={"events": [{"id": "$pageview", "type": "events", "order": 0, "name": "$pageview"}]},246 )247 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)248 (session_recordings, _) = session_recording_list_instance.run()249 self.assertEqual(len(session_recordings), 1)250 self.assertEqual(session_recordings[0]["session_id"], "1")251 self.assertEqual(session_recordings[0]["distinct_id"], "user")252 self.assertEqual(session_recordings[0]["start_time"], self.base_time)253 self.assertEqual(session_recordings[0]["end_time"], self.base_time + relativedelta(seconds=30))254 self.assertEqual(session_recordings[0]["duration"], 30)255 @freeze_time("2021-01-21T20:00:00.000Z")256 def test_duration_filter(self):257 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})258 self.create_snapshot("user", "1", self.base_time)259 self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))260 self.create_snapshot("user", "2", self.base_time)261 self.create_snapshot("user", "2", self.base_time + relativedelta(minutes=4))262 filter = SessionRecordingsFilter(263 team=self.team,264 data={"session_recording_duration": '{"type":"recording","key":"duration","value":60,"operator":"gt"}'},265 )266 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)267 (session_recordings, _) = session_recording_list_instance.run()268 self.assertEqual(len(session_recordings), 1)269 self.assertEqual(session_recordings[0]["session_id"], "2")270 filter = SessionRecordingsFilter(271 team=self.team,272 data={"session_recording_duration": '{"type":"recording","key":"duration","value":60,"operator":"lt"}'},273 )274 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)275 (session_recordings, _) = session_recording_list_instance.run()276 self.assertEqual(len(session_recordings), 1)277 self.assertEqual(session_recordings[0]["session_id"], "1")278 @freeze_time("2021-01-21T20:00:00.000Z")279 def test_date_from_filter(self):280 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})281 self.create_snapshot("user", "1", self.base_time - relativedelta(days=3))282 self.create_snapshot("user", "1", self.base_time - relativedelta(days=3) + relativedelta(seconds=30))283 filter = SessionRecordingsFilter(team=self.team, data={"date_from": self.base_time.strftime("%Y-%m-%d")})284 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)285 (session_recordings, _) = session_recording_list_instance.run()286 self.assertEqual(len(session_recordings), 0)287 filter = SessionRecordingsFilter(288 team=self.team, data={"date_from": (self.base_time - relativedelta(days=4)).strftime("%Y-%m-%d")}289 )290 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)291 (session_recordings, _) = session_recording_list_instance.run()292 self.assertEqual(len(session_recordings), 1)293 self.assertEqual(session_recordings[0]["session_id"], "1")294 @freeze_time("2021-01-21T20:00:00.000Z")295 def test_date_to_filter(self):296 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})297 self.create_snapshot("user", "1", self.base_time - relativedelta(days=3))298 self.create_snapshot("user", "1", self.base_time - relativedelta(days=3) + relativedelta(seconds=30))299 filter = SessionRecordingsFilter(300 team=self.team, data={"date_to": (self.base_time - relativedelta(days=4)).strftime("%Y-%m-%d")}301 )302 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)303 (session_recordings, _) = session_recording_list_instance.run()304 self.assertEqual(len(session_recordings), 0)305 filter = SessionRecordingsFilter(team=self.team, data={"date_to": (self.base_time).strftime("%Y-%m-%d")})306 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)307 (session_recordings, _) = session_recording_list_instance.run()308 self.assertEqual(len(session_recordings), 1)309 self.assertEqual(session_recordings[0]["session_id"], "1")310 @freeze_time("2021-01-21T20:00:00.000Z")311 def test_recording_that_spans_time_bounds(self):312 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})313 day_line = datetime(2021, 11, 5)314 self.create_snapshot("user", "1", day_line - relativedelta(hours=3))315 self.create_snapshot("user", "1", day_line + relativedelta(hours=3))316 filter = SessionRecordingsFilter(317 team=self.team,318 data={319 "date_to": day_line.strftime("%Y-%m-%d"),320 "date_from": (day_line - relativedelta(days=10)).strftime("%Y-%m-%d"),321 },322 )323 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)324 (session_recordings, _) = session_recording_list_instance.run()325 self.assertEqual(len(session_recordings), 1)326 self.assertEqual(session_recordings[0]["session_id"], "1")327 self.assertEqual(session_recordings[0]["duration"], 6 * 60 * 60)328 @freeze_time("2021-01-21T20:00:00.000Z")329 def test_person_id_filter(self):330 p = Person.objects.create(team=self.team, distinct_ids=["user", "user2"], properties={"email": "bla"})331 self.create_snapshot("user", "1", self.base_time)332 self.create_snapshot("user2", "2", self.base_time + relativedelta(seconds=10))333 self.create_snapshot("user3", "3", self.base_time + relativedelta(seconds=20))334 filter = SessionRecordingsFilter(team=self.team, data={"person_uuid": str(p.uuid),})335 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)336 (session_recordings, _) = session_recording_list_instance.run()337 self.assertEqual(len(session_recordings), 2)338 self.assertEqual(session_recordings[0]["session_id"], "2")339 self.assertEqual(session_recordings[1]["session_id"], "1")340 @freeze_time("2021-01-21T20:00:00.000Z")341 def test_all_filters_at_once(self):342 p = Person.objects.create(team=self.team, distinct_ids=["user", "user2"], properties={"email": "bla"})343 action2 = self.create_action(name="custom-event")344 self.create_snapshot("user", "1", self.base_time - relativedelta(days=3))345 self.create_event("user", self.base_time - relativedelta(days=3))346 self.create_event(347 "user",348 self.base_time - relativedelta(days=3),349 event_name="custom-event",350 properties={"$browser": "Chrome"},351 )352 self.create_snapshot("user", "1", self.base_time - relativedelta(days=3) + relativedelta(hours=6))353 filter = SessionRecordingsFilter(354 team=self.team,355 data={356 "person_uuid": str(p.uuid),357 "date_to": (self.base_time + relativedelta(days=3)).strftime("%Y-%m-%d"),358 "date_from": (self.base_time - relativedelta(days=10)).strftime("%Y-%m-%d"),359 "session_recording_duration": '{"type":"recording","key":"duration","value":60,"operator":"gt"}',360 "events": [{"id": "$pageview", "type": "events", "order": 0, "name": "$pageview"}],361 "actions": [{"id": action2.id, "type": "actions", "order": 1, "name": "custom-event",}],362 },363 )364 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)365 (session_recordings, _) = session_recording_list_instance.run()366 self.assertEqual(len(session_recordings), 1)367 self.assertEqual(session_recordings[0]["session_id"], "1")368 @freeze_time("2021-01-21T20:00:00.000Z")369 def test_pagination(self):370 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})371 self.create_snapshot("user", "1", self.base_time)372 self.create_snapshot("user", "2", self.base_time + relativedelta(seconds=10))373 self.create_snapshot("user", "3", self.base_time + relativedelta(seconds=20))374 filter = SessionRecordingsFilter(team=self.team, data={"limit": 2,})375 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)376 (session_recordings, more_recordings_available) = session_recording_list_instance.run()377 self.assertEqual(len(session_recordings), 2)378 self.assertEqual(session_recordings[0]["session_id"], "3")379 self.assertEqual(session_recordings[1]["session_id"], "2")380 self.assertEqual(more_recordings_available, True)381 filter = SessionRecordingsFilter(team=self.team, data={"limit": 2, "offset": 0})382 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)383 (session_recordings, more_recordings_available) = session_recording_list_instance.run()384 self.assertEqual(len(session_recordings), 2)385 self.assertEqual(session_recordings[0]["session_id"], "3")386 self.assertEqual(session_recordings[1]["session_id"], "2")387 self.assertEqual(more_recordings_available, True)388 filter = SessionRecordingsFilter(team=self.team, data={"limit": 2, "offset": 1})389 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)390 (session_recordings, more_recordings_available) = session_recording_list_instance.run()391 self.assertEqual(len(session_recordings), 2)392 self.assertEqual(session_recordings[0]["session_id"], "2")393 self.assertEqual(session_recordings[1]["session_id"], "1")394 self.assertEqual(more_recordings_available, False)395 filter = SessionRecordingsFilter(team=self.team, data={"limit": 2, "offset": 2})396 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)397 (session_recordings, more_recordings_available) = session_recording_list_instance.run()398 self.assertEqual(len(session_recordings), 1)399 self.assertEqual(session_recordings[0]["session_id"], "1")400 self.assertEqual(more_recordings_available, False)401 @freeze_time("2021-01-21T20:00:00.000Z")402 def test_recording_without_fullsnapshot_dont_appear(self):403 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})404 self.create_snapshot("user", "1", self.base_time, has_full_snapshot=False)405 filter = SessionRecordingsFilter(team=self.team, data={"no-filter": True})406 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)407 (session_recordings, _) = session_recording_list_instance.run()408 self.assertEqual(len(session_recordings), 0)409 @freeze_time("2021-01-21T20:00:00.000Z")410 def test_teams_dont_leak_event_filter(self):411 Person.objects.create(team=self.team, distinct_ids=["user"], properties={"email": "bla"})412 another_team = Team.objects.create(organization=self.organization)413 self.create_snapshot("user", "1", self.base_time)414 self.create_event(1, self.base_time + relativedelta(seconds=15), team=another_team)415 self.create_snapshot("user", "1", self.base_time + relativedelta(seconds=30))416 filter = SessionRecordingsFilter(417 team=self.team,418 data={"events": [{"id": "$pageview", "type": "events", "order": 0, "name": "$pageview"}]},419 )420 session_recording_list_instance = session_recording_list(filter=filter, team_id=self.team.pk)421 (session_recordings, _) = session_recording_list_instance.run()422 self.assertEqual(len(session_recordings), 0)...
test_dirsnapshot.py
Source:test_dirsnapshot.py
...87 d1 = tmp_path / "dir1"88 d1.mkdir()89 snapshot_file = tmp_path / f"{d1}.snapshot"90 populate_dir(d1)91 create_snapshot(str(d1), str(snapshot_file))92 assert is_snapshot_file(snapshot_file)93 snapshot_file = tmp_path / "not_a_snapshot"94 snapshot_file.touch()95 assert not is_snapshot_file(snapshot_file)96def test_create_snapshot_in_memory(tmp_path: pathlib.Path):97 """Test create_snapshot with in-memory snapshot"""98 d1 = tmp_path / "dir1"99 d1.mkdir()100 populate_dir(d1)101 snapshot = create_snapshot(str(d1), None)102 assert type(snapshot) == DirSnapshot103def test_dirdiff_two_snapshots(tmp_path: pathlib.Path, capsys):104 """Test DirDiff with two snapshots"""105 d1 = tmp_path / "dir1"106 d1.mkdir()107 files = populate_dir(d1)108 # snapshot 1109 snapshot_file1 = tmp_path / "1.snapshot"110 snapshot_1 = create_snapshot(str(d1), str(snapshot_file1), description="snapshot-1")111 assert snapshot_1.description == "snapshot-1"112 # create some changes113 added, removed, modified = modify_files(d1)114 # snapshot 2115 snapshot_file2 = tmp_path / "2.snapshot"116 snapshot_2 = create_snapshot(str(d1), str(snapshot_file2), description="snapshot-2")117 assert snapshot_2.description == "snapshot-2"118 dirdiff = DirDiff(119 str(snapshot_file1),120 str(snapshot_file2),121 )122 diff = dirdiff.diff()123 assert isinstance(diff, DirDiffResults)124 assert sorted(diff.removed) == removed125 assert sorted(diff.added) == added126 assert sorted(diff.modified) == modified127 # remove files we changed/removed from files list for comparing identical128 for f in removed + modified:129 files.remove(f)130 assert sorted(diff.identical) == sorted(files)131 # test json132 json_dict = json.loads(dirdiff.diff().json())133 assert sorted(json_dict["added"]) == sorted(diff.added)134 assert sorted(json_dict["removed"]) == sorted(diff.removed)135 assert sorted(json_dict["modified"]) == sorted(diff.modified)136 assert sorted(json_dict["identical"]) == sorted(diff.identical)137 # test description138 dirdiff.report()139 output = capsys.readouterr().out.strip()140 assert "(snapshot-1)" in output141 assert "(snapshot-2)" in output142def test_dirdiff_two_snapshot_objects(tmp_path: pathlib.Path, capsys):143 """Test DirDiff with two snapshot objects"""144 d1 = tmp_path / "dir1"145 d1.mkdir()146 files = populate_dir(d1)147 # snapshot 1148 snapshot_file1 = tmp_path / "1.snapshot"149 snapshot_1 = create_snapshot(str(d1), str(snapshot_file1), description="snapshot-1")150 assert snapshot_1.description == "snapshot-1"151 added, removed, modified = modify_files(d1)152 # snapshot 2153 snapshot_file2 = tmp_path / "2.snapshot"154 snapshot_2 = create_snapshot(str(d1), str(snapshot_file2), description="snapshot-2")155 assert snapshot_2.description == "snapshot-2"156 dirdiff = DirDiff(snapshot_1, snapshot_2)157 diff = dirdiff.diff()158 assert isinstance(diff, DirDiffResults)159 assert sorted(diff.removed) == removed160 assert sorted(diff.added) == added161 assert sorted(diff.modified) == modified162 # remove files we changed/removed from files list for comparing identical163 for f in removed + modified:164 files.remove(f)165 assert sorted(diff.identical) == sorted(files)166 # test json167 json_dict = json.loads(dirdiff.diff().json())168 assert sorted(json_dict["added"]) == sorted(diff.added)169 assert sorted(json_dict["removed"]) == sorted(diff.removed)170 assert sorted(json_dict["modified"]) == sorted(diff.modified)171 assert sorted(json_dict["identical"]) == sorted(diff.identical)172 # test description173 dirdiff.report()174 output = capsys.readouterr().out.strip()175 assert "(snapshot-1)" in output176 assert "(snapshot-2)" in output177def test_dirdiff_snapshot_dir(tmp_path: pathlib.Path):178 """Test DirDiff with snapshot and directory"""179 d1 = tmp_path / "dir1"180 d1.mkdir()181 files = populate_dir(d1)182 # snapshot 1183 snapshot_file1 = tmp_path / "1.snapshot"184 create_snapshot(str(d1), str(snapshot_file1), description="snapshot-1")185 added, removed, modified = modify_files(d1)186 dirdiff = DirDiff(187 str(snapshot_file1),188 str(d1),189 )190 diff = dirdiff.diff()191 assert isinstance(diff, DirDiffResults)192 assert sorted(diff.removed) == removed193 assert sorted(diff.added) == added194 assert sorted(diff.modified) == modified195 # remove files we changed/removed from files list for comparing identical196 for f in removed + modified:197 files.remove(f)198 assert sorted(diff.identical) == sorted(files)199def test_dirdiff_diff_dirs(tmp_path: pathlib.Path):200 """Test DirDiff with dirs=False"""201 d1 = tmp_path / "dir1"202 d1.mkdir()203 files = populate_dir(d1)204 # snapshot 1205 snapshot_file1 = tmp_path / "1.snapshot"206 create_snapshot(str(d1), str(snapshot_file1), description="snapshot-1")207 added, removed, modified = modify_files(d1)208 dirdiff = DirDiff(209 str(snapshot_file1),210 str(d1),211 )212 diff = dirdiff.diff(dirs=False)213 assert sorted(diff.removed) == removed214 assert sorted(diff.added) == added215 # filter out directories216 modified = [f for f in modified if pathlib.Path(f).name not in ["dir_0", "dir_1"]]217 assert sorted(diff.modified) == modified218 # remove files we changed/removed from files list for comparing identical219 for f in removed + modified:220 files.remove(f)221 # filter out directories222 files = [f for f in files if pathlib.Path(f).name not in ["dir_0", "dir_1"]]223 assert sorted(diff.identical) == sorted(files)224def test_dirdiff_filter_function_two_snapshots(tmp_path: pathlib.Path):225 """Test DirDiff with filter function and two snapshots"""226 d1 = tmp_path / "dir1"227 d1.mkdir()228 files = populate_dir(d1)229 # snapshot 1230 snapshot_file1 = tmp_path / "1.snapshot"231 create_snapshot(str(d1), str(snapshot_file1))232 added, removed, modified = modify_files(d1)233 # snapshot 2234 snapshot_file2 = tmp_path / "2.snapshot"235 create_snapshot(str(d1), str(snapshot_file2))236 def filter_function(path):237 if path.name == "file_1":238 return False239 return True240 # remove file_1 as the filter function will ignore it241 modified.remove(str(d1 / "file_1"))242 files.remove(str(d1 / "file_1"))243 dirdiff = DirDiff(244 str(snapshot_file1),245 str(snapshot_file2),246 filter_function=filter_function,247 )248 diff = dirdiff.diff()249 assert isinstance(diff, DirDiffResults)250 assert sorted(diff.removed) == removed251 assert sorted(diff.added) == added252 assert sorted(diff.modified) == modified253 # remove files we changed/removed from files list for comparing identical254 for f in removed + modified:255 files.remove(f)256 assert sorted(diff.identical) == sorted(files)257def test_dirdiff_filter_function_snapshot_dir(tmp_path: pathlib.Path):258 """Test DirDiff with filter function and snapshot and directory"""259 d1 = tmp_path / "dir1"260 d1.mkdir()261 files = populate_dir(d1)262 # snapshot 1263 snapshot_file1 = tmp_path / "1.snapshot"264 create_snapshot(str(d1), str(snapshot_file1))265 added, removed, modified = modify_files(d1)266 def filter_function(path):267 if path.name == "file_1":268 return False269 return True270 # remove file_1 as the filter function will ignore it271 modified.remove(str(d1 / "file_1"))272 files.remove(str(d1 / "file_1"))273 dirdiff = DirDiff(274 str(snapshot_file1),275 str(d1),276 filter_function=filter_function,277 )278 diff = dirdiff.diff()279 assert isinstance(diff, DirDiffResults)280 assert sorted(diff.removed) == removed281 assert sorted(diff.added) == added282 assert sorted(diff.modified) == modified283 # remove files we changed/removed from files list for comparing identical284 for f in removed + modified:285 files.remove(f)286 assert sorted(diff.identical) == sorted(files)287def test_snapshot_filter_function(tmp_path: pathlib.Path):288 """Test snapshot with filter function"""289 d1 = tmp_path / "dir1"290 d1.mkdir()291 files = populate_dir(d1)292 # snapshot 1293 snapshot_file1 = tmp_path / "1.snapshot"294 create_snapshot(295 str(d1), str(snapshot_file1), filter_function=lambda p: p.name != "file_1"296 )297 added, removed, modified = modify_files(d1)298 # snapshot 2299 snapshot_file2 = tmp_path / "2.snapshot"300 create_snapshot(301 str(d1), str(snapshot_file2), filter_function=lambda p: p.name != "file_1"302 )303 # remove file_1 as the filter function will ignore it304 modified.remove(str(d1 / "file_1"))305 files.remove(str(d1 / "file_1"))306 dirdiff = DirDiff(307 str(snapshot_file1),308 str(snapshot_file2),309 )310 diff = dirdiff.diff()311 assert isinstance(diff, DirDiffResults)312 assert sorted(diff.removed) == removed313 assert sorted(diff.added) == added314 assert sorted(diff.modified) == modified315 # remove files we changed/removed from files list for comparing identical316 for f in removed + modified:317 files.remove(f)318 assert sorted(diff.identical) == sorted(files)319def test_snapshot_no_walk(tmp_path: pathlib.Path):320 """Test snapshot with walk=False"""321 d1 = tmp_path / "dir1"322 d1.mkdir()323 files = populate_dir(d1)324 # snapshot 1325 snapshot_file1 = tmp_path / "1.snapshot"326 create_snapshot(str(d1), str(snapshot_file1), walk=False)327 added, removed, modified = modify_files(d1)328 # filter out files in subdirs since walk=False will filter these out329 added = [330 f for f in added if pathlib.Path(f).parent not in [d1 / "dir_0", d1 / "dir_1"]331 ]332 removed = [333 f for f in removed if pathlib.Path(f).parent not in [d1 / "dir_0", d1 / "dir_1"]334 ]335 modified = [336 f337 for f in modified338 if pathlib.Path(f).parent not in [d1 / "dir_0", d1 / "dir_1"]339 ]340 # snapshot 2341 snapshot_file2 = tmp_path / "2.snapshot"342 create_snapshot(str(d1), str(snapshot_file2), walk=False)343 dirdiff = DirDiff(344 str(snapshot_file1),345 str(snapshot_file2),346 )347 diff = dirdiff.diff()348 assert isinstance(diff, DirDiffResults)349 assert sorted(diff.removed) == removed350 assert sorted(diff.added) == added351 assert sorted(diff.modified) == modified352 # remove files we changed/removed from files list for comparing identical353 for f in removed + modified:354 files.remove(f)355 files = [356 f for f in files if pathlib.Path(f).parent not in [d1 / "dir_0", d1 / "dir_1"]357 ]358 assert sorted(diff.identical) == sorted(files)359def test_dirdiff_compare_function(tmp_path: pathlib.Path, capsys):360 """Test DirDiff with custom compare function"""361 d1 = tmp_path / "dir1"362 d1.mkdir()363 files = populate_dir(d1)364 # snapshot 1365 snapshot_file1 = tmp_path / "1.snapshot"366 snapshot_1 = create_snapshot(str(d1), str(snapshot_file1), description="snapshot-1")367 assert snapshot_1.description == "snapshot-1"368 added, removed, _ = modify_files(d1)369 # snapshot 2370 snapshot_file2 = tmp_path / "2.snapshot"371 snapshot_2 = create_snapshot(str(d1), str(snapshot_file2), description="snapshot-2")372 assert snapshot_2.description == "snapshot-2"373 def compare_function(record_1: SnapshotRecord, record_2: SnapshotRecord):374 # files are equal if size is the same375 return record_1.size == record_2.size376 dirdiff = DirDiff(snapshot_1, snapshot_2)377 diff = dirdiff.diff(compare_function=compare_function)378 assert isinstance(diff, DirDiffResults)379 assert sorted(diff.removed) == removed380 assert sorted(diff.added) == added381 # only list files that are not equal size382 modified = sorted(383 [384 str(f)385 for f in [386 d1 / "file_1",387 d1 / "dir_0" / "file_0_0",388 d1 / "dir_0",389 d1 / "dir_1",390 ]391 ]392 )393 assert sorted(diff.modified) == modified394 # remove files we changed/removed from files list for comparing identical395 for f in removed + modified:396 files.remove(f)397 assert sorted(diff.identical) == sorted(files)398 # test json399 json_dict = json.loads(dirdiff.diff(compare_function=compare_function).json())400 assert sorted(json_dict["added"]) == sorted(diff.added)401 assert sorted(json_dict["removed"]) == sorted(diff.removed)402 assert sorted(json_dict["modified"]) == sorted(diff.modified)403 assert sorted(json_dict["identical"]) == sorted(diff.identical)404def test_dirdiff_valuerror(tmp_path: pathlib.Path):405 """Test DirDiff with invalid arguments"""406 d1 = tmp_path / "dir1"407 d1.mkdir()408 files = populate_dir(d1)409 # snapshot 1410 snapshot_file1 = tmp_path / "1.snapshot"411 create_snapshot(str(d1), str(snapshot_file1), walk=False)412 # snapshot 2413 snapshot_file2 = tmp_path / "2.snapshot"414 create_snapshot(str(d1), str(snapshot_file2), walk=False)415 with pytest.raises(ValueError):416 dirdiff = DirDiff(417 str(d1),418 str(snapshot_file2),419 )420 with pytest.raises(ValueError):421 dirdiff = DirDiff(str(snapshot_file1), str(d1 / "file_0"))422def test_snapshot_error(tmp_path: pathlib.Path):423 """Test snapshot with with existing file"""424 d1 = tmp_path / "dir1"425 d1.mkdir()426 files = populate_dir(d1)427 # snapshot 1428 snapshot_file1 = tmp_path / "1.snapshot"429 create_snapshot(str(d1), str(snapshot_file1), walk=False)430 # snapshot 2431 with pytest.raises(ValueError):432 create_snapshot(str(d1), str(snapshot_file1), walk=False)433def test_dirsnapshot(tmp_path: pathlib.Path):434 """Test DirSnapshot methods"""435 d1 = tmp_path / "dir1"436 d1.mkdir()437 files = populate_dir(d1)438 snapshot_db = tmp_path / "snapshot.db"439 snapshot = create_snapshot(440 str(d1), snapshot_db, walk=True, description="Test Snapshot"441 )442 assert is_snapshot_file(snapshot_db)443 assert snapshot.description == "Test Snapshot"444 assert snapshot.directory == str(d1)445 assert sorted(list(snapshot.files())) == sorted(files)446 assert type(snapshot.datetime) == datetime.datetime447 info = snapshot.info448 assert type(info) == SnapshotInfo449 assert info.description == "Test Snapshot"450 assert info.directory == str(d1)451 record = snapshot.record(str(d1 / "file_0"))452 assert type(record) == SnapshotRecord453 assert record.path == str(d1 / "file_0")454 assert record.is_file455 assert not record.is_dir456 record_dict = record.asdict()457 assert record_dict["path"] == str(d1 / "file_0")458 records = list(snapshot.records())459 assert len(records) == len(files)460def test_dirsnapshot_load(tmp_path: pathlib.Path):461 """Test DirSnapshot methods when loaded from disk"""462 d1 = tmp_path / "dir1"463 d1.mkdir()464 files = populate_dir(d1)465 snapshot_db = tmp_path / "snapshot.db"466 snapshot_ = create_snapshot(467 str(d1), snapshot_db, walk=True, description="Test Snapshot"468 )469 assert is_snapshot_file(snapshot_db)470 snapshot = load_snapshot(snapshot_db)471 assert snapshot.description == "Test Snapshot"472 assert snapshot.directory == str(d1)473 assert sorted(list(snapshot.files())) == sorted(files)474 assert type(snapshot.datetime) == datetime.datetime475 info = snapshot.info476 assert type(info) == SnapshotInfo477 assert info.description == "Test Snapshot"478 assert info.directory == str(d1)479 record = snapshot.record(str(d1 / "file_0"))480 assert type(record) == SnapshotRecord481 assert record.path == str(d1 / "file_0")482 assert record.is_file483 assert not record.is_dir484 record_dict = record.asdict()485 assert record_dict["path"] == str(d1 / "file_0")486 records = list(snapshot.records())487 assert len(records) == len(files)488def test_dirsnapshot_len(tmp_path: pathlib.Path):489 """Test DirSnapshot length"""490 d1 = tmp_path / "dir1"491 d1.mkdir()492 files = populate_dir(d1)493 snapshot_db = tmp_path / "snapshot.db"494 snapshot = create_snapshot(495 str(d1), snapshot_db, walk=True, description="Test Snapshot"496 )...
test_share_snapshots.py
Source:test_share_snapshots.py
...14class ShareSnapshotCLITest(base.OSCClientTestBase):15 """Functional tests for share snpshot."""16 def test_share_snapshot_create(self):17 share = self.create_share()18 snapshot = self.create_snapshot(19 share=share['id'],20 name='Snap',21 description='Description')22 self.assertEqual(share["id"], snapshot["share_id"])23 self.assertEqual('Snap', snapshot["name"])24 self.assertEqual('Description', snapshot["description"])25 self.assertEqual('available', snapshot["status"])26 snapshots_list = self.listing_result('share snapshot', 'list')27 self.assertIn(snapshot['id'],28 [item['ID'] for item in snapshots_list])29 def test_share_snapshot_delete(self):30 share = self.create_share()31 snapshot_1 = self.create_snapshot(32 share=share['id'], add_cleanup=False)33 snapshot_2 = self.create_snapshot(34 share=share['id'], add_cleanup=False)35 self.openstack(36 f'share snapshot delete {snapshot_1["id"]} {snapshot_2["id"]} '37 '--wait')38 self.check_object_deleted('share snapshot', snapshot_1["id"])39 self.check_object_deleted('share snapshot', snapshot_2["id"])40 snapshot_3 = self.create_snapshot(41 share=share['id'], add_cleanup=False)42 self.openstack(43 f'share snapshot set {snapshot_3["id"]} '44 '--status creating')45 self.openstack(46 f'share snapshot delete {snapshot_3["id"]} --wait --force')47 self.check_object_deleted('share snapshot', snapshot_3["id"])48 def test_share_snapshot_show(self):49 share = self.create_share()50 snapshot = self.create_snapshot(51 share=share['id'],52 name='Snap',53 description='Description')54 show_result = self.dict_result(55 'share snapshot', f'show {snapshot["id"]}')56 self.assertEqual(snapshot["id"], show_result["id"])57 self.assertEqual('Snap', show_result["name"])58 self.assertEqual('Description', show_result["description"])59 def test_share_snapshot_set(self):60 share = self.create_share()61 snapshot = self.create_snapshot(share=share['id'])62 self.openstack(63 f'share snapshot set {snapshot["id"]} '64 f'--name Snap --description Description')65 show_result = self.dict_result(66 'share snapshot ', f'show {snapshot["id"]}')67 self.assertEqual(snapshot['id'], show_result["id"])68 self.assertEqual('Snap', show_result["name"])69 self.assertEqual('Description', show_result["description"])70 def test_share_snapshot_unset(self):71 share = self.create_share()72 snapshot = self.create_snapshot(73 share=share['id'],74 name='Snap',75 description='Description')76 self.openstack(77 f'share snapshot unset {snapshot["id"]} --name --description')78 show_result = json.loads(self.openstack(79 f'share snapshot show -f json {snapshot["id"]}'))80 self.assertEqual(snapshot['id'], show_result["id"])81 self.assertIsNone(show_result["name"])82 self.assertIsNone(show_result["description"])83 def test_share_snapshot_list(self):84 share = self.create_share()85 snapshot_1 = self.create_snapshot(share=share['id'])86 snapshot_2 = self.create_snapshot(87 share=share['id'],88 description='Description')89 snapshots_list = self.listing_result(90 'share snapshot', f'list --name {snapshot_2["name"]} '91 f'--description {snapshot_2["description"]} --all-projects'92 )93 self.assertTableStruct(snapshots_list, [94 'ID',95 'Name',96 'Project ID'97 ])98 self.assertIn(snapshot_2["name"],99 [snap["Name"] for snap in snapshots_list])100 self.assertEqual(1, len(snapshots_list))101 snapshots_list = self.listing_result(102 'share snapshot', f'list --share {share["name"]}')103 self.assertTableStruct(snapshots_list, [104 'ID',105 'Name'106 ])107 id_list = [snap["ID"] for snap in snapshots_list]108 self.assertIn(snapshot_1["id"], id_list)109 self.assertIn(snapshot_2["id"], id_list)110 snapshots_list = self.listing_result(111 'share snapshot',112 f'list --name~ {snapshot_2["name"][-3:]} '113 '--description~ Des --detail')114 self.assertTableStruct(snapshots_list, [115 'ID',116 'Name',117 'Status',118 'Description',119 'Created At',120 'Size',121 'Share ID',122 'Share Proto',123 'Share Size',124 'User ID'125 ])126 self.assertIn(snapshot_2["name"],127 [snap["Name"] for snap in snapshots_list])128 self.assertEqual(129 snapshot_2["description"], snapshots_list[0]['Description'])130 self.assertEqual(1, len(snapshots_list))131 def test_share_snapshot_export_location_list(self):132 share = self.create_share()133 snapshot = self.create_snapshot(share=share['id'])134 export_location_list = self.listing_result(135 'share snapshot export location', f' list {snapshot["id"]}')136 self.assertTableStruct(export_location_list, [137 'ID',138 'Path'139 ])140 def test_share_snapshot_export_location_show(self):141 share = self.create_share()142 snapshot = self.create_snapshot(share=share['id'])143 export_location_list = self.listing_result(144 'share snapshot export location', f'list {snapshot["id"]}')145 export_location = self.dict_result(146 'share snapshot export location',147 f'show {snapshot["id"]} {export_location_list[0]["ID"]}')148 self.assertIn('id', export_location)149 self.assertIn('created_at', export_location)150 self.assertIn('is_admin_only', export_location)151 self.assertIn('path', export_location)152 self.assertIn('share_snapshot_instance_id', export_location)153 self.assertIn('updated_at', export_location)154 def test_share_snapshot_abandon_adopt(self):155 share = self.create_share()156 snapshot = self.create_snapshot(157 share=share['id'], add_cleanup=False)158 self.openstack(159 f'share snapshot abandon {snapshot["id"]} --wait')160 snapshots_list = self.listing_result('share snapshot', 'list')161 self.assertNotIn(snapshot['id'],162 [item['ID'] for item in snapshots_list])163 snapshot = self.dict_result(164 'share snapshot',165 f'adopt {share["id"]} 10.0.0.1:/foo/path '166 f'--name Snap --description Zorilla --wait')167 snapshots_list = self.listing_result('share snapshot', 'list')168 self.assertIn(snapshot['id'],169 [item['ID'] for item in snapshots_list])170 self.addCleanup(...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!