Source code for filter_scikit

#!/usr/bin/env python
#-------------------------------------------------------------------------------
# Name:         filter_scikit.py
#
# Purpose:      This software filters ionosonde data using sklearn.covariance
#               MCD_simple uses only one ionospheric parameter (NmF2/hmF2)
#               MCD_correlation uses both (NmF2 and hmF2)
#
# Input:        Please customize the input parameters and paths in config.py first
#               Ionosonde data in the output format of DownloadClient
#
# Output:       Text file containing all outliers
#               Plots of hmF2/NmF2 values and outliers
#
# HowToStart:   python filter_scikit.py
#
# Needs:        python2.7.8, matplotlib 1.4.0, numpy 1.8.2, sklearn 0.15.1
#
# Version:      $Id: filter_scikit.py
#               $HeadURL: https://svn.dlr.de/GNSSsoft/trunk/UserUtils/CBojarraUtils/Filter/filter_scikit.py $
#
# Author:       Chris Bojarra
#
# Created:      17.11.2014
# Copyright:    (c) Chris Bojarra 2014
# Mail:         Chris.Bojarra@dlr.de
# Licence:      DLR
#-------------------------------------------------------------------------------

import datetime
import logging
import matplotlib.pyplot as plt
import numpy
import os
import sklearn.covariance

import config
import reader_ionodata

[docs]class FilterScikit(object): """ Main class of filter_scikit.py """ def __init__(self): self.logger = logging.getLogger(__name__) self.logger.setLevel(logging.INFO) handler = logging.FileHandler(config.path_output_log) self.logger.addHandler(handler) def __del__(self): pass def filter_mcd_simple(self, param): print "Filter MCD_simple " + param reader = reader_ionodata.Reader_IonoData() if(param == "NmF2"): IS = reader.convert_fof2_nmf2() maxValue = config.maxNmF2 minValue = config.minNmF2 elif(param == "hmF2"): IS = reader.read_iono_data("hmF2") maxValue = config.maxhmF2 minValue = config.minhmF2 if config.bool_exclude_extreme_outliers: IS = self.exclusion_outliers(IS, maxValue, minValue) dict_fit = self.fit_dict(IS) dict_filter = self.calc_filter_mcd_simple(dict_fit) self.output_outliers(dict_filter, param) self.plot_data_outliers(dict_filter, param) def filter_mcd_correlation(self): print "Filter MCD_correlation" reader = reader_ionodata.Reader_IonoData() NmF2_IS = reader.convert_fof2_nmf2() hmF2_IS = reader.read_iono_data("hmF2") if config.bool_exclude_extreme_outliers: NmF2_IS = self.exclusion_outliers(NmF2_IS, config.maxNmF2, config.minNmF2) hmF2_IS = self.exclusion_outliers(hmF2_IS, config.maxhmF2, config.minhmF2) NmF2_fit = self.fit_dict(NmF2_IS) hmF2_fit = self.fit_dict(hmF2_IS) dict_filter_NmF2_hmF2 = self.calc_filter_mcd_correlation(NmF2_fit, hmF2_fit) dict_filter = self.reduce_dict_param(dict_filter_NmF2_hmF2, "NmF2") self.output_outliers(dict_filter, "NmF2") self.plot_data_outliers(dict_filter, "NmF2") dict_filter = self.reduce_dict_param(dict_filter_NmF2_hmF2, "hmF2") self.output_outliers(dict_filter, "hmF2") self.plot_data_outliers(dict_filter, "hmF2")
[docs] def exclusion_outliers(self, dict_IS, maxValue, minValue): """ This method returns a dictionary of those values of dict_input, which don't exceed the minValue-maxValue bound :param dict dict_IS: dictionary of ionosonde data, format dict["station"]["YYYY-MM-DD hh:mm"] = float :param float maxValue: upper bound :param float minValue: lower bound :return: dictionary of those values of dict_input, which don't exceed the minValue-maxValue bound, format dict["station"]["YYYY-MM-DD hh:mm"] = float :rtype: dict """ print "Calculate extreme outliers" dict_excl = {} for station in dict_IS: dict_excl[station] = {} for data_time in [item for item in dict_IS[station] if item!="Meta"]: value = dict_IS[station][data_time] if(value < maxValue and value > minValue): dict_excl[station][data_time] = value return dict_excl
[docs] def fit_dict(self, dict_IS): """ This method reshapes dictionaries to work with :py:func:`calc_filter_mcd_simple` and :py:func:`calc_filter_mcd_correlation` :param dict dict_IS: dictionary of ionosonde data, format dict["station"]["YYYY-MM-DD hh:mm"] = float :return: dictionary of ionosonde data, format dict["station"][minute_of_day (interval of 15 minutes)] = ["YYYY-MM-DD hh:mm", float] :rtype: dict """ print "Reshape dictionary" dict_output = {} for station in dict_IS: dict_output[station] = {} for minute_of_day in range(0, 24*60, 15): for doy in range(config.doy_start, config.doy_end + 1): currentdate = datetime.datetime(config.year_start, 1, 1) + datetime.timedelta(days=doy-1) time_min = max(0, minute_of_day - config.time_to_look) time_max = min(24*60, minute_of_day + config.time_to_look) for mins in range(time_min, time_max + 1): currentdatetime = (currentdate + datetime.timedelta(minutes=mins)).strftime("%Y-%m-%d %H:%M") if currentdatetime in dict_IS[station]: value = dict_IS[station][currentdatetime] if minute_of_day in dict_output[station]: dict_output[station][minute_of_day].append([currentdatetime, value]) else: dict_output[station][minute_of_day] = [[currentdatetime, value]] if(minute_of_day==0): for mins in range(24*60 - config.time_to_look, 24*60): currentdatetime = (currentdate + datetime.timedelta(minutes=mins)).strftime("%Y-%m-%d %H:%M") if currentdatetime in dict_IS[station]: value = dict_IS[station][currentdatetime] if minute_of_day in dict_output[station]: dict_output[station][minute_of_day].append([currentdatetime, value]) else: dict_output[station][minute_of_day] = [[currentdatetime, value]] return dict_output
[docs] def calc_filter_mcd_simple(self, dict_fit): """ This method filters ionosonde data with sklearn.covariance, using only one ionosonde parameter :param dict dict_fit: dictionary of ionosonde data, format dict["station"][minute_of_day (interval of 15 minutes)] = ["YYYY-MM-DD hh:mm", float] (output of :py:func:`fit_dict`) :return: dictionary containing ionosonde data and boolean value ("Is this value an outlier?"), format dict["station"]["YYYY-MM-DD hh:mm"] = [float, bool_outlier] :rtype: dict """ print "Calculate filter MCD_simple" dict_filter = {} for station in sorted(dict_fit): print "Calculate filter " + station dict_filter[station] = {} for minute_of_day in sorted(dict_fit[station]): hh = minute_of_day/60 mm = minute_of_day-(hh*60) value_time = datetime.time(hh, mm).strftime("%H:%M") list_dates = [] list_values = [] for currentdatetime, value in dict_fit[station][minute_of_day]: list_dates.append(currentdatetime) list_values.append(value) try: X = numpy.array([list_values]) X = numpy.swapaxes(X, 0, 1) robust_cov = sklearn.covariance.MinCovDet().fit(X) reweight_cov_rob = robust_cov.reweight_covariance(X) outlier_rob = numpy.logical_not(reweight_cov_rob[2]) except ValueError: outlier_rob = [False]*len(list_dates) self.logger.info("ERR MinCovDet().fit(X): " + station + " " + value_time + " " + str(list_values)) for currentdatetime in list_dates: i = list_dates.index(currentdatetime) dict_filter[station][currentdatetime] = [list_values[i], outlier_rob[i]] return dict_filter
[docs] def calc_filter_mcd_correlation(self, NmF2_fit, hmF2_fit): """ This method filters ionosonde data with sklearn.covariance, using both ionosonde parameters (NmF2 and hmF2) :param dict NmF2_fit: dictionary of NmF2 data, format dict["station"][minute_of_day (interval of 15 minutes)] = ["YYYY-MM-DD hh:mm", float] (output of :py:func:`fit_dict`) :param dict hmF2_fit: dictionary of hmF2 data, format dict["station"][minute_of_day (interval of 15 minutes)] = ["YYYY-MM-DD hh:mm", float] (output of :py:func:`fit_dict`) :return: dictionary containing NmF2 and hmF2 and boolean value ("Is this value an outlier?"), format dict["station"]["YYYY-MM-DD hh:mm"] = [float, float, bool_outlier] :rtype: dict """ print "Calculate filter MCD_correlation" dict_filter = {} set_intersect_stations = self.intersect(NmF2_fit.keys(), hmF2_fit.keys(), "Station") for station in sorted(set_intersect_stations): print "Calculate filter " + station dict_filter[station] = {} set_intersect_mod = self.intersect(NmF2_fit[station].keys(), hmF2_fit[station].keys(), station + " Minute of day: ") for minute_of_day in sorted(set_intersect_mod): hh = minute_of_day/60 mm = minute_of_day-(hh*60) value_time = datetime.time(hh, mm).strftime("%H:%M") list_NmF2_dates = [] list_NmF2_all_values = [] list_NmF2_values = [] list_hmF2_dates = [] list_hmF2_all_values = [] list_hmF2_values = [] for currentdatetime, value in NmF2_fit[station][minute_of_day]: list_NmF2_dates.append(currentdatetime) list_NmF2_all_values.append(value) for currentdatetime, value in hmF2_fit[station][minute_of_day]: list_hmF2_dates.append(currentdatetime) list_hmF2_all_values.append(value) set_intersect_dates = self.intersect(list_NmF2_dates, list_hmF2_dates, station) list_intersect_dates = list(set_intersect_dates) for currentdatetime in list_intersect_dates: i_NmF2 = list_NmF2_dates.index(currentdatetime) list_NmF2_values.append(list_NmF2_all_values[i_NmF2]) i_hmF2 = list_hmF2_dates.index(currentdatetime) list_hmF2_values.append(list_hmF2_all_values[i_hmF2]) try: X = numpy.array([list_NmF2_values, list_hmF2_values]) X = numpy.swapaxes(X, 0, 1) robust_cov = sklearn.covariance.MinCovDet().fit(X) reweight_cov_rob = robust_cov.reweight_covariance(X) outlier_rob = numpy.logical_not(reweight_cov_rob[2]) except ValueError: outlier_rob = [False]*len(list_intersect_dates) self.logger.info("ERR MinCovDet().fit(X): " + station + " " + value_time + " " + str(list_NmF2_values) + " " + str(list_hmF2_values)) for currentdatetime in list_intersect_dates: i = list_intersect_dates.index(currentdatetime) dict_filter[station][currentdatetime] = [list_NmF2_values[i], list_hmF2_values[i], outlier_rob[i]] return dict_filter
[docs] def intersect(self, data_NmF2, data_hmF2, log_message): """ This method returns the intersection of two objects and logs those elements, which are not in the intersection, meaning that they are no longer used :param iterable data_NmF2: iterable object containing NmF2 data :param iterable data_hmF2: iterable object containing hmF2 data :param string log_message: additional text to output in the log file :return: set of the intersection of data_NmF2 and data_hmF2 :rtype: set """ set_data_NmF2 = set(data_NmF2) set_data_hmF2 = set(data_hmF2) set_intersect = set_data_NmF2.intersection(set_data_hmF2) set_only_data_NmF2 = set_data_NmF2 - set_intersect set_only_data_hmF2 = set_data_hmF2 - set_intersect if set_only_data_NmF2: self.logger.info("Only in NmF2: " + log_message + " " + ", ".join(str(n) for n in set_only_data_NmF2)) if set_only_data_hmF2: self.logger.info("Only in hmF2: " + log_message + " " + ", ".join(str(n) for n in set_only_data_hmF2)) return set_intersect
[docs] def reduce_dict_param(self, dict_filter_NmF2_hmF2, param): """ This method extract NmF2/hmF2 values from the dictionary returned with :py:func:`calc_filter_mcd_correlation` :param dict dict_filter_NmF2_hmF2: dictionary containing NmF2, hmF2 and boolean values (return of :py:func:`calc_filter_mcd_correlation`) :param string param: "NmF2"/"hmF2" :return: dictionary containing NmF2 or hmF2 values and boolean values, format dict["station"]["YYYY-MM-DD hh:mm"] = [float, bool_outlier] :rtype: dict """ dict_filter = {} for station in dict_filter_NmF2_hmF2: dict_filter[station] = {} for currentdatetime in dict_filter_NmF2_hmF2[station]: value = dict_filter_NmF2_hmF2[station][currentdatetime] if(param == "NmF2"): dict_filter[station][currentdatetime] = [value[0], value[2]] elif(param == "hmF2"): dict_filter[station][currentdatetime] = [value[1], value[2]] return dict_filter
[docs] def output_outliers(self, dict_filter, param): """ This method creates a file containing all outliers :param dict data_filter: dictionary containing ionosonde data and boolean, format dict["station"]["YYYY-MM-DD hh:mm"] = [float, bool_outlier] (output of :py:func:`calc_filter_mcd_simple` or :py:func:`calc_filter_mcd_correlation`) :param string param: "NmF2"/"hmF2" """ print "Output outliers" fw = open(os.path.join(config.path_output, "Outlier_" + param + ".txt"), "w") fw.write("#URSI YYYY-MM-DD hh:mm value\n") for station in sorted(dict_filter): for value_datetime in sorted(dict_filter[station]): value, bool_outlier = dict_filter[station][value_datetime] dec_places = 8 str_value = str(round(value, dec_places)) if bool_outlier: fw.write(station + " " + value_datetime + " " + str_value + "\n") fw.close()
[docs] def plot_data_outliers(self, dict_filter, param): """ This method creates png plots of hmF2/NmF2 values and outliers :param dict data_filter: dictionary containing ionosonde data and boolean, format dict["station"]["YYYY-MM-DD hh:mm"] = [float, bool_outlier] (output of :py:func:`calc_filter_mcd_simple` or :py:func:`calc_filter_mcd_correlation`) :param string param: "NmF2"/"hmF2" """ print "Output plots" for station in sorted(dict_filter): print "Plot - " + param + " - " + station list_value = [] list_value_time = [] list_outlier = [] list_outlier_time = [] for value_datetime in sorted(dict_filter[station]): value, bool_outlier = dict_filter[station][value_datetime] YYYY = int(value_datetime[0:4]) MM = int(value_datetime[5:7]) DD = int(value_datetime[8:10]) hh = int(value_datetime[11:13]) mm = int(value_datetime[14:16]) time = datetime.datetime(YYYY, MM, DD, hh, mm) if bool_outlier: list_outlier_time.append(time) list_outlier.append(value) else: list_value_time.append(time) list_value.append(value) plt.figure(figsize=(30, 10), dpi=100) param_unit = ("$\mathregular{10^{12} el/m^3}$" if param=="NmF2" else "km") plt.ylabel(param + " / " + param_unit) plt.plot(list_value_time, list_value, label="value", linewidth=0.5, marker=".", color="r") plt.plot(list_outlier_time, list_outlier, label="outlier", linestyle="None", marker=".", color="b") plt.grid(True) plt.legend(framealpha=0.5) plt.title(station) plt.savefig(os.path.join(config.path_output, param + "_" + station + ".png")) plt.close()
def main(): filter = FilterScikit() #filter.filter_mcd_simple("NmF2") #filter.filter_mcd_simple("hmF2") filter.filter_mcd_correlation() if __name__ == "__main__": main()