Source code for filter_option4

#!/usr/bin/env python
#-------------------------------------------------------------------------------
# Name:         filter_option4.py
#
# Purpose:      This software filters ionosonde data with Filter Option 4 specified in TN_TG_07_03_14_IS_datenFilterung.docx by Tatjana Gerzen
#               This is a translation of the MATLAB program IS_QualityCheck/MainIonosonde_QualityCheck.m by Tatjana Gerzen
#
# Input:        Please customize the input parameters and paths in config.py first
#               Ionosonde data in the output format of DownloadClient
#
# Output:       Text file containing all outliers
#               Plots of hmF2/NmF2 values, median, lower and upper bounds
#
# HowToStart:   python filter_option4.py
#
# Needs:        python2.7.8, matplotlib 1.4.0, numpy 1.8.2, scipy 0.14.0
#
# Version:      $Id: filter_option4.py
#               $HeadURL: https://svn.dlr.de/GNSSsoft/trunk/UserUtils/CBojarraUtils/Filter/filter_option4.py $
#
# Author:       Chris Bojarra
#
# Created:      03.11.2014
# Copyright:    (c) Chris Bojarra 2014
# Mail:         Chris.Bojarra@dlr.de
# Licence:      DLR
#-------------------------------------------------------------------------------

import datetime
import matplotlib.pyplot as plt
import numpy
import os
import scipy.interpolate

import config
import reader_ionodata

[docs]class FilterOption4(object): """ Main class of filter_option4.py """ def __init__(self): pass def __del__(self): pass
[docs] def filter4(self, param): """ Main method of filter_option4.py :param string param: "NmF2"/"hmF2" """ print "Filter Option4 " + param reader = reader_ionodata.Reader_IonoData() if(param == "NmF2"): IS = reader.convert_fof2_nmf2() maxValue = config.maxNmF2 minValue = config.minNmF2 elif(param == "hmF2"): IS = reader.read_iono_data("hmF2") maxValue = config.maxhmF2 minValue = config.minhmF2 IS_excl = self.exclusion_outliers(IS, maxValue, minValue) median = self.statistics_daily(IS_excl, "median") std = self.statistics_daily(IS_excl, "std") std_factor = self.calc_STD_factor_Kp(std, param) bounds_lower = self.calc_bounds(median, std, std_factor, "lower") bounds_upper = self.calc_bounds(median, std, std_factor, "upper") dict_interp_lower = self.interpolate_datetime_to_minute(bounds_lower) dict_interp_upper = self.interpolate_datetime_to_minute(bounds_upper) self.output_outliers(IS_excl, dict_interp_lower, dict_interp_upper, param) self.plot_data_median_bounds(IS_excl, median, dict_interp_lower, dict_interp_upper, param)
[docs] def exclusion_outliers(self, dict_IS, maxValue, minValue): """ This method returns a dictionary of those values of dict_input, which don't exceed the minValue-maxValue bound :param dict dict_IS: dictionary of ionosonde data, format dict["station"]["YYYY-MM-DD hh:mm"] = float :param float maxValue: upper bound :param float minValue: lower bound :return: dictionary of those values of dict_input, which don't exceed the minValue-maxValue bound, format dict["station"]["YYYY-MM-DD hh:mm"] = float :rtype: dict """ print "Calculate extreme outliers" dict_excl = {} for station in dict_IS: dict_excl[station] = {} for data_time in [item for item in dict_IS[station] if item!="Meta"]: value = dict_IS[station][data_time] if(value < maxValue and value > minValue): dict_excl[station][data_time] = value return dict_excl
[docs] def statistics_daily(self, dict_IS, statistics): """ This method returns a dictionary with statistical values in interval of 15 minutes :param dict dict_IS: dictionary of ionosonde data, format dict["station"]["YYYY-MM-DD hh:mm"] = float :param string statistics: "std" (standard deviation)/"median" :return: dictionary with statistical values for one day, format dict["station"][minute_of_day] = float :rtype: dict """ print "Calculate " + statistics dict_stat = {} for station in dict_IS: dict_stat[station] = {} dict_temp = {} for minute_of_day in range(0, 24*60, 15): for doy in range(config.doy_start, config.doy_end + 1): currentdate = datetime.datetime(config.year_start, 1, 1) + datetime.timedelta(days=doy-1) time_min = max(0, minute_of_day - config.time_to_look) time_max = min(24*60, minute_of_day + config.time_to_look) for mins in range(time_min, time_max + 1): currentdatetime = (currentdate + datetime.timedelta(minutes=mins)).strftime("%Y-%m-%d %H:%M") if currentdatetime in dict_IS[station]: value = dict_IS[station][currentdatetime] if minute_of_day in dict_temp: dict_temp[minute_of_day].append(value) else: dict_temp[minute_of_day] = [value] if(minute_of_day==0): for mins in range(24*60 - config.time_to_look, 24*60): currentdatetime = (currentdate + datetime.timedelta(minutes=mins)).strftime("%Y-%m-%d %H:%M") if currentdatetime in dict_IS[station]: value = dict_IS[station][currentdatetime] if minute_of_day in dict_temp: dict_temp[minute_of_day].append(value) else: dict_temp[minute_of_day] = [value] for minute_of_day in dict_temp: if(statistics == "std"): dict_stat[station][minute_of_day] = (0.0 if len(dict_temp[minute_of_day])==1 else numpy.std(dict_temp[minute_of_day], ddof=1)) # delta degrees of freedom, i.e. 1/(n-ddof) * ... if(statistics == "median"): dict_stat[station][minute_of_day] = numpy.median(dict_temp[minute_of_day]) else: pass return dict_stat # #
[docs] def calc_STD_factor_Kp(self, dict_std, param): """ This method calculates a STD factor for all days between the dates specified in config.py and all times in dict_std STD factor is chosen from config.py depending on hourly Kp values and daily F10.7 values read with reader_ionodata.py :param dict dict_std: dictionary of std values (return of statistics_daily(dict_IS, "std")) :param string param: "NmF2"/"hmF2" :return: dictionary with std factors, format dict["station"]["YYYY-MM-DD hh:mm"] = float :rtype: dict """ print "Calculate std factor" dict_factor = {} reader = reader_ionodata.Reader_IonoData() dict_f10p7 = reader.read_F10p7() interp = reader.read_Kp_interp() if(param=="NmF2"): STD_factor_low = config.STD_factor_NmF2_low STD_factor_mid = config.STD_factor_NmF2_mid STD_factor_high = config.STD_factor_NmF2_high elif(param=="hmF2"): STD_factor_low = config.STD_factor_hmF2_low STD_factor_mid = config.STD_factor_hmF2_mid STD_factor_high = config.STD_factor_hmF2_high date_start = datetime.datetime(config.year_start, 1, 1) + datetime.timedelta(config.doy_start - 1) for station in dict_std: dict_factor[station] = {} for doy in range(config.doy_start, config.doy_end + 2): currentdate = datetime.datetime(config.year_start, 1, 1) + datetime.timedelta(days=doy-1) iso = currentdate.date().isoformat() F10p7_d = dict_f10p7[iso] # for every specified day + minimum of next day for minute_of_day in ([min(dict_std[station])] if(doy == config.doy_end + 1) else dict_std[station]): hour_of_year = doy*24 + int(minute_of_day/60) Kp_h = interp(hour_of_year) value_datetime = (currentdate + datetime.timedelta(minutes=minute_of_day)).strftime("%Y-%m-%d %H:%M") if(Kp_h >= config.Kp_mid and F10p7_d >= config.F10p7_mid): if(Kp_h >= config.Kp_high or F10p7_d >= config.F10p7_high): dict_factor[station][value_datetime] = STD_factor_high else: dict_factor[station][value_datetime] = STD_factor_mid else: dict_factor[station][value_datetime] = STD_factor_low return dict_factor
[docs] def calc_bounds(self, dict_median, dict_std, dict_factor, lowerupper): """ This method calculates the bounds from median, std and std factors for all days specified in config.py :param dict dict_median: dictionary of median values (return of statistics_daily(dict_IS, "median")) :param dict dict_std: dictionary of std values (return of statistics_daily(dict_IS, "std")) :param dict dict_factor: dictionary of std factor values (return of calc_STD_factor_Kp(dict_std, param)) :param string lowerupper: "lower"/"upper" :return: dictionary with lower or upper bounds, format dict["station"]["YYYY-MM-DD hh:mm"] = float :rtype: dict """ print "Calculate " + lowerupper + " bounds" dict_bounds = {} for station in dict_factor: dict_bounds[station] = {} for value_datetime in dict_factor[station]: value_time = value_datetime[11:16] hh = int(value_datetime[11:13]) mm = int(value_datetime[14:16]) minute_of_day = hh*60 + mm if(lowerupper == "lower"): dict_bounds[station][value_datetime] = dict_median[station][minute_of_day] - dict_factor[station][value_datetime] * dict_std[station][minute_of_day] elif(lowerupper == "upper"): dict_bounds[station][value_datetime] = dict_median[station][minute_of_day] + dict_factor[station][value_datetime] * dict_std[station][minute_of_day] else: pass return dict_bounds #TODO: no empty dicts as input!
[docs] def interpolate_datetime_to_minute(self, dict_input): """ This method returns a dictionary of SciPy interpolation objects for each station :param dict dict_input: dictionary, format dict["station"]["YYYY-MM-DD hh:mm"] = float :return: dictionary, format dict["station"] = SciPy interpolation function with minute_of_year as input parameter :rtype: dict """ dict_interp = {} for station in dict_input: dict_interp[station] = {} dict_temp = {} for value_datetime in dict_input[station]: moy = self.datetime_to_moy(value_datetime) dict_temp[moy] = dict_input[station][value_datetime] interp = scipy.interpolate.interp1d(dict_temp.keys(), dict_temp.values(), assume_sorted=False) dict_interp[station] = interp return dict_interp
[docs] def datetime_to_moy(self, value_datetime): """ This method converts datetime string ("2014-12-31 23:59") to minute of year :param string value_datetime: datetime string, format "YYYY-MM-DD hh:mm" :return: minute_of_year :rtype: int """ YYYY = int(value_datetime[0:4]) MM = int(value_datetime[5:7]) DD = int(value_datetime[8:10]) hh = int(value_datetime[11:13]) mm = int(value_datetime[14:16]) doy = datetime.date(YYYY, MM, DD).strftime("%j") moy = int(doy)*24*60 + hh*60 + mm return moy
[docs] def output_outliers(self, data, dict_interp_lower, dict_interp_upper, param): """ This method creates a file containing all outliers :param dict data: dictionary of ionosonde data, format dict["station"]["YYYY-MM-DD hh:mm"] = float :param dict dict_interp_lower: dictionary of lower bounds, format dict["station"] = SciPy interpolation function :param dict dict_interp_upper: dictionary of upper bounds, format dict["station"] = SciPy interpolation function :param string param: "NmF2"/"hmF2" """ print "Output outliers" fw = open(os.path.join(config.path_output, "Outlier_" + param + ".txt"), "w") fw.write("#URSI YYYY-MM-DD hh:mm value lower upper\n") for station in sorted(data): for value_datetime in sorted(data[station]): data_value = data[station][value_datetime] moy = self.datetime_to_moy(value_datetime) interp_lower = dict_interp_lower[station] interp_upper = dict_interp_upper[station] data_bounds_lower = interp_lower(moy) data_bounds_upper = interp_upper(moy) dec_places = 8 str_value = str(round(data_value, dec_places)) str_lower = str(round(data_bounds_lower, dec_places)) str_upper = str(round(data_bounds_upper, dec_places)) if(data_bounds_upper < data_value or data_value < data_bounds_lower): fw.write(station + " " + value_datetime + " " + str_value + " " + str_lower + " " + str_upper + "\n") fw.close()
[docs] def plot_data_median_bounds(self, dict_data, dict_median, dict_interp_lower, dict_interp_upper, param): """ This method creates png plots of hmF2/NmF2 values, median, lower and upper bounds :param dict dict_data: dictionary of ionosonde data, format dict["station"]["YYYY-MM-DD hh:mm"] = float :param dict dict_median: dictionary of ionosonde data, format dict["station"]["YYYY-MM-DD hh:mm"] = float :param dict dict_interp_lower: dictionary of lower bounds, format dict["station"] = SciPy interpolation function :param dict dict_interp_upper: dictionary of upper bounds, format dict["station"] = SciPy interpolation function :param string param: "NmF2"/"hmF2" """ print "Output plots" for station in sorted(dict_data): print "Plot - " + param + " - " + station list_value_time = [] list_value = [] list_median_time = [] list_median = [] list_bounds_lower = [] list_bounds_upper = [] for value_datetime in sorted(dict_data[station]): # value YYYY = int(value_datetime[0:4]) MM = int(value_datetime[5:7]) DD = int(value_datetime[8:10]) hh = int(value_datetime[11:13]) mm = int(value_datetime[14:16]) time = datetime.datetime(YYYY, MM, DD, hh, mm) data_value = float(dict_data[station][value_datetime]) list_value_time.append(time) list_value.append(data_value) # bounds moy = self.datetime_to_moy(value_datetime) interp_lower = dict_interp_lower[station] interp_upper = dict_interp_upper[station] data_bounds_lower = interp_lower(moy) data_bounds_upper = interp_upper(moy) list_bounds_lower.append(data_bounds_lower) list_bounds_upper.append(data_bounds_upper) for doy in range(config.doy_start, config.doy_end + 1): current_date = datetime.datetime(config.year_start, 1, 1) + datetime.timedelta(days=doy-1) for minute_of_day in sorted(dict_median[station]): current_datetime = current_date + datetime.timedelta(minutes=minute_of_day) list_median_time.append(current_datetime) data_median = float(dict_median[station][minute_of_day]) list_median.append(data_median) plt.figure(figsize=(30, 10), dpi=100) param_unit = ("$\mathregular{10^{12} el/m^3}$" if param=="NmF2" else "km") plt.ylabel(param + " / " + param_unit) plt.plot(list_value_time, list_value, label="value", linewidth=0.5, marker=".", color="r") plt.plot(list_median_time, list_median, label="median", linewidth=0.5, marker=".", color="b") plt.plot(list_value_time, list_bounds_lower, label="lower bounds", linewidth=0.5, color="m", marker=",", markerfacecolor="k") plt.plot(list_value_time, list_bounds_upper, label="upper bounds", linewidth=0.5, color="g", marker=",", markerfacecolor="k") plt.grid(True) plt.legend(framealpha=0.5) plt.title(station) plt.savefig(os.path.join(config.path_output, param + "_" + station + ".png")) plt.close()
[docs]def main(): filter = FilterOption4() filter.filter4("NmF2") filter.filter4("hmF2")
if __name__ == "__main__": main()