Source code for tessif.identify.timevarying

# src/tessif/identify/timevarying.py
"""Identify submodule holding identification tools for timevarying results."""
import functools
import pandas as pd

from tessif.identify.core import (
    cluster,
    Identificier,
)

from tessif.identify.auxilliary import (
    filter_mutually_inclusive_columns,
    list_mutually_inclusive_columns,
    list_not_mutually_inclusive_columns,
    parse_reference_df,
)

from tessif.identify.calculate import (
    calc_evs,
    calc_corrs,
    calc_ardiffs,
)

from tessif.identify.timeframes import (
    significant_differences
)


[docs]class TimevaryingIdentificier(Identificier): """Identify components of which the flow results differ between softwares. Flows are identified using following logic: 1. High interest / significantly different: - CORR < :paramref:`~Identificier.corr` (e.g. ``CORR < 0.7``) - ERROR_VALUE > :paramref:`~Identificier.error_value`: (e.g. ``ERROR_VALUE > 0.1``) 2. Medium interest / borderline significantly different: - CORR < :paramref:`~Identificier.corr` (e.g. ``CORR < 0.7``) - ERROR_VALUE < :paramref:`~Identificier.error_value`: (e.g. ``ERROR_VALUE < 0.1``) And: - CORR > :paramref:`~Identificier.corr` (e.g. ``CORR > 0.7``) - ERROR_VALUE > :paramref:`~Identificier.error_value`: (e.g. ``ERROR_VALUE > 0.1``) 3. Low Interest / not significantly different: - CORR > :paramref:`~Identificier.corr` (e.g. ``CORR > 0.7``) - ERROR_VALUE < :paramref:`~Identificier.error_value`: (e.g. ``ERROR_VALUE < 0.1``) Parameters ---------- data: dict Dictionairy of multiindexed dataframes of all of all timevarying results of all components keyed inside the dict by the respective :attr:`software specifier <tessif.frused.defaults.registered_models>` Top-level index represents the individual components, while the second-level index represent the respective outflow targets. Meaning for an energy system like:: A -> B -> C | v D The dataframe would look something like:: A B B C D 0 10 8 2 1 0 0 0 2 20 2 18 Usually returned by something like :attr:`tessif.analyze.ComparativeResultier.all_loads`. error_value: str String abbrevating the error value calculated. Currently supported are: - ``nmae`` for ``Normalized Mean Average Error`` (default) - ``nmbe`` for ``Normalized Mean Biased Error`` - ``nrmse`` for ``Normalized Root Mean Square Error`` error_value_threshold: float, default = 0.1 Threshold value used for clustering data interest. Clustering is done according to :attr:`cluster_conditions`. correlation : {'pearson', 'kendall', 'spearman'} or callable Method of correlation: - pearson : standard correlation coefficient - kendall : Kendall Tau correlation coefficient - spearman : Spearman rank correlation - callable: callable with input two 1d ndarrays and returning a float. correlation_threshold: float, default = 0.7 Threshold value used for clustering data interest. Clustering is done according to :attr:`cluster_conditions`. conditions_dict: dict, None, default=None Dictionairy keying :class:`container(s) <collections.abc.Container>` of dicts by the respective cluster labels "high", "medium" and "low". The dictionairies inside the tuples need to have following keywords: - ``thres`` specyfying the threshold used - ``oprt`` specifying the :mod:`operator` used. If ``None`` is used, following default conditions are applied:: conditions = { "high": ( {"oprt": "lt", "thres": correlation_coefficient_threshold}, {"oprt": "ge", "thres": error_value_threshold}, ), "medium1": ( {"oprt": "ge", "thres": correlation_coefficient_threshold}, {"oprt": "ge", "thres": error_value_threshold}, ), "medium2": ( {"oprt": "lt", "thres": correlation_coefficient_threshold}, {"oprt": "lt", "thres": error_value_threshold}, ), "low": ( {"oprt": "ge", "thres": correlation_coefficient_threshold}, {"oprt": "lt", "thres": error_value_threshold}, ), } reference: str, None, default=None Defines the reference results to be used for calculating the statistical error values and pearson correlation coeficients. In case ``None`` is used (default), the dataframes average is used as returned by :func:`average_timevarying_dataframe_results`. """ def __init__( self, data, error_value="nmae", error_value_threshold=0.1, correlation="pearson", correlation_threshold=0.7, conditions_dict=None, reference=None, ): # init condition-related timevarying specific members self._corr_thres = correlation_threshold self._ev_thres = error_value_threshold if conditions_dict is None: conditions_dict = { "high": ( {"oprt": "lt", "thres": self._corr_thres}, {"oprt": "ge", "thres": self._ev_thres}, ), "medium1": ( {"oprt": "ge", "thres": self._corr_thres}, {"oprt": "ge", "thres": self._ev_thres}, ), "medium2": ( {"oprt": "lt", "thres": self._corr_thres}, {"oprt": "lt", "thres": self._ev_thres}, ), "low": ( {"oprt": "ge", "thres": self._corr_thres}, {"oprt": "lt", "thres": self._ev_thres}, ), } # init not-condition-related timevarying specific members self._ref_arg = reference self._softwares = tuple(data.keys()) self._error_value = error_value self._corr = correlation # identify / seperate the result data self._mutual_components = list_mutually_inclusive_columns( data.values()) self._non_mutual_components = list_not_mutually_inclusive_columns( data.values()) # narrow down data set by only including mutually inclusive components self._inspected_loads = dict( zip( data.keys(), filter_mutually_inclusive_columns(data.values()), ) ) # calculate error values self._error_values = calc_evs( dataframes=self._inspected_loads.values(), labels=self._inspected_loads.keys(), reference=reference, error=error_value, ) # calculate correlations self._corrs = calc_corrs( dataframes=self._inspected_loads.values(), method=self._corr, labels=self._inspected_loads.keys(), reference=reference, fillna=0, ) # super class handles clustering and mapping interest super().__init__( data=data, conditions_dict=conditions_dict, reference=reference, ) # only those timeframes wehre delta > threshold self._map_interest_timeframes() self._map_interest_averaged_results() @property def mutuals(self): """Mutual components between softwares""" return self._mutual_components @property def non_mutuals(self): """Non mutual components between softwares""" return self._non_mutual_components @property def error_value(self): """String specifying the error value used.""" return self._error_value @property def correlation(self): """String specyfying the correlation used.""" return self._corr @property def error_value_threshold(self): """Error value threshold used.""" return self._ev_thres @property def correlation_coefficient_threshold(self): """Correlation value threshold used.""" return self._corr_thres @property def error_values(self): """Calculated Normalized Medium Average Errors.""" return self._error_values @property def corrs(self): """Calculated Pearson Correlation Coefficients.""" return self._corrs @property def high_interest_averaged_results(self): """Mean and high interest results differing more than threshold.""" return self._high_interest_averaged_results @property def medium_interest_averaged_results(self): """Mean and medium interest results differing more than threshold.""" return self._medium_interest_averaged_results @property def low_interest_averaged_results(self): """Mean and low interest results differing more than threshold.""" return self._low_interest_averaged_results @property def high_interest_timeframes(self): """Significant results identified as highly interesting.""" return self._high_interest_timeframes @property def medium_interest_timeframes(self): """Significant results identified as medium interesting.""" return self._medium_interest_timeframes @property def low_interest_timeframes(self): """Significant results identified as not very interesting.""" return self._low_interest_timeframes
[docs] def cluster_interest(self): """Cluster inter component results by interest.""" # aggregate coorelation coefficient results df and # error value results df into one single df of tuples df = pd.DataFrame( { col: zip(self.corrs[col], self.error_values[col]) for col in self.corrs.columns }, index=self.corrs.index, ) clustered_df = df.applymap( # use functools partial to provide additional params to "cluster" functools.partial( cluster, conditions_dict=self.cluster_conditions, ), ) # merge medium1 and medium2 tags to "medium" clustered_df = clustered_df.replace( to_replace=["medium1", "medium2"], value=["medium", "medium"], ) return clustered_df
[docs] def map_interest_results(self, data): """Map data to identified interest categories.""" for cluster in ["high", "medium", "low"]: clustered_results = dict() for flow in tuple(getattr(self, cluster).index): dtf = pd.concat( [data[software][flow] for software in self._softwares], keys=self._softwares, axis="columns", ) clustered_results[flow] = dtf setattr(self, f"_{cluster}_interest_results", clustered_results)
def _map_interest_timeframes(self): for cluster in ["high", "medium", "low"]: clustered_results = dict() for flow in tuple(getattr(self, cluster).index): flow_results = getattr( self, f"_{cluster}_interest_results")[flow].copy() identified_timeframes = significant_differences( data=flow_results, reference=self._ref_arg, neighs=True, threshold=self._ev_thres, ) clustered_results[flow] = identified_timeframes setattr(self, f"_{cluster}_interest_timeframes", clustered_results) def _map_interest_averaged_results(self): for cluster in ["high", "medium", "low"]: clustered_results = dict() for flow in tuple(getattr(self, cluster).index): data = getattr( self, f"_{cluster}_interest_results")[flow].copy() # parse reference ref = parse_reference_df(data, self._ref_arg) # calculate relative deviations method = "ardiffs" if method == "ardiffs": relative_deviations = calc_ardiffs(data, ref) else: relative_deviations = method(data, ref) # identify only those significant relative_significant_differences = relative_deviations[ relative_deviations > self._ev_thres] # construct an all averaged dataframe all_averaged = pd.DataFrame( data=[ref] * len(data.columns), columns=data.index, index=data.columns, ).transpose() # update the all averaged dataframe with those from the original # data where significant differenes were detected mindices = list(relative_significant_differences.stack().index) for idx, col in mindices: all_averaged.at[idx, col] = data.at[idx, col] # identified_timeframes = significant_differences( # data=data, # reference=self._ref_arg, # neighs=True, # threshold=self._ev_thres, # ) # clustered_results[flow] = identified_timeframes clustered_results[flow] = all_averaged setattr(self, f"_{cluster}_interest_averaged_results", clustered_results)