etdtransform package
Submodules
etdtransform.aggregate module
- etdtransform.aggregate.read_hh_data(interval='default', metadata_columns=None)[source][source]
Read household data from a parquet file and optionally add index columns to.
- Parameters:
interval (str, optional) – The time interval of the data to read, by default “default”
metadata_columns (list, optional) – Additional columns to include from the index, by default None
- Returns:
The household data with optional index columns added
- Return type:
pd.DataFrame
Notes
This function reads parquet files from a predefined folder path.
- etdtransform.aggregate.add_index_columns(df: DataFrame, columns: list | None = None) DataFrame [source][source]
Add index columns to the given DataFrame.
- Parameters:
df (pd.DataFrame) – The input DataFrame
columns (list, optional) – Additional columns to include from the index, by default None
- Returns:
The DataFrame with added index columns
- Return type:
pd.DataFrame
Notes
This function merges the input DataFrame with an index DataFrame based on ‘HuisIdBSV’ and ‘ProjectIdBSV’.
- etdtransform.aggregate.aggregate_hh_data_5min()[source][source]
Aggregate household data into 5-minute intervals.
Notes
This function reads individual household parquet files, concatenates them, and saves the result as a single parquet file.
- etdtransform.aggregate.impute_hh_data_5min(df, cum_cols=['ElektriciteitNetgebruikHoog', 'ElektriciteitNetgebruikLaag', 'ElektriciteitTerugleveringHoog', 'ElektriciteitTerugleveringLaag', 'Gasgebruik', 'ElektriciteitsgebruikWTW', 'ElektriciteitsgebruikWarmtepomp', 'ElektriciteitsgebruikBooster', 'ElektriciteitsgebruikBoilervat', 'ElektriciteitsgebruikRadiator', 'WarmteproductieWarmtepomp', 'WatergebruikWarmTapwater', 'Zon-opwekTotaal'], sorted=False, diffs_calculated=False, optimized=False)[source][source]
Impute missing values in household data and save results.
- Parameters:
df (pd.DataFrame, optional) – The input DataFrame, if None it will be read from a file
cum_cols (list, optional) – List of cumulative columns to process, by default cumulative_columns
sorted (bool, optional) – Whether the data is already sorted, by default False
diffs_calculated (bool, optional) – Whether differences are already calculated, by default False
optimized (bool, optional) – Whether to use optimized processing, by default False
- Returns:
The imputed household data
- Return type:
pd.DataFrame
Notes
This function performs imputation, calculates differences, and saves various summary statistics.
- etdtransform.aggregate.add_calculated_columns_to_hh_data(df)[source][source]
Add calculated columns to household data and save the result.
- Parameters:
df (pd.DataFrame, optional) – The input DataFrame, if None it will be read from a file
- Returns:
The DataFrame with added calculated columns
- Return type:
pd.DataFrame
Notes
This function adds calculated columns to the household data and saves the result as a parquet file.
- etdtransform.aggregate.read_aggregate(name, interval)[source][source]
Read an aggregate parquet file.
- Parameters:
name (str) – The name of the aggregate
interval (str) – The time interval of the aggregate
- Returns:
The aggregate data
- Return type:
pd.DataFrame
Notes
This function reads a parquet file based on the provided name and interval.
- etdtransform.aggregate.get_aggregate_table(name, interval)[source][source]
Get an aggregate table as an ibis table.
- Parameters:
name (str) – The name of the aggregate
interval (str) – The time interval of the aggregate
- Returns:
The aggregate data as an ibis table
- Return type:
ibis.Table
Notes
This function reads a parquet file and returns it as an ibis table.
- etdtransform.aggregate.resample_hh_data(df=None, intervals=('60min', '15min', '5min'))[source][source]
Resample household data to different time intervals.
- Parameters:
df (pd.DataFrame, optional) – The input DataFrame, if None it will be read from a file
intervals (tuple, optional) – The time intervals to resample to, by default (“60min”, “15min”, “5min”)
Notes
This function resamples household data to specified time intervals and saves the results.
- etdtransform.aggregate.aggregate_project_data(intervals=('5min', '15min', '60min'))[source][source]
Aggregate project data for different time intervals.
- Parameters:
intervals (tuple, optional) – The time intervals to aggregate, by default (“5min”, “15min”, “60min”)
Notes
This function aggregates project data for specified time intervals and saves the results.
- etdtransform.aggregate.aggregate_and_save(df, group_column='ProjectIdBSV', interval='5min', alt_name=None)[source][source]
Aggregate data and save the result.
- Parameters:
df (pd.DataFrame) – The input DataFrame
group_column (tuple, optional) – The column(s) to group by, by default (“ProjectIdBSV”)
interval (str, optional) – The time interval for aggregation, by default “5min”
alt_name (str, optional) – An alternative name for the output file, by default None
Notes
This function aggregates data, merges with size information, and saves the result as a parquet file.
- etdtransform.aggregate.aggregate_by_columns(df, group_column, size)[source][source]
Aggregate data by columns.
- Parameters:
df (pd.DataFrame) – The input DataFrame
group_column (list) – The column(s) to group by
size (pd.DataFrame) – DataFrame containing size information
- Returns:
The aggregated DataFrame
- Return type:
pd.DataFrame
Notes
This function aggregates data for each variable defined in aggregation_variables.
- etdtransform.aggregate.aggregate_variable(df_grouped, var, config, group_column, size)[source][source]
Aggregate a single variable.
- Parameters:
df_grouped (pd.DataFrame) – The grouped DataFrame
var (str) – The variable to aggregate
config (dict) – Configuration for the aggregation
group_column (list) – The column(s) to group by
size (pd.DataFrame) – DataFrame containing size information
- Returns:
The aggregated DataFrame for the variable
- Return type:
pd.DataFrame
Notes
This function aggregates a single variable based on the specified method in the config.
- etdtransform.aggregate.aggregate_diff_cumsum(df, column, group_column, size, combined_results=None)[source][source]
Aggregate cumulative sum of differences.
- Parameters:
df (pd.DataFrame) – The input DataFrame
column (str) – The column to aggregate
group_column (list) – The column(s) to group by
size (pd.DataFrame) – DataFrame containing size information
combined_results (pd.DataFrame, optional) – Previously combined results, by default None
- Returns:
The aggregated DataFrame
- Return type:
pd.DataFrame
Notes
This function calculates the cumulative sum of differences for the specified column.
- etdtransform.aggregate.aggregate_sum(df, column, group_column, size)[source][source]
Aggregate sum of a column.
- Parameters:
df (pd.DataFrame) – The input DataFrame
column (str) – The column to aggregate
group_column (list) – The column(s) to group by
size (pd.DataFrame) – DataFrame containing size information
- Returns:
The aggregated DataFrame
- Return type:
pd.DataFrame
Notes
This function calculates the sum of the specified column, requiring at least 60% of values to be present.
- etdtransform.aggregate.aggregate_max(df, column, group_column, size)[source][source]
Aggregate maximum of a column.
- Parameters:
df (pd.DataFrame) – The input DataFrame
column (str) – The column to aggregate
group_column (list) – The column(s) to group by
size (pd.DataFrame) – DataFrame containing size information
- Returns:
The aggregated DataFrame
- Return type:
pd.DataFrame
Notes
This function calculates the maximum of the specified column, requiring at least 60% of values to be present.
- etdtransform.aggregate.aggregate_avg(df, column, group_column, size)[source][source]
Aggregate average of a column.
- Parameters:
df (pd.DataFrame) – The input DataFrame
column (str) – The column to aggregate
group_column (list) – The column(s) to group by
size (pd.DataFrame) – DataFrame containing size information
- Returns:
The aggregated DataFrame
- Return type:
pd.DataFrame
Notes
This function calculates the average of the specified column, requiring at least 60% of values to be present.
- etdtransform.aggregate.resample_and_save(df, group_column=('ProjectIdBSV', 'HuisIdBSV'), interval='5min', alt_name=None)[source][source]
Resample data and save the result.
- Parameters:
df (pd.DataFrame) – The input DataFrame
group_column (tuple, optional) – The column(s) to group by, by default (“ProjectIdBSV”, “HuisIdBSV”)
interval (str, optional) – The time interval for resampling, by default “5min”
alt_name (str, optional) – An alternative name for the output file, by default None
Notes
This function resamples data and saves the result as a parquet file.
- etdtransform.aggregate.resample_by_columns(df, group_column=None, interval='15min')[source][source]
Resample data by columns.
- Parameters:
df (pd.DataFrame) – The input DataFrame
group_column (list, optional) – The column(s) to group by, by default None
interval (str, optional) – The time interval for resampling, by default “15min”
- Returns:
The resampled DataFrame
- Return type:
pd.DataFrame
Notes
This function resamples data for each variable defined in aggregation_variables.
- etdtransform.aggregate.resample_variable(df, var, config, interval, group_column, min_count)[source][source]
Resample a single variable.
- Parameters:
df (pd.DataFrame) – The input DataFrame
var (str) – The variable to resample
config (dict) – Configuration for the resampling
interval (str) – The time interval for resampling
group_column (list) – The column(s) to group by
min_count (int) – The minimum count required for resampling
- Returns:
The resampled DataFrame for the variable
- Return type:
pd.DataFrame
Notes
This function resamples a single variable based on the specified method in the config.
- etdtransform.aggregate.resample_max(df, column, interval, group_column, min_count)[source][source]
Resample maximum of a column.
- Parameters:
df (pd.DataFrame) – The input DataFrame
column (str) – The column to resample
interval (str) – The time interval for resampling
group_column (list) – The column(s) to group by
min_count (int) – The minimum count required for resampling
- Returns:
The resampled DataFrame
- Return type:
pd.DataFrame
Notes
This function resamples the maximum of the specified column.
- etdtransform.aggregate.resample_sum(df, column, interval, group_column, min_count)[source][source]
Resample sum of a column.
- Parameters:
df (pd.DataFrame) – The input DataFrame
column (str) – The column to resample
interval (str) – The time interval for resampling
group_column (list) – The column(s) to group by
min_count (int) – The minimum count required for resampling
- Returns:
The resampled DataFrame
- Return type:
pd.DataFrame
Notes
This function resamples the sum of the specified column.
- etdtransform.aggregate.resample_avg(df, column, interval, group_column, min_count)[source][source]
Resample average of a column.
- Parameters:
df (pd.DataFrame) – The input DataFrame.
column (str) – The column to resample.
interval (str) – The time interval for resampling.
group_column (list) – The column(s) to group by.
min_count (int) – The minimum count required for resampling.
- Returns:
The resampled DataFrame.
- Return type:
pd.DataFrame
Notes
This function resamples the average of the specified column, requiring at least min_count values to be present.
etdtransform.calculated_columns module
- etdtransform.calculated_columns.add_calculated_columns_imputed_data(df, fillna=True)[source][source]
Add calculated columns to the input DataFrame based on existing data.
- Parameters:
df (pd.DataFrame) – The input DataFrame containing energy usage and production data.
fillna (bool, optional) – Whether to fill missing values with 0 before performing calculations. Default is True.
- Returns:
The modified DataFrame with additional calculated columns.
- Return type:
pd.DataFrame
Notes
This function assumes that the input DataFrame contains the necessary columns such as ‘ElektriciteitTerugleveringLaagDiff’, ‘ElektriciteitTerugleveringHoogDiff’, ‘ElektriciteitNetgebruikLaagDiff’, ‘ElektriciteitNetgebruikHoogDiff’, ‘ElektriciteitsgebruikWarmtepompDiff’, ‘ElektriciteitsgebruikBoosterDiff’, ‘ElektriciteitsgebruikBoilervatDiff’, ‘ElektriciteitsgebruikWTWDiff’, ‘ElektriciteitsgebruikRadiatorDiff’, and ‘Zon-opwekTotaalDiff’.
The function assumes that missing values can be treated a 0s, typically after data cleaning and imputation.
The function fills missing values in each column with 0 before performing calculations to ensure that the operations do not fail due to missing data.
- etdtransform.calculated_columns.add_rolling_avg(group, var='ElektriciteitsgebruikTotaalNetto', days=14, avg_var='RollingAverage')[source][source]
Add a rolling average column to each group in the DataFrame.
- Parameters:
group (pd.DataFrame) – The DataFrame group on which to perform the operation. This should be a subset of a larger DataFrame that has been grouped by some key, for example, df.groupby(‘some_column’).apply(add_rolling_avg).
var (str, optional) – The name of the column in ‘group’ for which the rolling average will be calculated. Default is ‘ElektriciteitsgebruikTotaalNetto’.
days (int, optional) – The number of days over which to calculate the rolling average. Default is 14 days.
avg_var (str, optional) – The name of the new column in ‘group’ that will store the calculated rolling average values. Default is ‘RollingAverage’.
- Returns:
A DataFrame with an additional column containing the rolling averages.
- Return type:
pd.DataFrame
Notes
This function assumes that the ‘ReadingDate’ column exists in the input DataFrame and is sorted in ascending order. The ‘ReadingDate’ column should be of datetime type.
The function calculates a rolling average using a window size determined by the number of days specified and the frequency of the data points in the group. It uses a forward-looking window, meaning that for each date, it computes the average of the next days worth of data points.
To handle cases where there are missing dates or irregular sampling intervals, the function first calculates the time difference between consecutive readings to determine how many timesteps correspond to the specified number of days. It then uses this calculated window size for the rolling average computation.
The min_periods parameter in the rolling method is set to half of the window size, ensuring that partial windows at the beginning and end of the group are still computed if they have sufficient data points.
- etdtransform.calculated_columns.get_highest_avg_period(group, avg_var='RollingAverage', days=14)[source][source]
Retrieve the start time, end time, and highest rolling average for each group in the DataFrame.
- Parameters:
group (pd.DataFrame) – The DataFrame group on which to perform the operation. This should be a subset of a larger DataFrame that has been grouped by some key, for example, df.groupby(‘some_column’).apply(get_highest_avg_period).
avg_var (str, optional) – The name of the column in ‘group’ containing the rolling averages. Default is ‘RollingAverage’.
days (int, optional) – The number of days over which the rolling average was calculated. Default is 14 days.
- Returns:
A DataFrame with columns for each group variable (if applicable), start time, end time, and highest rolling average.
- Return type:
pd.DataFrame
Notes
This function assumes that the ‘ReadingDate’ column exists in the input DataFrame and is sorted in ascending order. The ‘ReadingDate’ column should be of datetime type.
The function identifies rows with the highest value in the avg_var column and calculates the corresponding start and end times based on the number of days specified.
To handle cases where there are missing dates or irregular sampling intervals, the function first calculates the time difference between consecutive readings to determine how many timesteps correspond to the specified number of days. It then uses this calculated window size for determining the start and end times.
If the calculated start index is out of bounds (greater than or equal to the length of the group), it sets the start index to the last valid index in the group.
- etdtransform.calculated_columns.gelijktijdigheid(df, df_5min, rolling_average='RollingAverage', group_var=None)[source][source]
Calculate the ratio of the highest rolling average of the given rolling average column between daily and 5-minute interval data.
- Parameters:
df (pd.DataFrame) – The input daily DataFrame containing the data.
df_5min (pd.DataFrame) – The input 5-minute interval DataFrame containing the data.
rolling_average (str, optional) – The column name of the rolling average. Default is “RollingAverage”.
group_var (str, optional) – The column name to group by. Default is None.
- Returns:
A DataFrame with the group variable (if applicable) and the ratio of the highest rolling average value.
- Return type:
pd.DataFrame
- etdtransform.calculated_columns.get_lowest_avg_period(group, avg_var='RollingAvg_Temperatuur', days=14)[source][source]
Retrieve the start time, end time, and lowest rolling average for each group in the DataFrame.
- Parameters:
group (pd.DataFrame) – The DataFrame group on which to perform the operation. This should be a subset of a larger DataFrame that has been grouped by some key, for example, df.groupby(‘some_column’).apply(get_lowest_avg_period).
avg_var (str, optional) – The name of the column in ‘group’ containing the rolling averages. Default is ‘RollingAvg_Temperatuur’.
days (int, optional) – The number of days over which the rolling average was calculated. Default is 14 days.
- Returns:
A DataFrame with columns for each group variable (if applicable), start time, end time, and lowest rolling average.
- Return type:
pd.DataFrame
Notes
This function assumes that the ‘ReadingDate’ column exists in the input DataFrame and is sorted in ascending order. The ‘ReadingDate’ column should be of datetime type.
The function identifies rows with the lowest value in the avg_var column and calculates the corresponding start and end times based on the number of days specified.
To handle cases where there are missing dates or irregular sampling intervals, the function first calculates the time difference between consecutive readings to determine how many timesteps correspond to the specified number of days. It then uses this calculated window size for determining the start and end times.
If the calculated start index is out of bounds (greater than or equal to the length of the group), it sets the start index to the last valid index in the group.
- etdtransform.calculated_columns.mark_coldest_two_weeks(group, avg_var='TemperatuurRA', days=14)[source][source]
Marks the coldest two-week period for each group in the DataFrame.
- Parameters:
group (pd.DataFrame) – The DataFrame group.
avg_var (str) – The variable containing the rolling averages.
days (int) – The number of days over which the rolling average was calculated.
- Returns:
A boolean Series indicating whether each row is within the coldest two-week period.
- Return type:
pd.Series
- etdtransform.calculated_columns.mark_highest_peak(group, var='ElektriciteitsgebruikTotaalNetto', days=6)[source][source]
Marks the one-week period for each group in the DataFrame around the highest peak.
- Parameters:
group (pd.DataFrame) – The DataFrame group.
var (str) – The variable containing the peak energy use.
days (int) – The number of days to include.
- Returns:
A boolean Series indicating whether each row is within the one-week period around the highest peak.
- Return type:
pd.Series
- etdtransform.calculated_columns.switch_multiplier(interval_choice)[source][source]
Returns the multiplier for the switches in the calculation of the calculated columns.
- Parameters:
interval_choice (str) – The interval over which the data is aggregated.
- Returns:
The multiplier to use in unit conversions.
- Return type:
int
- etdtransform.calculated_columns.add_normalized_datetime(x, reference_date=Timestamp('2023-01-02 00:00:00'), datetime_column='ReadingDate')[source][source]
Adds a normalized datetime column to the DataFrame or Ibis Table. Used to do analyses that depend on the time of day rather than date.
- Parameters:
x (pd.DataFrame or ibis.expr.types.TableExpr) – The DataFrame or Table.
reference_date (datetime.datetime, optional) – The date used as reference for the normalization. Default is ‘2023-01-02’.
datetime_column (str, optional) – The name of the column containing the datetime. Default is ‘ReadingDate’.
- Returns:
The DataFrame or Table with a new column ‘normalized_datetime’.
- Return type:
pd.DataFrame or ibis.expr.types.TableExpr
etdtransform.impute module
- etdtransform.impute.calculate_average_diff(df: DataFrame, project_id_column: str, diff_columns: list[str]) DataFrame [source][source]
Calculate average differences for specified columns grouped by project and reading date.
This function computes the average differences for the specified columns, excluding outliers based on a 95th percentile threshold. It’s used to prepare data for imputation of missing values.
- Parameters:
df (pd.DataFrame) – The input DataFrame containing the data.
project_id_column (str) – The name of the column containing project IDs.
diff_columns (list[str]) – A list of column names for which to calculate average differences.
- Returns:
A dictionary where keys are column names and values are dictionaries containing: - ‘avg_diff’: DataFrame with average differences - ‘upper_bounds’: DataFrame with upper bounds for outlier exclusion - ‘household_max_with_bounds’: DataFrame with household maximum values and bounds
- Return type:
dict
Notes
This function uses a 95th percentile threshold to exclude outliers when calculating averages. The threshold is doubled to create an upper bound for inclusion in the average calculation.
Warning
Negative difference values will raise a ValueError.
Missing values in the resulting average columns will be logged as errors.
- etdtransform.impute.concatenate_household_max_with_bounds(avg_diff_dict, project_id_column)[source][source]
Concatenate household maximum values and bounds for all columns.
This function combines the household maximum values and upper bounds for all columns in the avg_diff_dict into a single DataFrame.
- Parameters:
avg_diff_dict (dict) – A dictionary containing average difference data for each column.
project_id_column (str) – The name of the column containing project IDs.
- Returns:
A DataFrame containing concatenated household maximum values and bounds for all columns.
- Return type:
pd.DataFrame
Notes
This function assumes that the ‘household_max_with_bounds’ key exists in each dictionary within avg_diff_dict and contains the columns ‘ProjectIdBSV’ (or other specified project id column), ‘HuisIdBSV’, ‘{col}_huis_max’, and ‘{col}_upper_bound’.
- etdtransform.impute.concatenate_avg_diff_columns(avg_diff_dict, project_id_column)[source][source]
Concatenate average difference columns for all variables.
This function combines the average difference columns for all variables in the avg_diff_dict into a single DataFrame.
- Parameters:
avg_diff_dict (dict) – A dictionary containing average difference data for each column.
project_id_column (str) – The name of the column containing project IDs.
- Returns:
A DataFrame containing concatenated average difference columns for all variables.
- Return type:
pd.DataFrame
Notes
This function assumes that the ‘avg_diff’ key exists in each dictionary within avg_diff_dict and contains the columns ‘ProjectIdBSV’ or specified project_id_column, ‘ReadingDate’, and ‘{col}_avg’.
- etdtransform.impute.equal_sig_fig(a, b, sig_figs)[source][source]
Compare two numbers for equality up to a specified number of significant figures.
This function rounds both numbers to the specified number of significant figures and then compares them for equality using a relative tolerance that scales with the magnitude of the numbers.
- Parameters:
a (float) – The first number to compare.
b (float) – The second number to compare.
sig_figs (int) – The number of significant figures to consider for comparison.
- Returns:
True if the numbers are equal up to the specified number of significant figures, False otherwise.
- Return type:
bool
Notes
This function uses the isclose function from the math module to compare the rounded numbers with a relative tolerance based on the number of significant figures.
- etdtransform.impute.validate_household_column(household_df, cum_col, huis_code)[source][source]
Validate a household column for data quality and completeness.
This function checks a specific column in a household DataFrame for missing values, zero values, and lack of change. It logs warnings and information about the data quality.
- Parameters:
household_df (pd.DataFrame) – The DataFrame containing household data.
cum_col (str) – The name of the cumulative column to validate.
huis_code (str) – The unique identifier for the household.
- Returns:
True if the column passes all checks, False otherwise.
- Return type:
bool
Notes
This function is currently unused in the main processing pipeline.
Warning
Logs a warning if more than 40% of values in the column are missing.
Logs information about the number of missing values, zero values, and lack of change.
- etdtransform.impute.get_reading_date_imputation_stats(df, project_id_column, cumulative_columns)[source][source]
Calculate imputation statistics for each reading date and cumulative column.
This function computes various statistics related to imputation for each reading date and cumulative column, including the number of imputed values, missing values, and original values.
- Parameters:
df (pd.DataFrame) – The DataFrame containing the data.
project_id_column (str) – The name of the column containing project IDs.
cumulative_columns (list) – A list of cumulative column names to analyze.
- Returns:
A DataFrame containing imputation statistics for each reading date and column.
- Return type:
pd.DataFrame
Notes
This function is currently unused but can be applied as a sense check to ensure not too many values are missing.
The resulting DataFrame includes the following columns: - project_id_column - ReadingDate - column - imputed - na - total_records - original - percent_imputed - percent_na - percent_original
- etdtransform.impute.sort_for_impute(df: DataFrame, project_id_column: str)[source][source]
Sort the DataFrame to prepare for imputation.
This function sorts the input DataFrame by project ID, household ID, and reading date. Sorting is necessary to ensure correct imputation of missing values.
- Parameters:
df (pd.DataFrame) – The input DataFrame to be sorted.
project_id_column (str) – The name of the column containing project IDs.
- Returns:
The sorted DataFrame.
- Return type:
pd.DataFrame
Notes
The sorting order is: project ID, household ID (HuisIdBSV), and reading date (ReadingDate). This order is crucial for the imputation process to work correctly.
- etdtransform.impute.get_diff_columns(cumulative_columns: list)[source][source]
Generate difference column names from cumulative column names.
This function takes a list of cumulative column names and returns a list of corresponding difference column names by appending ‘Diff’ to each name.
- Parameters:
cumulative_columns (list) – A list of cumulative column names.
- Returns:
A list of difference column names.
- Return type:
list
Notes
This function is used to create names for columns that will store the differences between consecutive cumulative values.
- etdtransform.impute.prepare_diffs_for_impute(df: DataFrame, project_id_column: str, cumulative_columns: list, sorted=False)[source][source]
Prepare difference columns for imputation.
This function calculates average differences, combines them, and prepares household maximum and bound information for imputation.
- Parameters:
df (pd.DataFrame) – The input DataFrame containing the data.
project_id_column (str) – The name of the column containing project IDs.
cumulative_columns (list) – A list of cumulative column names.
sorted (bool, optional) – Whether the DataFrame is already sorted. Default is False.
- Returns:
A tuple containing: - diff_columns: list of difference column names - diffs: DataFrame with average differences - max_bound: DataFrame with household maximum and bound information
- Return type:
tuple
Notes
This function performs the following steps: 1. Sorts the DataFrame if not already sorted. 2. Calculates average differences for each cumulative column. 3. Combines average differences and household maximum/bound information. 4. Saves the results to parquet files for later use.
The resulting files are saved in the directory specified by etdtransform.options.aggregate_folder_path.
- etdtransform.impute.read_diffs()[source][source]
Read average differences from a parquet file.
This function reads the average differences data from a parquet file located in the aggregate folder specified in the etdtransform options.
- Returns:
A DataFrame containing the average differences data.
- Return type:
pd.DataFrame
Notes
The function assumes that the ‘avg_diffs.parquet’ file exists in the aggregate folder path specified in etdtransform.options.aggregate_folder_path.
This function is typically used to load pre-calculated average differences for use in imputation processes.
- etdtransform.impute.process_and_impute(df: DataFrame, project_id_column: str, cumulative_columns: list, sorted=False, diffs_calculated=False, optimized=False)[source][source]
Process and impute missing values in the dataset.
This function performs data processing and imputation on the input DataFrame. It can either calculate differences or load pre-calculated differences, and then applies imputation methods to fill missing values.
- Parameters:
df (pd.DataFrame) – The input DataFrame containing the data to be processed and imputed.
project_id_column (str) – The name of the column containing project IDs.
cumulative_columns (list) – A list of cumulative column names to be processed.
sorted (bool, optional) – Whether the DataFrame is already sorted. Default is False.
diffs_calculated (bool, optional) – Whether differences have already been calculated. Default is False.
optimized (bool, optional) – Whether to use optimized imputation methods. Default is False.
- Returns:
A tuple containing: - df: The processed and imputed DataFrame - imputation_summary_house: Summary of imputation statistics per house - imputation_summary_project: Summary of imputation statistics per project - imputation_reading_date_stats_df: Statistics of imputation by reading date
- Return type:
tuple
Notes
This function performs the following steps: 1. Sorts the DataFrame if not already sorted. 2. Loads or calculates differences. 3. Merges average differences into the household DataFrame. 4. Applies imputation methods (either optimized or standard). 5. Calculates and saves imputation statistics. 6. Provides warnings for high imputation percentages.
The function saves various statistics and summary files in the aggregate folder specified in etdtransform.options.aggregate_folder_path.
Warning
Logs warnings if any house or project has more than 40% imputed values.
Logs warnings if any reading date has more than 40% imputed values.
etdtransform.knmi module
- etdtransform.knmi.get_project_weather_station_data()[source][source]
Load and process project weather station data.
- Returns:
DataFrame containing project weather station data with uppercase station names.
- Return type:
pandas.DataFrame
- etdtransform.knmi.get_weather_data()[source][source]
Load and process weather data from CSV files.
- Returns:
DataFrame containing combined weather data from all CSV files.
- Return type:
pandas.DataFrame
- etdtransform.knmi.load_knmi_weather_data(folder_path)[source][source]
Load and process KNMI weather data from text files in a specified folder.
- Parameters:
folder_path (str) – Path to the folder containing KNMI weather data files.
- Returns:
DataFrame containing combined and processed weather data from all files.
- Return type:
pandas.DataFrame
etdtransform.load_data module
- etdtransform.load_data.get_household_tables(include_weather: bool = True) dict[str, Expr] [source][source]
Reads household data tables for different intervals and joins them with an index table. Optionally integrates weather data.
- Parameters:
include_weather (bool, optional) – If True, includes weather data in the returned tables (default is True).
- Returns:
A dictionary where keys are interval names (e.g., ‘hourly’, ‘daily’) and values are the corresponding Ibis tables.
- Return type:
dict[str, ibis.Expr]
- etdtransform.load_data.join_index_table(tbl: Expr, index_table: Expr | None = None, index_join_columns: List[str] = ['HuisIdBSV', 'ProjectIdBSV']) Expr [source][source]
Joins a given table with an index table on specified columns.
- Parameters:
tbl (ibis.Expr) – The table to join.
index_table (Optional[ibis.Expr], optional) – The index table. If None, reads from default parquet file (default is None).
index_join_columns (List[str], optional) – Columns to use for the join (default is [“HuisCode”, “ProjectIdBSV”]).
- Returns:
The table joined with the index table.
- Return type:
ibis.Expr
- etdtransform.load_data.get_weather_data_table() Expr [source][source]
Processes and transforms weather data into an Ibis table with additional calculated columns.
The transformations include: - Rolling 14-day averages of temperature and perceived temperature. - Identifying the coldest two weeks based on rolling averages. - Adding ISO week, day of week, and weekly summary calculations.
- Returns:
An Ibis table containing transformed weather data with additional calculated columns.
- Return type:
ibis.Expr
Notes
The weather data is grouped by station (‘STN’) and aggregated weekly.
- etdtransform.load_data.get_weather_station_table() Expr [source][source]
Retrieves weather station data as an Ibis table.
- Returns:
An Ibis table containing weather station data.
- Return type:
ibis.Expr
- etdtransform.load_data.join_weather_data(tbl: Expr, weather_station_table: Expr | None = None, weather_table: Expr | None = None) Expr [source][source]
Joins weather data with a given table.
- Parameters:
tbl (ibis.Expr) – The table to join with weather data.
weather_station_table (Optional[ibis.Expr], optional) – The weather station mapping table (default is None, meaning it will be retrieved).
weather_table (Optional[ibis.Expr], optional) – The table containing weather data (default is None, meaning it will be retrieved).
- Returns:
The input table with joined weather data.
- Return type:
ibis.Expr
- etdtransform.load_data.get_project_tables(include_weather=True) dict[str, Expr] [source][source]
Retrieves aggregate project data tables from Parquet files, integrates weather data, and returns a dictionary of Ibis tables for each interval.
- Returns:
A dictionary where keys are intervals (and additional metadata like ‘project_weather’) and values are the corresponding Ibis tables with integrated weather data.
- Return type:
dict[str, ibis.Expr]
etdtransform.vectorized_impute module
- class etdtransform.vectorized_impute.ImputeType(value)[source][source]
Bases:
IntFlag
Enumeration of imputation types used in the vectorized imputation process.
This class defines the different types of imputation methods applied during the vectorized imputation process for handling missing or problematic data in time series.
- NONE
Represents no imputation.
- Type:
int
- NEGATIVE_GAP_JUMP
Represents a negative gap jump. Fills with zeros (potentially a meter reset)
- Type:
int
- NEAR_ZERO_GAP_JUMP
Represents a gap jump near zero. Fills with zeros (no change).
- Type:
int
- LINEAR_FILL
Represents a linear fill for positive gaps with near-zero impute jumps based on average.
- Type:
int
- SCALED_FILL
Represents a scaled fill for positive gaps with positive impute jumps based on average.
- Type:
int
- ZERO_END_VALUE
Represents imputation when end value is zero and there is no start value. Fills with zeros.
- Type:
int
- POSITIVE_END_VALUE
Represents imputation when end value is positive but there is no start value. Fills with averages.
- Type:
int
- NO_END_VALUE
Represents imputation when there is no end value. Fills with averages.
- Type:
int
- THRESHOLD_ADJUSTED
Represents values adjusted due to threshold violations. This happens after imputation and could be triggered by imputed values.
- Type:
int
Notes
The THRESHOLD_ADJUSTED flag can be combined with other imputation types.
- etdtransform.vectorized_impute.apply_thresholds(df, lower_bound, upper_bound, diff_col, avg_col, impute_type_col, is_imputed_col)[source][source]
Apply thresholds to difference column and update imputation flags.
This function applies lower and upper bounds to a difference column in the DataFrame. Values outside these bounds are replaced with average values, and corresponding imputation flags are updated.
- Parameters:
df (pandas.DataFrame) – The input DataFrame containing the data.
lower_bound (float) – The lower threshold for the difference column.
upper_bound (float) – The upper threshold for the difference column.
diff_col (str) – The name of the difference column to apply thresholds to.
avg_col (str) – The name of the column containing average values to use for imputation.
impute_type_col (str) – The name of the column indicating the imputation type.
is_imputed_col (str) – The name of the column indicating whether a value is imputed.
- Returns:
The DataFrame with thresholds applied and imputation flags updated.
- Return type:
pandas.DataFrame
Notes
This function modifies the input DataFrame in-place and also returns it. Values outside the thresholds are replaced with average values and marked as imputed.
- etdtransform.vectorized_impute.drop_temp_cols(df, temp_cols=None, logLeftoverError=False)[source][source]
Drop temporary columns from the DataFrame.
This function removes specified temporary columns from the DataFrame. If no columns are specified, it drops a predefined set of temporary columns.
- Parameters:
df (pandas.DataFrame) – The DataFrame from which to drop columns.
temp_cols (list, optional) – A list of column names to drop. If None, a default set of temporary columns will be used.
logLeftoverError (bool, optional) – If True, log an error message for any leftover columns to be removed.
Notes
This function modifies the DataFrame in-place.
The default set of temporary columns includes various intermediate calculation columns used in the imputation process.
Warning
If logLeftoverError is True and there are columns to be dropped, an error message will be logged, which might indicate unintended remnants in the data processing pipeline.
- etdtransform.vectorized_impute.impute_and_normalize(df: DataFrame, cumulative_columns: list, project_id_column: str, max_bound: DataFrame)[source][source]
Perform vectorized imputation and normalization on cumulative columns
This function applies imputation techniques to fill missing values in cumulative columns and normalizes the data. It uses vectorized operations for improved performance.
- Parameters:
df (pandas.DataFrame) – The input DataFrame containing the data to be imputed and normalized.
cumulative_columns (list) – A list of column names representing cumulative variables to be processed.
project_id_column (str) – The name of the column containing project identifiers.
max_bound (pandas.DataFrame) – A DataFrame containing maximum bounds for each variable.
- Returns:
A tuple containing three elements:
- dfpandas.DataFrame
The imputed and normalized DataFrame.
- imputation_gap_stats_dfpandas.DataFrame
Statistics about the imputation process for each gap.
- imputation_reading_date_stats_dfNone or pandas.DataFrame
Statistics about imputation by reading date (if calculated).
- Return type:
tuple
Notes
This function applies various imputation methods based on the nature of the missing data and the available information. It handles different scenarios such as gaps in data, zero jumps, and negative jumps.
The function also calculates and returns statistics about the imputation process, which can be useful for quality assessment.
Warning
The function may modify the input DataFrame in-place.
Imputation methods may introduce bias or affect the variance of the data.
Large amounts of imputed data may significantly affect analysis results.
- etdtransform.vectorized_impute.methods_to_bitwise(methods_column)[source][source]
Convert methods to bitwise representation.
This function takes a column of methods and converts each method to a bitwise representation. Each method is represented by a bit in the resulting integer.
- Parameters:
methods_column (array-like) – A column containing lists of method numbers.
- Returns:
An array of integers where each integer represents the bitwise representation of the methods for that row.
- Return type:
numpy.ndarray
Notes
The function assumes that method numbers start from 1 and correspond to bit positions (method 1 = bit 0, method 2 = bit 1, etc.).
This vectorized version is optimized for performance with NumPy.
- etdtransform.vectorized_impute.process_gap_and_cumulative_groups(df, diff_col, cum_col)[source][source]
Process gap and cumulative value groups in the DataFrame.
This function identifies gaps in the data, creates gap groups, and establishes cumulative value groups based on the presence of NA values and transitions between households.
- Parameters:
df (pandas.DataFrame) – The DataFrame to process.
diff_col (str) – The name of the difference column to analyze for gaps.
cum_col (str) – The name of the cumulative column to use for value grouping.
- Returns:
The processed DataFrame with added columns for gap and cumulative value grouping.
- Return type:
pandas.DataFrame
Notes
This function adds several temporary columns to the DataFrame: - ‘gap_start’: Identifies the start of a diff column gap or transition between households. - ‘gap_group’: Groups consecutive NA values in diff columns. - ‘cum_value_encountered’: Marks where a non-NA value is encountered in cumulative column. - ‘cumulative_value_group’: Groups gaps based on cumulative values. - ‘gap_length’: The length of each gap group.
These columns are crucial for the subsequent imputation process. It returns only for further imputation: - ‘cumulative_value_group’ - ‘gap_length’
Warning
This function modifies the input DataFrame in-place.
The added columns should be handled carefully in subsequent processing steps.
- etdtransform.vectorized_impute.process_imputation_vectorized(df, diff_col, cum_col, avg_col, impute_type_col, is_imputed_col)[source][source]
Perform vectorized imputation on the DataFrame.
This function applies various imputation methods to fill missing values in the difference column based on cumulative values and average differences.
- Parameters:
df (pandas.DataFrame) – The DataFrame to impute.
diff_col (str) – The name of the difference column to impute.
cum_col (str) – The name of the cumulative column used for imputation.
avg_col (str) – The name of the column containing average differences.
impute_type_col (str) – The name of the column to store imputation type.
is_imputed_col (str) – The name of the column to indicate whether a value is imputed.
- Returns:
The DataFrame with imputed values and additional columns indicating imputation types and statistics.
- Return type:
pandas.DataFrame
Notes
This function applies several imputation methods for diff columns: - Filling with zeros for flat or near-zero gaps - Linear filling for positive gaps with near-zero impute jumps - Scaled impute value filling for positive gaps with positive impute jumps - Handling cases with no gap jump (e.g., at the start or end of the dataset)
The function also applies thresholds to remove physically impossible outliers.
Warning
This function modifies the input DataFrame in-place.
The imputation process may introduce bias, especially in cases with large gaps or when a significant portion of the data is imputed.
The function assumes that the input data has been properly prepared and sorted.