#!/usr/bin/env python
#-------------------------------------------------------------------------------
# Name: filter_option4.py
#
# Purpose: This software filters ionosonde data with Filter Option 4 specified in TN_TG_07_03_14_IS_datenFilterung.docx by Tatjana Gerzen
# This is a translation of the MATLAB program IS_QualityCheck/MainIonosonde_QualityCheck.m by Tatjana Gerzen
#
# Input: Please customize the input parameters and paths in config.py first
# Ionosonde data in the output format of DownloadClient
#
# Output: Text file containing all outliers
# Plots of hmF2/NmF2 values, median, lower and upper bounds
#
# HowToStart: python filter_option4.py
#
# Needs: python2.7.8, matplotlib 1.4.0, numpy 1.8.2, scipy 0.14.0
#
# Version: $Id: filter_option4.py
# $HeadURL: https://svn.dlr.de/GNSSsoft/trunk/UserUtils/CBojarraUtils/Filter/filter_option4.py $
#
# Author: Chris Bojarra
#
# Created: 03.11.2014
# Copyright: (c) Chris Bojarra 2014
# Mail: Chris.Bojarra@dlr.de
# Licence: DLR
#-------------------------------------------------------------------------------
import datetime
import matplotlib.pyplot as plt
import numpy
import os
import scipy.interpolate
import config
import reader_ionodata
[docs]class FilterOption4(object):
"""
Main class of filter_option4.py
"""
def __init__(self):
pass
def __del__(self):
pass
[docs] def filter4(self, param):
"""
Main method of filter_option4.py
:param string param: "NmF2"/"hmF2"
"""
print "Filter Option4 " + param
reader = reader_ionodata.Reader_IonoData()
if(param == "NmF2"):
IS = reader.convert_fof2_nmf2()
maxValue = config.maxNmF2
minValue = config.minNmF2
elif(param == "hmF2"):
IS = reader.read_iono_data("hmF2")
maxValue = config.maxhmF2
minValue = config.minhmF2
IS_excl = self.exclusion_outliers(IS, maxValue, minValue)
median = self.statistics_daily(IS_excl, "median")
std = self.statistics_daily(IS_excl, "std")
std_factor = self.calc_STD_factor_Kp(std, param)
bounds_lower = self.calc_bounds(median, std, std_factor, "lower")
bounds_upper = self.calc_bounds(median, std, std_factor, "upper")
dict_interp_lower = self.interpolate_datetime_to_minute(bounds_lower)
dict_interp_upper = self.interpolate_datetime_to_minute(bounds_upper)
self.output_outliers(IS_excl, dict_interp_lower, dict_interp_upper, param)
self.plot_data_median_bounds(IS_excl, median, dict_interp_lower, dict_interp_upper, param)
[docs] def exclusion_outliers(self, dict_IS, maxValue, minValue):
"""
This method returns a dictionary of those values of dict_input, which don't exceed the minValue-maxValue bound
:param dict dict_IS: dictionary of ionosonde data, format dict["station"]["YYYY-MM-DD hh:mm"] = float
:param float maxValue: upper bound
:param float minValue: lower bound
:return: dictionary of those values of dict_input, which don't exceed the minValue-maxValue bound, format dict["station"]["YYYY-MM-DD hh:mm"] = float
:rtype: dict
"""
print "Calculate extreme outliers"
dict_excl = {}
for station in dict_IS:
dict_excl[station] = {}
for data_time in [item for item in dict_IS[station] if item!="Meta"]:
value = dict_IS[station][data_time]
if(value < maxValue and value > minValue):
dict_excl[station][data_time] = value
return dict_excl
[docs] def statistics_daily(self, dict_IS, statistics):
"""
This method returns a dictionary with statistical values in interval of 15 minutes
:param dict dict_IS: dictionary of ionosonde data, format dict["station"]["YYYY-MM-DD hh:mm"] = float
:param string statistics: "std" (standard deviation)/"median"
:return: dictionary with statistical values for one day, format dict["station"][minute_of_day] = float
:rtype: dict
"""
print "Calculate " + statistics
dict_stat = {}
for station in dict_IS:
dict_stat[station] = {}
dict_temp = {}
for minute_of_day in range(0, 24*60, 15):
for doy in range(config.doy_start, config.doy_end + 1):
currentdate = datetime.datetime(config.year_start, 1, 1) + datetime.timedelta(days=doy-1)
time_min = max(0, minute_of_day - config.time_to_look)
time_max = min(24*60, minute_of_day + config.time_to_look)
for mins in range(time_min, time_max + 1):
currentdatetime = (currentdate + datetime.timedelta(minutes=mins)).strftime("%Y-%m-%d %H:%M")
if currentdatetime in dict_IS[station]:
value = dict_IS[station][currentdatetime]
if minute_of_day in dict_temp:
dict_temp[minute_of_day].append(value)
else:
dict_temp[minute_of_day] = [value]
if(minute_of_day==0):
for mins in range(24*60 - config.time_to_look, 24*60):
currentdatetime = (currentdate + datetime.timedelta(minutes=mins)).strftime("%Y-%m-%d %H:%M")
if currentdatetime in dict_IS[station]:
value = dict_IS[station][currentdatetime]
if minute_of_day in dict_temp:
dict_temp[minute_of_day].append(value)
else:
dict_temp[minute_of_day] = [value]
for minute_of_day in dict_temp:
if(statistics == "std"):
dict_stat[station][minute_of_day] = (0.0 if len(dict_temp[minute_of_day])==1 else numpy.std(dict_temp[minute_of_day], ddof=1)) # delta degrees of freedom, i.e. 1/(n-ddof) * ...
if(statistics == "median"):
dict_stat[station][minute_of_day] = numpy.median(dict_temp[minute_of_day])
else:
pass
return dict_stat
#
#
[docs] def calc_STD_factor_Kp(self, dict_std, param):
"""
This method calculates a STD factor for all days between the dates specified in config.py and all times in dict_std
STD factor is chosen from config.py depending on hourly Kp values and daily F10.7 values read with reader_ionodata.py
:param dict dict_std: dictionary of std values (return of statistics_daily(dict_IS, "std"))
:param string param: "NmF2"/"hmF2"
:return: dictionary with std factors, format dict["station"]["YYYY-MM-DD hh:mm"] = float
:rtype: dict
"""
print "Calculate std factor"
dict_factor = {}
reader = reader_ionodata.Reader_IonoData()
dict_f10p7 = reader.read_F10p7()
interp = reader.read_Kp_interp()
if(param=="NmF2"):
STD_factor_low = config.STD_factor_NmF2_low
STD_factor_mid = config.STD_factor_NmF2_mid
STD_factor_high = config.STD_factor_NmF2_high
elif(param=="hmF2"):
STD_factor_low = config.STD_factor_hmF2_low
STD_factor_mid = config.STD_factor_hmF2_mid
STD_factor_high = config.STD_factor_hmF2_high
date_start = datetime.datetime(config.year_start, 1, 1) + datetime.timedelta(config.doy_start - 1)
for station in dict_std:
dict_factor[station] = {}
for doy in range(config.doy_start, config.doy_end + 2):
currentdate = datetime.datetime(config.year_start, 1, 1) + datetime.timedelta(days=doy-1)
iso = currentdate.date().isoformat()
F10p7_d = dict_f10p7[iso]
# for every specified day + minimum of next day
for minute_of_day in ([min(dict_std[station])] if(doy == config.doy_end + 1) else dict_std[station]):
hour_of_year = doy*24 + int(minute_of_day/60)
Kp_h = interp(hour_of_year)
value_datetime = (currentdate + datetime.timedelta(minutes=minute_of_day)).strftime("%Y-%m-%d %H:%M")
if(Kp_h >= config.Kp_mid and F10p7_d >= config.F10p7_mid):
if(Kp_h >= config.Kp_high or F10p7_d >= config.F10p7_high):
dict_factor[station][value_datetime] = STD_factor_high
else:
dict_factor[station][value_datetime] = STD_factor_mid
else:
dict_factor[station][value_datetime] = STD_factor_low
return dict_factor
[docs] def calc_bounds(self, dict_median, dict_std, dict_factor, lowerupper):
"""
This method calculates the bounds from median, std and std factors for all days specified in config.py
:param dict dict_median: dictionary of median values (return of statistics_daily(dict_IS, "median"))
:param dict dict_std: dictionary of std values (return of statistics_daily(dict_IS, "std"))
:param dict dict_factor: dictionary of std factor values (return of calc_STD_factor_Kp(dict_std, param))
:param string lowerupper: "lower"/"upper"
:return: dictionary with lower or upper bounds, format dict["station"]["YYYY-MM-DD hh:mm"] = float
:rtype: dict
"""
print "Calculate " + lowerupper + " bounds"
dict_bounds = {}
for station in dict_factor:
dict_bounds[station] = {}
for value_datetime in dict_factor[station]:
value_time = value_datetime[11:16]
hh = int(value_datetime[11:13])
mm = int(value_datetime[14:16])
minute_of_day = hh*60 + mm
if(lowerupper == "lower"):
dict_bounds[station][value_datetime] = dict_median[station][minute_of_day] - dict_factor[station][value_datetime] * dict_std[station][minute_of_day]
elif(lowerupper == "upper"):
dict_bounds[station][value_datetime] = dict_median[station][minute_of_day] + dict_factor[station][value_datetime] * dict_std[station][minute_of_day]
else:
pass
return dict_bounds
#TODO: no empty dicts as input!
[docs] def interpolate_datetime_to_minute(self, dict_input):
"""
This method returns a dictionary of SciPy interpolation objects for each station
:param dict dict_input: dictionary, format dict["station"]["YYYY-MM-DD hh:mm"] = float
:return: dictionary, format dict["station"] = SciPy interpolation function with minute_of_year as input parameter
:rtype: dict
"""
dict_interp = {}
for station in dict_input:
dict_interp[station] = {}
dict_temp = {}
for value_datetime in dict_input[station]:
moy = self.datetime_to_moy(value_datetime)
dict_temp[moy] = dict_input[station][value_datetime]
interp = scipy.interpolate.interp1d(dict_temp.keys(), dict_temp.values(), assume_sorted=False)
dict_interp[station] = interp
return dict_interp
[docs] def datetime_to_moy(self, value_datetime):
"""
This method converts datetime string ("2014-12-31 23:59") to minute of year
:param string value_datetime: datetime string, format "YYYY-MM-DD hh:mm"
:return: minute_of_year
:rtype: int
"""
YYYY = int(value_datetime[0:4])
MM = int(value_datetime[5:7])
DD = int(value_datetime[8:10])
hh = int(value_datetime[11:13])
mm = int(value_datetime[14:16])
doy = datetime.date(YYYY, MM, DD).strftime("%j")
moy = int(doy)*24*60 + hh*60 + mm
return moy
[docs] def output_outliers(self, data, dict_interp_lower, dict_interp_upper, param):
"""
This method creates a file containing all outliers
:param dict data: dictionary of ionosonde data, format dict["station"]["YYYY-MM-DD hh:mm"] = float
:param dict dict_interp_lower: dictionary of lower bounds, format dict["station"] = SciPy interpolation function
:param dict dict_interp_upper: dictionary of upper bounds, format dict["station"] = SciPy interpolation function
:param string param: "NmF2"/"hmF2"
"""
print "Output outliers"
fw = open(os.path.join(config.path_output, "Outlier_" + param + ".txt"), "w")
fw.write("#URSI YYYY-MM-DD hh:mm value lower upper\n")
for station in sorted(data):
for value_datetime in sorted(data[station]):
data_value = data[station][value_datetime]
moy = self.datetime_to_moy(value_datetime)
interp_lower = dict_interp_lower[station]
interp_upper = dict_interp_upper[station]
data_bounds_lower = interp_lower(moy)
data_bounds_upper = interp_upper(moy)
dec_places = 8
str_value = str(round(data_value, dec_places))
str_lower = str(round(data_bounds_lower, dec_places))
str_upper = str(round(data_bounds_upper, dec_places))
if(data_bounds_upper < data_value or data_value < data_bounds_lower):
fw.write(station + " " + value_datetime + " " + str_value + " " + str_lower + " " + str_upper + "\n")
fw.close()
[docs]def main():
filter = FilterOption4()
filter.filter4("NmF2")
filter.filter4("hmF2")
if __name__ == "__main__":
main()