Best Python code snippet using Testify_python
test_storage.py
Source:test_storage.py
1# -*- coding: utf-8 -*-2from __future__ import absolute_import3from __future__ import division4from __future__ import print_function5from __future__ import unicode_literals6import os7import io8import six9import json10import pytest11import logging12import datetime13import unittest14import tableschema15import savReaderWriter16from decimal import Decimal17from tableschema_spss import Storage18log = logging.getLogger(__name__)19# Resources20ARTICLES = {21 'schema': {22 'fields': [23 {'name': 'id', 'type': 'integer', 'constraints': {'required': True}},24 {'name': 'parent', 'type': 'integer'},25 {'name': 'name', 'type': 'string'},26 {'name': 'current', 'type': 'boolean'},27 {'name': 'rating', 'type': 'number'},28 ],29 # 'primaryKey': 'id',30 # 'foreignKeys': [31 # {'fields': 'parent', 'reference': {'resource': '', 'fields': 'id'}},32 # ],33 },34 'data': [35 ['1', '', 'Taxes', 'True', '9.5'],36 ['2', '1', 'ä¸å½äºº', 'False', '7'],37 ],38}39COMMENTS = {40 'schema': {41 'fields': [42 {'name': 'entry_id', 'type': 'integer', 'constraints': {'required': True}},43 {'name': 'comment', 'type': 'string'},44 {'name': 'note', 'type': 'any'},45 ],46 # 'primaryKey': 'entry_id',47 # 'foreignKeys': [48 # {'fields': 'entry_id', 'reference': {'resource': 'articles', 'fields': 'id'}},49 # ],50 },51 'data': [52 ['1', 'good', 'note1'],53 ['2', 'bad', 'note2'],54 ],55}56TEMPORAL = {57 'schema': {58 'fields': [59 {'name': 'date', 'type': 'date'},60 {'name': 'date_year', 'type': 'date', 'format': '%Y'},61 {'name': 'datetime', 'type': 'datetime'},62 {'name': 'duration', 'type': 'duration'},63 {'name': 'time', 'type': 'time'},64 {'name': 'year', 'type': 'year'},65 {'name': 'yearmonth', 'type': 'yearmonth'},66 ],67 },68 'data': [69 ['2015-01-01', '2015', '2015-01-01T03:00:00Z', 'P1Y1M', '03:00:00', '2015', '2015-01'],70 ['2015-12-31', '2015', '2015-12-31T15:45:33Z', 'P2Y2M', '15:45:33', '2015', '2015-01'],71 ],72}73LOCATION = {74 'schema': {75 'fields': [76 {'name': 'location', 'type': 'geojson'},77 {'name': 'geopoint', 'type': 'geopoint'},78 ],79 },80 'data': [81 ['{"type": "Point","coordinates":[33.33,33.33]}', '30,75'],82 ['{"type": "Point","coordinates":[50.00,50.00]}', '90,45'],83 ],84}85COMPOUND = {86 'schema': {87 'fields': [88 {'name': 'stats', 'type': 'object'},89 {'name': 'persons', 'type': 'array'},90 ],91 },92 'data': [93 ['{"chars":560}', '["Mike", "John"]'],94 ['{"chars":970}', '["Paul", "Alex"]'],95 ],96}97# Tests98# TODO: activate if possible99# https://github.com/frictionlessdata/tableschema-spss-py/issues/5100@pytest.mark.skip('not possible for now')101def test_storage(tmpdir):102 # Create storage103 storage = Storage(base_path=str(tmpdir))104 # Delete buckets105 storage.delete()106 # Create buckets107 storage.create(['articles', 'comments'], [ARTICLES['schema'], COMMENTS['schema']])108 storage.create('comments', COMMENTS['schema'], force=True)109 storage.create('temporal', TEMPORAL['schema'])110 storage.create('location', LOCATION['schema'])111 storage.create('compound', COMPOUND['schema'])112 # Write data113 storage.write('articles', ARTICLES['data'])114 storage.write('comments', COMMENTS['data'])115 storage.write('temporal', TEMPORAL['data'])116 storage.write('location', LOCATION['data'])117 storage.write('compound', COMPOUND['data'])118 # Create new storage to use reflection only119 storage = Storage(base_path=str(tmpdir))120 # Create existent bucket121 with pytest.raises(tableschema.exceptions.StorageError):122 storage.create('articles.sav', ARTICLES['schema'])123 # Assert buckets124 assert storage.buckets == ['articles', 'compound', 'location', 'temporal', 'comments']125 # Assert schemas126 assert storage.describe('articles') == ARTICLES['schema']127 assert storage.describe('comments') == {128 'fields': [129 {'name': 'entry_id', 'type': 'integer', 'constraints': {'required': True}},130 {'name': 'comment', 'type': 'string'},131 {'name': 'note', 'type': 'string'}, # type downgrade132 ],133 }134 assert storage.describe('temporal') == {135 'fields': [136 {'name': 'date', 'type': 'date'},137 {'name': 'date_year', 'type': 'date'}, # format removal138 {'name': 'datetime', 'type': 'datetime'},139 {'name': 'duration', 'type': 'string'}, # type fallback140 {'name': 'time', 'type': 'time'},141 {'name': 'year', 'type': 'integer'}, # type downgrade142 {'name': 'yearmonth', 'type': 'string'}, # type fallback143 ],144 }145 assert storage.describe('location') == {146 'fields': [147 {'name': 'location', 'type': 'object'}, # type downgrade148 {'name': 'geopoint', 'type': 'string'}, # type fallback149 ],150 }151 assert storage.describe('compound') == {152 'fields': [153 {'name': 'stats', 'type': 'object'},154 {'name': 'persons', 'type': 'string'}, # type fallback155 ],156 }157 # Assert data158 assert storage.read('articles') == cast(ARTICLES)['data']159 assert storage.read('comments') == cast(COMMENTS)['data']160 assert storage.read('temporal') == cast(TEMPORAL, skip=['duration', 'yearmonth'])['data']161 assert storage.read('location') == cast(LOCATION, skip=['geopoint'])['data']162 assert storage.read('compound') == cast(COMPOUND, skip=['array'])['data']163 # Assert data with forced schema164 storage.describe('compound.sav', COMPOUND['schema'])165 assert storage.read('compound.sav') == cast(COMPOUND)['data']166 # Delete non existent bucket167 with pytest.raises(tableschema.exceptions.StorageError):168 storage.delete('non_existent')169 # Delete buckets170 storage.delete()171class BaseTestClass(unittest.TestCase):172 @classmethod173 def get_base_path(cls):174 '''Making this a readonly property prevents it being overridden, and subsequently175 deleted!'''176 return 'data/delete_tests'177 def tearDown(self):178 # Remove all files under test path179 files = os.listdir(self.get_base_path())180 for f in files:181 os.remove(os.path.join(self.get_base_path(), f))182 @classmethod183 def setUpClass(cls):184 # Create directory for writing test files185 if not os.path.exists(cls.get_base_path()):186 os.mkdir(cls.get_base_path())187 @classmethod188 def tearDownClass(cls):189 # Remove test directory190 if os.path.exists(cls.get_base_path()):191 os.rmdir(cls.get_base_path())192class TestBasePath(unittest.TestCase):193 def test_base_path_exists(self):194 '''Storage with existing base_path doesn't raise exception.'''195 try:196 Storage(base_path='data')197 except Exception as e:198 self.fail("Storage() raised Exception")199 def test_base_path_not_exists(self):200 '''base_path arg must exist.'''201 with self.assertRaises(tableschema.exceptions.StorageError):202 Storage(base_path='not_here')203 def test_base_path_must_not_be_file(self):204 '''base_path arg must be a directory.'''205 with self.assertRaises(tableschema.exceptions.StorageError):206 Storage(base_path='data/simple.json')207 def test_base_path_no_path_defined(self):208 '''No base_path defined shouldn't raise exception'''209 try:210 Storage()211 except Exception as e:212 print(e)213 self.fail("Storage() raised Exception")214class TestStorageCreate(BaseTestClass):215 TEST_BASE_PATH = BaseTestClass.get_base_path()216 TEST_FILE_NAME = 'test_simple.sav'217 TEST_FILE_PATH = os.path.join(TEST_BASE_PATH, TEST_FILE_NAME)218 def test_storage_create_creates_file(self):219 '''Storage .sav file created by create()'''220 simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8'))221 storage = Storage(base_path=self.TEST_BASE_PATH)222 storage.create(self.TEST_FILE_NAME, simple_descriptor)223 assert os.path.exists(self.TEST_FILE_PATH)224 self.assertEqual(storage.buckets, ['test_simple.sav'])225 def test_storage_create_creates_file_no_base_path(self):226 '''Storage .sav file created by create() with no base_path'''227 simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8'))228 storage = Storage()229 storage.create(self.TEST_FILE_PATH, simple_descriptor)230 assert os.path.exists(self.TEST_FILE_PATH)231 # storage.buckets not maintained for storage with no base_path232 self.assertEqual(storage.buckets, None)233 def test_storage_create_protect_existing(self):234 '''create() called twice raises exception preventing overwrite of existing235 file.'''236 simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8'))237 storage = Storage(base_path=self.TEST_BASE_PATH)238 storage.create(self.TEST_FILE_NAME, simple_descriptor)239 with self.assertRaises(tableschema.exceptions.StorageError):240 storage.create(self.TEST_FILE_NAME, simple_descriptor)241 def test_storage_create_protect_existing_no_base_path(self):242 '''create() called twice raises exception preventing overwrite of existing243 file.'''244 simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8'))245 storage = Storage()246 storage.create(self.TEST_FILE_PATH, simple_descriptor)247 with self.assertRaises(tableschema.exceptions.StorageError):248 storage.create(self.TEST_FILE_PATH, simple_descriptor)249 def test_storage_create_force_overwrite(self):250 '''create() called twice with force=True allows file overwrite.'''251 simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8'))252 storage = Storage(base_path=self.TEST_BASE_PATH)253 storage.create(self.TEST_FILE_NAME, simple_descriptor)254 try:255 storage.create(self.TEST_FILE_NAME, simple_descriptor, force=True)256 except RuntimeError:257 self.fail("create() raised RuntimeError")258 assert os.path.exists(self.TEST_FILE_PATH)259 def test_storage_create_force_overwrite_no_base_path(self):260 '''create() called twice with force=True allows file overwrite.'''261 simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8'))262 storage = Storage()263 storage.create(self.TEST_FILE_PATH, simple_descriptor)264 try:265 storage.create(self.TEST_FILE_PATH, simple_descriptor, force=True)266 except RuntimeError:267 self.fail("create() raised RuntimeError")268 assert os.path.exists(self.TEST_FILE_PATH)269 def test_storage_create_metadata_headers(self):270 '''created .sav file has expected metadata headers'''271 simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8'))272 storage = Storage(base_path=self.TEST_BASE_PATH)273 storage.create(self.TEST_FILE_NAME, simple_descriptor)274 with savReaderWriter.SavHeaderReader(self.TEST_FILE_PATH, ioUtf8=True) as header:275 metadata = header.all()276 self.assertEqual(metadata.varNames, ['person_id', 'name', 'salary', 'bdate',277 'var_datetime', 'var_time'])278 self.assertEqual(metadata.formats, {'name': 'A10', 'person_id': 'F8',279 'salary': 'DOLLAR8', 'bdate': 'ADATE10',280 'var_datetime': 'DATETIME19',281 'var_time': 'TIME10'})282 self.assertEqual(metadata.varTypes, {'name': 10, 'person_id': 0, 'bdate': 0,283 'salary': 0, 'var_datetime': 0,284 'var_time': 0})285class TestStorageWrite(BaseTestClass):286 TEST_BASE_PATH = BaseTestClass.get_base_path()287 TEST_FILE_NAME = 'test_simple.sav'288 TEST_FILE_PATH = os.path.join(TEST_BASE_PATH, TEST_FILE_NAME)289 def test_write_file_exists(self):290 simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8'))291 storage = Storage(base_path=self.TEST_BASE_PATH)292 # create file with no rows293 storage.create(self.TEST_FILE_NAME, simple_descriptor)294 self.assertEqual(storage.read(self.TEST_FILE_NAME), [])295 rows = [[1, 'fred', Decimal('57000'), datetime.date(1952, 2, 3),296 datetime.datetime(2010, 8, 11, 0, 0, 0), datetime.time(0, 0)]]297 storage.write(self.TEST_FILE_NAME, rows)298 self.assertEqual(storage.read(self.TEST_FILE_NAME), rows)299 def test_write_file_exists_no_base_path(self):300 simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8'))301 storage = Storage()302 # create file with no rows303 storage.create(self.TEST_FILE_PATH, simple_descriptor)304 self.assertEqual(storage.read(self.TEST_FILE_PATH), [])305 rows = [[1, 'fred', Decimal('57000'), datetime.date(1952, 2, 3),306 datetime.datetime(2010, 8, 11, 0, 0, 0), datetime.time(0, 0)]]307 storage.write(self.TEST_FILE_PATH, rows)308 self.assertEqual(storage.read(self.TEST_FILE_PATH), rows)309 def test_write_file_doesnot_exist(self):310 '''Trying to write to a file that does not exist raises exception.'''311 simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8'))312 storage = Storage(base_path=self.TEST_BASE_PATH)313 # create file with no rows314 storage.create(self.TEST_FILE_NAME, simple_descriptor)315 self.assertEqual(storage.read(self.TEST_FILE_NAME), [])316 # remove the file317 os.remove(self.TEST_FILE_PATH)318 rows = [[1, 'fred', Decimal('57000'), datetime.date(1952, 2, 3),319 datetime.datetime(2010, 8, 11, 0, 0, 0), datetime.time(0, 0)]]320 # try to write to it321 with self.assertRaises(tableschema.exceptions.StorageError):322 storage.write(self.TEST_FILE_NAME, rows)323 def test_write_file_doesnot_exist_no_base_path(self):324 '''Trying to write to a file that does not exist raises exception.'''325 simple_descriptor = json.load(io.open('data/simple.json', encoding='utf-8'))326 storage = Storage()327 # create file with no rows328 storage.create(self.TEST_FILE_PATH, simple_descriptor)329 self.assertEqual(storage.read(self.TEST_FILE_PATH), [])330 # remove the file331 os.remove(self.TEST_FILE_PATH)332 rows = [[1, 'fred', Decimal('57000'), datetime.date(1952, 2, 3),333 datetime.datetime(2010, 8, 11, 0, 0, 0), datetime.time(0, 0)]]334 # try to write to it335 with self.assertRaises(tableschema.exceptions.StorageError):336 storage.write(self.TEST_FILE_PATH, rows)337class TestStorageDescribe(BaseTestClass):338 def test_describe(self):339 '''Return the expected schema descriptor.'''340 expected_schema = json.load(io.open('data/Employee_expected_descriptor.json',341 encoding='utf-8'))342 storage = Storage(base_path='data')343 schema = storage.describe('Employee data.sav')344 self.assertEqual(expected_schema, schema)345 def test_describe_no_base_path(self):346 '''Return the expected schema descriptor, with storage with no base_path.'''347 expected_schema = json.load(io.open('data/Employee_expected_descriptor.json',348 encoding='utf-8'))349 storage = Storage()350 # pass file path351 schema = storage.describe('data/Employee data.sav')352 self.assertEqual(expected_schema, schema)353 def test_describe_no_base_path_invalid(self):354 '''Attempting to describe an invalid file raises exception.'''355 storage = Storage()356 # pass file path357 with self.assertRaises(tableschema.exceptions.StorageError):358 storage.describe('data/no-file-here.sav')359class TestStorageIter_Read(BaseTestClass):360 READ_TEST_BASE_PATH = 'data'361 EXPECTED_DATA = [362 [1, 'm', datetime.date(1952, 2, 3), 15, 3, Decimal('57000'), Decimal('27000'),363 98, 144, 0],364 [2, 'm', datetime.date(1958, 5, 23), 16, 1, Decimal('40200'), Decimal('18750'),365 98, 36, 0],366 [3, 'f', datetime.date(1929, 7, 26), 12, 1, Decimal('21450'), Decimal('12000'),367 98, 381, 0],368 [4, 'f', datetime.date(1947, 4, 15), 8, 1, Decimal('21900'), Decimal('13200'),369 98, 190, 0],370 [5, 'm', datetime.date(1955, 2, 9), 15, 1, Decimal('45000'), Decimal('21000'),371 98, 138, 0],372 [6, 'm', datetime.date(1958, 8, 22), 15, 1, Decimal('32100'), Decimal('13500'),373 98, 67, 0],374 [7, 'm', datetime.date(1956, 4, 26), 15, 1, Decimal('36000'), Decimal('18750'),375 98, 114, 0],376 [8, 'f', datetime.date(1966, 5, 6), 12, 1, Decimal('21900'), Decimal('9750'),377 98, 0, 0],378 [9, 'f', datetime.date(1946, 1, 23), 15, 1, Decimal('27900'), Decimal('12750'),379 98, 115, 0],380 [10, 'f', datetime.date(1946, 2, 13), 12, 1, Decimal('24000'), Decimal('13500'),381 98, 244, 0]382 ]383 def _assert_rows(self, rows):384 for i, row in enumerate(rows):385 # Test the first 10 rows against the expected data.386 self.assertEqual(self.EXPECTED_DATA[i], row)387 if i == 9:388 break389 def test_iter(self):390 storage = Storage(base_path=self.READ_TEST_BASE_PATH)391 self._assert_rows(storage.iter('Employee data.sav'))392 def test_iter_no_base_path(self):393 storage = Storage()394 self._assert_rows(storage.iter('data/Employee data.sav'))395 def test_read(self):396 storage = Storage(base_path=self.READ_TEST_BASE_PATH)397 self._assert_rows(storage.read('Employee data.sav'))398 def test_read_no_base_path(self):399 storage = Storage()400 self._assert_rows(storage.read('data/Employee data.sav'))401 def test_read_no_base_path_invalid(self):402 storage = Storage()403 with self.assertRaises(tableschema.exceptions.StorageError):404 storage.read('data/no-file-here.sav')405class TestStorageRead_Dates(BaseTestClass):406 READ_TEST_BASE_PATH = 'data'407 EXPECTED_FIRST_ROW = \408 [datetime.datetime(2010, 8, 11, 0, 0), u'32 WK 2010', datetime.date(2010, 8, 11),409 u'3 Q 2010', datetime.date(2010, 8, 11), datetime.date(2010, 8, 11),410 u'156260 00:00:00', datetime.date(2010, 8, 11), u'August', u'August 2010',411 datetime.time(0, 0), datetime.date(2010, 8, 11), u'Wednesday']412 # TODO: activate413 @pytest.mark.skip('locale problem')414 def test_read_date_file(self):415 '''Test various date formated fields from test file'''416 storage = Storage(base_path=self.READ_TEST_BASE_PATH)417 row = six.next(storage.iter('test_dates.sav'))418 self.assertEqual(row, self.EXPECTED_FIRST_ROW)419 def test_read_time_with_no_decimal(self):420 '''Test file containing time field with no decimals.'''421 storage = Storage(base_path=self.READ_TEST_BASE_PATH)422 expected_time = datetime.time(16, 0)423 row = six.next(storage.iter('test_time_no_decimal.sav'))424 self.assertEqual(row[2], expected_time)425class TestStorageDelete(BaseTestClass):426 TEST_BASE_PATH = BaseTestClass.get_base_path()427 TEST_FILE_PATH = os.path.join(TEST_BASE_PATH, 'delme.sav')428 SIMPLE_DESCRIPTOR = json.load(io.open('data/simple.json', encoding='utf-8'))429 def test_delete_file(self):430 storage = Storage(base_path=self.TEST_BASE_PATH)431 storage.create('delme.sav', self.SIMPLE_DESCRIPTOR)432 # File was created433 self.assertTrue(os.path.exists(self.TEST_FILE_PATH))434 self.assertEqual(storage.buckets, ['delme.sav'])435 storage.delete('delme.sav')436 # File was deleted437 self.assertFalse(os.path.exists(self.TEST_FILE_PATH))438 self.assertEqual(storage.buckets, [])439 def test_delete_file_no_base_path(self):440 storage = Storage()441 storage.create(self.TEST_FILE_PATH, self.SIMPLE_DESCRIPTOR)442 # File was created443 self.assertTrue(os.path.exists(self.TEST_FILE_PATH))444 self.assertEqual(storage.buckets, None)445 storage.delete(self.TEST_FILE_PATH)446 # File was deleted447 self.assertFalse(os.path.exists(self.TEST_FILE_PATH))448 self.assertEqual(storage.buckets, None)449 def test_delete_file_doesnot_exist(self):450 storage = Storage(base_path=self.TEST_BASE_PATH)451 storage.create('delme.sav', self.SIMPLE_DESCRIPTOR)452 # File was created453 self.assertTrue(os.path.exists(self.TEST_FILE_PATH))454 # File is removed externally455 os.remove(self.TEST_FILE_PATH)456 with self.assertRaises(tableschema.exceptions.StorageError):457 storage.delete('delme.sav')458 def test_delete_bucket_doesnot_exist(self):459 storage = Storage(base_path=self.TEST_BASE_PATH)460 storage.create('delme.sav', self.SIMPLE_DESCRIPTOR)461 # File was created462 self.assertTrue(os.path.exists(self.TEST_FILE_PATH))463 # File is removed externally464 os.remove(self.TEST_FILE_PATH)465 with self.assertRaises(tableschema.exceptions.StorageError):466 storage.delete('no-file-here.sav')467 def test_delete_file_doesnot_exist_no_base_path(self):468 storage = Storage()469 storage.create(self.TEST_FILE_PATH, self.SIMPLE_DESCRIPTOR)470 # File was created471 self.assertTrue(os.path.exists(self.TEST_FILE_PATH))472 # File is removed externally473 os.remove(self.TEST_FILE_PATH)474 with self.assertRaises(tableschema.exceptions.StorageError):475 storage.delete(self.TEST_FILE_PATH)476 def test_delete_file_doesnot_exist_ignore(self):477 storage = Storage(base_path=self.TEST_BASE_PATH)478 storage.create('delme.sav', self.SIMPLE_DESCRIPTOR)479 # File was created480 self.assertTrue(os.path.exists(self.TEST_FILE_PATH))481 # File is removed externally482 os.remove(self.TEST_FILE_PATH)483 # File was deleted484 self.assertFalse(os.path.exists(self.TEST_FILE_PATH))485 # Delete and ignore missing486 try:487 storage.delete('delme.sav', ignore=True)488 except(tableschema.exceptions.StorageError):489 self.fail('delete() shouldn\'t raise exception')490 self.assertEqual(storage.buckets, [])491 def test_delete_file_doesnot_exist_ignore_no_base_path(self):492 storage = Storage()493 storage.create(self.TEST_FILE_PATH, self.SIMPLE_DESCRIPTOR)494 # File was created495 self.assertTrue(os.path.exists(self.TEST_FILE_PATH))496 # File is removed externally497 os.remove(self.TEST_FILE_PATH)498 # File was deleted499 self.assertFalse(os.path.exists(self.TEST_FILE_PATH))500 # Delete and ignore missing501 try:502 storage.delete(self.TEST_FILE_PATH, ignore=True)503 except(tableschema.exceptions.StorageError):504 self.fail('delete() shouldn\'t raise exception')505 self.assertEqual(storage.buckets, None)506 def test_delete_all_files(self):507 storage = Storage(base_path=self.TEST_BASE_PATH)508 storage.create('delme.sav', self.SIMPLE_DESCRIPTOR)509 storage.create('delme_too.sav', self.SIMPLE_DESCRIPTOR)510 storage.create('delme_also.sav', self.SIMPLE_DESCRIPTOR)511 expected_buckets = ['delme.sav', 'delme_also.sav', 'delme_too.sav']512 self.assertEqual(sorted(os.listdir(self.TEST_BASE_PATH)), expected_buckets)513 self.assertEqual(storage.buckets, expected_buckets)514 # no specified bucket, delete everything!515 storage.delete()516 self.assertEqual(os.listdir(self.TEST_BASE_PATH), [])517 self.assertEqual(storage.buckets, [])518class TestSafeFilePath(BaseTestClass):519 TEST_BASE_PATH = BaseTestClass.get_base_path()520 SIMPLE_DESCRIPTOR = json.load(io.open('data/simple.json', encoding='utf-8'))521 def test_valid_bucket_name(self):522 storage = Storage(base_path=self.TEST_BASE_PATH)523 try:524 storage.create('delme.sav', self.SIMPLE_DESCRIPTOR)525 storage.create('delme_too', self.SIMPLE_DESCRIPTOR)526 except tableschema.exceptions.StorageError:527 self.fail('Creating with a valid bucket name shouldn\'t fail')528 def test_invalid_bucket_name(self):529 storage = Storage(base_path=self.TEST_BASE_PATH)530 with self.assertRaises(tableschema.exceptions.StorageError):531 storage.create('../delme.sav', self.SIMPLE_DESCRIPTOR)532 with self.assertRaises(tableschema.exceptions.StorageError):533 storage.create('/delme.sav', self.SIMPLE_DESCRIPTOR)534 with self.assertRaises(tableschema.exceptions.StorageError):535 storage.create('/delme', self.SIMPLE_DESCRIPTOR)536 with self.assertRaises(tableschema.exceptions.StorageError):537 storage.create('../../delme', self.SIMPLE_DESCRIPTOR)538# Helpers539def cast(resource, skip=[]):540 resource = deepcopy(resource)541 schema = tableschema.Schema(resource['schema'])542 for row in resource['data']:543 for index, field in enumerate(schema.fields):544 if field.type not in skip:545 row[index] = field.cast_value(row[index])...
tests.py
Source:tests.py
1# -*- coding: utf-8 -*-2# _3# /\ | |4# / \ _ __ ___| |__ ___ _ __ _ _5# / /\ \ | '__/ __| '_ \ / _ \ '__| | | |6# / ____ \| | | (__| | | | __/ | | |_| |7# /_/ \_\_| \___|_| |_|\___|_| \__, |8# __/ |9# |___/10# Copyright (C) 2017 Anand Tiwari11#12# Email: anandtiwarics@gmail.com13# Twitter: @anandtiwarics14#15# This file is part of ArcherySec Project.16import logging17import requests18from django.contrib import messages19from django.core.files.uploadedfile import SimpleUploadedFile20from django.test import Client, TestCase21from authentication.tests import UserCreationTest22from compliance.models import *23from networkscanners.models import *24from projects.models import *25from staticscanners.models import *26from webscanners.models import *27logging.disable(logging.CRITICAL)28class UploadTest(TestCase):29 fixtures = [30 "fixtures/default_user_roles.json",31 "fixtures/default_organization.json",32 ]33 auth_test = UserCreationTest()34 def setUp(self):35 """36 This is the class which runs at the start before running test case.37 This method updates password of admin user38 """39 # Creating Admin user40 UserProfile.objects.create_user(41 name=self.auth_test.admin.get("name"),42 email=self.auth_test.admin.get("email"),43 password=self.auth_test.admin.get("password"),44 role=1,45 organization=1,46 )47 # Creating analyst User48 UserProfile.objects.create_user(49 name=self.auth_test.analyst.get("name"),50 email=self.auth_test.analyst.get("email"),51 password=self.auth_test.analyst.get("password"),52 role=2,53 organization=1,54 )55 # Create viewer user56 UserProfile.objects.create_user(57 name=self.auth_test.viewer.get("name"),58 email=self.auth_test.viewer.get("email"),59 password=self.auth_test.viewer.get("password"),60 role=3,61 organization=1,62 )63 # Test user profile page64 def test_file_upload_page(self):65 client = Client()66 # from admin users67 client.login(68 username=self.auth_test.admin.get("email"),69 password=self.auth_test.admin.get("password"),70 )71 response = client.get("/report-upload/upload/")72 self.assertEqual(response.status_code, 200)73 self.assertTemplateUsed(response, "report_upload/upload.html")74 # from analyst users75 client.login(76 username=self.auth_test.analyst.get("email"),77 password=self.auth_test.analyst.get("password"),78 )79 response = client.get("/report-upload/upload/")80 self.assertEqual(response.status_code, 200)81 self.assertTemplateUsed(response, "report_upload/upload.html")82 # from viewers users83 client.login(84 username=self.auth_test.viewer.get("email"),85 password=self.auth_test.viewer.get("password"),86 )87 response = client.get("/report-upload/upload/")88 self.assertEqual(response.status_code, 403)89 def upload_report_file(90 self, scanner, file_path, error_message, test_file_path, redirect_to, file_type91 ):92 client = Client()93 # from admin users94 client.login(95 username=self.auth_test.admin.get("email"),96 password=self.auth_test.admin.get("password"),97 )98 # Create project99 client.post(100 "/projects/project_create/",101 data={"project_name": "name", "project_disc": "disc"},102 )103 project_id = (104 ProjectDb.objects.filter(project_name="name").values("uu_id").get()["uu_id"]105 )106 response = requests.get(file_path)107 file_name = ""108 if file_type == "xml":109 file_name = "test.xml"110 if file_type == "json":111 file_name = "test.json"112 file_n = SimpleUploadedFile(113 name=file_name,114 content=response.text.encode(),115 content_type="multipart/form-data",116 )117 data = {118 "scanner": scanner,119 "file": file_n,120 "target": "http://test.com",121 "project_id": str(project_id),122 }123 # upload one sample report124 response = client.post("/report-upload/upload/", data=data)125 self.assertEqual(response.status_code, 302)126 self.assertRedirects(response, redirect_to)127 # test from analyst users128 response = requests.get(file_path)129 file_name = ""130 if file_type == "xml":131 file_name = "test.xml"132 if file_type == "json":133 file_name = "test.json"134 file_n = SimpleUploadedFile(135 name=file_name,136 content=response.text.encode(),137 content_type="multipart/form-data",138 )139 data = {140 "scanner": scanner,141 "file": file_n,142 "target": "http://test.com",143 "project_id": str(project_id),144 }145 client.login(146 username=self.auth_test.analyst.get("email"),147 password=self.auth_test.analyst.get("password"),148 )149 response = client.post("/report-upload/upload/", data=data)150 self.assertEqual(response.status_code, 302)151 self.assertRedirects(response, redirect_to)152 # test from viewers users153 response = requests.get(file_path)154 file_n = SimpleUploadedFile(155 name=file_name,156 content=response.text.encode(),157 content_type="multipart/form-data",158 )159 data = {160 "scanner": scanner,161 "file": file_n,162 "target": "http://test.com",163 "project_id": str(project_id),164 }165 client.login(166 username=self.auth_test.viewer.get("email"),167 password=self.auth_test.viewer.get("password"),168 )169 response = client.post("/report-upload/upload/", data=data)170 self.assertEqual(response.status_code, 403)171 # test for json file172 client.login(173 username=self.auth_test.analyst.get("email"),174 password=self.auth_test.analyst.get("password"),175 )176 response = requests.get(test_file_path)177 if file_type == "xml":178 file_name = "test.json"179 if file_type == "json":180 file_name = "test.xml"181 file_n = SimpleUploadedFile(182 name=file_name,183 content=response.text.encode(),184 content_type="multipart/form-data",185 )186 data = {187 "scanner": scanner,188 "file": file_n,189 "target": "http://test.com",190 "project_id": str(project_id),191 }192 # upload one sample report193 response = client.post("/report-upload/upload/", data=data, follow=True)194 # get message from context and check that expected text is there195 message = list(response.context.get("messages"))[0]196 self.assertEqual(message.tags, "alert-danger")197 self.assertTrue(error_message in message.message)198 def test_upload_zap_report_files(self):199 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/OWASP-ZAP/OWASP-ZAP-v2.11.1.xml"200 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Bandit/bandit_report.json"201 scanner = "zap_scan"202 error_message = "ZAP Scanner Only XML file Support"203 redirect_to = "/webscanners/list_scans/"204 file_type = "xml"205 self.upload_report_file(206 scanner, file_path, error_message, test_file_path, redirect_to, file_type207 )208 def test_upload_burp_report_files(self):209 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Burp/Burp_Report.xml"210 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Bandit/bandit_report.json"211 scanner = "burp_scan"212 error_message = "Burp Scan Only XML file Support"213 redirect_to = "/webscanners/list_scans/"214 file_type = "xml"215 self.upload_report_file(216 scanner, file_path, error_message, test_file_path, redirect_to, file_type217 )218 def test_upload_arachni_report_files(self):219 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Arachni/Arachni_v1.3.xml"220 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Bandit/bandit_report.json"221 scanner = "arachni"222 error_message = "Arachni Only XML file Support"223 redirect_to = "/webscanners/list_scans/"224 file_type = "xml"225 self.upload_report_file(226 scanner, file_path, error_message, test_file_path, redirect_to, file_type227 )228 def test_upload_netsparker_report_files(self):229 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Netsparker/Netsparker_report.xml"230 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Bandit/bandit_report.json"231 scanner = "netsparker"232 error_message = "Netsparker Only XML file Support"233 redirect_to = "/webscanners/list_scans/"234 file_type = "xml"235 self.upload_report_file(236 scanner, file_path, error_message, test_file_path, redirect_to, file_type237 )238 def test_upload_webinspect_report_files(self):239 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Webinspect/Webinspect_v18.20.xml"240 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Bandit/bandit_report.json"241 scanner = "webinspect"242 error_message = "Webinspect Only XML file Support"243 redirect_to = "/webscanners/list_scans/"244 file_type = "xml"245 self.upload_report_file(246 scanner, file_path, error_message, test_file_path, redirect_to, file_type247 )248 def test_upload_acunetix_report_files(self):249 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Acunetix/Acunetix_report_sample.xml"250 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Bandit/bandit_report.json"251 scanner = "acunetix"252 error_message = "Acunetix Only XML file Support"253 redirect_to = "/webscanners/list_scans/"254 file_type = "xml"255 self.upload_report_file(256 scanner, file_path, error_message, test_file_path, redirect_to, file_type257 )258 def test_upload_dependencycheck_report_files(self):259 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Dependency-check/dependency-check-report_v5.2.1.xml"260 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Bandit/bandit_report.json"261 scanner = "dependencycheck"262 error_message = "Dependencycheck Only XML file Support"263 redirect_to = "/staticscanners/list_scans/"264 file_type = "xml"265 self.upload_report_file(266 scanner, file_path, error_message, test_file_path, redirect_to, file_type267 )268 def test_upload_checkmarx_report_files(self):269 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Checkmarx/Checkmarx_v8.9.0.210.xml"270 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Bandit/bandit_report.json"271 scanner = "checkmarx"272 error_message = "Checkmarx Only XML file Support"273 redirect_to = "/staticscanners/list_scans/"274 file_type = "xml"275 self.upload_report_file(276 scanner, file_path, error_message, test_file_path, redirect_to, file_type277 )278 def test_upload_findbugs_report_files(self):279 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"280 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Bandit/bandit_report.json"281 scanner = "findbugs"282 error_message = "Findbugs Only XML file Support"283 redirect_to = "/staticscanners/list_scans/"284 file_type = "xml"285 self.upload_report_file(286 scanner, file_path, error_message, test_file_path, redirect_to, file_type287 )288 def test_upload_bandit_scan_report_files(self):289 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Bandit/bandit_report.json"290 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"291 scanner = "bandit_scan"292 error_message = "Bandit Only JSON file Supported"293 redirect_to = "/staticscanners/list_scans/"294 file_type = "json"295 self.upload_report_file(296 scanner, file_path, error_message, test_file_path, redirect_to, file_type297 )298 def test_upload_clair_scan_report_files(self):299 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Clair/clair_output.json"300 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"301 scanner = "clair_scan"302 error_message = "Clair Only JSON file Supported"303 redirect_to = "/staticscanners/list_scans/"304 file_type = "json"305 self.upload_report_file(306 scanner, file_path, error_message, test_file_path, redirect_to, file_type307 )308 def test_upload_trivy_scan_report_files(self):309 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Trivy/trivy_results.json"310 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"311 scanner = "trivy_scan"312 error_message = "Trivy Only JSON file Supported"313 redirect_to = "/staticscanners/list_scans/"314 file_type = "json"315 self.upload_report_file(316 scanner, file_path, error_message, test_file_path, redirect_to, file_type317 )318 def test_upload_npmaudit_scan_report_files(self):319 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Npmaudit/npm_audit_report.json"320 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"321 scanner = "npmaudit_scan"322 error_message = "NPM Audit Only JSON file Supported"323 redirect_to = "/staticscanners/list_scans/"324 file_type = "json"325 self.upload_report_file(326 scanner, file_path, error_message, test_file_path, redirect_to, file_type327 )328 def test_upload_nodejsscan_scan_report_files(self):329 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Nodejsscan/nodejsscan_report.json"330 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"331 scanner = "nodejsscan_scan"332 error_message = "Nodejs scan Only JSON file Supported"333 redirect_to = "/staticscanners/list_scans/"334 file_type = "json"335 self.upload_report_file(336 scanner, file_path, error_message, test_file_path, redirect_to, file_type337 )338 def test_upload_semgrepscan_scan_report_files(self):339 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Semgrep/semgrep-WebGoat.json"340 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"341 scanner = "semgrepscan_scan"342 error_message = "Semgrep scan Only JSON file Supported"343 redirect_to = "/staticscanners/list_scans/"344 file_type = "json"345 self.upload_report_file(346 scanner, file_path, error_message, test_file_path, redirect_to, file_type347 )348 def test_upload_tfsec_scan_report_files(self):349 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/tfsec/tfsec_report.json"350 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"351 scanner = "tfsec_scan"352 error_message = "Tfsec Only JSON file Supported"353 redirect_to = "/staticscanners/list_scans/"354 file_type = "json"355 self.upload_report_file(356 scanner, file_path, error_message, test_file_path, redirect_to, file_type357 )358 def test_upload_whitesource_scan_report_files(self):359 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Whitesource/whitesource-report.json"360 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"361 scanner = "whitesource_scan"362 error_message = "Whitesource Only JSON file Supported"363 redirect_to = "/staticscanners/list_scans/"364 file_type = "json"365 self.upload_report_file(366 scanner, file_path, error_message, test_file_path, redirect_to, file_type367 )368 def test_upload_inspec_scan_report_files(self):369 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Inspec/inspec_report.json"370 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"371 scanner = "inspec_scan"372 error_message = "Inspec Only JSON file Supported"373 redirect_to = "/inspec/inspec_list/"374 file_type = "json"375 self.upload_report_file(376 scanner, file_path, error_message, test_file_path, redirect_to, file_type377 )378 def test_upload_dockle_scan_report_files(self):379 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Dockle/dockle_report.json"380 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"381 scanner = "dockle_scan"382 error_message = "Dockle Only JSON file Supported"383 redirect_to = "/dockle/dockle_list/"384 file_type = "json"385 self.upload_report_file(386 scanner, file_path, error_message, test_file_path, redirect_to, file_type387 )388 def test_upload_gitlabsast_scan_report_files(self):389 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Gitlab/gl-sast-report.json"390 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"391 scanner = "gitlabsast_scan"392 error_message = "Gitlabsast Only JSON file Supported"393 redirect_to = "/staticscanners/list_scans/"394 file_type = "json"395 self.upload_report_file(396 scanner, file_path, error_message, test_file_path, redirect_to, file_type397 )398 def test_upload_gitlabcontainerscan_scan_report_files(self):399 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Gitlab/gl-container-scanning-report.json"400 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"401 scanner = "gitlabcontainerscan_scan"402 error_message = "Gitlabcontainerscan Only JSON file Supported"403 redirect_to = "/staticscanners/list_scans/"404 file_type = "json"405 self.upload_report_file(406 scanner, file_path, error_message, test_file_path, redirect_to, file_type407 )408 def test_upload_gitlabsca_scan_report_files(self):409 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Gitlab/gl-dependency-scanning-report.json"410 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"411 scanner = "gitlabsca_scan"412 error_message = "Gitlabsca Only JSON file Supported"413 redirect_to = "/staticscanners/list_scans/"414 file_type = "json"415 self.upload_report_file(416 scanner, file_path, error_message, test_file_path, redirect_to, file_type417 )418 def test_upload_twistlock_scan_report_files(self):419 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Twistlock/twistlock.json"420 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"421 scanner = "twistlock_scan"422 error_message = "Twistlock Only JSON file Supported"423 redirect_to = "/staticscanners/list_scans/"424 file_type = "json"425 self.upload_report_file(426 scanner, file_path, error_message, test_file_path, redirect_to, file_type427 )428 def test_upload_brakeman_scan_report_files(self):429 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Brakeman/brakeman_output.json"430 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Findbugs/findbugs_report_v3.1.5.xml"431 scanner = "brakeman_scan"432 error_message = "Brakeman Only JSON file Supported"433 redirect_to = "/staticscanners/list_scans/"434 file_type = "json"435 self.upload_report_file(436 scanner, file_path, error_message, test_file_path, redirect_to, file_type437 )438 def test_upload_openvas_report_files(self):439 file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Openvas/openvas.xml"440 test_file_path = "https://raw.githubusercontent.com/archerysec/report-sample/main/Brakeman/brakeman_output.json"441 scanner = "openvas"442 error_message = "Openvas Only XML file Supported"443 redirect_to = "/networkscanners/list_scans/"444 file_type = "xml"445 self.upload_report_file(446 scanner, file_path, error_message, test_file_path, redirect_to, file_type...
classifier.py
Source:classifier.py
1#coding:utf-82import os, config3from sklearn.datasets import load_svmlight_file4from sklearn import svm, tree5from sklearn.ensemble import AdaBoostClassifier6from sklearn.ensemble import RandomForestClassifier7from sklearn.ensemble import VotingClassifier8from sklearn.linear_model import LogisticRegression9from sklearn.naive_bayes import GaussianNB10from sklearn.neighbors import KNeighborsClassifier11#æ½è±¡ç±» ææçåç±»å¨é½ä»¥æ¤ç±»ä¸ºç¶ç±»è¿è¡æ´¾ç12class Strategy(object):13 def train_model(self, train_file_path, model_path):14 return None15 def test_model(self, test_file_path, model_path, result_file_path):16 return None17#主åç±»å¨ ç¨äºè°ç¨å个åç±»å¨18class Classifier(object):19 def __init__(self, strategy):20 self.strategy = strategy21 def train_model(self, train_file_path, model_path):22 self.strategy.train_model(train_file_path, model_path)23 def test_model(self, test_file_path, model_path, result_file_path):24 self.strategy.test_model(test_file_path, model_path, result_file_path)25''' skLearn '''26'''27æ¯ä¸ªæ¨¡åé½æ两个类æåå两个æåå½æ°28trainer : åç±»å¨å称,å¨ä¸»å½æ°è¿è¡æ¶ç¨äºæå°æ示å符29clf : å³classifier,åç±»å¨å¯¹è±¡30train_model : è®ç»æ¨¡åå½æ° 31test_model: æµè¯æ¨¡åå½æ°32ææåç±»å¨çå½æ°æµç¨å¤§åå°å¼33å¨è¿éåç»ä¸è§£é:34train_model:35(1)éè¿load_svmlight_fileå è½½ç¹å¾æ件36(2)è°ç¨sklearnçåºè¿è¡æ¨¡åè®ç»37test_model:38(1)éè¿load_svmlight_fileå è½½ç¹å¾æ件39(2)è°ç¨sklearnçåºè¿è¡æ¨¡åé¢æµ40(3)å°é¢æµç»æåå
¥ç»ææ件å½ä¸41'''42#模å1: å³çæ 43class skLearn_DecisionTree(Strategy):44 def __init__(self):45 self.trainer = "skLearn decisionTree"46 self.clf = tree.DecisionTreeClassifier()47 print("Using %s Classifier" % (self.trainer))48 def train_model(self, train_file_path, model_path):49 train_X, train_y = load_svmlight_file(train_file_path)50 print("==> Train the model ...")51 self.clf.fit(train_X, train_y)52 def test_model(self, test_file_path, model_path, result_file_path):53 print("==> Test the model ...")54 test_X, test_y = load_svmlight_file(test_file_path)55 pred_y = self.clf.predict(test_X)56 # write prediction to file57 with open(result_file_path, 'w') as fout:58 fout.write("\n".join(map(str, map(int, pred_y))))59#模å2: æ´ç´ è´å¶æ¯60class skLearn_NaiveBayes(Strategy):61 def __init__(self):62 self.trainer = "skLearn NaiveBayes"63 self.clf = GaussianNB()64 print("Using %s Classifier" % (self.trainer))65 def train_model(self, train_file_path, model_path):66 train_X, train_y = load_svmlight_file(train_file_path)67 train_X = train_X.toarray()68 print("==> Train the model ...")69 self.clf.fit(train_X, train_y)70 def test_model(self, test_file_path, model_path, result_file_path):71 print("==> Test the model ...")72 test_X, test_y = load_svmlight_file(test_file_path)73 test_X = test_X.toarray()74 pred_y = self.clf.predict(test_X)75 # write prediction to file76 with open(result_file_path, 'w') as fout:77 fout.write("\n".join(map(str, map(int, pred_y))))78#模å3: æ¯æåéæº79class skLearn_svm(Strategy):80 def __init__(self):81 self.trainer = "skLearn svm"82 self.clf = svm.LinearSVC()83 print("Using %s Classifier" % (self.trainer))84 def train_model(self, train_file_path, model_path):85 train_X, train_y = load_svmlight_file(train_file_path)86 print("==> Train the model ...")87 self.clf.fit(train_X, train_y)88 def test_model(self, test_file_path, model_path, result_file_path):89 print("==> Test the model ...")90 test_X, test_y = load_svmlight_file(test_file_path)91 pred_y = self.clf.predict(test_X)92 # write prediction to file93 with open(result_file_path, 'w') as fout:94 fout.write("\n".join(map(str, map(int, pred_y))))95#模å4: é»è¾åå½96class skLearn_lr(Strategy):97 def __init__(self):98 self.trainer = "skLearn LogisticRegression"99 self.clf = LogisticRegression()100 print("Using %s Classifier" % (self.trainer))101 def train_model(self, train_file_path, model_path):102 train_X, train_y = load_svmlight_file(train_file_path)103 print("==> Train the model ...")104 self.clf.fit(train_X, train_y)105 def test_model(self, test_file_path, model_path, result_file_path):106 print("==> Test the model ...")107 test_X, test_y = load_svmlight_file(test_file_path)108 pred_y = self.clf.predict(test_X)109 # write prediction to file110 with open(result_file_path, 'w') as fout:111 fout.write("\n".join(map(str, map(int, pred_y))))112#模å5: kè¿é»ç®æ³(default k = 3)113class skLearn_KNN(Strategy):114 def __init__(self):115 self.trainer = "skLearn KNN"116 self.clf = KNeighborsClassifier(n_neighbors=10)117 print("Using %s Classifier" % (self.trainer))118 def train_model(self, train_file_path, model_path):119 train_X, train_y = load_svmlight_file(train_file_path)120 print("==> Train the model ...")121 self.clf.fit(train_X, train_y)122 def test_model(self, test_file_path, model_path, result_file_path):123 print("==> Test the model ...")124 test_X, test_y = load_svmlight_file(test_file_path)125 pred_y = self.clf.predict(test_X)126 # write prediction to file127 with open(result_file_path, 'w') as fout:128 fout.write("\n".join(map(str, map(int, pred_y))))129#模å6: éæå¦ä¹ 130class skLearn_AdaBoostClassifier(Strategy):131 def __init__(self):132 self.trainer = "skLearn AdaBoostClassifier"133 self.clf = AdaBoostClassifier()134 print("Using %s Classifier" % (self.trainer))135 def train_model(self, train_file_path, model_path):136 train_X, train_y = load_svmlight_file(train_file_path)137 print("==> Train the model ...")138 self.clf.fit(train_X, train_y)139 def test_model(self, test_file_path, model_path, result_file_path):140 print("==> Test the model ...")141 test_X, test_y = load_svmlight_file(test_file_path)142 pred_y = self.clf.predict(test_X)143 # write prediction to file144 with open(result_file_path, 'w') as fout:145 fout.write("\n".join(map(str, map(int, pred_y))))146#模å7: éæºæ£®æåç±»147class sklearn_RandomForestClassifier(Strategy):148 def __init__(self):149 self.trainer = "skLearn RandomForestClassifier"150 self.clf = RandomForestClassifier()151 print("Using %s Classifier" % (self.trainer))152 def train_model(self, train_file_path, model_path):153 train_X, train_y = load_svmlight_file(train_file_path)154 print("==> Train the model ...")155 self.clf.fit(train_X, train_y)156 def test_model(self, test_file_path, model_path, result_file_path):157 print("==> Test the model ...")158 test_X, test_y = load_svmlight_file(test_file_path)159 pred_y = self.clf.predict(test_X)160 # write prediction to file161 with open(result_file_path, 'w') as fout:162 fout.write("\n".join(map(str, map(int, pred_y))))163#模å8: æ票åç±»(hardâå°æ°æä»å¤æ°)164class sklearn_VotingClassifier(Strategy):165 def __init__(self):166 self.trainer = "skLearn VotingClassifier"167 clf1 = tree.DecisionTreeClassifier()168 clf2 = GaussianNB()169 clf3 = LogisticRegression()170 clf4 = svm.LinearSVC()171 clf5 = KNeighborsClassifier(n_neighbors=3)172 clf6 = AdaBoostClassifier()173 clf7 = RandomForestClassifier()174 self.clf = VotingClassifier(estimators=[('dtc',clf1), ('lr', clf3), ('svm', clf4), ('knc',clf5), ('ada', clf6), ('rfc',clf7)], voting='hard')175 print("Using %s Classifier" % (self.trainer))176 def train_model(self, train_file_path, model_path):177 train_X, train_y = load_svmlight_file(train_file_path)178 print("==> Train the model ...")179 self.clf.fit(train_X, train_y)180 def test_model(self, test_file_path, model_path, result_file_path):181 print("==> Test the model ...")182 test_X, test_y = load_svmlight_file(test_file_path)183 pred_y = self.clf.predict(test_X)184 # write prediction to file185 with open(result_file_path, 'w') as fout:186 fout.write("\n".join(map(str, map(int, pred_y))))187if __name__ == "__main__":...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!