How to use is_stream_ready method in localstack

Best Python code snippet using localstack_python

nicolive.py

Source:nicolive.py Github

copy

Full Screen

1import json2import logging3import re4from streamlink.utils import websocket5import threading6import time7from streamlink.plugin import Plugin, PluginArguments, PluginArgument8from streamlink.plugin.api import useragents9from streamlink.stream import HLSStream10from streamlink.compat import urlparse, unquote_plus11_log = logging.getLogger(__name__)12_url_re = re.compile(13 r"^https?://(?P<domain>live[0-9]*\.nicovideo\.jp)/watch/lv[0-9]*")14_login_url = "https://account.nicovideo.jp/login/redirector"15_login_url_params = {16 "show_button_twitter": 1,17 "show_button_facebook": 1,18 "next_url": "/"}19class NicoLive(Plugin):20 arguments = PluginArguments(21 PluginArgument(22 "email",23 argument_name="niconico-email",24 sensitive=True,25 metavar="EMAIL",26 help="The email or phone number associated with your "27 "Niconico account"),28 PluginArgument(29 "password",30 argument_name="niconico-password",31 sensitive=True,32 metavar="PASSWORD",33 help="The password of your Niconico account"),34 PluginArgument(35 "user-session",36 argument_name="niconico-user-session",37 sensitive=True,38 metavar="VALUE",39 help="Value of the user-session token \n(can be used in "40 "case you do not want to put your password here)"))41 is_stream_ready = False42 is_stream_ended = False43 watching_interval = 3044 watching_interval_worker_thread = None45 stream_reader = None46 _ws = None47 frontend_id = None48 @classmethod49 def can_handle_url(cls, url):50 return _url_re.match(url) is not None51 def _get_streams(self):52 self.url = self.url.split("?")[0]53 self.session.http.headers.update({54 "User-Agent": useragents.CHROME,55 })56 if not self.get_wss_api_url():57 _log.debug("Coundn't extract wss_api_url. Attempting login...")58 if not self.niconico_web_login():59 return None60 if not self.get_wss_api_url():61 _log.error("Failed to get wss_api_url.")62 _log.error(63 "Please check if the URL is correct, "64 "and make sure your account has access to the video.")65 return None66 self.api_connect(self.wss_api_url)67 i = 068 while not self.is_stream_ready:69 if i % 10 == 0:70 _log.debug("Waiting for permit...")71 if i == 600:72 _log.error("Waiting for permit timed out.")73 return None74 if self.is_stream_ended:75 return None76 time.sleep(0.1)77 i += 178 streams = HLSStream.parse_variant_playlist(79 self.session, self.hls_stream_url)80 nico_streams = {}81 for s in streams:82 nico_stream = NicoHLSStream(streams[s], self)83 nico_streams[s] = nico_stream84 return nico_streams85 def get_wss_api_url(self):86 _log.debug("Getting video page: {0}".format(self.url))87 resp = self.session.http.get(self.url)88 try:89 self.wss_api_url = extract_text(90 resp.text, "&quot;webSocketUrl&quot;:&quot;", "&quot;")91 if not self.wss_api_url:92 return False93 except Exception as e:94 _log.debug(e)95 _log.debug("Failed to extract wss api url")96 return False97 try:98 self.frontend_id = extract_text(99 resp.text, "&quot;frontendId&quot;:", ",&quot;")100 except Exception as e:101 _log.debug(e)102 _log.warning("Failed to extract frontend id")103 self.wss_api_url = "{0}&frontend_id={1}".format(self.wss_api_url, self.frontend_id)104 _log.debug("Video page response code: {0}".format(resp.status_code))105 _log.trace(u"Video page response body: {0}".format(resp.text))106 _log.debug("Got wss_api_url: {0}".format(self.wss_api_url))107 _log.debug("Got frontend_id: {0}".format(self.frontend_id))108 return self.wss_api_url.startswith("wss://")109 def api_on_open(self):110 self.send_playerversion()111 require_new_stream = not self.is_stream_ready112 self.send_getpermit(require_new_stream=require_new_stream)113 def api_on_error(self, ws, error=None):114 if error:115 _log.warning(error)116 _log.warning("wss api disconnected.")117 _log.warning("Attempting to reconnect in 5 secs...")118 time.sleep(5)119 self.api_connect(self.wss_api_url)120 def api_connect(self, url):121 # Proxy support adapted from the UStreamTV plugin (ustreamtv.py)122 proxy_url = self.session.get_option("https-proxy")123 if proxy_url is None:124 proxy_url = self.session.get_option("http-proxy")125 proxy_options = parse_proxy_url(proxy_url)126 if proxy_options.get('http_proxy_host'):127 _log.debug("Using proxy ({0}://{1}:{2})".format(128 proxy_options.get('proxy_type') or "http",129 proxy_options.get('http_proxy_host'),130 proxy_options.get('http_proxy_port') or 80))131 _log.debug("Connecting: {0}".format(url))132 self._ws = websocket.WebSocketApp(133 url,134 header=["User-Agent: {0}".format(useragents.CHROME)],135 on_open=self.api_on_open,136 on_message=self.handle_api_message,137 on_error=self.api_on_error)138 self.ws_worker_thread = threading.Thread(139 target=self._ws.run_forever,140 args=proxy_options)141 self.ws_worker_thread.daemon = True142 self.ws_worker_thread.start()143 def send_message(self, type_, body):144 msg = {"type": type_, "body": body}145 msg_json = json.dumps(msg)146 _log.debug(u"Sending: {0}".format(msg_json))147 if self._ws and self._ws.sock.connected:148 self._ws.send(msg_json)149 else:150 _log.warning("wss api is not connected.")151 def send_no_body_message(self, type_):152 msg = {"type": type_}153 msg_json = json.dumps(msg)154 _log.debug(u"Sending: {0}".format(msg_json))155 if self._ws and self._ws.sock.connected:156 self._ws.send(msg_json)157 else:158 _log.warning("wss api is not connected.")159 def send_custom_message(self, msg):160 msg_json = json.dumps(msg)161 _log.debug(u"Sending: {0}".format(msg_json))162 if self._ws and self._ws.sock.connected:163 self._ws.send(msg_json)164 else:165 _log.warning("wss api is not connected.")166 def send_playerversion(self):167 body = {168 "type": "startWatching",169 "data": {170 "stream": {171 "quality": "abr",172 "protocol": "hls",173 "latency": "high",174 "chasePlay": False175 },176 "room": {177 "protocol": "webSocket",178 "commentable": True179 },180 "reconnect": False181 }182 }183 self.send_custom_message(body)184 def send_getpermit(self, require_new_stream=True):185 body = {186 "type": "getAkashic",187 "data": {188 "chasePlay": False189 }190 }191 self.send_custom_message(body)192 def send_watching(self):193 body = {194 "command": "watching",195 "params": [self.broadcast_id, "-1", "0"]196 }197 self.send_message("watch", body)198 def send_pong(self):199 self.send_no_body_message("pong")200 self.send_no_body_message("keepSeat")201 def handle_api_message(self, message):202 _log.debug(u"Received: {0}".format(message))203 message_parsed = json.loads(message)204 if message_parsed["type"] == "stream":205 data = message_parsed["data"]206 self.hls_stream_url = data["uri"]207 self.is_stream_ready = True208 if message_parsed["type"] == "watch":209 body = message_parsed["body"]210 command = body["command"]211 if command == "currentstream":212 current_stream = body["currentStream"]213 self.hls_stream_url = current_stream["uri"]214 self.is_stream_ready = True215 elif command == "watchinginterval":216 self.watching_interval = int(body["params"][0])217 _log.debug("Got watching_interval: {0}".format(218 self.watching_interval))219 if self.watching_interval_worker_thread is None:220 _log.debug("send_watching_scheduler starting.")221 self.watching_interval_worker_thread = threading.Thread(222 target=self.send_watching_scheduler)223 self.watching_interval_worker_thread.daemon = True224 self.watching_interval_worker_thread.start()225 else:226 _log.debug("send_watching_scheduler already running.")227 elif command == "disconnect":228 _log.info("Websocket API closed.")229 _log.info("Stream ended.")230 self.is_stream_ended = True231 if self.stream_reader is not None:232 self.stream_reader.close()233 _log.info("Stream reader closed.")234 elif message_parsed["type"] == "ping":235 self.send_pong()236 def send_watching_scheduler(self):237 """238 Periodically send "watching" command to the API.239 This is necessary to keep the session alive.240 """241 while not self.is_stream_ended:242 self.send_watching()243 time.sleep(self.watching_interval)244 def niconico_web_login(self):245 user_session = self.get_option("user-session")246 email = self.get_option("email")247 password = self.get_option("password")248 if user_session is not None:249 _log.info("User session cookie is provided. Using it.")250 self.session.http.cookies.set(251 "user_session",252 user_session,253 path="/",254 domain="nicovideo.jp")255 self.save_cookies()256 return True257 elif email is not None and password is not None:258 _log.info("Email and password are provided. Attemping login.")259 payload = {"mail_tel": email, "password": password}260 resp = self.session.http.post(_login_url, data=payload,261 params=_login_url_params)262 _log.debug("Login response code: {0}".format(resp.status_code))263 _log.trace(u"Login response body: {0}".format(resp.text))264 _log.debug("Cookies: {0}".format(265 self.session.http.cookies.get_dict()))266 if self.session.http.cookies.get("user_session") is None:267 try:268 msg = extract_text(269 resp.text, '<p class="notice__text">', "</p>")270 except Exception as e:271 _log.debug(e)272 msg = "unknown reason"273 _log.warn("Login failed. {0}".format(msg))274 return False275 else:276 _log.info("Logged in.")277 self.save_cookies()278 return True279 else:280 _log.warn(281 "Neither a email and password combination nor a user session "282 "token is provided. Cannot attempt login.")283 return False284class NicoHLSStream(HLSStream):285 def __init__(self, hls_stream, nicolive_plugin):286 super(NicoHLSStream, self).__init__(287 hls_stream.session,288 force_restart=hls_stream.force_restart,289 start_offset=hls_stream.start_offset,290 duration=hls_stream.duration,291 **hls_stream.args)292 # url is already in hls_stream.args293 self.nicolive_plugin = nicolive_plugin294 def open(self):295 reader = super(NicoHLSStream, self).open()296 self.nicolive_plugin.stream_reader = reader297 return reader298def extract_text(text, left, right):299 """Extract text from HTML"""300 result = re.findall("{0}(.*?){1}".format(left, right), text)301 if len(result) != 1:302 raise Exception("Failed to extract string. "303 "Expected 1, found {0}".format(len(result)))304 return result[0]305def parse_proxy_url(purl):306 """Adapted from UStreamTV plugin (ustreamtv.py)"""307 proxy_options = {}308 if purl:309 p = urlparse(purl)310 proxy_options['proxy_type'] = p.scheme311 proxy_options['http_proxy_host'] = p.hostname312 if p.port:313 proxy_options['http_proxy_port'] = p.port314 if p.username:315 proxy_options['http_proxy_auth'] = \316 (unquote_plus(p.username), unquote_plus(p.password or ""))317 return proxy_options...

Full Screen

Full Screen

client.py

Source:client.py Github

copy

Full Screen

1import grpc2# classes generated by grpc3from .generated import eval_server_pb24from .generated import eval_server_pb2_grpc5# other imports6import numpy as np 7import time8from time import perf_counter9from datetime import datetime10# multiprocessing 11import multiprocessing as mp12from multiprocessing import shared_memory, resource_tracker13from threading import Thread14# suppress shared memory warnings15from .utils import remove_shm_from_resource_tracker16remove_shm_from_resource_tracker()17# receive input fidx streamed by server, store them in a list18def receive_stream(seq, latest_fidx, fid_ptr_dict, is_stream_ready, stream_start_time, config, verbose=False):19 if verbose:20 print("EvalClient (", datetime.now(), "): ", "Requesting stream for sequence ", seq)21 channel = grpc.insecure_channel(config['loopback_ip'] + ":" + str(config['image_service_port']))22 stub = eval_server_pb2_grpc.ImageServiceStub(channel)23 stream_request = eval_server_pb2.String(value=seq)24 send_times = []25 # receive input stream26 for i, response in enumerate(stub.GetImageStream(stream_request)):27 if i == 0:28 stream_start_time.value = perf_counter()29 if verbose:30 print("EvalClient (", datetime.now(), "): ", "Receiving stream for sequence ", seq) 31 if response.end_marker:32 latest_fidx.value = -133 break34 is_stream_ready.clear()35 latest_fidx.value = response.fid36 fid_ptr_dict[response.fid] = (response.start_ptr, response.end_ptr)37 if response.fid >= 0:38 is_stream_ready.set()39 send_times.append(perf_counter() - response.timestamp)40 # if verbose:41 # print("EvalClient (", datetime.now(), "): ", "Mean sending time = ", np.mean(send_times), "s, stdev = ", np.std(send_times), "Max = ", np.max(send_times), "Min = ", np.min(send_times))42 channel.close()43class EvalClient:44 def __init__(self, config, state=None, verbose=False):45 self.img_width, self.img_height = 1920, 120046 if state is None:47 mp.set_start_method('spawn')48 self.latest_fidx = mp.Value('i', -1, lock=True)49 self.is_stream_ready = mp.Event()50 self.fid_ptr_dict = mp.Manager().dict()51 self.stream_start_time = mp.Value('d', 0.0, lock=True)52 else:53 self.latest_fidx = state[0]54 self.is_stream_ready = state[1]55 self.fid_ptr_dict = state[2]56 57 self.verbose = verbose58 # create image receiver stub59 self.channel = grpc.insecure_channel(config['loopback_ip'] + ":" + str(config['image_service_port']))60 self.config = config61 self.stub = eval_server_pb2_grpc.ImageServiceStub(self.channel)62 response = self.stub.GetShm(eval_server_pb2.Empty())63 self.existing_shm = shared_memory.SharedMemory(name=response.value)64 self.channel.close()65 # create result sender stub66 self.result_channel = grpc.insecure_channel(config['loopback_ip'] + ":" + str(config['result_service_port']))67 self.result_stub = eval_server_pb2_grpc.ResultServiceStub(self.result_channel)68 response = self.result_stub.GetShm(eval_server_pb2.Empty())69 self.results_shm = shared_memory.SharedMemory(name=response.value)70 self.results_np = np.ndarray((100, 6), dtype=np.float32, buffer=self.results_shm.buf)71 self.is_stream_ready.clear()72 self.stream_process = None73 self.result_thread = None74 def get_state(self):75 return (self.latest_fidx, self.is_stream_ready, self.fid_ptr_dict)76 def close(self, results_file='results.json'):77 self.result_stub.GenResults(eval_server_pb2.String(value=results_file))78 self.result_channel.close()79 self.existing_shm.close()80 self.results_shm.close()81 def stop_stream(self):82 self.stream_process.join()83 self.result_stub.FinishSequence(eval_server_pb2.Empty())84 self.is_stream_ready.clear()85 self.stream_process = None86 def request_stream(self, seq):87 # receiver as separate processs88 self.stream_process = mp.Process(target=receive_stream, args=(seq, self.latest_fidx, self.fid_ptr_dict, self.is_stream_ready, self.stream_start_time, self.config, self.verbose))89 self.stream_process.start()90 def get_latest_fidx(self):91 self.is_stream_ready.wait()92 return self.latest_fidx.value93 def get_frame(self, fid=None, ptr=False):94 if fid is not None and fid < 0:95 raise TypeError(f"fid must be non-negative")96 if fid is None:97 fid = self.get_latest_fidx()98 if fid == -1:99 return None, None100 elif fid not in self.fid_ptr_dict:101 raise KeyError(f"frame not available yet")102 start_ptr, end_ptr = self.fid_ptr_dict[fid]103 if ptr:104 return fid, int(start_ptr/(self.img_height*self.img_width*3))105 return fid, np.ndarray((self.img_height, self.img_width, 3), dtype=np.uint8, buffer=self.existing_shm.buf[start_ptr:end_ptr])106 def send_result_shm(self, bboxes, bbox_scores, labels, timestamp):107 num_detections = min(len(bboxes), 100)108 self.results_np[:num_detections, :4] = bboxes[:num_detections]109 self.results_np[:num_detections, 4] = bbox_scores[:num_detections]110 self.results_np[:num_detections, 5] = labels[:num_detections]111 self.result_stub.SignalResultsReady(eval_server_pb2.Result(timestamp=timestamp, num_bboxes=num_detections))112 def send_result_to_server(self, bboxes, bbox_scores, labels):113 timestamp = perf_counter()114 if self.result_thread:115 self.result_thread.join()116 self.result_thread = Thread(target=self.send_result_shm, args=(bboxes, bbox_scores, labels, timestamp))117 self.result_thread.start()118 def get_frame_buf(self):119 return self.existing_shm120 def get_stream_start_time(self):121 self.is_stream_ready.wait()...

Full Screen

Full Screen

Automation Testing Tutorials

Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.

LambdaTest Learning Hubs:

YouTube

You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.

Run localstack automation tests on LambdaTest cloud grid

Perform automation testing on 3000+ real desktop and mobile devices online.

Try LambdaTest Now !!

Get 100 minutes of automation test minutes FREE!!

Next-Gen App & Browser Testing Cloud

Was this article helpful?

Helpful

NotHelpful