Source code for etdtransform.aggregate

import logging
import os
import re
from typing import Optional

import ibis
import numpy as np
import pandas as pd
from etdmap.data_model import cumulative_columns
from etdmap.index_helpers import read_index, update_meenemen

import etdtransform
from etdtransform.calculated_columns import add_calculated_columns_imputed_data
from etdtransform.impute import process_and_impute

"""
Aggregating the data for a given time interval
Example intervals:
1 hour: '1h'
15 min: '15min'
5 min: '5min'
"""


[docs] def read_hh_data(interval="default", metadata_columns=None): """ Read household data from a parquet file and optionally add index columns to. Parameters ---------- interval : str, optional The time interval of the data to read, by default "default" metadata_columns : list, optional Additional columns to include from the index, by default None Returns ------- pd.DataFrame The household data with optional index columns added Notes ----- This function reads parquet files from a predefined folder path. """ if not metadata_columns: metadata_columns = [] df = pd.read_parquet( os.path.join(etdtransform.options.aggregate_folder_path, f"household_{interval}.parquet"), ) return add_index_columns(df, columns=metadata_columns)
[docs] def add_index_columns(df: pd.DataFrame, columns: Optional[list] = None) -> pd.DataFrame: """ Add index columns to the given DataFrame. Parameters ---------- df : pd.DataFrame The input DataFrame columns : list, optional Additional columns to include from the index, by default None Returns ------- pd.DataFrame The DataFrame with added index columns Notes ----- This function merges the input DataFrame with an index DataFrame based on 'HuisIdBSV' and 'ProjectIdBSV'. """ if columns: index_df, index_path = read_index() columns_to_select = ["HuisIdBSV", "ProjectIdBSV", *columns] columns_to_select = list(set(columns_to_select)) index_df = index_df[columns_to_select] df = df.merge(index_df, on=["HuisIdBSV", "ProjectIdBSV"], how="left") return df else: return df
[docs] def aggregate_hh_data_5min(): """ Aggregate household data into 5-minute intervals. Notes ----- This function reads individual household parquet files, concatenates them, and saves the result as a single parquet file. """ logging.info("Starting to aggregate household data.") index_df = update_meenemen() data_frames = [] index_df = index_df[index_df["Meenemen"]] for _, row in index_df.iterrows(): huis_id_bsv = row["HuisIdBSV"] project_code = row["ProjectIdBSV"] file_name = f"household_{huis_id_bsv}_table.parquet" file_path = os.path.join(etdtransform.options.mapped_folder_path, file_name) household_df = pd.read_parquet(file_path) household_df["ProjectIdBSV"] = project_code household_df["HuisIdBSV"] = huis_id_bsv data_frames.append(household_df) logging.info(f"Added {file_name}") logging.info("Concatenate all HH dataframes.") df = pd.concat(data_frames, ignore_index=True) logging.info("Saving HH data to parquet file.") df.to_parquet( os.path.join(etdtransform.options.aggregate_folder_path, "household_default.parquet"), engine="pyarrow", )
[docs] def impute_hh_data_5min( df, cum_cols=cumulative_columns, sorted=False, diffs_calculated=False, optimized=False, ): """ Impute missing values in household data and save results. Parameters ---------- df : pd.DataFrame, optional The input DataFrame, if None it will be read from a file cum_cols : list, optional List of cumulative columns to process, by default cumulative_columns sorted : bool, optional Whether the data is already sorted, by default False diffs_calculated : bool, optional Whether differences are already calculated, by default False optimized : bool, optional Whether to use optimized processing, by default False Returns ------- pd.DataFrame The imputed household data Notes ----- This function performs imputation, calculates differences, and saves various summary statistics. """ logging.info("Loading HH data from parquet file.") if df is None: df = read_hh_data(interval="default", metadata_columns=["ProjectIdBSV"]) # Call the imputation function logging.info("Starting the imputation.") # df = apply_rolling_iqr_imputation( # df=df, # time_col="ReadingDate", # variable_names=cum_cols, # group_vars=["HuisIdBSV"], # iqr_factor=1.5, # window_weeks=4, # min_valid_ratio=.4 # ) ( df, imputation_summary_house, imputation_summary_project, imputation_reading_date_stats_df, ) = process_and_impute( df=df, project_id_column="ProjectIdBSV", cumulative_columns=cum_cols, sorted=sorted, diffs_calculated=diffs_calculated, optimized=optimized, ) diff_columns = [col + "Diff" for col in cumulative_columns] logging.info("Averaging all diffs by project and reading date.") aggregated_diff = ( df.groupby(["ProjectIdBSV", "ReadingDate"])[diff_columns].mean().reset_index() ) logging.info("Saving results") # Save the results modified_household_dfs = [] for _huis_code, household_df in df.groupby("HuisIdBSV"): for col in cumulative_columns: household_df[col + "Original"] = household_df[col] # rename household_df[col] = household_df[col + "Diff"].cumsum() household_df[col + "Check"] = ( household_df[col] - household_df[col + "Original"] ).diff() modified_household_dfs.append(household_df) df = pd.concat(modified_household_dfs, ignore_index=True) logging.info("Re-arranging columns.") # df = rearrange_model_columns(household_df=df) # df.drop(columns=diff_columns) if optimized: optimized_label = "_optimized" else: optimized_label = "" logging.info("Saving files.") df.to_parquet( os.path.join( etdtransform.options.aggregate_folder_path, f"household_imputed{optimized_label}.parquet", ), engine="pyarrow", ) aggregated_diff.to_parquet( os.path.join( etdtransform.options.aggregate_folder_path, f"household_aggregated_diff{optimized_label}.parquet", ), engine="pyarrow", ) imputation_summary_house.to_parquet( os.path.join( etdtransform.options.aggregate_folder_path, f"impute_summary_household{optimized_label}.parquet", ), engine="pyarrow", ) imputation_summary_project.to_parquet( os.path.join( etdtransform.options.aggregate_folder_path, f"impute_summary_project{optimized_label}.parquet", ), engine="pyarrow", ) if imputation_reading_date_stats_df: imputation_reading_date_stats_df.to_parquet( os.path.join( etdtransform.options.aggregate_folder_path, f"impute_summary_reading_date{optimized_label}.parquet", ), engine="pyarrow", ) logging.info("Done") return df
[docs] def add_calculated_columns_to_hh_data(df): """ Add calculated columns to household data and save the result. Parameters ---------- df : pd.DataFrame, optional The input DataFrame, if None it will be read from a file Returns ------- pd.DataFrame The DataFrame with added calculated columns Notes ----- This function adds calculated columns to the household data and saves the result as a parquet file. """ logging.info("Loading imputed data from parquet file.") if df is None: df = read_hh_data(interval="imputed") logging.info("Calculating: ") df = add_calculated_columns_imputed_data(df) logging.info("Saving calculated columns to file: household_calculated.parquet") df.to_parquet( os.path.join(etdtransform.options.aggregate_folder_path, "household_calculated.parquet"), engine="pyarrow", ) return df
[docs] def read_aggregate(name, interval): """ Read an aggregate parquet file. Parameters ---------- name : str The name of the aggregate interval : str The time interval of the aggregate Returns ------- pd.DataFrame The aggregate data Notes ----- This function reads a parquet file based on the provided name and interval. """ safe_name = re.sub(r"\W+", "_", name.lower()) return pd.read_parquet( os.path.join(etdtransform.options.aggregate_folder_path, f"{safe_name}_{interval}.parquet"), )
[docs] def get_aggregate_table(name, interval): """ Get an aggregate table as an ibis table. Parameters ---------- name : str The name of the aggregate interval : str The time interval of the aggregate Returns ------- ibis.Table The aggregate data as an ibis table Notes ----- This function reads a parquet file and returns it as an ibis table. """ safe_name = re.sub(r"\W+", "_", name.lower()) parquet_path = os.path.join( etdtransform.options.aggregate_folder_path, f"{safe_name}_{interval}.parquet", ) return ibis.read_parquet(parquet_path)
[docs] def resample_hh_data(df=None, intervals=("60min", "15min", "5min")): """ Resample household data to different time intervals. Parameters ---------- df : pd.DataFrame, optional The input DataFrame, if None it will be read from a file intervals : tuple, optional The time intervals to resample to, by default ("60min", "15min", "5min") Notes ----- This function resamples household data to specified time intervals and saves the results. """ group_column = ["ProjectIdBSV", "HuisIdBSV"] if df is None: logging.info("Loading data with calculated columns to resample hh data") df = read_hh_data(interval="calculated") else: logging.warning( "If passing a dataframe to resample_hh_data() be sure to use a copy as it may be modified in place.", ) for interval in intervals: logging.info(f"-- Starting household resampling with {interval} intervals --") if interval == "5min": logging.info( "-- 5min interval - applying shortcut without transformation --", ) columns_to_copy = [ "ReadingDate", *group_column, *list(aggregation_variables.keys()), ] for _var, config in aggregation_variables.items(): validator_column = config.get("validator_column") if validator_column: columns_to_copy.append(validator_column) df = df[columns_to_copy] logging.info( f"{interval}min interval - removing variables that do not pass filters" ) for var, config in aggregation_variables.items(): validator_column = config.get("validator_column") if validator_column: df.loc[df[validator_column] is False, var] = pd.NA logging.info( f"-- {interval}-min interval - saving file household_5min.parquet --" ) df.to_parquet( os.path.join(etdtransform.options.aggregate_folder_path, "household_5min.parquet"), engine="pyarrow", ) else: resample_and_save(df, group_column, interval=interval, alt_name="household")
[docs] def aggregate_project_data(intervals=("5min", "15min", "60min")): """ Aggregate project data for different time intervals. Parameters ---------- intervals : tuple, optional The time intervals to aggregate, by default ("5min", "15min", "60min") Notes ----- This function aggregates project data for specified time intervals and saves the results. """ group_column = ["ProjectIdBSV"] for interval in intervals: logging.info( f"-- Starting {group_column} aggregation with {interval} intervals --", ) df = read_hh_data(interval=interval) aggregate_and_save(df, group_column, interval=interval, alt_name="project")
# def aggregate_weerstation_data(index_df): # group_column = ['Weerstation'] # intervals = ['5min', '15min', '60min'] # for interval in intervals: # logging.info(f'-- Starting {group_column} aggregation with {interval} intervals --') # df = read_hh_data(interval = interval, metadata_columns = ['Weerstation']) # aggregate_and_save(df, group_column, interval=interval)
[docs] def aggregate_and_save( df, group_column=("ProjectIdBSV"), interval="5min", alt_name=None, ): """ Aggregate data and save the result. Parameters ---------- df : pd.DataFrame The input DataFrame group_column : tuple, optional The column(s) to group by, by default ("ProjectIdBSV") interval : str, optional The time interval for aggregation, by default "5min" alt_name : str, optional An alternative name for the output file, by default None Notes ----- This function aggregates data, merges with size information, and saves the result as a parquet file. """ df_grouped = df.groupby(["ReadingDate", *list(group_column)]) df_size = df_grouped.size().reset_index(name="n") if alt_name is None: alt_name = group_column df = aggregate_by_columns(df, group_column=group_column, size=df_size) df = df.merge(df_size, on=["ReadingDate", *list(group_column)], how="left") safe_name = re.sub(r"\W+", "_", alt_name.lower()) df.to_parquet( os.path.join(etdtransform.options.aggregate_folder_path, f"{safe_name}_{interval}.parquet"), engine="pyarrow", )
[docs] def aggregate_by_columns(df, group_column, size): """ Aggregate data by columns. Parameters ---------- df : pd.DataFrame The input DataFrame group_column : list The column(s) to group by size : pd.DataFrame DataFrame containing size information Returns ------- pd.DataFrame The aggregated DataFrame Notes ----- This function aggregates data for each variable defined in aggregation_variables. """ first = True combined_results = None for var, config in aggregation_variables.items(): logging.info(f"In loop for to aggregate by column {var}") method = config["aggregate_method"] if ( method == "diff_cumsum" and not first and var + "Diff" in combined_results.columns ): result = aggregate_diff_cumsum( df, var, group_column, size, combined_results=combined_results, ) else: result = aggregate_variable(df, var, config, group_column, size) if first: combined_results = result first = False else: combined_results = combined_results.merge( result, on=["ReadingDate", *group_column], how="outer", ) logging.info(f"Combining aggregated dataset grouped by: {group_column}") return combined_results.reset_index()
[docs] def aggregate_variable(df_grouped, var, config, group_column, size): """ Aggregate a single variable. Parameters ---------- df_grouped : pd.DataFrame The grouped DataFrame var : str The variable to aggregate config : dict Configuration for the aggregation group_column : list The column(s) to group by size : pd.DataFrame DataFrame containing size information Returns ------- pd.DataFrame The aggregated DataFrame for the variable Notes ----- This function aggregates a single variable based on the specified method in the config. """ logging.info(f"{group_column} : column {var}") method = config["aggregate_method"] # not including validator columns as they are not aggregated in the household data atm # validator_column = config.get('validator_column') columns_to_select = ["ReadingDate", *group_column, var] if method == "diff_cumsum": columns_to_select = [*columns_to_select, var + "Diff"] # if validator_column: # columns_to_copy.append(validator_column) df_copy = df_grouped[columns_to_select] # if validator_column: # df_copy.loc[df_copy[validator_column] != True, var] = pd.NA if method == "sum": return aggregate_sum(df_copy, var, ["ReadingDate", *group_column], size) elif method == "max": return aggregate_max(df_copy, var, ["ReadingDate", *group_column], size) elif method == "avg": return aggregate_avg(df_copy, var, ["ReadingDate", *group_column], size) elif method == "diff_cumsum": # ReadingDate left out here to allow cumsum to proceed per project with pre-sorted rows return aggregate_diff_cumsum(df_copy, var, group_column, size)
# would be smarter to do these variables with method diff_sum only after calculating the average Diff columns
[docs] def aggregate_diff_cumsum(df, column, group_column, size, combined_results=None): """ Aggregate cumulative sum of differences. Parameters ---------- df : pd.DataFrame The input DataFrame column : str The column to aggregate group_column : list The column(s) to group by size : pd.DataFrame DataFrame containing size information combined_results : pd.DataFrame, optional Previously combined results, by default None Returns ------- pd.DataFrame The aggregated DataFrame Notes ----- This function calculates the cumulative sum of differences for the specified column. """ diff_column = column + "Diff" logging.info( f"Aggregate cumsum of diff column: {group_column} / {column} / {diff_column}", ) if combined_results is None: logging.info("Calculating Diff as not included.") aggregated = aggregate_avg( df, diff_column, ["ReadingDate", *group_column], size, ) else: logging.info("Diff precalculated. No need to recalculate. Making a copy.") aggregated = combined_results[ ["ReadingDate", *group_column, diff_column] ].copy() logging.info( f"Transform average diff to calculate cumsum: {group_column} / {column} / {column}Diff", ) aggregated[column] = aggregated.groupby(group_column)[diff_column].transform( pd.Series.cumsum, ) logging.info("Add missing values") aggregated[aggregated[diff_column].isna()][column] = pd.NA logging.info("Drop column") aggregated = aggregated.drop(columns=[diff_column]) logging.info("Finished") return aggregated
[docs] def aggregate_sum(df, column, group_column, size): """ Aggregate sum of a column. Parameters ---------- df : pd.DataFrame The input DataFrame column : str The column to aggregate group_column : list The column(s) to group by size : pd.DataFrame DataFrame containing size information Returns ------- pd.DataFrame The aggregated DataFrame Notes ----- This function calculates the sum of the specified column, requiring at least 60% of values to be present. """ logging.info(f"aggregate sum: {group_column} / {column}") grouped = df.groupby(group_column) aggregated = grouped[column].agg(sum, min_count=size["n"] * 0.6).reset_index() return aggregated
[docs] def aggregate_max(df, column, group_column, size): """ Aggregate maximum of a column. Parameters ---------- df : pd.DataFrame The input DataFrame column : str The column to aggregate group_column : list The column(s) to group by size : pd.DataFrame DataFrame containing size information Returns ------- pd.DataFrame The aggregated DataFrame Notes ----- This function calculates the maximum of the specified column, requiring at least 60% of values to be present. """ logging.info(f"aggregate sum: {group_column} / {column}") grouped = df.groupby(group_column) aggregated = grouped[column].agg(max, min_count=size["n"] * 0.6).reset_index() return aggregated
[docs] def aggregate_avg(df, column, group_column, size): """ Aggregate average of a column. Parameters ---------- df : pd.DataFrame The input DataFrame column : str The column to aggregate group_column : list The column(s) to group by size : pd.DataFrame DataFrame containing size information Returns ------- pd.DataFrame The aggregated DataFrame Notes ----- This function calculates the average of the specified column, requiring at least 60% of values to be present. """ logging.info(f"aggregate avg: {group_column} / {column}") # Group by the specified column grouped = df.groupby(group_column) # Aggregate with sum and count aggregated = grouped.agg( sum_agg=(column, "sum"), count_agg=(column, "count"), ).reset_index() aggregated[column] = np.where( aggregated["count_agg"] >= size["n"] * 0.6, aggregated["sum_agg"] / aggregated["count_agg"], pd.NA, ) aggregated = aggregated.drop(columns=["sum_agg", "count_agg"]) return aggregated
[docs] def resample_and_save( df, group_column=("ProjectIdBSV", "HuisIdBSV"), interval="5min", alt_name=None, ): """ Resample data and save the result. Parameters ---------- df : pd.DataFrame The input DataFrame group_column : tuple, optional The column(s) to group by, by default ("ProjectIdBSV", "HuisIdBSV") interval : str, optional The time interval for resampling, by default "5min" alt_name : str, optional An alternative name for the output file, by default None Notes ----- This function resamples data and saves the result as a parquet file. """ if alt_name is None: alt_name = "_".join(group_column) df = df.set_index("ReadingDate") df = resample_by_columns(df, group_column=group_column, interval=interval) df.reset_index(inplace=True) safe_name = re.sub(r"\W+", "_", alt_name.lower()) df.to_parquet( os.path.join(etdtransform.options.aggregate_folder_path, f"{safe_name}_{interval}.parquet"), engine="pyarrow", )
[docs] def resample_by_columns( df, group_column=None, interval="15min", ): """ Resample data by columns. Parameters ---------- df : pd.DataFrame The input DataFrame group_column : list, optional The column(s) to group by, by default None interval : str, optional The time interval for resampling, by default "15min" Returns ------- pd.DataFrame The resampled DataFrame Notes ----- This function resamples data for each variable defined in aggregation_variables. """ # resampled_dfs = [] if group_column is None: group_column = ["ProjectIdBSV", "HuisIdBSV"] if interval == "5min": min_count = 1 elif interval == "15min": min_count = 3 elif interval == "60min": min_count = 12 elif interval == "6h": min_count = 72 elif interval == "24h": min_count = 288 else: raise Exception(f'Unknown interval "{interval}"') # Generate the initial dataset with only group_column and ReadingDate df_copy = df[group_column].copy() combined_results = ( df_copy.groupby(group_column) .resample(interval) .size() .reset_index() .drop(columns=0) ) for var, config in aggregation_variables.items(): logging.info(f"in loop for {var}") result = resample_variable(df, var, config, interval, group_column, min_count) combined_results = combined_results.merge( result, on=["ReadingDate", *group_column], how="outer", ) logging.info(f"Combining dataset: {interval} / {group_column}") combined_results.reset_index(inplace=True) return combined_results
[docs] def resample_variable(df, var, config, interval, group_column, min_count): """ Resample a single variable. Parameters ---------- df : pd.DataFrame The input DataFrame var : str The variable to resample config : dict Configuration for the resampling interval : str The time interval for resampling group_column : list The column(s) to group by min_count : int The minimum count required for resampling Returns ------- pd.DataFrame The resampled DataFrame for the variable Notes ----- This function resamples a single variable based on the specified method in the config. """ logging.info(f"{group_column} / {interval}: column {var}") method = config["resample_method"] validator_column = config.get("validator_column") columns_to_copy = [*group_column, var] if validator_column: columns_to_copy.append(validator_column) df_copy = df[columns_to_copy].copy() # Filter by validator column if specified if validator_column: df_copy.loc[df_copy[validator_column] is False, var] = pd.NA if method == "sum": return resample_sum(df_copy, var, interval, group_column, min_count) elif method == "max": return resample_max(df_copy, var, interval, group_column, min_count) elif method == "avg": return resample_avg(df_copy, var, interval, group_column, min_count)
[docs] def resample_max(df, column, interval, group_column, min_count): """ Resample maximum of a column. Parameters ---------- df : pd.DataFrame The input DataFrame column : str The column to resample interval : str The time interval for resampling group_column : list The column(s) to group by min_count : int The minimum count required for resampling Returns ------- pd.DataFrame The resampled DataFrame Notes ----- This function resamples the maximum of the specified column. """ logging.info(f"resample max: {group_column} / {interval}: {column}") resampled = ( df.groupby(group_column)[column] .resample(interval) .max(min_count=min_count) .reset_index() ) return resampled
[docs] def resample_sum(df, column, interval, group_column, min_count): """ Resample sum of a column. Parameters ---------- df : pd.DataFrame The input DataFrame column : str The column to resample interval : str The time interval for resampling group_column : list The column(s) to group by min_count : int The minimum count required for resampling Returns ------- pd.DataFrame The resampled DataFrame Notes ----- This function resamples the sum of the specified column. """ logging.info(f"resample sum: {group_column} / {interval}: {column}") resampled = ( df.groupby(group_column)[column] .resample(interval) .sum(min_count=min_count) .reset_index() ) # resampled = df.groupby(group_column)[column].resample(interval).apply( # lambda x: pd.NA if x.isnull().any() else x.sum() # ).reset_index() # resampled = resampled.groupby('ReadingDate')[column].apply( # lambda x: pd.NA if x.isnull().any() else x.sum() # ).reset_index() return resampled
[docs] def resample_avg(df, column, interval, group_column, min_count): """ Resample average of a column. Parameters ---------- df : pd.DataFrame The input DataFrame. column : str The column to resample. interval : str The time interval for resampling. group_column : list The column(s) to group by. min_count : int The minimum count required for resampling. Returns ------- pd.DataFrame The resampled DataFrame. Notes ----- This function resamples the average of the specified column, requiring at least `min_count` values to be present. """ logging.info(f"resample avg: {group_column} / {interval}: {column}") resampled = ( df.groupby(group_column) .resample(interval)[column] .agg(["sum", "count"]) .reset_index() ) resampled[column] = np.where( resampled["count"] >= min_count, resampled["sum"] / resampled["count"], pd.NA, ) resampled = resampled.drop(columns=["sum", "count"]) # resampled = df.groupby(group_column)[column].resample(interval).apply( # lambda x: pd.NA if x.isnull().any() else x.mean() # ).reset_index() # resampled = resampled.groupby('ReadingDate')[column].apply( # lambda x: pd.NA if x.isnull().any() else x.mean() # ).reset_index() return resampled
# List of variables with their corresponding aggregation methods - lines marked with ## need a check of the methods - consider for some using 'last_value' for instantaneous variables aggregation_variables = { "ElektriciteitNetgebruikHoogDiff": { "resample_method": "sum", "aggregate_method": "avg", }, #'ElektriciteitNetgebruikHoog': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'}, "ElektriciteitNetgebruikLaagDiff": { "resample_method": "sum", "aggregate_method": "avg", }, #'ElektriciteitNetgebruikLaag': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'}, "ElektriciteitTerugleveringHoogDiff": { "resample_method": "sum", "aggregate_method": "avg", }, #'ElektriciteitTerugleveringHoog': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'}, "ElektriciteitTerugleveringLaagDiff": { "resample_method": "sum", "aggregate_method": "avg", }, #'ElektriciteitTerugleveringLaag': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'}, ## 'ElektriciteitVermogen': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_elektriciteit_vermogen'}, ## 'Gasgebruik': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'}, "ElektriciteitsgebruikWTWDiff": { "resample_method": "sum", "aggregate_method": "avg", }, #'ElektriciteitsgebruikWTW': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'}, "ElektriciteitsgebruikWarmtepompDiff": { "resample_method": "sum", "aggregate_method": "avg", }, #'ElektriciteitsgebruikWarmtepomp': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'}, "ElektriciteitsgebruikBoosterDiff": { "resample_method": "sum", "aggregate_method": "avg", }, #'ElektriciteitsgebruikBooster': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'}, "ElektriciteitsgebruikBoilervatDiff": { "resample_method": "sum", "aggregate_method": "avg", }, #'ElektriciteitsgebruikBoilervat': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'}, "ElektriciteitsgebruikRadiatorDiff": { "resample_method": "sum", "aggregate_method": "avg", }, #'ElektriciteitsgebruikRadiator': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'}, ## 'TemperatuurWarmTapwater': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_temperatuur_warm_tapwater'}, ## 'TemperatuurWoonkamer': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_temperatuur_woonkamer'}, ## 'TemperatuurSetpointWoonkamer': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_temperatuur_setpoint_woonkamer'}, ## 'WarmteproductieWarmtepomp': {'resample_method': 'max', 'aggregate_method': 'avg'}, ## 'WatergebruikWarmTapwater': {'resample_method': 'max', 'aggregate_method': 'avg'}, ## 'Zon-opwekMomentaan': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_zon_opwek_momentaan'}, "ZonopwekBruto": {"resample_method": "sum", "aggregate_method": "avg"}, #'Zon-opwekTotaal': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'}, ## 'CO2': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_co2'}, ## 'Luchtvochtigheid': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_luchtvochtigheid'}, ## 'Ventilatiedebiet': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_ventilatiedebiet'}, "TerugleveringTotaalNetto": {"resample_method": "sum", "aggregate_method": "avg"}, "ElektriciteitsgebruikTotaalNetto": { "resample_method": "sum", "aggregate_method": "avg", }, "Netuitwisseling": {"resample_method": "sum", "aggregate_method": "avg"}, "ElektriciteitsgebruikTotaalWarmtepomp": { "resample_method": "sum", "aggregate_method": "avg", }, "ElektriciteitsgebruikTotaalGebouwgebonden": { "resample_method": "sum", "aggregate_method": "avg", }, "ElektriciteitsgebruikTotaalHuishoudelijk": { "resample_method": "sum", "aggregate_method": "avg", }, "Zelfgebruik": {"resample_method": "sum", "aggregate_method": "avg"}, "ElektriciteitsgebruikTotaalBruto": { "resample_method": "sum", "aggregate_method": "avg", }, }