etdtransform package

Submodules

etdtransform.aggregate module

etdtransform.aggregate.read_hh_data(interval='default', metadata_columns=None)[source][source]

Read household data from a parquet file and optionally add index columns to.

Parameters:
  • interval (str, optional) – The time interval of the data to read, by default “default”

  • metadata_columns (list, optional) – Additional columns to include from the index, by default None

Returns:

The household data with optional index columns added

Return type:

pd.DataFrame

Notes

This function reads parquet files from a predefined folder path.

etdtransform.aggregate.add_index_columns(df: DataFrame, columns: list | None = None) DataFrame[source][source]

Add index columns to the given DataFrame.

Parameters:
  • df (pd.DataFrame) – The input DataFrame

  • columns (list, optional) – Additional columns to include from the index, by default None

Returns:

The DataFrame with added index columns

Return type:

pd.DataFrame

Notes

This function merges the input DataFrame with an index DataFrame based on ‘HuisIdBSV’ and ‘ProjectIdBSV’.

etdtransform.aggregate.aggregate_hh_data_5min()[source][source]

Aggregate household data into 5-minute intervals.

Notes

This function reads individual household parquet files, concatenates them, and saves the result as a single parquet file.

etdtransform.aggregate.impute_hh_data_5min(df, cum_cols=['ElektriciteitNetgebruikHoog', 'ElektriciteitNetgebruikLaag', 'ElektriciteitTerugleveringHoog', 'ElektriciteitTerugleveringLaag', 'Gasgebruik', 'ElektriciteitsgebruikWTW', 'ElektriciteitsgebruikWarmtepomp', 'ElektriciteitsgebruikBooster', 'ElektriciteitsgebruikBoilervat', 'ElektriciteitsgebruikRadiator', 'WarmteproductieWarmtepomp', 'WatergebruikWarmTapwater', 'Zon-opwekTotaal'], sorted=False, diffs_calculated=False, optimized=False)[source][source]

Impute missing values in household data and save results.

Parameters:
  • df (pd.DataFrame, optional) – The input DataFrame, if None it will be read from a file

  • cum_cols (list, optional) – List of cumulative columns to process, by default cumulative_columns

  • sorted (bool, optional) – Whether the data is already sorted, by default False

  • diffs_calculated (bool, optional) – Whether differences are already calculated, by default False

  • optimized (bool, optional) – Whether to use optimized processing, by default False

Returns:

The imputed household data

Return type:

pd.DataFrame

Notes

This function performs imputation, calculates differences, and saves various summary statistics.

etdtransform.aggregate.add_calculated_columns_to_hh_data(df)[source][source]

Add calculated columns to household data and save the result.

Parameters:

df (pd.DataFrame, optional) – The input DataFrame, if None it will be read from a file

Returns:

The DataFrame with added calculated columns

Return type:

pd.DataFrame

Notes

This function adds calculated columns to the household data and saves the result as a parquet file.

etdtransform.aggregate.read_aggregate(name, interval)[source][source]

Read an aggregate parquet file.

Parameters:
  • name (str) – The name of the aggregate

  • interval (str) – The time interval of the aggregate

Returns:

The aggregate data

Return type:

pd.DataFrame

Notes

This function reads a parquet file based on the provided name and interval.

etdtransform.aggregate.get_aggregate_table(name, interval)[source][source]

Get an aggregate table as an ibis table.

Parameters:
  • name (str) – The name of the aggregate

  • interval (str) – The time interval of the aggregate

Returns:

The aggregate data as an ibis table

Return type:

ibis.Table

Notes

This function reads a parquet file and returns it as an ibis table.

etdtransform.aggregate.resample_hh_data(df=None, intervals=('60min', '15min', '5min'))[source][source]

Resample household data to different time intervals.

Parameters:
  • df (pd.DataFrame, optional) – The input DataFrame, if None it will be read from a file

  • intervals (tuple, optional) – The time intervals to resample to, by default (“60min”, “15min”, “5min”)

Notes

This function resamples household data to specified time intervals and saves the results.

etdtransform.aggregate.aggregate_project_data(intervals=('5min', '15min', '60min'))[source][source]

Aggregate project data for different time intervals.

Parameters:

intervals (tuple, optional) – The time intervals to aggregate, by default (“5min”, “15min”, “60min”)

Notes

This function aggregates project data for specified time intervals and saves the results.

etdtransform.aggregate.aggregate_and_save(df, group_column='ProjectIdBSV', interval='5min', alt_name=None)[source][source]

Aggregate data and save the result.

Parameters:
  • df (pd.DataFrame) – The input DataFrame

  • group_column (tuple, optional) – The column(s) to group by, by default (“ProjectIdBSV”)

  • interval (str, optional) – The time interval for aggregation, by default “5min”

  • alt_name (str, optional) – An alternative name for the output file, by default None

Notes

This function aggregates data, merges with size information, and saves the result as a parquet file.

etdtransform.aggregate.aggregate_by_columns(df, group_column, size)[source][source]

Aggregate data by columns.

Parameters:
  • df (pd.DataFrame) – The input DataFrame

  • group_column (list) – The column(s) to group by

  • size (pd.DataFrame) – DataFrame containing size information

Returns:

The aggregated DataFrame

Return type:

pd.DataFrame

Notes

This function aggregates data for each variable defined in aggregation_variables.

etdtransform.aggregate.aggregate_variable(df_grouped, var, config, group_column, size)[source][source]

Aggregate a single variable.

Parameters:
  • df_grouped (pd.DataFrame) – The grouped DataFrame

  • var (str) – The variable to aggregate

  • config (dict) – Configuration for the aggregation

  • group_column (list) – The column(s) to group by

  • size (pd.DataFrame) – DataFrame containing size information

Returns:

The aggregated DataFrame for the variable

Return type:

pd.DataFrame

Notes

This function aggregates a single variable based on the specified method in the config.

etdtransform.aggregate.aggregate_diff_cumsum(df, column, group_column, size, combined_results=None)[source][source]

Aggregate cumulative sum of differences.

Parameters:
  • df (pd.DataFrame) – The input DataFrame

  • column (str) – The column to aggregate

  • group_column (list) – The column(s) to group by

  • size (pd.DataFrame) – DataFrame containing size information

  • combined_results (pd.DataFrame, optional) – Previously combined results, by default None

Returns:

The aggregated DataFrame

Return type:

pd.DataFrame

Notes

This function calculates the cumulative sum of differences for the specified column.

etdtransform.aggregate.aggregate_sum(df, column, group_column, size)[source][source]

Aggregate sum of a column.

Parameters:
  • df (pd.DataFrame) – The input DataFrame

  • column (str) – The column to aggregate

  • group_column (list) – The column(s) to group by

  • size (pd.DataFrame) – DataFrame containing size information

Returns:

The aggregated DataFrame

Return type:

pd.DataFrame

Notes

This function calculates the sum of the specified column, requiring at least 60% of values to be present.

etdtransform.aggregate.aggregate_max(df, column, group_column, size)[source][source]

Aggregate maximum of a column.

Parameters:
  • df (pd.DataFrame) – The input DataFrame

  • column (str) – The column to aggregate

  • group_column (list) – The column(s) to group by

  • size (pd.DataFrame) – DataFrame containing size information

Returns:

The aggregated DataFrame

Return type:

pd.DataFrame

Notes

This function calculates the maximum of the specified column, requiring at least 60% of values to be present.

etdtransform.aggregate.aggregate_avg(df, column, group_column, size)[source][source]

Aggregate average of a column.

Parameters:
  • df (pd.DataFrame) – The input DataFrame

  • column (str) – The column to aggregate

  • group_column (list) – The column(s) to group by

  • size (pd.DataFrame) – DataFrame containing size information

Returns:

The aggregated DataFrame

Return type:

pd.DataFrame

Notes

This function calculates the average of the specified column, requiring at least 60% of values to be present.

etdtransform.aggregate.resample_and_save(df, group_column=('ProjectIdBSV', 'HuisIdBSV'), interval='5min', alt_name=None)[source][source]

Resample data and save the result.

Parameters:
  • df (pd.DataFrame) – The input DataFrame

  • group_column (tuple, optional) – The column(s) to group by, by default (“ProjectIdBSV”, “HuisIdBSV”)

  • interval (str, optional) – The time interval for resampling, by default “5min”

  • alt_name (str, optional) – An alternative name for the output file, by default None

Notes

This function resamples data and saves the result as a parquet file.

etdtransform.aggregate.resample_by_columns(df, group_column=None, interval='15min')[source][source]

Resample data by columns.

Parameters:
  • df (pd.DataFrame) – The input DataFrame

  • group_column (list, optional) – The column(s) to group by, by default None

  • interval (str, optional) – The time interval for resampling, by default “15min”

Returns:

The resampled DataFrame

Return type:

pd.DataFrame

Notes

This function resamples data for each variable defined in aggregation_variables.

etdtransform.aggregate.resample_variable(df, var, config, interval, group_column, min_count)[source][source]

Resample a single variable.

Parameters:
  • df (pd.DataFrame) – The input DataFrame

  • var (str) – The variable to resample

  • config (dict) – Configuration for the resampling

  • interval (str) – The time interval for resampling

  • group_column (list) – The column(s) to group by

  • min_count (int) – The minimum count required for resampling

Returns:

The resampled DataFrame for the variable

Return type:

pd.DataFrame

Notes

This function resamples a single variable based on the specified method in the config.

etdtransform.aggregate.resample_max(df, column, interval, group_column, min_count)[source][source]

Resample maximum of a column.

Parameters:
  • df (pd.DataFrame) – The input DataFrame

  • column (str) – The column to resample

  • interval (str) – The time interval for resampling

  • group_column (list) – The column(s) to group by

  • min_count (int) – The minimum count required for resampling

Returns:

The resampled DataFrame

Return type:

pd.DataFrame

Notes

This function resamples the maximum of the specified column.

etdtransform.aggregate.resample_sum(df, column, interval, group_column, min_count)[source][source]

Resample sum of a column.

Parameters:
  • df (pd.DataFrame) – The input DataFrame

  • column (str) – The column to resample

  • interval (str) – The time interval for resampling

  • group_column (list) – The column(s) to group by

  • min_count (int) – The minimum count required for resampling

Returns:

The resampled DataFrame

Return type:

pd.DataFrame

Notes

This function resamples the sum of the specified column.

etdtransform.aggregate.resample_avg(df, column, interval, group_column, min_count)[source][source]

Resample average of a column.

Parameters:
  • df (pd.DataFrame) – The input DataFrame.

  • column (str) – The column to resample.

  • interval (str) – The time interval for resampling.

  • group_column (list) – The column(s) to group by.

  • min_count (int) – The minimum count required for resampling.

Returns:

The resampled DataFrame.

Return type:

pd.DataFrame

Notes

This function resamples the average of the specified column, requiring at least min_count values to be present.

etdtransform.calculated_columns module

etdtransform.calculated_columns.add_calculated_columns_imputed_data(df, fillna=True)[source][source]

Add calculated columns to the input DataFrame based on existing data.

Parameters:
  • df (pd.DataFrame) – The input DataFrame containing energy usage and production data.

  • fillna (bool, optional) – Whether to fill missing values with 0 before performing calculations. Default is True.

Returns:

The modified DataFrame with additional calculated columns.

Return type:

pd.DataFrame

Notes

  • This function assumes that the input DataFrame contains the necessary columns such as ‘ElektriciteitTerugleveringLaagDiff’, ‘ElektriciteitTerugleveringHoogDiff’, ‘ElektriciteitNetgebruikLaagDiff’, ‘ElektriciteitNetgebruikHoogDiff’, ‘ElektriciteitsgebruikWarmtepompDiff’, ‘ElektriciteitsgebruikBoosterDiff’, ‘ElektriciteitsgebruikBoilervatDiff’, ‘ElektriciteitsgebruikWTWDiff’, ‘ElektriciteitsgebruikRadiatorDiff’, and ‘Zon-opwekTotaalDiff’.

  • The function assumes that missing values can be treated a 0s, typically after data cleaning and imputation.

  • The function fills missing values in each column with 0 before performing calculations to ensure that the operations do not fail due to missing data.

etdtransform.calculated_columns.add_rolling_avg(group, var='ElektriciteitsgebruikTotaalNetto', days=14, avg_var='RollingAverage')[source][source]

Add a rolling average column to each group in the DataFrame.

Parameters:
  • group (pd.DataFrame) – The DataFrame group on which to perform the operation. This should be a subset of a larger DataFrame that has been grouped by some key, for example, df.groupby(‘some_column’).apply(add_rolling_avg).

  • var (str, optional) – The name of the column in ‘group’ for which the rolling average will be calculated. Default is ‘ElektriciteitsgebruikTotaalNetto’.

  • days (int, optional) – The number of days over which to calculate the rolling average. Default is 14 days.

  • avg_var (str, optional) – The name of the new column in ‘group’ that will store the calculated rolling average values. Default is ‘RollingAverage’.

Returns:

A DataFrame with an additional column containing the rolling averages.

Return type:

pd.DataFrame

Notes

  • This function assumes that the ‘ReadingDate’ column exists in the input DataFrame and is sorted in ascending order. The ‘ReadingDate’ column should be of datetime type.

  • The function calculates a rolling average using a window size determined by the number of days specified and the frequency of the data points in the group. It uses a forward-looking window, meaning that for each date, it computes the average of the next days worth of data points.

  • To handle cases where there are missing dates or irregular sampling intervals, the function first calculates the time difference between consecutive readings to determine how many timesteps correspond to the specified number of days. It then uses this calculated window size for the rolling average computation.

  • The min_periods parameter in the rolling method is set to half of the window size, ensuring that partial windows at the beginning and end of the group are still computed if they have sufficient data points.

etdtransform.calculated_columns.get_highest_avg_period(group, avg_var='RollingAverage', days=14)[source][source]

Retrieve the start time, end time, and highest rolling average for each group in the DataFrame.

Parameters:
  • group (pd.DataFrame) – The DataFrame group on which to perform the operation. This should be a subset of a larger DataFrame that has been grouped by some key, for example, df.groupby(‘some_column’).apply(get_highest_avg_period).

  • avg_var (str, optional) – The name of the column in ‘group’ containing the rolling averages. Default is ‘RollingAverage’.

  • days (int, optional) – The number of days over which the rolling average was calculated. Default is 14 days.

Returns:

A DataFrame with columns for each group variable (if applicable), start time, end time, and highest rolling average.

Return type:

pd.DataFrame

Notes

  • This function assumes that the ‘ReadingDate’ column exists in the input DataFrame and is sorted in ascending order. The ‘ReadingDate’ column should be of datetime type.

  • The function identifies rows with the highest value in the avg_var column and calculates the corresponding start and end times based on the number of days specified.

  • To handle cases where there are missing dates or irregular sampling intervals, the function first calculates the time difference between consecutive readings to determine how many timesteps correspond to the specified number of days. It then uses this calculated window size for determining the start and end times.

  • If the calculated start index is out of bounds (greater than or equal to the length of the group), it sets the start index to the last valid index in the group.

etdtransform.calculated_columns.gelijktijdigheid(df, df_5min, rolling_average='RollingAverage', group_var=None)[source][source]

Calculate the ratio of the highest rolling average of the given rolling average column between daily and 5-minute interval data.

Parameters:
  • df (pd.DataFrame) – The input daily DataFrame containing the data.

  • df_5min (pd.DataFrame) – The input 5-minute interval DataFrame containing the data.

  • rolling_average (str, optional) – The column name of the rolling average. Default is “RollingAverage”.

  • group_var (str, optional) – The column name to group by. Default is None.

Returns:

A DataFrame with the group variable (if applicable) and the ratio of the highest rolling average value.

Return type:

pd.DataFrame

etdtransform.calculated_columns.get_lowest_avg_period(group, avg_var='RollingAvg_Temperatuur', days=14)[source][source]

Retrieve the start time, end time, and lowest rolling average for each group in the DataFrame.

Parameters:
  • group (pd.DataFrame) – The DataFrame group on which to perform the operation. This should be a subset of a larger DataFrame that has been grouped by some key, for example, df.groupby(‘some_column’).apply(get_lowest_avg_period).

  • avg_var (str, optional) – The name of the column in ‘group’ containing the rolling averages. Default is ‘RollingAvg_Temperatuur’.

  • days (int, optional) – The number of days over which the rolling average was calculated. Default is 14 days.

Returns:

A DataFrame with columns for each group variable (if applicable), start time, end time, and lowest rolling average.

Return type:

pd.DataFrame

Notes

  • This function assumes that the ‘ReadingDate’ column exists in the input DataFrame and is sorted in ascending order. The ‘ReadingDate’ column should be of datetime type.

  • The function identifies rows with the lowest value in the avg_var column and calculates the corresponding start and end times based on the number of days specified.

  • To handle cases where there are missing dates or irregular sampling intervals, the function first calculates the time difference between consecutive readings to determine how many timesteps correspond to the specified number of days. It then uses this calculated window size for determining the start and end times.

  • If the calculated start index is out of bounds (greater than or equal to the length of the group), it sets the start index to the last valid index in the group.

etdtransform.calculated_columns.mark_coldest_two_weeks(group, avg_var='TemperatuurRA', days=14)[source][source]

Marks the coldest two-week period for each group in the DataFrame.

Parameters:
  • group (pd.DataFrame) – The DataFrame group.

  • avg_var (str) – The variable containing the rolling averages.

  • days (int) – The number of days over which the rolling average was calculated.

Returns:

A boolean Series indicating whether each row is within the coldest two-week period.

Return type:

pd.Series

etdtransform.calculated_columns.mark_highest_peak(group, var='ElektriciteitsgebruikTotaalNetto', days=6)[source][source]

Marks the one-week period for each group in the DataFrame around the highest peak.

Parameters:
  • group (pd.DataFrame) – The DataFrame group.

  • var (str) – The variable containing the peak energy use.

  • days (int) – The number of days to include.

Returns:

A boolean Series indicating whether each row is within the one-week period around the highest peak.

Return type:

pd.Series

etdtransform.calculated_columns.switch_multiplier(interval_choice)[source][source]

Returns the multiplier for the switches in the calculation of the calculated columns.

Parameters:

interval_choice (str) – The interval over which the data is aggregated.

Returns:

The multiplier to use in unit conversions.

Return type:

int

etdtransform.calculated_columns.add_normalized_datetime(x, reference_date=Timestamp('2023-01-02 00:00:00'), datetime_column='ReadingDate')[source][source]

Adds a normalized datetime column to the DataFrame or Ibis Table. Used to do analyses that depend on the time of day rather than date.

Parameters:
  • x (pd.DataFrame or ibis.expr.types.TableExpr) – The DataFrame or Table.

  • reference_date (datetime.datetime, optional) – The date used as reference for the normalization. Default is ‘2023-01-02’.

  • datetime_column (str, optional) – The name of the column containing the datetime. Default is ‘ReadingDate’.

Returns:

The DataFrame or Table with a new column ‘normalized_datetime’.

Return type:

pd.DataFrame or ibis.expr.types.TableExpr

etdtransform.impute module

etdtransform.impute.calculate_average_diff(df: DataFrame, project_id_column: str, diff_columns: list[str]) DataFrame[source][source]

Calculate average differences for specified columns grouped by project and reading date.

This function computes the average differences for the specified columns, excluding outliers based on a 95th percentile threshold. It’s used to prepare data for imputation of missing values.

Parameters:
  • df (pd.DataFrame) – The input DataFrame containing the data.

  • project_id_column (str) – The name of the column containing project IDs.

  • diff_columns (list[str]) – A list of column names for which to calculate average differences.

Returns:

A dictionary where keys are column names and values are dictionaries containing: - ‘avg_diff’: DataFrame with average differences - ‘upper_bounds’: DataFrame with upper bounds for outlier exclusion - ‘household_max_with_bounds’: DataFrame with household maximum values and bounds

Return type:

dict

Notes

This function uses a 95th percentile threshold to exclude outliers when calculating averages. The threshold is doubled to create an upper bound for inclusion in the average calculation.

Warning

  • Negative difference values will raise a ValueError.

  • Missing values in the resulting average columns will be logged as errors.

etdtransform.impute.concatenate_household_max_with_bounds(avg_diff_dict, project_id_column)[source][source]

Concatenate household maximum values and bounds for all columns.

This function combines the household maximum values and upper bounds for all columns in the avg_diff_dict into a single DataFrame.

Parameters:
  • avg_diff_dict (dict) – A dictionary containing average difference data for each column.

  • project_id_column (str) – The name of the column containing project IDs.

Returns:

A DataFrame containing concatenated household maximum values and bounds for all columns.

Return type:

pd.DataFrame

Notes

This function assumes that the ‘household_max_with_bounds’ key exists in each dictionary within avg_diff_dict and contains the columns ‘ProjectIdBSV’ (or other specified project id column), ‘HuisIdBSV’, ‘{col}_huis_max’, and ‘{col}_upper_bound’.

etdtransform.impute.concatenate_avg_diff_columns(avg_diff_dict, project_id_column)[source][source]

Concatenate average difference columns for all variables.

This function combines the average difference columns for all variables in the avg_diff_dict into a single DataFrame.

Parameters:
  • avg_diff_dict (dict) – A dictionary containing average difference data for each column.

  • project_id_column (str) – The name of the column containing project IDs.

Returns:

A DataFrame containing concatenated average difference columns for all variables.

Return type:

pd.DataFrame

Notes

This function assumes that the ‘avg_diff’ key exists in each dictionary within avg_diff_dict and contains the columns ‘ProjectIdBSV’ or specified project_id_column, ‘ReadingDate’, and ‘{col}_avg’.

etdtransform.impute.equal_sig_fig(a, b, sig_figs)[source][source]

Compare two numbers for equality up to a specified number of significant figures.

This function rounds both numbers to the specified number of significant figures and then compares them for equality using a relative tolerance that scales with the magnitude of the numbers.

Parameters:
  • a (float) – The first number to compare.

  • b (float) – The second number to compare.

  • sig_figs (int) – The number of significant figures to consider for comparison.

Returns:

True if the numbers are equal up to the specified number of significant figures, False otherwise.

Return type:

bool

Notes

This function uses the isclose function from the math module to compare the rounded numbers with a relative tolerance based on the number of significant figures.

etdtransform.impute.validate_household_column(household_df, cum_col, huis_code)[source][source]

Validate a household column for data quality and completeness.

This function checks a specific column in a household DataFrame for missing values, zero values, and lack of change. It logs warnings and information about the data quality.

Parameters:
  • household_df (pd.DataFrame) – The DataFrame containing household data.

  • cum_col (str) – The name of the cumulative column to validate.

  • huis_code (str) – The unique identifier for the household.

Returns:

True if the column passes all checks, False otherwise.

Return type:

bool

Notes

This function is currently unused in the main processing pipeline.

Warning

  • Logs a warning if more than 40% of values in the column are missing.

  • Logs information about the number of missing values, zero values, and lack of change.

etdtransform.impute.get_reading_date_imputation_stats(df, project_id_column, cumulative_columns)[source][source]

Calculate imputation statistics for each reading date and cumulative column.

This function computes various statistics related to imputation for each reading date and cumulative column, including the number of imputed values, missing values, and original values.

Parameters:
  • df (pd.DataFrame) – The DataFrame containing the data.

  • project_id_column (str) – The name of the column containing project IDs.

  • cumulative_columns (list) – A list of cumulative column names to analyze.

Returns:

A DataFrame containing imputation statistics for each reading date and column.

Return type:

pd.DataFrame

Notes

This function is currently unused but can be applied as a sense check to ensure not too many values are missing.

The resulting DataFrame includes the following columns: - project_id_column - ReadingDate - column - imputed - na - total_records - original - percent_imputed - percent_na - percent_original

etdtransform.impute.sort_for_impute(df: DataFrame, project_id_column: str)[source][source]

Sort the DataFrame to prepare for imputation.

This function sorts the input DataFrame by project ID, household ID, and reading date. Sorting is necessary to ensure correct imputation of missing values.

Parameters:
  • df (pd.DataFrame) – The input DataFrame to be sorted.

  • project_id_column (str) – The name of the column containing project IDs.

Returns:

The sorted DataFrame.

Return type:

pd.DataFrame

Notes

The sorting order is: project ID, household ID (HuisIdBSV), and reading date (ReadingDate). This order is crucial for the imputation process to work correctly.

etdtransform.impute.get_diff_columns(cumulative_columns: list)[source][source]

Generate difference column names from cumulative column names.

This function takes a list of cumulative column names and returns a list of corresponding difference column names by appending ‘Diff’ to each name.

Parameters:

cumulative_columns (list) – A list of cumulative column names.

Returns:

A list of difference column names.

Return type:

list

Notes

This function is used to create names for columns that will store the differences between consecutive cumulative values.

etdtransform.impute.prepare_diffs_for_impute(df: DataFrame, project_id_column: str, cumulative_columns: list, sorted=False)[source][source]

Prepare difference columns for imputation.

This function calculates average differences, combines them, and prepares household maximum and bound information for imputation.

Parameters:
  • df (pd.DataFrame) – The input DataFrame containing the data.

  • project_id_column (str) – The name of the column containing project IDs.

  • cumulative_columns (list) – A list of cumulative column names.

  • sorted (bool, optional) – Whether the DataFrame is already sorted. Default is False.

Returns:

A tuple containing: - diff_columns: list of difference column names - diffs: DataFrame with average differences - max_bound: DataFrame with household maximum and bound information

Return type:

tuple

Notes

This function performs the following steps: 1. Sorts the DataFrame if not already sorted. 2. Calculates average differences for each cumulative column. 3. Combines average differences and household maximum/bound information. 4. Saves the results to parquet files for later use.

The resulting files are saved in the directory specified by etdtransform.options.aggregate_folder_path.

etdtransform.impute.read_diffs()[source][source]

Read average differences from a parquet file.

This function reads the average differences data from a parquet file located in the aggregate folder specified in the etdtransform options.

Returns:

A DataFrame containing the average differences data.

Return type:

pd.DataFrame

Notes

The function assumes that the ‘avg_diffs.parquet’ file exists in the aggregate folder path specified in etdtransform.options.aggregate_folder_path.

This function is typically used to load pre-calculated average differences for use in imputation processes.

etdtransform.impute.process_and_impute(df: DataFrame, project_id_column: str, cumulative_columns: list, sorted=False, diffs_calculated=False, optimized=False)[source][source]

Process and impute missing values in the dataset.

This function performs data processing and imputation on the input DataFrame. It can either calculate differences or load pre-calculated differences, and then applies imputation methods to fill missing values.

Parameters:
  • df (pd.DataFrame) – The input DataFrame containing the data to be processed and imputed.

  • project_id_column (str) – The name of the column containing project IDs.

  • cumulative_columns (list) – A list of cumulative column names to be processed.

  • sorted (bool, optional) – Whether the DataFrame is already sorted. Default is False.

  • diffs_calculated (bool, optional) – Whether differences have already been calculated. Default is False.

  • optimized (bool, optional) – Whether to use optimized imputation methods. Default is False.

Returns:

A tuple containing: - df: The processed and imputed DataFrame - imputation_summary_house: Summary of imputation statistics per house - imputation_summary_project: Summary of imputation statistics per project - imputation_reading_date_stats_df: Statistics of imputation by reading date

Return type:

tuple

Notes

This function performs the following steps: 1. Sorts the DataFrame if not already sorted. 2. Loads or calculates differences. 3. Merges average differences into the household DataFrame. 4. Applies imputation methods (either optimized or standard). 5. Calculates and saves imputation statistics. 6. Provides warnings for high imputation percentages.

The function saves various statistics and summary files in the aggregate folder specified in etdtransform.options.aggregate_folder_path.

Warning

  • Logs warnings if any house or project has more than 40% imputed values.

  • Logs warnings if any reading date has more than 40% imputed values.

etdtransform.knmi module

etdtransform.knmi.get_project_weather_station_data()[source][source]

Load and process project weather station data.

Returns:

DataFrame containing project weather station data with uppercase station names.

Return type:

pandas.DataFrame

etdtransform.knmi.get_weather_data()[source][source]

Load and process weather data from CSV files.

Returns:

DataFrame containing combined weather data from all CSV files.

Return type:

pandas.DataFrame

etdtransform.knmi.load_knmi_weather_data(folder_path)[source][source]

Load and process KNMI weather data from text files in a specified folder.

Parameters:

folder_path (str) – Path to the folder containing KNMI weather data files.

Returns:

DataFrame containing combined and processed weather data from all files.

Return type:

pandas.DataFrame

etdtransform.load_data module

etdtransform.load_data.get_household_tables(include_weather: bool = True) dict[str, Expr][source][source]

Reads household data tables for different intervals and joins them with an index table. Optionally integrates weather data.

Parameters:

include_weather (bool, optional) – If True, includes weather data in the returned tables (default is True).

Returns:

A dictionary where keys are interval names (e.g., ‘hourly’, ‘daily’) and values are the corresponding Ibis tables.

Return type:

dict[str, ibis.Expr]

etdtransform.load_data.join_index_table(tbl: Expr, index_table: Expr | None = None, index_join_columns: List[str] = ['HuisIdBSV', 'ProjectIdBSV']) Expr[source][source]

Joins a given table with an index table on specified columns.

Parameters:
  • tbl (ibis.Expr) – The table to join.

  • index_table (Optional[ibis.Expr], optional) – The index table. If None, reads from default parquet file (default is None).

  • index_join_columns (List[str], optional) – Columns to use for the join (default is [“HuisCode”, “ProjectIdBSV”]).

Returns:

The table joined with the index table.

Return type:

ibis.Expr

etdtransform.load_data.get_weather_data_table() Expr[source][source]

Processes and transforms weather data into an Ibis table with additional calculated columns.

The transformations include: - Rolling 14-day averages of temperature and perceived temperature. - Identifying the coldest two weeks based on rolling averages. - Adding ISO week, day of week, and weekly summary calculations.

Returns:

An Ibis table containing transformed weather data with additional calculated columns.

Return type:

ibis.Expr

Notes

The weather data is grouped by station (‘STN’) and aggregated weekly.

etdtransform.load_data.get_weather_station_table() Expr[source][source]

Retrieves weather station data as an Ibis table.

Returns:

An Ibis table containing weather station data.

Return type:

ibis.Expr

etdtransform.load_data.join_weather_data(tbl: Expr, weather_station_table: Expr | None = None, weather_table: Expr | None = None) Expr[source][source]

Joins weather data with a given table.

Parameters:
  • tbl (ibis.Expr) – The table to join with weather data.

  • weather_station_table (Optional[ibis.Expr], optional) – The weather station mapping table (default is None, meaning it will be retrieved).

  • weather_table (Optional[ibis.Expr], optional) – The table containing weather data (default is None, meaning it will be retrieved).

Returns:

The input table with joined weather data.

Return type:

ibis.Expr

etdtransform.load_data.get_project_tables(include_weather=True) dict[str, Expr][source][source]

Retrieves aggregate project data tables from Parquet files, integrates weather data, and returns a dictionary of Ibis tables for each interval.

Returns:

A dictionary where keys are intervals (and additional metadata like ‘project_weather’) and values are the corresponding Ibis tables with integrated weather data.

Return type:

dict[str, ibis.Expr]

etdtransform.load_data.get_dfs()[source][source]

Reads the aggregate data from the parquet files and adds weather data for analysis. It returns a dictionary of DataFrames for each interval.

etdtransform.vectorized_impute module

class etdtransform.vectorized_impute.ImputeType(value)[source][source]

Bases: IntFlag

Enumeration of imputation types used in the vectorized imputation process.

This class defines the different types of imputation methods applied during the vectorized imputation process for handling missing or problematic data in time series.

NONE

Represents no imputation.

Type:

int

NEGATIVE_GAP_JUMP

Represents a negative gap jump. Fills with zeros (potentially a meter reset)

Type:

int

NEAR_ZERO_GAP_JUMP

Represents a gap jump near zero. Fills with zeros (no change).

Type:

int

LINEAR_FILL

Represents a linear fill for positive gaps with near-zero impute jumps based on average.

Type:

int

SCALED_FILL

Represents a scaled fill for positive gaps with positive impute jumps based on average.

Type:

int

ZERO_END_VALUE

Represents imputation when end value is zero and there is no start value. Fills with zeros.

Type:

int

POSITIVE_END_VALUE

Represents imputation when end value is positive but there is no start value. Fills with averages.

Type:

int

NO_END_VALUE

Represents imputation when there is no end value. Fills with averages.

Type:

int

THRESHOLD_ADJUSTED

Represents values adjusted due to threshold violations. This happens after imputation and could be triggered by imputed values.

Type:

int

Notes

The THRESHOLD_ADJUSTED flag can be combined with other imputation types.

etdtransform.vectorized_impute.apply_thresholds(df, lower_bound, upper_bound, diff_col, avg_col, impute_type_col, is_imputed_col)[source][source]

Apply thresholds to difference column and update imputation flags.

This function applies lower and upper bounds to a difference column in the DataFrame. Values outside these bounds are replaced with average values, and corresponding imputation flags are updated.

Parameters:
  • df (pandas.DataFrame) – The input DataFrame containing the data.

  • lower_bound (float) – The lower threshold for the difference column.

  • upper_bound (float) – The upper threshold for the difference column.

  • diff_col (str) – The name of the difference column to apply thresholds to.

  • avg_col (str) – The name of the column containing average values to use for imputation.

  • impute_type_col (str) – The name of the column indicating the imputation type.

  • is_imputed_col (str) – The name of the column indicating whether a value is imputed.

Returns:

The DataFrame with thresholds applied and imputation flags updated.

Return type:

pandas.DataFrame

Notes

This function modifies the input DataFrame in-place and also returns it. Values outside the thresholds are replaced with average values and marked as imputed.

etdtransform.vectorized_impute.drop_temp_cols(df, temp_cols=None, logLeftoverError=False)[source][source]

Drop temporary columns from the DataFrame.

This function removes specified temporary columns from the DataFrame. If no columns are specified, it drops a predefined set of temporary columns.

Parameters:
  • df (pandas.DataFrame) – The DataFrame from which to drop columns.

  • temp_cols (list, optional) – A list of column names to drop. If None, a default set of temporary columns will be used.

  • logLeftoverError (bool, optional) – If True, log an error message for any leftover columns to be removed.

Notes

This function modifies the DataFrame in-place.

The default set of temporary columns includes various intermediate calculation columns used in the imputation process.

Warning

  • If logLeftoverError is True and there are columns to be dropped, an error message will be logged, which might indicate unintended remnants in the data processing pipeline.

etdtransform.vectorized_impute.impute_and_normalize(df: DataFrame, cumulative_columns: list, project_id_column: str, max_bound: DataFrame)[source][source]

Perform vectorized imputation and normalization on cumulative columns

This function applies imputation techniques to fill missing values in cumulative columns and normalizes the data. It uses vectorized operations for improved performance.

Parameters:
  • df (pandas.DataFrame) – The input DataFrame containing the data to be imputed and normalized.

  • cumulative_columns (list) – A list of column names representing cumulative variables to be processed.

  • project_id_column (str) – The name of the column containing project identifiers.

  • max_bound (pandas.DataFrame) – A DataFrame containing maximum bounds for each variable.

Returns:

A tuple containing three elements:

  • dfpandas.DataFrame

    The imputed and normalized DataFrame.

  • imputation_gap_stats_dfpandas.DataFrame

    Statistics about the imputation process for each gap.

  • imputation_reading_date_stats_dfNone or pandas.DataFrame

    Statistics about imputation by reading date (if calculated).

Return type:

tuple

Notes

This function applies various imputation methods based on the nature of the missing data and the available information. It handles different scenarios such as gaps in data, zero jumps, and negative jumps.

The function also calculates and returns statistics about the imputation process, which can be useful for quality assessment.

Warning

  • The function may modify the input DataFrame in-place.

  • Imputation methods may introduce bias or affect the variance of the data.

  • Large amounts of imputed data may significantly affect analysis results.

etdtransform.vectorized_impute.methods_to_bitwise(methods_column)[source][source]

Convert methods to bitwise representation.

This function takes a column of methods and converts each method to a bitwise representation. Each method is represented by a bit in the resulting integer.

Parameters:

methods_column (array-like) – A column containing lists of method numbers.

Returns:

An array of integers where each integer represents the bitwise representation of the methods for that row.

Return type:

numpy.ndarray

Notes

The function assumes that method numbers start from 1 and correspond to bit positions (method 1 = bit 0, method 2 = bit 1, etc.).

This vectorized version is optimized for performance with NumPy.

etdtransform.vectorized_impute.process_gap_and_cumulative_groups(df, diff_col, cum_col)[source][source]

Process gap and cumulative value groups in the DataFrame.

This function identifies gaps in the data, creates gap groups, and establishes cumulative value groups based on the presence of NA values and transitions between households.

Parameters:
  • df (pandas.DataFrame) – The DataFrame to process.

  • diff_col (str) – The name of the difference column to analyze for gaps.

  • cum_col (str) – The name of the cumulative column to use for value grouping.

Returns:

The processed DataFrame with added columns for gap and cumulative value grouping.

Return type:

pandas.DataFrame

Notes

This function adds several temporary columns to the DataFrame: - ‘gap_start’: Identifies the start of a diff column gap or transition between households. - ‘gap_group’: Groups consecutive NA values in diff columns. - ‘cum_value_encountered’: Marks where a non-NA value is encountered in cumulative column. - ‘cumulative_value_group’: Groups gaps based on cumulative values. - ‘gap_length’: The length of each gap group.

These columns are crucial for the subsequent imputation process. It returns only for further imputation: - ‘cumulative_value_group’ - ‘gap_length’

Warning

  • This function modifies the input DataFrame in-place.

  • The added columns should be handled carefully in subsequent processing steps.

etdtransform.vectorized_impute.process_imputation_vectorized(df, diff_col, cum_col, avg_col, impute_type_col, is_imputed_col)[source][source]

Perform vectorized imputation on the DataFrame.

This function applies various imputation methods to fill missing values in the difference column based on cumulative values and average differences.

Parameters:
  • df (pandas.DataFrame) – The DataFrame to impute.

  • diff_col (str) – The name of the difference column to impute.

  • cum_col (str) – The name of the cumulative column used for imputation.

  • avg_col (str) – The name of the column containing average differences.

  • impute_type_col (str) – The name of the column to store imputation type.

  • is_imputed_col (str) – The name of the column to indicate whether a value is imputed.

Returns:

The DataFrame with imputed values and additional columns indicating imputation types and statistics.

Return type:

pandas.DataFrame

Notes

This function applies several imputation methods for diff columns: - Filling with zeros for flat or near-zero gaps - Linear filling for positive gaps with near-zero impute jumps - Scaled impute value filling for positive gaps with positive impute jumps - Handling cases with no gap jump (e.g., at the start or end of the dataset)

The function also applies thresholds to remove physically impossible outliers.

Warning

  • This function modifies the input DataFrame in-place.

  • The imputation process may introduce bias, especially in cases with large gaps or when a significant portion of the data is imputed.

  • The function assumes that the input data has been properly prepared and sorted.

Module contents