Best Python code snippet using yandex-tank
feedreader.py
Source:feedreader.py
1"""2Support for RSS/Atom feeds.3For more details about this component, please refer to the documentation at4https://home-assistant.io/components/feedreader/5"""6from datetime import datetime7from logging import getLogger8from os.path import exists9from threading import Lock10import pickle11import voluptuous as vol12from homeassistant.const import EVENT_HOMEASSISTANT_START13from homeassistant.helpers.event import track_utc_time_change14import homeassistant.helpers.config_validation as cv15REQUIREMENTS = ['feedparser==5.2.1']16_LOGGER = getLogger(__name__)17CONF_URLS = 'urls'18DOMAIN = 'feedreader'19EVENT_FEEDREADER = 'feedreader'20MAX_ENTRIES = 2021CONFIG_SCHEMA = vol.Schema({22 DOMAIN: {23 vol.Required(CONF_URLS): vol.All(cv.ensure_list, [cv.url]),24 }25}, extra=vol.ALLOW_EXTRA)26def setup(hass, config):27 """Set up the Feedreader component."""28 urls = config.get(DOMAIN)[CONF_URLS]29 data_file = hass.config.path("{}.pickle".format(DOMAIN))30 storage = StoredData(data_file)31 feeds = [FeedManager(url, hass, storage) for url in urls]32 return len(feeds) > 033class FeedManager(object):34 """Abstraction over Feedparser module."""35 def __init__(self, url, hass, storage):36 """Initialize the FeedManager object, poll every hour."""37 self._url = url38 self._feed = None39 self._hass = hass40 self._firstrun = True41 self._storage = storage42 self._last_entry_timestamp = None43 self._has_published_parsed = False44 hass.bus.listen_once(45 EVENT_HOMEASSISTANT_START, lambda _: self._update())46 track_utc_time_change(47 hass, lambda now: self._update(), minute=0, second=0)48 def _log_no_entries(self):49 """Send no entries log at debug level."""50 _LOGGER.debug("No new entries to be published in feed %s", self._url)51 def _update(self):52 """Update the feed and publish new entries to the event bus."""53 import feedparser54 _LOGGER.info("Fetching new data from feed %s", self._url)55 self._feed = feedparser.parse(self._url,56 etag=None if not self._feed57 else self._feed.get('etag'),58 modified=None if not self._feed59 else self._feed.get('modified'))60 if not self._feed:61 _LOGGER.error("Error fetching feed data from %s", self._url)62 else:63 if self._feed.bozo != 0:64 _LOGGER.error("Error parsing feed %s", self._url)65 # Using etag and modified, if there's no new data available,66 # the entries list will be empty67 elif self._feed.entries:68 _LOGGER.debug("%s entri(es) available in feed %s",69 len(self._feed.entries), self._url)70 if len(self._feed.entries) > MAX_ENTRIES:71 _LOGGER.debug("Processing only the first %s entries "72 "in feed %s", MAX_ENTRIES, self._url)73 self._feed.entries = self._feed.entries[0:MAX_ENTRIES]74 self._publish_new_entries()75 if self._has_published_parsed:76 self._storage.put_timestamp(77 self._url, self._last_entry_timestamp)78 else:79 self._log_no_entries()80 _LOGGER.info("Fetch from feed %s completed", self._url)81 def _update_and_fire_entry(self, entry):82 """Update last_entry_timestamp and fire entry."""83 # We are lucky, `published_parsed` data available, let's make use of84 # it to publish only new available entries since the last run85 if 'published_parsed' in entry.keys():86 self._has_published_parsed = True87 self._last_entry_timestamp = max(88 entry.published_parsed, self._last_entry_timestamp)89 else:90 self._has_published_parsed = False91 _LOGGER.debug("No published_parsed info available for entry %s",92 entry.title)93 entry.update({'feed_url': self._url})94 self._hass.bus.fire(EVENT_FEEDREADER, entry)95 def _publish_new_entries(self):96 """Publish new entries to the event bus."""97 new_entries = False98 self._last_entry_timestamp = self._storage.get_timestamp(self._url)99 if self._last_entry_timestamp:100 self._firstrun = False101 else:102 # Set last entry timestamp as epoch time if not available103 self._last_entry_timestamp = \104 datetime.utcfromtimestamp(0).timetuple()105 for entry in self._feed.entries:106 if self._firstrun or (107 'published_parsed' in entry.keys() and108 entry.published_parsed > self._last_entry_timestamp):109 self._update_and_fire_entry(entry)110 new_entries = True111 else:112 _LOGGER.debug("Entry %s already processed", entry.title)113 if not new_entries:114 self._log_no_entries()115 self._firstrun = False116class StoredData(object):117 """Abstraction over pickle data storage."""118 def __init__(self, data_file):119 """Initialize pickle data storage."""120 self._data_file = data_file121 self._lock = Lock()122 self._cache_outdated = True123 self._data = {}124 self._fetch_data()125 def _fetch_data(self):126 """Fetch data stored into pickle file."""127 if self._cache_outdated and exists(self._data_file):128 try:129 _LOGGER.debug("Fetching data from file %s", self._data_file)130 with self._lock, open(self._data_file, 'rb') as myfile:131 self._data = pickle.load(myfile) or {}132 self._cache_outdated = False133 # pylint: disable=bare-except134 except:135 _LOGGER.error("Error loading data from pickled file %s",136 self._data_file)137 def get_timestamp(self, url):138 """Return stored timestamp for given url."""139 self._fetch_data()140 return self._data.get(url)141 def put_timestamp(self, url, timestamp):142 """Update timestamp for given URL."""143 self._fetch_data()144 with self._lock, open(self._data_file, 'wb') as myfile:145 self._data.update({url: timestamp})146 _LOGGER.debug("Overwriting feed %s timestamp in storage file %s",147 url, self._data_file)148 try:149 pickle.dump(self._data, myfile)150 # pylint: disable=bare-except151 except:152 _LOGGER.error(153 "Error saving pickled data to %s", self._data_file)...
Feed.py
Source:Feed.py
1from __future__ import print_function2from datetime import datetime3import feedparser4import json5IGNORE_BOZO_EXCEPTIONS = [6 feedparser.exceptions.CharacterEncodingOverride,7]8class Feed:9 10 def __init__(self, url):11 self._feed = feedparser.parse(url)12 if self._feed.bozo:13 if type(self._feed.bozo_exception) not in IGNORE_BOZO_EXCEPTIONS:14 print('Feed error on line{}: \'{}\''.format(15 self._feed.bozo_exception.getLineNumber(),16 self._feed.bozo_exception.getMessage()),17 file=sys.stderr18 )19 self._feed['bozo_exception'] = None20 def all(self):21 return self._feed22 def entries(self):23 return self._feed['entries']24 def header(self):25 return self._feed['feed']26 27 def updated(self):28 updated = '?'29 if 'updated_parsed' in self._feed:30 updated = self.parseDate(self._feed['updated_parsed'])31 elif 'entries' in self._feed:32 updated = self.parseDate(self._feed['entries'][0]['published_parsed'])33 for entry in self._feed:34 if 'published_parsed' in entry:35 entry_date = self.parseDate(entry['published_parsed'])36 if entry_date > updated:37 updated = entry_date38 return updated39 def parseDate(self, published_parsed):40 return datetime(41 year=published_parsed.tm_year,42 month=published_parsed.tm_mon,43 day=published_parsed.tm_mday,44 hour=published_parsed.tm_hour,45 minute=published_parsed.tm_min,46 second=published_parsed.tm_sec47 )48 49 50 def __repr__(self):...
startjobs.py
Source:startjobs.py
1from django.core.management.base import BaseCommand2import feedparser3from dateutil import parser4from feeder.models import Article5def save_new_article(feed):6 article_title = feed.channel.title7 for item in feed.entries:8 if not Article.objects.filter(puid=item.guid).exists():9 article = Article(10 title=item.title,11 description=item.description,12 pub_date=parser.parse(item.published),13 link=item.link,14 publication_name=article_title,15 puid=item.guid,16 )17 article.save()18def geeksforgeeks():19 _feed = feedparser.parse("https://www.cdn.geeksforgeeks.org/feed/")20 save_new_article(_feed)21def devto():22 _feed = feedparser.parse("https://dev.to/feed/")23 save_new_article(_feed)24def tech_republic():25 _feed = feedparser.parse("https://www.techrepublic.com/rssfeeds/articles/")26 save_new_article(_feed)27def freecodecamp():28 _feed = feedparser.parse("https://medium.com/feed/free-code-camp")29 save_new_article(_feed)30def medium():31 _feed = feedparser.parse("https://medium.com/feed/better-programming")32 save_new_article(_feed)33def hackernoon():34 _feed = feedparser.parse("https://cdn.hackernoon.com/feed")35 save_new_article(_feed)36class Command(BaseCommand):37 def handle(self, *args, **options):38 geeksforgeeks()39 devto()40 tech_republic()41 freecodecamp()42 medium()...
Learn to execute automation testing from scratch with LambdaTest Learning Hub. Right from setting up the prerequisites to run your first automation test, to following best practices and diving deeper into advanced test scenarios. LambdaTest Learning Hubs compile a list of step-by-step guides to help you be proficient with different test automation frameworks i.e. Selenium, Cypress, TestNG etc.
You could also refer to video tutorials over LambdaTest YouTube channel to get step by step demonstration from industry experts.
Get 100 minutes of automation test minutes FREE!!