How to use get_container_info method in avocado

Best Python code snippet using avocado_python

test_middleware.py

Source:test_middleware.py Github

copy

Full Screen

1import unittest2from mock import patch3from swift.common import swob4from data_locker import middleware as md5class FakeApp(object):6 def __call__(self, env, start_response):7 return swob.Response('Fake Test App')(env, start_response)8class DataLockerTestCase(unittest.TestCase):9 def setUp(self):10 self.app = md.DataLocker(FakeApp(), {})11 @patch('data_locker.middleware.DataLocker._get_req_lockers')12 def test_get_request(self, mock):13 """ GET never get blocked and return immediatly """14 response = swob.Request.blank(15 '/v1/a/c/o',16 environ={'REQUEST_METHOD': 'GET'}17 ).get_response(self.app)18 self.assertEqual(response.status, '200 OK')19 mock.assert_not_called()20 @patch('data_locker.middleware.DataLocker._get_req_lockers')21 def test_head_request(self, mock):22 """ HEAD never get blocked and return immediatly """23 response = swob.Request.blank(24 '/v1/a/c/o',25 environ={'REQUEST_METHOD': 'HEAD'}26 ).get_response(self.app)27 self.assertEqual(response.status, '200 OK')28 mock.assert_not_called()29 @patch('data_locker.middleware.DataLocker._get_req_lockers')30 def test_options_request(self, mock):31 """ OPTIONS never get blocked and return immediatly """32 response = swob.Request.blank(33 '/v1/a/c/o',34 environ={'REQUEST_METHOD': 'OPTIONS'}35 ).get_response(self.app)36 self.assertEqual(response.status, '200 OK')37 mock.assert_not_called()38 @patch('data_locker.middleware.get_container_info')39 def test_invalid_header_doesnt_break(self, mock):40 mock.return_value = {41 'meta': {42 md.META_DATA_LOCKER: 'invalid'43 }44 }45 response = swob.Request.blank(46 '/v1/a/c/o',47 environ={'REQUEST_METHOD': 'GET'}48 ).get_response(self.app)49 self.assertEqual(response.status, '200 OK')50 @patch('data_locker.middleware.get_container_info')51 def test_container_request_never_blocked(self, mock):52 mock.return_value = {53 'meta': {54 md.META_DATA_LOCKER: 'delete, create'55 }56 }57 response = swob.Request.blank(58 '/v1/a/c',59 environ={'REQUEST_METHOD': 'DELETE'}60 ).get_response(self.app)61 self.assertEqual(response.status, '200 OK')62 response = swob.Request.blank(63 '/v1/a/c',64 environ={'REQUEST_METHOD': 'PUT'}65 ).get_response(self.app)66 self.assertEqual(response.status, '200 OK')67 response = swob.Request.blank(68 '/v1/a/c',69 environ={'REQUEST_METHOD': 'POST'}70 ).get_response(self.app)71 self.assertEqual(response.status, '200 OK')72 @patch('data_locker.middleware.get_account_info')73 def test_account_request_never_blocked(self, mock):74 mock.return_value = {75 'meta': {76 md.META_DATA_LOCKER: 'delete, create'77 }78 }79 response = swob.Request.blank(80 '/v1/a/c',81 environ={'REQUEST_METHOD': 'DELETE'}82 ).get_response(self.app)83 self.assertEqual(response.status, '200 OK')84 response = swob.Request.blank(85 '/v1/a/c',86 environ={'REQUEST_METHOD': 'PUT'}87 ).get_response(self.app)88 self.assertEqual(response.status, '200 OK')89 response = swob.Request.blank(90 '/v1/a/c',91 environ={'REQUEST_METHOD': 'POST'}92 ).get_response(self.app)93 self.assertEqual(response.status, '200 OK')94 def test_unlocked_delete_request(self):95 response = swob.Request.blank(96 '/v1/a/c/o',97 environ={'REQUEST_METHOD': 'DELETE'}98 ).get_response(self.app)99 self.assertEqual(response.status, '200 OK')100 @patch('data_locker.middleware.get_container_info')101 def test_delete_req_with_container_locked(self, mock):102 mock.return_value = {103 'meta': {104 md.META_DATA_LOCKER: 'delete'105 }106 }107 response = swob.Request.blank(108 '/v1/a/c/o',109 environ={'REQUEST_METHOD': 'DELETE'}110 ).get_response(self.app)111 self.assertEqual(response.status, '403 Forbidden')112 @patch('data_locker.middleware.get_account_info')113 def test_delete_req_with_account_locked(self, mock):114 mock.return_value = {115 'meta': {116 md.META_DATA_LOCKER: 'delete'117 }118 }119 response = swob.Request.blank(120 '/v1/a/c/o',121 environ={'REQUEST_METHOD': 'DELETE'}122 ).get_response(self.app)123 self.assertEqual(response.status, '403 Forbidden')124 @patch('data_locker.middleware.get_container_info')125 def test_put_req_with_container_locked_for_delete(self, mock):126 """127 Test PUT request in a container locked for delete. It should work128 properly129 """130 mock.return_value = {131 'meta': {132 md.META_DATA_LOCKER: 'delete'133 }134 }135 response = swob.Request.blank(136 '/v1/a/c/o',137 environ={'REQUEST_METHOD': 'PUT'}138 ).get_response(self.app)139 self.assertEqual(response.status, '200 OK')140 @patch('data_locker.middleware.get_account_info')141 def test_put_req_with_account_locked_for_delete(self, mock):142 """143 Test PUT request in a account locked for delete. It should work144 properly145 """146 mock.return_value = {147 'meta': {148 md.META_DATA_LOCKER: 'delete'149 }150 }151 response = swob.Request.blank(152 '/v1/a/c/o',153 environ={'REQUEST_METHOD': 'PUT'}154 ).get_response(self.app)155 self.assertEqual(response.status, '200 OK')156 @patch('data_locker.middleware.get_container_info')157 def test_put_post_req_with_container_locked(self, mock):158 """ Test PUT request in a container locked. It should be forbidden """159 mock.return_value = {160 'meta': {161 md.META_DATA_LOCKER: 'create'162 }163 }164 response = swob.Request.blank(165 '/v1/a/c/o',166 environ={'REQUEST_METHOD': 'PUT'}167 ).get_response(self.app)168 self.assertEqual(response.status, '403 Forbidden')169 response = swob.Request.blank(170 '/v1/a/c/o',171 environ={'REQUEST_METHOD': 'POST'}172 ).get_response(self.app)173 self.assertEqual(response.status, '403 Forbidden')174 @patch('data_locker.middleware.get_account_info')175 def test_put_post_req_with_account_locked(self, mock):176 """ Test PUT request in a account locked. It should be forbidden """177 mock.return_value = {178 'meta': {179 md.META_DATA_LOCKER: 'create'180 }181 }182 response = swob.Request.blank(183 '/v1/a/c/o',184 environ={'REQUEST_METHOD': 'PUT'}185 ).get_response(self.app)186 self.assertEqual(response.status, '403 Forbidden')187 response = swob.Request.blank(188 '/v1/a/c/o',189 environ={'REQUEST_METHOD': 'POST'}190 ).get_response(self.app)191 self.assertEqual(response.status, '403 Forbidden')192class GetLockersTestCase(unittest.TestCase):193 """ Testacase for the _get_req_lockers method """194 def setUp(self):195 self.app = md.DataLocker(FakeApp(), {})196 @patch('data_locker.middleware.get_container_info')197 @patch('data_locker.middleware.get_account_info')198 def test_no_lockers_set(self, mock_acc, mock_con):199 req = swob.Request.blank('/v1/a/c/o')200 lockers = self.app._get_req_lockers(req)201 self.assertEqual(lockers, [])202 @patch('data_locker.middleware.get_container_info')203 @patch('data_locker.middleware.get_account_info')204 def test_delete_on_container(self, mock_acc, mock_con):205 """ Delete set on container """206 mock_con.return_value = {207 'meta': {208 md.META_DATA_LOCKER: 'delete'209 }210 }211 req = swob.Request.blank('/v1/a/c/o')212 computed = self.app._get_req_lockers(req)213 expected = md.METHODS['delete']214 self.assertEqual(computed, expected)215 @patch('data_locker.middleware.get_container_info')216 @patch('data_locker.middleware.get_account_info')217 def test_delete_on_account(self, mock_acc, mock_con):218 """ Delete set on account """219 mock_acc.return_value = {220 'meta': {221 md.META_DATA_LOCKER: 'delete'222 }223 }224 req = swob.Request.blank('/v1/a/c/o')225 computed = self.app._get_req_lockers(req)226 expected = md.METHODS['delete']227 self.assertEqual(computed, expected)228 @patch('data_locker.middleware.get_container_info')229 @patch('data_locker.middleware.get_account_info')230 def test_delete_on_both(self, mock_acc, mock_con):231 """ Delete set on account and container """232 mock_acc.return_value = {233 'meta': {234 md.META_DATA_LOCKER: 'delete'235 }236 }237 mock_con.return_value = {238 'meta': {239 md.META_DATA_LOCKER: 'delete'240 }241 }242 req = swob.Request.blank('/v1/a/c/o')243 computed = self.app._get_req_lockers(req)244 expected = md.METHODS['delete']245 self.assertEqual(computed, expected)246 @patch('data_locker.middleware.get_container_info')247 @patch('data_locker.middleware.get_account_info')248 def test_delete_on_account_create_on_container(self, mock_acc, mock_con):249 """ Delete set on account, create on container """250 mock_acc.return_value = {251 'meta': {252 md.META_DATA_LOCKER: 'delete'253 }254 }255 mock_con.return_value = {256 'meta': {257 md.META_DATA_LOCKER: 'create'258 }259 }260 req = swob.Request.blank('/v1/a/c/o')261 computed = self.app._get_req_lockers(req)262 expected = md.METHODS['delete'] + md.METHODS['create']263 self.assertEqual(sorted(computed), sorted(expected))264 @patch('data_locker.middleware.get_container_info')265 @patch('data_locker.middleware.get_account_info')266 def test_create_on_account_delete_on_container(self, mock_acc, mock_con):267 """ Delete set on account, create on container """268 mock_acc.return_value = {269 'meta': {270 md.META_DATA_LOCKER: 'create'271 }272 }273 mock_con.return_value = {274 'meta': {275 md.META_DATA_LOCKER: 'delete'276 }277 }278 req = swob.Request.blank('/v1/a/c/o')279 computed = self.app._get_req_lockers(req)280 expected = md.METHODS['delete'] + md.METHODS['create']281 self.assertEqual(sorted(computed), sorted(expected))282 @patch('data_locker.middleware.get_container_info')283 @patch('data_locker.middleware.get_account_info')284 def test_create_on_both(self, mock_acc, mock_con):285 """ Delete set on account and container """286 mock_acc.return_value = {287 'meta': {288 md.META_DATA_LOCKER: 'create'289 }290 }291 mock_con.return_value = {292 'meta': {293 md.META_DATA_LOCKER: 'create'294 }295 }296 req = swob.Request.blank('/v1/a/c/o')297 computed = self.app._get_req_lockers(req)298 expected = md.METHODS['create']299 self.assertEqual(sorted(computed), sorted(expected))300 @patch('data_locker.middleware.get_container_info')301 @patch('data_locker.middleware.get_account_info')302 def test_create_on_container(self, mock_acc, mock_con):303 """ Delete set on container """304 mock_con.return_value = {305 'meta': {306 md.META_DATA_LOCKER: 'create'307 }308 }309 req = swob.Request.blank('/v1/a/c/o')310 computed = self.app._get_req_lockers(req)311 expected = md.METHODS['create']312 self.assertEqual(sorted(computed), sorted(expected))313 @patch('data_locker.middleware.get_container_info')314 @patch('data_locker.middleware.get_account_info')315 def test_create_on_account(self, mock_acc, mock_con):316 """ Delete set on container """317 mock_acc.return_value = {318 'meta': {319 md.META_DATA_LOCKER: 'create'320 }321 }322 req = swob.Request.blank('/v1/a/c/o')323 computed = self.app._get_req_lockers(req)324 expected = md.METHODS['create']325 self.assertEqual(sorted(computed), sorted(expected))326 @patch('data_locker.middleware.get_container_info')327 @patch('data_locker.middleware.get_account_info')328 def test_create_and_delete_on_container(self, mock_acc, mock_con):329 """ Delete set on container """330 mock_con.return_value = {331 'meta': {332 md.META_DATA_LOCKER: 'create, delete'333 }334 }335 req = swob.Request.blank('/v1/a/c/o')336 computed = self.app._get_req_lockers(req)337 expected = md.METHODS['delete'] + md.METHODS['create']338 self.assertEqual(sorted(computed), sorted(expected))339 @patch('data_locker.middleware.get_container_info')340 @patch('data_locker.middleware.get_account_info')341 def test_create_and_delete_on_account(self, mock_acc, mock_con):342 """ Delete set on container """343 mock_acc.return_value = {344 'meta': {345 md.META_DATA_LOCKER: 'delete, create'346 }347 }348 req = swob.Request.blank('/v1/a/c/o')349 computed = self.app._get_req_lockers(req)350 expected = md.METHODS['create'] + md.METHODS['delete']351 self.assertEqual(sorted(computed), sorted(expected))352if __name__ == '__main__':...

Full Screen

Full Screen

test_watchdog.py

Source:test_watchdog.py Github

copy

Full Screen

...50 mock.patch('watchdog.get_config', get_config):51 watchdog.send_email('test', None)5253 def test_initial(self):54 def get_container_info():55 started_at = self.base_date56 running = True57 return started_at, running5859 def load_state():60 return {}6162 def save_state(state):63 self.assertEqual(state['status'], 'running')64 self.assertEqual(state['started_at'], self.base_date)6566 def _send(msg):67 raise AssertionError("This routine should not be running")6869 with mock.patch('watchdog.get_container_info', get_container_info), \70 mock.patch('watchdog.load_state', load_state), \71 mock.patch('watchdog.save_state', save_state), \72 mock.patch('watchdog.get_config', get_config), \73 mock.patch('watchdog._send', _send):74 watchdog.check_docker()7576 def test_running_running(self):77 def get_container_info():78 started_at = self.base_date79 running = True80 return started_at, running8182 def load_state():83 return {84 'status': 'running',85 'started_at': self.base_date86 }8788 def save_state(state):89 self.assertEqual(state['status'], 'running')90 self.assertEqual(91 state['started_at'], self.base_date)9293 def _send(msg):94 raise AssertionError("This routine should not be running")9596 with mock.patch('watchdog.get_container_info', get_container_info), \97 mock.patch('watchdog.load_state', load_state), \98 mock.patch('watchdog.save_state', save_state), \99 mock.patch('watchdog.get_config', get_config), \100 mock.patch('watchdog._send', _send):101 watchdog.check_docker()102103 def test_running_crashed(self):104 def gethostname():105 return "Foo"106107 def get_container_info():108 started_at = self.base_date + datetime.timedelta(days=1)109 started_at = started_at.replace(tzinfo = pytz.utc)110 running = True111 return started_at, running112113 def load_state():114 return {115 'status': 'running',116 'started_at': self.base_date117 }118119 def save_state(state):120 self.assertEqual(state['status'], 'crashed')121 self.assertEqual(122 state['started_at'], self.base_date + datetime.timedelta(days=1))123124 def _send(msg):125 self.assertEqual(msg['Subject'], "CRASH message")126 self.assertEqual(msg.get_payload(), "Server Foo crashed")127128 with mock.patch('watchdog.get_container_info', get_container_info), \129 mock.patch('watchdog.load_state', load_state), \130 mock.patch('watchdog.save_state', save_state), \131 mock.patch('watchdog.get_config', get_config), \132 mock.patch('watchdog._send', _send), \133 mock.patch('socket.gethostname', gethostname):134 watchdog.check_docker()135136 def test_crashed_crashed_starting_up(self):137 def get_container_info():138 started_at = self.base_date + datetime.timedelta(seconds=10)139 running = True140 return started_at, running141142 def load_state():143 return {144 'status': 'crashed',145 'started_at': self.base_date146 }147148 def save_state(state):149 self.assertEqual(state['status'], 'crashed')150 self.assertEqual(state['started_at'], self.base_date)151152153 def get_utcnow():154 return self.base_date + datetime.timedelta(seconds=10)155156 def _send(msg):157 raise AssertionError("This routine should not be running")158159 with mock.patch('watchdog.get_container_info', get_container_info), \160 mock.patch('watchdog.load_state', load_state), \161 mock.patch('watchdog.save_state', save_state), \162 mock.patch('watchdog.get_config', get_config), \163 mock.patch('watchdog.get_utcnow', get_utcnow), \164 mock.patch('watchdog._send', _send):165 watchdog.check_docker()166167 def test_crashed_crashed_still_crashing(self):168 def get_container_info():169 started_at = self.base_date + datetime.timedelta(days=1)170 running = False171 return started_at, running172173 def load_state():174 return {175 'status': 'crashed',176 'started_at': self.base_date177 }178179 def get_utcnow():180 return self.base_date + datetime.timedelta(seconds=10)181182 def save_state(state):183 self.assertEqual(state['status'], 'crashed')184 self.assertEqual(state['started_at'], self.base_date)185186 def _send(msg):187 raise AssertionError("This routine should not be running")188189 with mock.patch('watchdog.get_container_info', get_container_info), \190 mock.patch('watchdog.load_state', load_state), \191 mock.patch('watchdog.save_state', save_state), \192 mock.patch('watchdog.get_config', get_config), \193 mock.patch('watchdog.get_utcnow', get_utcnow), \194 mock.patch('watchdog._send', _send):195 watchdog.check_docker()196197 def test_crashed_running(self):198 def gethostname():199 return "Foo"200201 def get_container_info():202 started_at = self.base_date + datetime.timedelta(minutes=2)203 running = True204 return started_at, running205206 def load_state():207 return {208 'status': 'crashed',209 'started_at': self.base_date210 }211212 def get_utcnow():213 return self.base_date + datetime.timedelta(seconds=130)214215 def save_state(state): ...

Full Screen

Full Screen

monitor.py

Source:monitor.py Github

copy

Full Screen

...4import types5import json6container = {"install_registry_1", "install_registry_collector_auth_1", "install_logstash_1", "install_elasticsearch_1"}7registry_ui = "install_registryui_1"8def get_container_info(container):9 msg = commands.getoutput('docker inspect '+container)10 data = json.loads(msg)11 return data[0]12def get_container_status(data):13 return data['State']['Running']14def stop_container_ui(container_ui_name):15 return commands.getoutput('docker stop '+container_ui_name)16def start_container_ui(container_ui_name):17 return commands.getoutput('docker start '+container_ui_name)18if get_container_status(get_container_info(registry_ui)):19 for i in container:20 if not get_container_status(get_container_info(i)):...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run avocado automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful