Source code for datasources.data_source


import xml.etree.ElementTree as ET
import glob, os, sys, shutil
from datetime import datetime, timedelta
try:
    from urllib2 import urlopen, URLError
    from urllib import quote
    from urlparse import urlparse, urlunsplit
except ImportError:
    from urllib.request import urlopen
    from urllib.error import URLError
    from urllib.parse import quote, urlparse, urlunsplit

import certifi 
import platform, subprocess, re
import imp
import socket
import logging
import traceback

import sharppy.io.decoder as decoder
from sharppy.io.csv import loadCSV
import sutils.frozenutils as frozenutils

HOME_DIR = os.path.join(os.path.expanduser("~"), ".sharppy", "datasources")
if not os.path.exists(HOME_DIR):
    os.makedirs(HOME_DIR)

if frozenutils.isFrozen():
    from . import available
else:
    avail_loc = os.path.join(HOME_DIR, 'available.py')
    if not os.path.exists(avail_loc):
        pkg_avail_loc = os.path.join(os.path.dirname(__file__), 'available.py')
        shutil.copy(pkg_avail_loc, avail_loc)

    available = imp.load_source('available', avail_loc)

# TAS: Comment this file and available.py

[docs]class DataSourceError(Exception): pass
[docs]def loadDataSources(ds_dir=HOME_DIR): """ Load the data source information from the XML files. Returns a dictionary associating data source names to DataSource objects. """ files = glob.glob(os.path.join(ds_dir, '*.xml')) if len(files) == 0: if frozenutils.isFrozen(): frozen_path = frozenutils.frozenPath() files = glob.glob(os.path.join(frozen_path, 'sharppy', 'datasources', '*.xml')) + \ glob.glob(os.path.join(frozen_path, 'sharppy', 'datasources', '*.csv')) else: pkg_path = os.path.dirname(__file__) files = glob.glob(os.path.join(pkg_path, '*.xml')) + glob.glob(os.path.join(pkg_path, '*.csv')) for file_name in files: shutil.copy(file_name, ds_dir) files = glob.glob(os.path.join(ds_dir, '*.xml')) ds = {} for ds_file in files: root = ET.parse(ds_file).getroot() for src in root: name = src.get('name') try: ds[name] = DataSource(src) except Exception as e: traceback.print_exc() print("Exception: ", e) print('Unable to process %s file'%os.path.basename(ds_file)) print("This data source may not be loaded then.") return ds
def _pingURL(hostname, timeout=1): try: urlopen(hostname, timeout=timeout, cafile=certifi.where()) except URLError: return False except socket.timeout as e: return False return True
[docs]def pingURLs(ds_dict): """ Try to ping all the URLs in any XML file. Takes a dictionary associating data source names to DataSource objects. Returns a dictionary associating URLs with a boolean specifying whether or not they were reachable. """ urls = {} for ds in list(ds_dict.values()): ds_urls = ds.getURLList() for url in ds_urls: urlp = urlparse(url) base_url = urlunsplit((urlp.scheme, urlp.netloc, '', '', '')) urls[base_url] = None for url in urls.keys(): urls[url] = _pingURL(url) if urls[url]: # A bit of a hack. Since we're only using this to check for an Internet connection, we don't need to check # any more if we find one. break return urls
[docs]class Outlet(object): """ An Outlet object contains all the information for a data outlet (for example, the observed sounding feed from SPC, which is distinct from the observed sounding feed from, say, Europe). """ def __init__(self, ds_name, config): self._ds_name = ds_name self._name = config.get('name') self._url = config.get('url') self._format = config.get('format') self._time = config.find('time') point_csv = config.find('points') #self.start, self.end = self.getTimeSpan() self._csv_fields, self._points = loadCSV(os.path.join(HOME_DIR, point_csv.get("csv"))) for idx in range(len(self._points)): self._points[idx]['lat'] = float(self._points[idx]['lat']) self._points[idx]['lon'] = float(self._points[idx]['lon']) self._points[idx]['elev'] = int(self._points[idx]['elev']) self._custom_avail = self._name.lower() in available.available and self._ds_name.lower() in available.available[self._name.lower()] self._is_available = True
[docs] def getTimeSpan(self): """ Read in the XML data to determine the dates/times this outlet spans across """ try: start = self._time.get('start') except: start = '-' try: end = self._time.get('end') except: end = '-' if start == "-" or start is None: s = None elif start == "now": s = datetime.utcnow() else: s = datetime.strptime(start, '%Y/%m/%d') if end == "-" or end is None: e = None elif end == 'now': e = datetime.utcnow() else: e = datetime.strptime(end, '%Y/%m/%d') return [s, e]
[docs] def getForecastHours(self): """ Return a list of forecast hours that this outlet serves. If the 'delta' attribute is set to 0, then the data source only has the 0-hour. """ times = [] t = self._time f_start = int(t.get('first')) f_range = int(t.get('range')) f_delta = int(t.get('delta')) if f_delta > 0: times.extend(list(range(f_start, f_range + f_delta, f_delta))) else: times.append(0) return times
[docs] def getCycles(self): """ Return a list of data cycles that this outlet serves. """ times = [] t = self._time c_length = int(t.get('cycle')) c_offset = int(t.get('offset')) return [ t + c_offset for t in range(0, 24, c_length) ]
[docs] def getDelay(self): return int(self._time.get('delay'))
[docs] def getMostRecentCycle(self, dt=None): custom_failed = False if self._custom_avail: try: try: times = available.available[self._name.lower()][self._ds_name.lower()](dt) except TypeError: times = available.available[self._name.lower()][self._ds_name.lower()]() recent = max(times) self._is_available = True except URLError: custom_failed = True self._is_available = False if not self._custom_avail or custom_failed: now = datetime.utcnow() cycles = self.getCycles() delay = self.getDelay() avail = [ now.replace(hour=hr, minute=0, second=0, microsecond=0) + timedelta(hours=delay) for hr in cycles ] avail = [ run - timedelta(days=1) if run > now else run for run in avail ] recent = max(avail) - timedelta(hours=delay) return recent
[docs] def getArchivedCycles(self, **kwargs): max_cycles = kwargs.get('max_cycles', 100) start = kwargs.get('start', None) if start is None: start = self.getMostRecentCycle() daily_cycles = self.getCycles() time_counter = daily_cycles.index(start.hour) archive_len = self.getArchiveLen() # TODO: Potentially include the option to specify the beginning date of the archive, if the data source # is static? cycles = [] cur_time = start while cur_time > start - timedelta(hours=archive_len): cycles.append(cur_time) if len(cycles) >= max_cycles: break time_counter = (time_counter - 1) % len(daily_cycles) cycle = daily_cycles[time_counter] cur_time = cur_time.replace(hour=cycle) if cycle == daily_cycles[-1]: cur_time -= timedelta(days=1) return cycles
[docs] def getAvailableAtTime(self, **kwargs): dt = kwargs.get('dt', None) logging.debug("Calling getAvailableAtTime()" + str(dt) + " " + self._ds_name.lower() + " " + self._name.lower()) if dt is None: dt = self.getMostRecentCycle(dt) elif dt == datetime(1700,1,1,0,0,0): return [] stns_avail = self.getPoints() if self._name.lower() in available.availableat and self._ds_name.lower() in available.availableat[self._name.lower()]: #avail = available.availableat[self._name.lower()][self._ds_name.lower()](dt) try: avail = available.availableat[self._name.lower()][self._ds_name.lower()](dt) stns_avail = [] points = self.getPoints() srcids = [ p['srcid'] for p in points ] for stn in avail: try: idx = srcids.index(stn) stns_avail.append(points[idx]) except ValueError: pass self._is_available = True except URLError as err: logging.exception(err) stns_avail = [] self._is_available = False logging.debug("_is_available: "+ str(self._is_available)) logging.debug('Number of stations found:' + str(len(stns_avail))) return stns_avail
[docs] def getAvailableTimes(self, max_cycles=100, **kwargs): # THIS IS WHERE I COULD POTENTIALLY PASS AN ARGUMENT TO FILTER OUT WHAT RAOBS OR MODELS ARE AVAILABLE custom_failed = False dt = kwargs.get('dt', None) #Used to help search for times logging.debug("Called outlet.getAvailableTimes():" + " dt=" + str(dt) + " self._name=" + str(self._name) + " self._ds_name=" + str(self._ds_name)) if self._custom_avail: try: if self._name.lower() == "local": times = available.available[self._name.lower()][self._ds_name.lower()](kwargs.get("filename")) else: try: times = available.available[self._name.lower()][self._ds_name.lower()](dt) except TypeError: times = available.available[self._name.lower()][self._ds_name.lower()]() if len(times) == 1: times = self.getArchivedCycles(start=times[0], max_cycles=max_cycles) self._is_available = True except URLError as err: logging.exception(err) custom_failed = True self._is_available = False if not self._custom_avail or custom_failed: times = self.getArchivedCycles(max_cycles=max_cycles) logging.debug("outlet.getAvailableTimes() times Found:" + str(times[~max_cycles:])) return times[-max_cycles:]
[docs] def getArchiveLen(self): return int(self._time.get('archive'))
[docs] def getURL(self): return self._url
[docs] def getDecoder(self): return decoder.getDecoder(self._format)
[docs] def hasProfile(self, point, cycle): logging.debug("Calling outlet.hasProfile() ") times = self.getAvailableTimes(dt=cycle) has_prof = cycle in times if has_prof: stns = self.getAvailableAtTime(dt=cycle) if point['icao'] != '': has_prof = point['icao'] in [ s['icao'] for s in stns ] elif point['iata'] != '': has_prof = point['iata'] in [ s['iata'] for s in stns ] else: has_prof = point['synop'] in [ s['synop'] for s in stns ] return has_prof
[docs] def getPoints(self): points = self._points return points
[docs] def getFields(self): return self._csv_fields
[docs] def isAvailable(self): return self._is_available
[docs]class DataSource(object): def __init__(self, config): self._name = config.get('name') self._ensemble = config.get('ensemble').lower() == "true" self._observed = config.get('observed').lower() == "true" self._outlets = dict( (c.get('name'), Outlet(self._name, c)) for c in config ) def _get(self, name, outlet_num=None, flatten=True, **kwargs): prop = None if outlet_num is None: prop = [] for o in self._outlets.values(): func = getattr(o, name) prop.append(func(**kwargs)) if flatten: prop = [ p for plist in prop for p in plist ] prop = list(set(prop)) prop = sorted(prop) else: func = getattr(list(self._outlets.values())[outlet_num], name) prop = func() return prop def _getOutletWithProfile(self, stn, cycle_dt, outlet_num=0): logging.debug("_getOutletWithProfile: " + str(stn) + ' ' + str(cycle_dt)) use_outlets = [ out for out, cfg in self._outlets.items() if cfg.hasProfile(stn, cycle_dt) ] try: outlet = use_outlets[outlet_num] except IndexError: raise DataSourceError() return outlet
[docs] def updateTimeSpan(self): outlets = [ self._outlets[key].getTimeSpan() for key in self._outlets.keys() ] return outlets
[docs] def getForecastHours(self, outlet_num=None, flatten=True): times = self._get('getForecastHours', outlet_num=outlet_num, flatten=flatten) return times
[docs] def getDailyCycles(self, outlet_num=None, flatten=True): cycles = self._get('getCycles', outlet_num=outlet_num, flatten=flatten) return cycles
[docs] def getDelays(self, outlet_num=None): delays = self._get('getDelay', outlet, flatten=False) return delays
[docs] def getArchiveLens(self, outlet_num=None): lens = self._get('getArchiveLen', outlet_num=outlet_num, flatten=False) return lens
[docs] def getMostRecentCycle(self, outlet_num=None): cycles = self._get('getMostRecentCycle', outlet_num=outlet_num, flatten=False) return max(cycles)
[docs] def getAvailableTimes(self, filename=None, outlet_num=None, max_cycles=100, dt=None): logging.debug("data_source.getAvailableTimes() filename="+str(filename)+' outlent_num=' +str(outlet_num) + ' dt=' + str(dt)) cycles = self._get('getAvailableTimes', outlet_num=outlet_num, filename=filename, max_cycles=max_cycles, dt=dt) return cycles[-max_cycles:]
[docs] def getAvailableAtTime(self, dt, outlet_num=None): points = self._get('getAvailableAtTime', outlet_num=outlet_num, flatten=False, dt=dt) flatten_pts = [] flatten_coords = [] for pt_list in points: for pt in pt_list: if (pt['lat'], pt['lon']) not in flatten_coords: flatten_coords.append((pt['lat'], pt['lon'])) flatten_pts.append(pt) return flatten_pts
[docs] def getDecoder(self, stn, cycle_dt, outlet_num=0): outlet = self._getOutletWithProfile(stn, cycle_dt, outlet_num=outlet_num) decoder = self._outlets[outlet].getDecoder() return decoder
[docs] def getURL(self, stn, cycle_dt, outlet_num=0, outlet=None): if outlet is None: outlet = self._getOutletWithProfile(stn, cycle_dt, outlet_num=outlet_num) url_base = self._outlets[outlet].getURL() logging.debug("URL: " + url_base) fmt = { 'srcid':quote(stn['srcid']), 'cycle':"%02d" % cycle_dt.hour, 'date':cycle_dt.strftime("%y%m%d"), 'year':cycle_dt.strftime("%Y"), 'month':cycle_dt.strftime("%m"), 'day':cycle_dt.strftime("%d") } url = url_base.format(**fmt) logging.debug("Formatted URL: " + url) return url
[docs] def getDecoderAndURL(self, stn, cycle_dt, outlet_num=0): logging.debug("Getting the decoder and the URL to the data ...") outlet = self._getOutletWithProfile(stn, cycle_dt, outlet_num=outlet_num) decoder = self._outlets[outlet].getDecoder() url = self.getURL(stn, cycle_dt, outlet_num, outlet=outlet) return decoder, url
[docs] def getURLList(self, outlet_num=None): return self._get('getURL', outlet_num=outlet_num, flatten=False)
[docs] def getName(self): return self._name
[docs] def isEnsemble(self): return self._ensemble
[docs] def isObserved(self): return self._observed
[docs] def getPoint(self, stn): for outlet in self._outlets.items(): for pnt in outlet[1].getPoints(): if stn in pnt['icao'] or stn in pnt['iata'] or stn in pnt['srcid']: return pnt
if __name__ == "__main__": ds = loadDataSources('./') ds = dict( (n, ds[n]) for n in ['Observed', 'GFS', 'HRRR', 'NAM'] ) print(ds['GFS'].updateTimeSpan()) print(ds['HRRR'].updateTimeSpan()) # for n, d in ds.items(): # print n, d.getMostRecentCycle() #times = d.getAvailableTimes() #for t in times: # print(n, t, [ s['srcid'] for s in d.getAvailableAtTime(t) ])