Best Python code snippet using slash
ftp.py
Source:ftp.py
1from . import constants2from . import errors3from . import decorators4import ftplib5import os6import os.path7import socket8"""9Imports:10 .constants11 .errors12 .decorators13 ftplib14 os15 os.path16 socket17Contains:18 <Ftp>19"""20class Ftp(object):21 """22 Indirect wrapper for ftplib.FTP (https://docs.python.org/3/library/ftplib.html#ftplib.FTP)23 """24 def __init__ \25 (26 self,27 host,28 port = constants.DEFAULT_PORT,29 username = constants.DEFAULT_USERNAME,30 password = constants.DEFAULT_PASSWORD,31 ):32 """33 Initialises self34 Initialises class attributes35 Connects, and logs into the ftp session (https://docs.python.org/3/library/ftplib.html#module-ftplib)36 Default username: constants.DEFAULT_USERNAME37 Default password: constants.DEFAULT_PASSWORD38 """39 self._init_attrs()40 self.FTP = ftplib.FTP()41 try:42 self.FTP.connect(host, port, timeout=constants.TIMEOUT)43 except (TimeoutError, socket.timeout) as error_timeout:44 raise errors.TimeoutError \45 (46 'A connection attempt failed because the connected '47 'party did not properly respond after a period of '48 'time, or established connection failed because '49 'connected host has failed to respond'50 )51 except ConnectionRefusedError as error_refused:52 raise errors.ConnectionRefusedError \53 (54 'No connection could be made because the target '55 'machine actively refused it'56 )57 self.FTP.login(username, password)58 self.host = host59 self.username = username60 self.password = password61 self.port = port62 self.login()63 def __repr__(self):64 """65 Returns a string representation of the object66 ... in the form <class_name(username:password@ip:port)>67 """68 return '<{class_name}({username}:{password}@{ip}:{port})>'.format \69 (70 class_name = self.__class__.__name__,71 username = self.username,72 password = self.password,73 ip = self.host,74 port = self.port,75 )76 def login(self, *args, **kwargs):77 """78 Login to the ftp session (https://docs.python.org/3/library/ftplib.html#module-ftplib)79 If login failed, raises <errors.LoginFailed>80 """81 resp = self.FTP.login(*args, **kwargs)82 if not resp.strip().startswith(str(constants.STATUS_CODE_OPERATION_SUCCESSFUL)):83 raise errors.LoginFailed('Invalid username or password')84 return resp85 @decorators.keep_alive86 def pwd(self, *args, **kwargs):87 """88 Wrapper for ftplib.FTP.pwd implementing89 ... the decorator @decorators.keep_alive90 """91 return self.FTP.pwd(*args, **kwargs)92 @decorators.keep_alive93 @decorators.callback_store94 def ls(self, directory = '', **kwargs):95 """96 Wrapper for ftplib.FTP.retrlines('LIST')97 ... implementing the decorators:98 ... @decorators.keep_alive,99 ... @decorators.callback_store100 """101 comm_ext = '' if not directory else ' ' + directory102 return self.FTP.retrlines(constants.COMMAND_LIST + comm_ext, **kwargs)103 @decorators.keep_alive104 def exit(self, *args, **kwargs):105 """106 Wrapper for ftplib.FTP.quit implementing107 ... the decorator @decorators.keep_alive108 """109 return self.FTP.quit(*args, **kwargs)110 @decorators.keep_alive111 def rm(self, *args, **kwargs):112 """113 Wrapper for ftplib.FTP.delete implementing114 ... the decorator @decorators.keep_alive115 """116 return self.FTP.delete(*args, **kwargs)117 @decorators.keep_alive118 def cd(self, *args, **kwargs):119 """120 Wrapper for ftplib.FTP.cwd implementing121 ... the decorator @decorators.keep_alive122 """123 return self.FTP.cwd(*args, **kwargs)124 @decorators.keep_alive125 def mkdir(self, *args, **kwargs):126 """127 Wrapper for ftplib.FTP.mkd implementing128 ... the decorator @decorators.keep_alive129 """130 return self.FTP.mkd(*args, **kwargs)131 @decorators.keep_alive132 def rmdir(self, *args, **kwargs):133 """134 Wrapper for ftplib.FTP.rmd implementing135 ... the decorator @decorators.keep_alive136 """137 return self.FTP.rmd(*args, **kwargs)138 @decorators.keep_alive139 def get(self, file_in, file_out=None):140 """141 Wrapper for ftplib.FTP.retrbinary implementing142 ... the decorator @decorators.keep_alive143 Requests the contents of 'file_in' from the server144 ... then writes it into 'file_out'145 'file_out' should have an absolute path. The local146 path can be found in: self.local_directory147 If 'file_out' is None, it takes on the file name148 ... of 'file_in'149 """150 if file_out is None:151 file_out = self.local_directory + os.path.basename(file_in)152 with open(file_out, 'wb') as file:153 self.FTP.retrbinary(constants.COMMAND_RETR + ' ' + file_in, callback = file.write)154 @decorators.keep_alive155 def help(self):156 """157 Wrapper for ftplib.FTP.sendcmd('HELP') implementing158 ... the decorator @decorators.keep_alive159 """160 return self.FTP.sendcmd(constants.COMMAND_HELP)161 @decorators.keep_alive162 def put(self, file_in, file_out=None):163 """164 Wrapper for ftplib.FTP.storbinary implementing165 ... the decorator @decorators.keep_alive166 Sends the contents of 'file_in' to the server167 ... to be written to 'file_out'168 'file_in' should be an absolute path. The local169 ... directory can be found in self.local_directory170 ... of 'file_in'171 If 'file_in' does not exist, it is created as an empty file172 ... similair to Linux's 'touch'173 """174 if file_out is None:175 file_out = os.path.basename(file_in)176 if not os.path.isfile(file_in):177 with open(file_in, 'w') as file:178 pass179 with open(file_in, 'rb') as file:180 self.FTP.storbinary(constants.COMMAND_STOR + ' ' + file_out, file)181 def _init_attrs(self):182 """183 Initialises the class attributes184 It creates them, then fills them with a default value185 """186 self.host = None187 self.username = ''188 self.password = ''189 self.port = None...
page.js
Source:page.js
1import router from '@/router'2import { sessionSet, sessionDel } from "@/util/storage"3export default {4 namespaced: true,5 state: {6 keep_alive: [], // éè¦ç¶æä¿æç页é¢7 current: "" // å½åactiveç页é¢8 },9 mutations: {10 // 设置éè¦ç¶æä¿æç页é¢11 SET_KEEP_ALIVE(state, data, session = true) {12 state.keep_alive = data;13 session && sessionSet('keep_alive', data);14 },15 // æ·»å ä¸ä¸ªæ°çéè¦ç¶æä¿æç页é¢16 ADD_KEEP_ALIVE(state, data) {17 let _had = state.keep_alive.some(i => i.url === data.url);18 if (!_had) {19 state.keep_alive.push(data);20 sessionSet('keep_alive', state.keep_alive);21 }22 },23 // å
³ééè¦ç¶æä¿æç页é¢24 CLOSE_KEEP_ALIVE(state, data) {25 // å½ååªæä¸ä¸ªé¡µé¢ æ¸
空26 if (state.keep_alive.length === 1) {27 state.keep_alive = [];28 state.current = '/index';29 sessionSet('current', '/index');30 sessionDel('keep_alive');31 router.push('/index');32 return;33 }34 // å½åå
³éactive页é¢ï¼é¡µé¢æå¤ä¸ªï¼ååä¸ä¸ªæä¸ä¸ä¸ª35 if (data === state.current) {36 for (let i = 0, len = state.keep_alive.length; i < len; i++) {37 if (state.keep_alive[i].url === data) {38 let _next_index = i < len - 1 ? i + 1 : i - 1;39 state.current = state.keep_alive[_next_index].url;40 sessionSet('current', state.current);41 router.push(state.current);42 break;43 }44 }45 }46 state.keep_alive = state.keep_alive.filter(i => i.url !== data);47 sessionSet('keep_alive', state.keep_alive);48 },49 // å
³éå³ä¾§éè¦ç¶æä¿æç页é¢50 CLOSE_RIGHT_KEEP_ALIVE(state, data) {51 let _target_index = -1; // å³é®ç¼è¾çèåç´¢å¼52 let _active_index = 0; // å½åçèåç´¢å¼53 state.keep_alive.forEach((i, idx) => {54 if (i.url === data) {55 _target_index = idx;56 }57 if (i.url === state.current) {58 _active_index = idx;59 }60 })61 if (_target_index !== -1) {62 state.keep_alive = this._vm._.take(state.keep_alive, [_target_index + 1]);63 if (_target_index < _active_index) {64 state.current = data;65 sessionSet('current', state.current);66 router.push(data);67 }68 sessionSet('keep_alive', state.keep_alive);69 }70 },71 // å
³é左侧éè¦ç¶æä¿æç页é¢72 CLOSE_LEFT_KEEP_ALIVE(state, data) {73 let _target_index = -1; // å³é®ç¼è¾çèåç´¢å¼74 let _active_index = 0; // å½åçèåç´¢å¼75 state.keep_alive.forEach((i, idx) => {76 if (i.url === data) {77 _target_index = idx;78 }79 if (i.url === state.current) {80 _active_index = idx;81 }82 })83 if (_target_index !== -1) {84 state.keep_alive = this._vm._.drop(state.keep_alive, [_target_index]);85 if (_active_index < _target_index) {86 state.current = data;87 sessionSet('current', state.current);88 router.push(data);89 }90 sessionSet('keep_alive', state.keep_alive);91 }92 },93 // å
³éå
¶ä»éè¦ç¶æä¿æç页é¢94 CLOSE_OTHER_KEEP_ALIVE(state, data) {95 state.keep_alive = state.keep_alive.filter(i => i.url === data);96 state.current = data;97 router.push(data);98 sessionSet('keep_alive', state.keep_alive);99 sessionSet('current', state.current);100 },101 // å
³éå
¨é¨éè¦ç¶æä¿æç页é¢102 CLOSE_ALL_KEEP_ALIVE(state) {103 state.keep_alive = [];104 state.current = '/index';105 sessionSet('current', '/index');106 sessionDel('keep_alive');107 router.push('/index');108 },109 // 设置å½å页é¢active110 SET_CURRENT(state, data) {111 state.current = data;112 sessionSet('current', state.current);113 },114 },115 actions: {116 // 设置æ¯å¦ç¶æä¿æ117 setKeepPage({ commit }, data, session) {118 if (Array.isArray(data)) {119 commit('SET_KEEP_ALIVE', data, session)120 }121 },122 // æ·»å ä¸ä¸ªæ°çéè¦ç¶æä¿æç页é¢123 addKeepPage({ commit }, data) {124 commit('ADD_KEEP_ALIVE', data)125 },126 // å
³éä¸ä¸ªç¶æä¿æå
ç页é¢127 closeKeepPage({ commit }, data) {128 commit('CLOSE_KEEP_ALIVE', data)129 },130 // å
³éå³ä¾§ç¶æä¿æå
ç页é¢131 closeRightKeepPage({ commit }, data) {132 commit('CLOSE_RIGHT_KEEP_ALIVE', data)133 },134 // å
³é左侧ç¶æä¿æå
ç页é¢135 closeLeftKeepPage({ commit }, data) {136 commit('CLOSE_LEFT_KEEP_ALIVE', data)137 },138 // å
³éå
¶ä»ç¶æä¿æå
ç页é¢139 closeOtherKeepPage({ commit }, data) {140 commit('CLOSE_OTHER_KEEP_ALIVE', data)141 },142 // å
³éå
¨é¨ç¶æä¿æå
ç页é¢143 closeAllKeepPage({ commit }) {144 commit('CLOSE_ALL_KEEP_ALIVE')145 },146 // 设置å½å页é¢active147 setCurrentPage({ commit }, data) {148 commit('SET_CURRENT', data)149 }150 }...
benchmark.py
Source:benchmark.py
1#!/usr/bin/python2import sys3import json4import commands5import time6try:7 import matplotlib.pyplot as plt8except ImportError:9 plt = None10def clearstderrline():11 sys.stderr.write('\033[2K')12def weighttp(url, n_threads, n_connections, n_requests, keep_alive):13 keep_alive = '-k' if keep_alive else ''14 command = 'weighttp %(keep_alive)s ' \15 '-t %(n_threads)d ' \16 '-c %(n_connections)d ' \17 '-n %(n_requests)d ' \18 '-j ' \19 '%(url)s 2> /dev/null' % locals()20 clearstderrline()21 sys.stderr.write('*** %s\r' % command)22 output = commands.getoutput(command)23 return json.loads(output)24def weighttp_has_json_output():25 output = commands.getoutput('weighttp -j')26 return not 'unknown option: -j' in output27def steprange(initial, final, steps=10):28 step = (final - initial) / steps29 while initial <= final:30 yield initial31 initial += step32def sleepwithstatus(msg, period):33 slept = 034 spinner = 035 while slept <= period:36 clearstderrline()37 sys.stderr.write('\r%s: %s' % (msg, '/|\\-'[spinner % 4]))38 time.sleep(0.1)39 slept += 0.140 spinner += 141 sys.stderr.write('\r')42 clearstderrline()43def cmdlineboolarg(arg):44 has_arg = False45 if arg in sys.argv:46 has_arg = True47 sys.argv.remove(arg)48 return has_arg49def cmdlineintarg(arg, default=0):50 value = default51 if arg in sys.argv:52 index = sys.argv.index(arg)53 del sys.argv[index]54 try:55 value = int(sys.argv[index])56 except ValueError:57 print 'Argument is of invalid type for argument %s, assuming default (%d)' % (arg, default)58 finally:59 del sys.argv[index]60 return value61class CSVOutput:62 def header(self):63 print 'keep_alive,n_connections,rps,kbps,2xx,3xx,4xx,5xx'64 def footer(self):65 clearstderrline()66 def log(self, keep_alive, n_connections, rps, kbps, _2xx, _3xx, _4xx, _5xx):67 clearstderrline()68 print ','.join(str(token) for token in69 (int(keep_alive), n_connections, rps, kbps, _2xx, _3xx, _4xx, _5xx))70class MatplotlibOutput:71 def __init__(self, xkcd=False):72 self.xkcd = xkcd73 def header(self):74 self.n_connections = []75 self.rps = {'keep-alive': [], 'close': []}76 def _plot(self):77 plt.xlabel('# connections')78 plt.ylabel('Requests/s')79 n_connections = self.n_connections[:len(self.rps['close'])]80 plt.plot(n_connections, self.rps['keep-alive'], label='Keep-Alive')81 plt.plot(n_connections, self.rps['close'], label='Close',82 marker='o', linestyle='--', color='r')83 plt.title('Web Server Benchmark')84 plt.legend()85 plt.show()86 def footer(self):87 if self.xkcd:88 with plt.xkcd():89 self._plot()90 else:91 self._plot()92 def log(self, keep_alive, n_connections, rps, kbps, _2xx, _3xx, _4xx, _5xx):93 self.n_connections.append(n_connections)94 if keep_alive:95 self.rps['keep-alive'].append(rps)96 else:97 self.rps['close'].append(rps)98if __name__ == '__main__':99 if not weighttp_has_json_output():100 print 'This script requires a special version of weighttp which supports JSON'101 print 'output. Get it at http://github.com/lpereira/weighttp'102 sys.exit(1)103 plot = cmdlineboolarg('--plot')104 xkcd = cmdlineboolarg('--xkcd')105 n_threads = cmdlineintarg('--threads', 2)106 n_requests = cmdlineintarg('--request', 1000000)107 keep_alive_timeout = cmdlineintarg('--keep-alive-timeout', 5)108 n_conn_start = cmdlineintarg('--start-conn', 100)109 n_conn_end = cmdlineintarg('--end-conn', 60000)110 n_conn_step = cmdlineintarg('--conn-step', 10)111 url = sys.argv[-1] if len(sys.argv) > 1 else 'http://localhost:8080/100.html'112 if plt is None:113 if plot:114 print 'Matplotlib not installed!'115 sys.exit(1)116 output = CSVOutput()117 elif plot:118 output = MatplotlibOutput(xkcd)119 else:120 output = CSVOutput()121 output.header()122 for keep_alive in (True, False):123 for n_connections in steprange(n_conn_start, n_conn_end, n_conn_step):124 results = weighttp(url, n_threads, n_connections, n_requests, keep_alive)125 status = results['status_codes']126 output.log(keep_alive, n_connections, results['reqs_per_sec'],127 results['kbyte_per_sec'], status['2xx'], status['3xx'],128 status['4xx'], status['5xx'])129 sleepwithstatus('Waiting for keepalive connection timeout', keep_alive_timeout * 1.1)...
wbrunner.py
Source:wbrunner.py
1# -*- coding: utf-8 -*-2# @Time : 2020/11/30 19:24 3# @Author : youding4# @File : wbrunner.py5# @Software: PyCharm Community Edition6from selenium import webdriver7from selenium.webdriver.common.desired_capabilities import DesiredCapabilities8from runner.baserunner import BaseRunner9class WbRunner(BaseRunner):10 def __init__(self, name="Webdriver Runner"):11 super().__init__(name=name)12 def remote_driver(self, command_executor='127.0.0.1:444/wd/hub',13 desired_capabilities=None, browser_profile=None, proxy=None,14 keep_alive=False, file_detector=None, options=None):15 return webdriver.Remote(command_executor=command_executor, desired_capabilities=desired_capabilities,16 browser_profile=browser_profile, proxy=proxy, keep_alive=keep_alive,17 file_detector=file_detector, options=options)18 def chrome_driver(self, executable_path="chromedriver", port=0, options=None, service_args=None,19 desired_capabilities=None, service_log_path=None, chrome_options=None, kee_alive=True):20 return webdriver.Chrome(executable_path=executable_path, port=port, options=options,21 service_args=service_args, desired_capabilities=desired_capabilities,22 service_log_path=service_log_path, chrome_options=chrome_options,23 keep_alive=kee_alive)24 def firefox_driver(self, firefox_profile=None, firefox_binary=None, timeout=30,25 capabilities=None, proxy=None, executable_path="geckodriver", options=None,26 service_log_path="geckodriver.log", firefox_options=None,27 service_args=None, desired_capabilities=None, log_path=None, keep_alive=True):28 return webdriver.Firefox(firefox_profile=firefox_profile, firefox_binary=firefox_binary,29 timeout=timeout, capabilities=capabilities, proxy=proxy,30 executable_path=executable_path, options=options, service_log_path=service_log_path,31 firefox_options=firefox_options, service_args=service_args, desired_capabilities=desired_capabilities,32 log_path=log_path, keep_alive=keep_alive)33 def ie_driver(self, executable_path="IEDriverServer.exe", capabilities=None,34 port=0, timeout=30, host=None, log_lever=None, service_log_path=None,35 options=None, ie_options=None, desired_capabilities=None, log_file=None,36 keep_alive=False):37 return webdriver.Ie(executable_path=executable_path,capabilities=capabilities,38 port=port, timeout=timeout, host=host, log_level=log_lever, options=options,39 service_log_path=service_log_path, ie_options=ie_options, desired_capabilities=desired_capabilities,40 log_file=log_file, keep_alive=keep_alive)41 def edge_driver(self, executable_path="MicrosoftWebDriver.exe", capabilities=None,42 port=0, verbose=False, service_log_path=None, log_path=None, keep_alive=False):43 return webdriver.Edge(executable_path=executable_path, capabilities=capabilities, port=port, verbose=verbose, service_log_path=service_log_path,44 log_path=log_path, keep_alive=keep_alive)45 def safafi_driver(self, executable_path="/usr/bin/safaridriver", reuse_service=False,46 desired_capabilities=DesiredCapabilities.SAFARI,quiet=False,47 keep_alive=True, service_args=None):48 return webdriver.Safari(executable_path=executable_path, reuse_service=reuse_service, desired_capabilities=desired_capabilities, quiet=quiet, keep_alive=keep_alive,...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!