import logging
import os
import re
from typing import Optional
import ibis
import numpy as np
import pandas as pd
from etdmap.data_model import cumulative_columns
from etdmap.index_helpers import read_index, update_meenemen
import etdtransform
from etdtransform.calculated_columns import add_calculated_columns_imputed_data
from etdtransform.impute import process_and_impute
"""
Aggregating the data for a given time interval
Example intervals:
1 hour: '1h'
15 min: '15min'
5 min: '5min'
"""
[docs]
def read_hh_data(interval="default", metadata_columns=None):
"""
Read household data from a parquet file and optionally add index columns to.
Parameters
----------
interval : str, optional
The time interval of the data to read, by default "default"
metadata_columns : list, optional
Additional columns to include from the index, by default None
Returns
-------
pd.DataFrame
The household data with optional index columns added
Notes
-----
This function reads parquet files from a predefined folder path.
"""
if not metadata_columns:
metadata_columns = []
df = pd.read_parquet(
os.path.join(etdtransform.options.aggregate_folder_path, f"household_{interval}.parquet"),
)
return add_index_columns(df, columns=metadata_columns)
[docs]
def add_index_columns(df: pd.DataFrame, columns: Optional[list] = None) -> pd.DataFrame:
"""
Add index columns to the given DataFrame.
Parameters
----------
df : pd.DataFrame
The input DataFrame
columns : list, optional
Additional columns to include from the index, by default None
Returns
-------
pd.DataFrame
The DataFrame with added index columns
Notes
-----
This function merges the input DataFrame with an index DataFrame based on 'HuisIdBSV' and 'ProjectIdBSV'.
"""
if columns:
index_df, index_path = read_index()
columns_to_select = ["HuisIdBSV", "ProjectIdBSV", *columns]
columns_to_select = list(set(columns_to_select))
index_df = index_df[columns_to_select]
df = df.merge(index_df, on=["HuisIdBSV", "ProjectIdBSV"], how="left")
return df
else:
return df
[docs]
def aggregate_hh_data_5min():
"""
Aggregate household data into 5-minute intervals.
Notes
-----
This function reads individual household parquet files, concatenates them,
and saves the result as a single parquet file.
"""
logging.info("Starting to aggregate household data.")
index_df = update_meenemen()
data_frames = []
index_df = index_df[index_df["Meenemen"]]
for _, row in index_df.iterrows():
huis_id_bsv = row["HuisIdBSV"]
project_code = row["ProjectIdBSV"]
file_name = f"household_{huis_id_bsv}_table.parquet"
file_path = os.path.join(etdtransform.options.mapped_folder_path, file_name)
household_df = pd.read_parquet(file_path)
household_df["ProjectIdBSV"] = project_code
household_df["HuisIdBSV"] = huis_id_bsv
data_frames.append(household_df)
logging.info(f"Added {file_name}")
logging.info("Concatenate all HH dataframes.")
df = pd.concat(data_frames, ignore_index=True)
logging.info("Saving HH data to parquet file.")
df.to_parquet(
os.path.join(etdtransform.options.aggregate_folder_path, "household_default.parquet"),
engine="pyarrow",
)
[docs]
def impute_hh_data_5min(
df,
cum_cols=cumulative_columns,
sorted=False,
diffs_calculated=False,
optimized=False,
):
"""
Impute missing values in household data and save results.
Parameters
----------
df : pd.DataFrame, optional
The input DataFrame, if None it will be read from a file
cum_cols : list, optional
List of cumulative columns to process, by default cumulative_columns
sorted : bool, optional
Whether the data is already sorted, by default False
diffs_calculated : bool, optional
Whether differences are already calculated, by default False
optimized : bool, optional
Whether to use optimized processing, by default False
Returns
-------
pd.DataFrame
The imputed household data
Notes
-----
This function performs imputation, calculates differences, and saves various summary statistics.
"""
logging.info("Loading HH data from parquet file.")
if df is None:
df = read_hh_data(interval="default", metadata_columns=["ProjectIdBSV"])
# Call the imputation function
logging.info("Starting the imputation.")
# df = apply_rolling_iqr_imputation(
# df=df,
# time_col="ReadingDate",
# variable_names=cum_cols,
# group_vars=["HuisIdBSV"],
# iqr_factor=1.5,
# window_weeks=4,
# min_valid_ratio=.4
# )
(
df,
imputation_summary_house,
imputation_summary_project,
imputation_reading_date_stats_df,
) = process_and_impute(
df=df,
project_id_column="ProjectIdBSV",
cumulative_columns=cum_cols,
sorted=sorted,
diffs_calculated=diffs_calculated,
optimized=optimized,
)
diff_columns = [col + "Diff" for col in cumulative_columns]
logging.info("Averaging all diffs by project and reading date.")
aggregated_diff = (
df.groupby(["ProjectIdBSV", "ReadingDate"])[diff_columns].mean().reset_index()
)
logging.info("Saving results")
# Save the results
modified_household_dfs = []
for _huis_code, household_df in df.groupby("HuisIdBSV"):
for col in cumulative_columns:
household_df[col + "Original"] = household_df[col] # rename
household_df[col] = household_df[col + "Diff"].cumsum()
household_df[col + "Check"] = (
household_df[col] - household_df[col + "Original"]
).diff()
modified_household_dfs.append(household_df)
df = pd.concat(modified_household_dfs, ignore_index=True)
logging.info("Re-arranging columns.")
# df = rearrange_model_columns(household_df=df)
# df.drop(columns=diff_columns)
if optimized:
optimized_label = "_optimized"
else:
optimized_label = ""
logging.info("Saving files.")
df.to_parquet(
os.path.join(
etdtransform.options.aggregate_folder_path,
f"household_imputed{optimized_label}.parquet",
),
engine="pyarrow",
)
aggregated_diff.to_parquet(
os.path.join(
etdtransform.options.aggregate_folder_path,
f"household_aggregated_diff{optimized_label}.parquet",
),
engine="pyarrow",
)
imputation_summary_house.to_parquet(
os.path.join(
etdtransform.options.aggregate_folder_path,
f"impute_summary_household{optimized_label}.parquet",
),
engine="pyarrow",
)
imputation_summary_project.to_parquet(
os.path.join(
etdtransform.options.aggregate_folder_path,
f"impute_summary_project{optimized_label}.parquet",
),
engine="pyarrow",
)
if imputation_reading_date_stats_df:
imputation_reading_date_stats_df.to_parquet(
os.path.join(
etdtransform.options.aggregate_folder_path,
f"impute_summary_reading_date{optimized_label}.parquet",
),
engine="pyarrow",
)
logging.info("Done")
return df
[docs]
def add_calculated_columns_to_hh_data(df):
"""
Add calculated columns to household data and save the result.
Parameters
----------
df : pd.DataFrame, optional
The input DataFrame, if None it will be read from a file
Returns
-------
pd.DataFrame
The DataFrame with added calculated columns
Notes
-----
This function adds calculated columns to the household data and saves the result as a parquet file.
"""
logging.info("Loading imputed data from parquet file.")
if df is None:
df = read_hh_data(interval="imputed")
logging.info("Calculating: ")
df = add_calculated_columns_imputed_data(df)
logging.info("Saving calculated columns to file: household_calculated.parquet")
df.to_parquet(
os.path.join(etdtransform.options.aggregate_folder_path, "household_calculated.parquet"),
engine="pyarrow",
)
return df
[docs]
def read_aggregate(name, interval):
"""
Read an aggregate parquet file.
Parameters
----------
name : str
The name of the aggregate
interval : str
The time interval of the aggregate
Returns
-------
pd.DataFrame
The aggregate data
Notes
-----
This function reads a parquet file based on the provided name and interval.
"""
safe_name = re.sub(r"\W+", "_", name.lower())
return pd.read_parquet(
os.path.join(etdtransform.options.aggregate_folder_path, f"{safe_name}_{interval}.parquet"),
)
[docs]
def get_aggregate_table(name, interval):
"""
Get an aggregate table as an ibis table.
Parameters
----------
name : str
The name of the aggregate
interval : str
The time interval of the aggregate
Returns
-------
ibis.Table
The aggregate data as an ibis table
Notes
-----
This function reads a parquet file and returns it as an ibis table.
"""
safe_name = re.sub(r"\W+", "_", name.lower())
parquet_path = os.path.join(
etdtransform.options.aggregate_folder_path,
f"{safe_name}_{interval}.parquet",
)
return ibis.read_parquet(parquet_path)
[docs]
def resample_hh_data(df=None, intervals=("60min", "15min", "5min")):
"""
Resample household data to different time intervals.
Parameters
----------
df : pd.DataFrame, optional
The input DataFrame, if None it will be read from a file
intervals : tuple, optional
The time intervals to resample to, by default ("60min", "15min", "5min")
Notes
-----
This function resamples household data to specified time intervals and saves the results.
"""
group_column = ["ProjectIdBSV", "HuisIdBSV"]
if df is None:
logging.info("Loading data with calculated columns to resample hh data")
df = read_hh_data(interval="calculated")
else:
logging.warning(
"If passing a dataframe to resample_hh_data() be sure to use a copy as it may be modified in place.",
)
for interval in intervals:
logging.info(f"-- Starting household resampling with {interval} intervals --")
if interval == "5min":
logging.info(
"-- 5min interval - applying shortcut without transformation --",
)
columns_to_copy = [
"ReadingDate",
*group_column,
*list(aggregation_variables.keys()),
]
for _var, config in aggregation_variables.items():
validator_column = config.get("validator_column")
if validator_column:
columns_to_copy.append(validator_column)
df = df[columns_to_copy]
logging.info(
f"{interval}min interval - removing variables that do not pass filters"
)
for var, config in aggregation_variables.items():
validator_column = config.get("validator_column")
if validator_column:
df.loc[df[validator_column] is False, var] = pd.NA
logging.info(
f"-- {interval}-min interval - saving file household_5min.parquet --"
)
df.to_parquet(
os.path.join(etdtransform.options.aggregate_folder_path, "household_5min.parquet"),
engine="pyarrow",
)
else:
resample_and_save(df, group_column, interval=interval, alt_name="household")
[docs]
def aggregate_project_data(intervals=("5min", "15min", "60min")):
"""
Aggregate project data for different time intervals.
Parameters
----------
intervals : tuple, optional
The time intervals to aggregate, by default ("5min", "15min", "60min")
Notes
-----
This function aggregates project data for specified time intervals and saves the results.
"""
group_column = ["ProjectIdBSV"]
for interval in intervals:
logging.info(
f"-- Starting {group_column} aggregation with {interval} intervals --",
)
df = read_hh_data(interval=interval)
aggregate_and_save(df, group_column, interval=interval, alt_name="project")
# def aggregate_weerstation_data(index_df):
# group_column = ['Weerstation']
# intervals = ['5min', '15min', '60min']
# for interval in intervals:
# logging.info(f'-- Starting {group_column} aggregation with {interval} intervals --')
# df = read_hh_data(interval = interval, metadata_columns = ['Weerstation'])
# aggregate_and_save(df, group_column, interval=interval)
[docs]
def aggregate_and_save(
df,
group_column=("ProjectIdBSV"),
interval="5min",
alt_name=None,
):
"""
Aggregate data and save the result.
Parameters
----------
df : pd.DataFrame
The input DataFrame
group_column : tuple, optional
The column(s) to group by, by default ("ProjectIdBSV")
interval : str, optional
The time interval for aggregation, by default "5min"
alt_name : str, optional
An alternative name for the output file, by default None
Notes
-----
This function aggregates data, merges with size information, and saves the result as a parquet file.
"""
df_grouped = df.groupby(["ReadingDate", *list(group_column)])
df_size = df_grouped.size().reset_index(name="n")
if alt_name is None:
alt_name = group_column
df = aggregate_by_columns(df, group_column=group_column, size=df_size)
df = df.merge(df_size, on=["ReadingDate", *list(group_column)], how="left")
safe_name = re.sub(r"\W+", "_", alt_name.lower())
df.to_parquet(
os.path.join(etdtransform.options.aggregate_folder_path, f"{safe_name}_{interval}.parquet"),
engine="pyarrow",
)
[docs]
def aggregate_by_columns(df, group_column, size):
"""
Aggregate data by columns.
Parameters
----------
df : pd.DataFrame
The input DataFrame
group_column : list
The column(s) to group by
size : pd.DataFrame
DataFrame containing size information
Returns
-------
pd.DataFrame
The aggregated DataFrame
Notes
-----
This function aggregates data for each variable defined in aggregation_variables.
"""
first = True
combined_results = None
for var, config in aggregation_variables.items():
logging.info(f"In loop for to aggregate by column {var}")
method = config["aggregate_method"]
if (
method == "diff_cumsum"
and not first
and var + "Diff" in combined_results.columns
):
result = aggregate_diff_cumsum(
df,
var,
group_column,
size,
combined_results=combined_results,
)
else:
result = aggregate_variable(df, var, config, group_column, size)
if first:
combined_results = result
first = False
else:
combined_results = combined_results.merge(
result,
on=["ReadingDate", *group_column],
how="outer",
)
logging.info(f"Combining aggregated dataset grouped by: {group_column}")
return combined_results.reset_index()
[docs]
def aggregate_variable(df_grouped, var, config, group_column, size):
"""
Aggregate a single variable.
Parameters
----------
df_grouped : pd.DataFrame
The grouped DataFrame
var : str
The variable to aggregate
config : dict
Configuration for the aggregation
group_column : list
The column(s) to group by
size : pd.DataFrame
DataFrame containing size information
Returns
-------
pd.DataFrame
The aggregated DataFrame for the variable
Notes
-----
This function aggregates a single variable based on the specified method in the config.
"""
logging.info(f"{group_column} : column {var}")
method = config["aggregate_method"]
# not including validator columns as they are not aggregated in the household data atm
# validator_column = config.get('validator_column')
columns_to_select = ["ReadingDate", *group_column, var]
if method == "diff_cumsum":
columns_to_select = [*columns_to_select, var + "Diff"]
# if validator_column:
# columns_to_copy.append(validator_column)
df_copy = df_grouped[columns_to_select]
# if validator_column:
# df_copy.loc[df_copy[validator_column] != True, var] = pd.NA
if method == "sum":
return aggregate_sum(df_copy, var, ["ReadingDate", *group_column], size)
elif method == "max":
return aggregate_max(df_copy, var, ["ReadingDate", *group_column], size)
elif method == "avg":
return aggregate_avg(df_copy, var, ["ReadingDate", *group_column], size)
elif method == "diff_cumsum":
# ReadingDate left out here to allow cumsum to proceed per project with pre-sorted rows
return aggregate_diff_cumsum(df_copy, var, group_column, size)
# would be smarter to do these variables with method diff_sum only after calculating the average Diff columns
[docs]
def aggregate_diff_cumsum(df, column, group_column, size, combined_results=None):
"""
Aggregate cumulative sum of differences.
Parameters
----------
df : pd.DataFrame
The input DataFrame
column : str
The column to aggregate
group_column : list
The column(s) to group by
size : pd.DataFrame
DataFrame containing size information
combined_results : pd.DataFrame, optional
Previously combined results, by default None
Returns
-------
pd.DataFrame
The aggregated DataFrame
Notes
-----
This function calculates the cumulative sum of differences for the specified column.
"""
diff_column = column + "Diff"
logging.info(
f"Aggregate cumsum of diff column: {group_column} / {column} / {diff_column}",
)
if combined_results is None:
logging.info("Calculating Diff as not included.")
aggregated = aggregate_avg(
df,
diff_column,
["ReadingDate", *group_column],
size,
)
else:
logging.info("Diff precalculated. No need to recalculate. Making a copy.")
aggregated = combined_results[
["ReadingDate", *group_column, diff_column]
].copy()
logging.info(
f"Transform average diff to calculate cumsum: {group_column} / {column} / {column}Diff",
)
aggregated[column] = aggregated.groupby(group_column)[diff_column].transform(
pd.Series.cumsum,
)
logging.info("Add missing values")
aggregated[aggregated[diff_column].isna()][column] = pd.NA
logging.info("Drop column")
aggregated = aggregated.drop(columns=[diff_column])
logging.info("Finished")
return aggregated
[docs]
def aggregate_sum(df, column, group_column, size):
"""
Aggregate sum of a column.
Parameters
----------
df : pd.DataFrame
The input DataFrame
column : str
The column to aggregate
group_column : list
The column(s) to group by
size : pd.DataFrame
DataFrame containing size information
Returns
-------
pd.DataFrame
The aggregated DataFrame
Notes
-----
This function calculates the sum of the specified column, requiring at least 60% of values to be present.
"""
logging.info(f"aggregate sum: {group_column} / {column}")
grouped = df.groupby(group_column)
aggregated = grouped[column].agg(sum, min_count=size["n"] * 0.6).reset_index()
return aggregated
[docs]
def aggregate_max(df, column, group_column, size):
"""
Aggregate maximum of a column.
Parameters
----------
df : pd.DataFrame
The input DataFrame
column : str
The column to aggregate
group_column : list
The column(s) to group by
size : pd.DataFrame
DataFrame containing size information
Returns
-------
pd.DataFrame
The aggregated DataFrame
Notes
-----
This function calculates the maximum of the specified column, requiring at least 60% of values to be present.
"""
logging.info(f"aggregate sum: {group_column} / {column}")
grouped = df.groupby(group_column)
aggregated = grouped[column].agg(max, min_count=size["n"] * 0.6).reset_index()
return aggregated
[docs]
def aggregate_avg(df, column, group_column, size):
"""
Aggregate average of a column.
Parameters
----------
df : pd.DataFrame
The input DataFrame
column : str
The column to aggregate
group_column : list
The column(s) to group by
size : pd.DataFrame
DataFrame containing size information
Returns
-------
pd.DataFrame
The aggregated DataFrame
Notes
-----
This function calculates the average of the specified column, requiring at least 60% of values to be present.
"""
logging.info(f"aggregate avg: {group_column} / {column}")
# Group by the specified column
grouped = df.groupby(group_column)
# Aggregate with sum and count
aggregated = grouped.agg(
sum_agg=(column, "sum"),
count_agg=(column, "count"),
).reset_index()
aggregated[column] = np.where(
aggregated["count_agg"] >= size["n"] * 0.6,
aggregated["sum_agg"] / aggregated["count_agg"],
pd.NA,
)
aggregated = aggregated.drop(columns=["sum_agg", "count_agg"])
return aggregated
[docs]
def resample_and_save(
df,
group_column=("ProjectIdBSV", "HuisIdBSV"),
interval="5min",
alt_name=None,
):
"""
Resample data and save the result.
Parameters
----------
df : pd.DataFrame
The input DataFrame
group_column : tuple, optional
The column(s) to group by, by default ("ProjectIdBSV", "HuisIdBSV")
interval : str, optional
The time interval for resampling, by default "5min"
alt_name : str, optional
An alternative name for the output file, by default None
Notes
-----
This function resamples data and saves the result as a parquet file.
"""
if alt_name is None:
alt_name = "_".join(group_column)
df = df.set_index("ReadingDate")
df = resample_by_columns(df, group_column=group_column, interval=interval)
df.reset_index(inplace=True)
safe_name = re.sub(r"\W+", "_", alt_name.lower())
df.to_parquet(
os.path.join(etdtransform.options.aggregate_folder_path, f"{safe_name}_{interval}.parquet"),
engine="pyarrow",
)
[docs]
def resample_by_columns(
df,
group_column=None,
interval="15min",
):
"""
Resample data by columns.
Parameters
----------
df : pd.DataFrame
The input DataFrame
group_column : list, optional
The column(s) to group by, by default None
interval : str, optional
The time interval for resampling, by default "15min"
Returns
-------
pd.DataFrame
The resampled DataFrame
Notes
-----
This function resamples data for each variable defined in aggregation_variables.
"""
# resampled_dfs = []
if group_column is None:
group_column = ["ProjectIdBSV", "HuisIdBSV"]
if interval == "5min":
min_count = 1
elif interval == "15min":
min_count = 3
elif interval == "60min":
min_count = 12
elif interval == "6h":
min_count = 72
elif interval == "24h":
min_count = 288
else:
raise Exception(f'Unknown interval "{interval}"')
# Generate the initial dataset with only group_column and ReadingDate
df_copy = df[group_column].copy()
combined_results = (
df_copy.groupby(group_column)
.resample(interval)
.size()
.reset_index()
.drop(columns=0)
)
for var, config in aggregation_variables.items():
logging.info(f"in loop for {var}")
result = resample_variable(df, var, config, interval, group_column, min_count)
combined_results = combined_results.merge(
result,
on=["ReadingDate", *group_column],
how="outer",
)
logging.info(f"Combining dataset: {interval} / {group_column}")
combined_results.reset_index(inplace=True)
return combined_results
[docs]
def resample_variable(df, var, config, interval, group_column, min_count):
"""
Resample a single variable.
Parameters
----------
df : pd.DataFrame
The input DataFrame
var : str
The variable to resample
config : dict
Configuration for the resampling
interval : str
The time interval for resampling
group_column : list
The column(s) to group by
min_count : int
The minimum count required for resampling
Returns
-------
pd.DataFrame
The resampled DataFrame for the variable
Notes
-----
This function resamples a single variable based on the specified method in the config.
"""
logging.info(f"{group_column} / {interval}: column {var}")
method = config["resample_method"]
validator_column = config.get("validator_column")
columns_to_copy = [*group_column, var]
if validator_column:
columns_to_copy.append(validator_column)
df_copy = df[columns_to_copy].copy()
# Filter by validator column if specified
if validator_column:
df_copy.loc[df_copy[validator_column] is False, var] = pd.NA
if method == "sum":
return resample_sum(df_copy, var, interval, group_column, min_count)
elif method == "max":
return resample_max(df_copy, var, interval, group_column, min_count)
elif method == "avg":
return resample_avg(df_copy, var, interval, group_column, min_count)
[docs]
def resample_max(df, column, interval, group_column, min_count):
"""
Resample maximum of a column.
Parameters
----------
df : pd.DataFrame
The input DataFrame
column : str
The column to resample
interval : str
The time interval for resampling
group_column : list
The column(s) to group by
min_count : int
The minimum count required for resampling
Returns
-------
pd.DataFrame
The resampled DataFrame
Notes
-----
This function resamples the maximum of the specified column.
"""
logging.info(f"resample max: {group_column} / {interval}: {column}")
resampled = (
df.groupby(group_column)[column]
.resample(interval)
.max(min_count=min_count)
.reset_index()
)
return resampled
[docs]
def resample_sum(df, column, interval, group_column, min_count):
"""
Resample sum of a column.
Parameters
----------
df : pd.DataFrame
The input DataFrame
column : str
The column to resample
interval : str
The time interval for resampling
group_column : list
The column(s) to group by
min_count : int
The minimum count required for resampling
Returns
-------
pd.DataFrame
The resampled DataFrame
Notes
-----
This function resamples the sum of the specified column.
"""
logging.info(f"resample sum: {group_column} / {interval}: {column}")
resampled = (
df.groupby(group_column)[column]
.resample(interval)
.sum(min_count=min_count)
.reset_index()
)
# resampled = df.groupby(group_column)[column].resample(interval).apply(
# lambda x: pd.NA if x.isnull().any() else x.sum()
# ).reset_index()
# resampled = resampled.groupby('ReadingDate')[column].apply(
# lambda x: pd.NA if x.isnull().any() else x.sum()
# ).reset_index()
return resampled
[docs]
def resample_avg(df, column, interval, group_column, min_count):
"""
Resample average of a column.
Parameters
----------
df : pd.DataFrame
The input DataFrame.
column : str
The column to resample.
interval : str
The time interval for resampling.
group_column : list
The column(s) to group by.
min_count : int
The minimum count required for resampling.
Returns
-------
pd.DataFrame
The resampled DataFrame.
Notes
-----
This function resamples the average of the specified column, requiring at least `min_count` values to be present.
"""
logging.info(f"resample avg: {group_column} / {interval}: {column}")
resampled = (
df.groupby(group_column)
.resample(interval)[column]
.agg(["sum", "count"])
.reset_index()
)
resampled[column] = np.where(
resampled["count"] >= min_count,
resampled["sum"] / resampled["count"],
pd.NA,
)
resampled = resampled.drop(columns=["sum", "count"])
# resampled = df.groupby(group_column)[column].resample(interval).apply(
# lambda x: pd.NA if x.isnull().any() else x.mean()
# ).reset_index()
# resampled = resampled.groupby('ReadingDate')[column].apply(
# lambda x: pd.NA if x.isnull().any() else x.mean()
# ).reset_index()
return resampled
# List of variables with their corresponding aggregation methods - lines marked with ## need a check of the methods - consider for some using 'last_value' for instantaneous variables
aggregation_variables = {
"ElektriciteitNetgebruikHoogDiff": {
"resample_method": "sum",
"aggregate_method": "avg",
},
#'ElektriciteitNetgebruikHoog': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'},
"ElektriciteitNetgebruikLaagDiff": {
"resample_method": "sum",
"aggregate_method": "avg",
},
#'ElektriciteitNetgebruikLaag': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'},
"ElektriciteitTerugleveringHoogDiff": {
"resample_method": "sum",
"aggregate_method": "avg",
},
#'ElektriciteitTerugleveringHoog': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'},
"ElektriciteitTerugleveringLaagDiff": {
"resample_method": "sum",
"aggregate_method": "avg",
},
#'ElektriciteitTerugleveringLaag': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'},
## 'ElektriciteitVermogen': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_elektriciteit_vermogen'},
## 'Gasgebruik': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'},
"ElektriciteitsgebruikWTWDiff": {
"resample_method": "sum",
"aggregate_method": "avg",
},
#'ElektriciteitsgebruikWTW': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'},
"ElektriciteitsgebruikWarmtepompDiff": {
"resample_method": "sum",
"aggregate_method": "avg",
},
#'ElektriciteitsgebruikWarmtepomp': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'},
"ElektriciteitsgebruikBoosterDiff": {
"resample_method": "sum",
"aggregate_method": "avg",
},
#'ElektriciteitsgebruikBooster': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'},
"ElektriciteitsgebruikBoilervatDiff": {
"resample_method": "sum",
"aggregate_method": "avg",
},
#'ElektriciteitsgebruikBoilervat': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'},
"ElektriciteitsgebruikRadiatorDiff": {
"resample_method": "sum",
"aggregate_method": "avg",
},
#'ElektriciteitsgebruikRadiator': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'},
## 'TemperatuurWarmTapwater': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_temperatuur_warm_tapwater'},
## 'TemperatuurWoonkamer': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_temperatuur_woonkamer'},
## 'TemperatuurSetpointWoonkamer': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_temperatuur_setpoint_woonkamer'},
## 'WarmteproductieWarmtepomp': {'resample_method': 'max', 'aggregate_method': 'avg'},
## 'WatergebruikWarmTapwater': {'resample_method': 'max', 'aggregate_method': 'avg'},
## 'Zon-opwekMomentaan': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_zon_opwek_momentaan'},
"ZonopwekBruto": {"resample_method": "sum", "aggregate_method": "avg"},
#'Zon-opwekTotaal': {'resample_method': 'max', 'aggregate_method': 'diff_cumsum'},
## 'CO2': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_co2'},
## 'Luchtvochtigheid': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_luchtvochtigheid'},
## 'Ventilatiedebiet': {'resample_method': 'avg', 'aggregate_method': 'avg', 'validator_column': 'validate_ventilatiedebiet'},
"TerugleveringTotaalNetto": {"resample_method": "sum", "aggregate_method": "avg"},
"ElektriciteitsgebruikTotaalNetto": {
"resample_method": "sum",
"aggregate_method": "avg",
},
"Netuitwisseling": {"resample_method": "sum", "aggregate_method": "avg"},
"ElektriciteitsgebruikTotaalWarmtepomp": {
"resample_method": "sum",
"aggregate_method": "avg",
},
"ElektriciteitsgebruikTotaalGebouwgebonden": {
"resample_method": "sum",
"aggregate_method": "avg",
},
"ElektriciteitsgebruikTotaalHuishoudelijk": {
"resample_method": "sum",
"aggregate_method": "avg",
},
"Zelfgebruik": {"resample_method": "sum", "aggregate_method": "avg"},
"ElektriciteitsgebruikTotaalBruto": {
"resample_method": "sum",
"aggregate_method": "avg",
},
}