#!/usr/bin/env python
#-------------------------------------------------------------------------------
# Name: filter_scikit.py
#
# Purpose: This software filters ionosonde data using sklearn.covariance
# MCD_simple uses only one ionospheric parameter (NmF2/hmF2)
# MCD_correlation uses both (NmF2 and hmF2)
#
# Input: Please customize the input parameters and paths in config.py first
# Ionosonde data in the output format of DownloadClient
#
# Output: Text file containing all outliers
# Plots of hmF2/NmF2 values and outliers
#
# HowToStart: python filter_scikit.py
#
# Needs: python2.7.8, matplotlib 1.4.0, numpy 1.8.2, sklearn 0.15.1
#
# Version: $Id: filter_scikit.py
# $HeadURL: https://svn.dlr.de/GNSSsoft/trunk/UserUtils/CBojarraUtils/Filter/filter_scikit.py $
#
# Author: Chris Bojarra
#
# Created: 17.11.2014
# Copyright: (c) Chris Bojarra 2014
# Mail: Chris.Bojarra@dlr.de
# Licence: DLR
#-------------------------------------------------------------------------------
import datetime
import logging
import matplotlib.pyplot as plt
import numpy
import os
import sklearn.covariance
import config
import reader_ionodata
[docs]class FilterScikit(object):
"""
Main class of filter_scikit.py
"""
def __init__(self):
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler(config.path_output_log)
self.logger.addHandler(handler)
def __del__(self):
pass
def filter_mcd_simple(self, param):
print "Filter MCD_simple " + param
reader = reader_ionodata.Reader_IonoData()
if(param == "NmF2"):
IS = reader.convert_fof2_nmf2()
maxValue = config.maxNmF2
minValue = config.minNmF2
elif(param == "hmF2"):
IS = reader.read_iono_data("hmF2")
maxValue = config.maxhmF2
minValue = config.minhmF2
if config.bool_exclude_extreme_outliers:
IS = self.exclusion_outliers(IS, maxValue, minValue)
dict_fit = self.fit_dict(IS)
dict_filter = self.calc_filter_mcd_simple(dict_fit)
self.output_outliers(dict_filter, param)
self.plot_data_outliers(dict_filter, param)
def filter_mcd_correlation(self):
print "Filter MCD_correlation"
reader = reader_ionodata.Reader_IonoData()
NmF2_IS = reader.convert_fof2_nmf2()
hmF2_IS = reader.read_iono_data("hmF2")
if config.bool_exclude_extreme_outliers:
NmF2_IS = self.exclusion_outliers(NmF2_IS, config.maxNmF2, config.minNmF2)
hmF2_IS = self.exclusion_outliers(hmF2_IS, config.maxhmF2, config.minhmF2)
NmF2_fit = self.fit_dict(NmF2_IS)
hmF2_fit = self.fit_dict(hmF2_IS)
dict_filter_NmF2_hmF2 = self.calc_filter_mcd_correlation(NmF2_fit, hmF2_fit)
dict_filter = self.reduce_dict_param(dict_filter_NmF2_hmF2, "NmF2")
self.output_outliers(dict_filter, "NmF2")
self.plot_data_outliers(dict_filter, "NmF2")
dict_filter = self.reduce_dict_param(dict_filter_NmF2_hmF2, "hmF2")
self.output_outliers(dict_filter, "hmF2")
self.plot_data_outliers(dict_filter, "hmF2")
[docs] def exclusion_outliers(self, dict_IS, maxValue, minValue):
"""
This method returns a dictionary of those values of dict_input, which don't exceed the minValue-maxValue bound
:param dict dict_IS: dictionary of ionosonde data, format dict["station"]["YYYY-MM-DD hh:mm"] = float
:param float maxValue: upper bound
:param float minValue: lower bound
:return: dictionary of those values of dict_input, which don't exceed the minValue-maxValue bound, format dict["station"]["YYYY-MM-DD hh:mm"] = float
:rtype: dict
"""
print "Calculate extreme outliers"
dict_excl = {}
for station in dict_IS:
dict_excl[station] = {}
for data_time in [item for item in dict_IS[station] if item!="Meta"]:
value = dict_IS[station][data_time]
if(value < maxValue and value > minValue):
dict_excl[station][data_time] = value
return dict_excl
[docs] def fit_dict(self, dict_IS):
"""
This method reshapes dictionaries to work with :py:func:`calc_filter_mcd_simple` and :py:func:`calc_filter_mcd_correlation`
:param dict dict_IS: dictionary of ionosonde data, format dict["station"]["YYYY-MM-DD hh:mm"] = float
:return: dictionary of ionosonde data, format dict["station"][minute_of_day (interval of 15 minutes)] = ["YYYY-MM-DD hh:mm", float]
:rtype: dict
"""
print "Reshape dictionary"
dict_output = {}
for station in dict_IS:
dict_output[station] = {}
for minute_of_day in range(0, 24*60, 15):
for doy in range(config.doy_start, config.doy_end + 1):
currentdate = datetime.datetime(config.year_start, 1, 1) + datetime.timedelta(days=doy-1)
time_min = max(0, minute_of_day - config.time_to_look)
time_max = min(24*60, minute_of_day + config.time_to_look)
for mins in range(time_min, time_max + 1):
currentdatetime = (currentdate + datetime.timedelta(minutes=mins)).strftime("%Y-%m-%d %H:%M")
if currentdatetime in dict_IS[station]:
value = dict_IS[station][currentdatetime]
if minute_of_day in dict_output[station]:
dict_output[station][minute_of_day].append([currentdatetime, value])
else:
dict_output[station][minute_of_day] = [[currentdatetime, value]]
if(minute_of_day==0):
for mins in range(24*60 - config.time_to_look, 24*60):
currentdatetime = (currentdate + datetime.timedelta(minutes=mins)).strftime("%Y-%m-%d %H:%M")
if currentdatetime in dict_IS[station]:
value = dict_IS[station][currentdatetime]
if minute_of_day in dict_output[station]:
dict_output[station][minute_of_day].append([currentdatetime, value])
else:
dict_output[station][minute_of_day] = [[currentdatetime, value]]
return dict_output
[docs] def calc_filter_mcd_simple(self, dict_fit):
"""
This method filters ionosonde data with sklearn.covariance, using only one ionosonde parameter
:param dict dict_fit: dictionary of ionosonde data, format dict["station"][minute_of_day (interval of 15 minutes)] = ["YYYY-MM-DD hh:mm", float] (output of :py:func:`fit_dict`)
:return: dictionary containing ionosonde data and boolean value ("Is this value an outlier?"), format dict["station"]["YYYY-MM-DD hh:mm"] = [float, bool_outlier]
:rtype: dict
"""
print "Calculate filter MCD_simple"
dict_filter = {}
for station in sorted(dict_fit):
print "Calculate filter " + station
dict_filter[station] = {}
for minute_of_day in sorted(dict_fit[station]):
hh = minute_of_day/60
mm = minute_of_day-(hh*60)
value_time = datetime.time(hh, mm).strftime("%H:%M")
list_dates = []
list_values = []
for currentdatetime, value in dict_fit[station][minute_of_day]:
list_dates.append(currentdatetime)
list_values.append(value)
try:
X = numpy.array([list_values])
X = numpy.swapaxes(X, 0, 1)
robust_cov = sklearn.covariance.MinCovDet().fit(X)
reweight_cov_rob = robust_cov.reweight_covariance(X)
outlier_rob = numpy.logical_not(reweight_cov_rob[2])
except ValueError:
outlier_rob = [False]*len(list_dates)
self.logger.info("ERR MinCovDet().fit(X): " + station + " " + value_time + " " + str(list_values))
for currentdatetime in list_dates:
i = list_dates.index(currentdatetime)
dict_filter[station][currentdatetime] = [list_values[i], outlier_rob[i]]
return dict_filter
[docs] def calc_filter_mcd_correlation(self, NmF2_fit, hmF2_fit):
"""
This method filters ionosonde data with sklearn.covariance, using both ionosonde parameters (NmF2 and hmF2)
:param dict NmF2_fit: dictionary of NmF2 data, format dict["station"][minute_of_day (interval of 15 minutes)] = ["YYYY-MM-DD hh:mm", float] (output of :py:func:`fit_dict`)
:param dict hmF2_fit: dictionary of hmF2 data, format dict["station"][minute_of_day (interval of 15 minutes)] = ["YYYY-MM-DD hh:mm", float] (output of :py:func:`fit_dict`)
:return: dictionary containing NmF2 and hmF2 and boolean value ("Is this value an outlier?"), format dict["station"]["YYYY-MM-DD hh:mm"] = [float, float, bool_outlier]
:rtype: dict
"""
print "Calculate filter MCD_correlation"
dict_filter = {}
set_intersect_stations = self.intersect(NmF2_fit.keys(), hmF2_fit.keys(), "Station")
for station in sorted(set_intersect_stations):
print "Calculate filter " + station
dict_filter[station] = {}
set_intersect_mod = self.intersect(NmF2_fit[station].keys(), hmF2_fit[station].keys(), station + " Minute of day: ")
for minute_of_day in sorted(set_intersect_mod):
hh = minute_of_day/60
mm = minute_of_day-(hh*60)
value_time = datetime.time(hh, mm).strftime("%H:%M")
list_NmF2_dates = []
list_NmF2_all_values = []
list_NmF2_values = []
list_hmF2_dates = []
list_hmF2_all_values = []
list_hmF2_values = []
for currentdatetime, value in NmF2_fit[station][minute_of_day]:
list_NmF2_dates.append(currentdatetime)
list_NmF2_all_values.append(value)
for currentdatetime, value in hmF2_fit[station][minute_of_day]:
list_hmF2_dates.append(currentdatetime)
list_hmF2_all_values.append(value)
set_intersect_dates = self.intersect(list_NmF2_dates, list_hmF2_dates, station)
list_intersect_dates = list(set_intersect_dates)
for currentdatetime in list_intersect_dates:
i_NmF2 = list_NmF2_dates.index(currentdatetime)
list_NmF2_values.append(list_NmF2_all_values[i_NmF2])
i_hmF2 = list_hmF2_dates.index(currentdatetime)
list_hmF2_values.append(list_hmF2_all_values[i_hmF2])
try:
X = numpy.array([list_NmF2_values, list_hmF2_values])
X = numpy.swapaxes(X, 0, 1)
robust_cov = sklearn.covariance.MinCovDet().fit(X)
reweight_cov_rob = robust_cov.reweight_covariance(X)
outlier_rob = numpy.logical_not(reweight_cov_rob[2])
except ValueError:
outlier_rob = [False]*len(list_intersect_dates)
self.logger.info("ERR MinCovDet().fit(X): " + station + " " + value_time + " " + str(list_NmF2_values) + " " + str(list_hmF2_values))
for currentdatetime in list_intersect_dates:
i = list_intersect_dates.index(currentdatetime)
dict_filter[station][currentdatetime] = [list_NmF2_values[i], list_hmF2_values[i], outlier_rob[i]]
return dict_filter
[docs] def intersect(self, data_NmF2, data_hmF2, log_message):
"""
This method returns the intersection of two objects and logs those elements, which are not in the intersection, meaning that they are no longer used
:param iterable data_NmF2: iterable object containing NmF2 data
:param iterable data_hmF2: iterable object containing hmF2 data
:param string log_message: additional text to output in the log file
:return: set of the intersection of data_NmF2 and data_hmF2
:rtype: set
"""
set_data_NmF2 = set(data_NmF2)
set_data_hmF2 = set(data_hmF2)
set_intersect = set_data_NmF2.intersection(set_data_hmF2)
set_only_data_NmF2 = set_data_NmF2 - set_intersect
set_only_data_hmF2 = set_data_hmF2 - set_intersect
if set_only_data_NmF2:
self.logger.info("Only in NmF2: " + log_message + " " + ", ".join(str(n) for n in set_only_data_NmF2))
if set_only_data_hmF2:
self.logger.info("Only in hmF2: " + log_message + " " + ", ".join(str(n) for n in set_only_data_hmF2))
return set_intersect
[docs] def reduce_dict_param(self, dict_filter_NmF2_hmF2, param):
"""
This method extract NmF2/hmF2 values from the dictionary returned with :py:func:`calc_filter_mcd_correlation`
:param dict dict_filter_NmF2_hmF2: dictionary containing NmF2, hmF2 and boolean values (return of :py:func:`calc_filter_mcd_correlation`)
:param string param: "NmF2"/"hmF2"
:return: dictionary containing NmF2 or hmF2 values and boolean values, format dict["station"]["YYYY-MM-DD hh:mm"] = [float, bool_outlier]
:rtype: dict
"""
dict_filter = {}
for station in dict_filter_NmF2_hmF2:
dict_filter[station] = {}
for currentdatetime in dict_filter_NmF2_hmF2[station]:
value = dict_filter_NmF2_hmF2[station][currentdatetime]
if(param == "NmF2"):
dict_filter[station][currentdatetime] = [value[0], value[2]]
elif(param == "hmF2"):
dict_filter[station][currentdatetime] = [value[1], value[2]]
return dict_filter
[docs] def output_outliers(self, dict_filter, param):
"""
This method creates a file containing all outliers
:param dict data_filter: dictionary containing ionosonde data and boolean, format dict["station"]["YYYY-MM-DD hh:mm"] = [float, bool_outlier] (output of :py:func:`calc_filter_mcd_simple` or :py:func:`calc_filter_mcd_correlation`)
:param string param: "NmF2"/"hmF2"
"""
print "Output outliers"
fw = open(os.path.join(config.path_output, "Outlier_" + param + ".txt"), "w")
fw.write("#URSI YYYY-MM-DD hh:mm value\n")
for station in sorted(dict_filter):
for value_datetime in sorted(dict_filter[station]):
value, bool_outlier = dict_filter[station][value_datetime]
dec_places = 8
str_value = str(round(value, dec_places))
if bool_outlier:
fw.write(station + " " + value_datetime + " " + str_value + "\n")
fw.close()
[docs] def plot_data_outliers(self, dict_filter, param):
"""
This method creates png plots of hmF2/NmF2 values and outliers
:param dict data_filter: dictionary containing ionosonde data and boolean, format dict["station"]["YYYY-MM-DD hh:mm"] = [float, bool_outlier] (output of :py:func:`calc_filter_mcd_simple` or :py:func:`calc_filter_mcd_correlation`)
:param string param: "NmF2"/"hmF2"
"""
print "Output plots"
for station in sorted(dict_filter):
print "Plot - " + param + " - " + station
list_value = []
list_value_time = []
list_outlier = []
list_outlier_time = []
for value_datetime in sorted(dict_filter[station]):
value, bool_outlier = dict_filter[station][value_datetime]
YYYY = int(value_datetime[0:4])
MM = int(value_datetime[5:7])
DD = int(value_datetime[8:10])
hh = int(value_datetime[11:13])
mm = int(value_datetime[14:16])
time = datetime.datetime(YYYY, MM, DD, hh, mm)
if bool_outlier:
list_outlier_time.append(time)
list_outlier.append(value)
else:
list_value_time.append(time)
list_value.append(value)
plt.figure(figsize=(30, 10), dpi=100)
param_unit = ("$\mathregular{10^{12} el/m^3}$" if param=="NmF2" else "km")
plt.ylabel(param + " / " + param_unit)
plt.plot(list_value_time, list_value, label="value", linewidth=0.5, marker=".", color="r")
plt.plot(list_outlier_time, list_outlier, label="outlier", linestyle="None", marker=".", color="b")
plt.grid(True)
plt.legend(framealpha=0.5)
plt.title(station)
plt.savefig(os.path.join(config.path_output, param + "_" + station + ".png"))
plt.close()
def main():
filter = FilterScikit()
#filter.filter_mcd_simple("NmF2")
#filter.filter_mcd_simple("hmF2")
filter.filter_mcd_correlation()
if __name__ == "__main__":
main()