Mapping raw household sensor data to the Energy Transition Dataset¶

This notebook demonstrates the code required to map raw household sensor data to the Energy Transition Dataset Data Model.

This notebook should be helpful to both potential data suppliers, such as social housing corporations, building companies and monitoring companies, who would like to understand the impact of their installations at neighborhood level. After reading this notebook, you should be ready to submit data.

This diagram provides an overview of the process. Most steps require some level of manual intervention. Where there is code that supports the process, the function is also provided. There are two main actors:

  • The data supplier
  • The data processor, in this example it is the Stroomversnelling (BSV)
%%{init: {"flowchart": {"htmlLabels": false}} }%%
sequenceDiagram
    participant Supplier
    participant DataProcessorBSV

    Supplier->Supplier: Acquire raw data & prepare metadata 
(Use Excel Template) Supplier->Supplier: Validate & check required columns Supplier->Supplier: Perform initial data transformations
(conversion) Supplier->DataProcessorBSV: Submit data for processing DataProcessorBSV->DataProcessorBSV: Validate data against ETD model
`load_etdmodel()` DataProcessorBSV->DataProcessorBSV: Initial statistical column check
`collect_column_stats()` DataProcessorBSV->DataProcessorBSV: Inspect data completeness
(Missing Values & Intervals) DataProcessorBSV->DataProcessorBSV: Calculate differences
`add_diff_columns()` DataProcessorBSV->DataProcessorBSV: Validate cumulative data
`validate_cumulative_variables()` DataProcessorBSV->DataProcessorBSV: Final statistical column check
`get_mapped_data_stats()` Note right of DataProcessorBSV: Send feedback if needed DataProcessorBSV-->Supplier: Feedback to revise data Supplier->Supplier: Revise data & resubmit Supplier->DataProcessorBSV: Submit revised data DataProcessorBSV->DataProcessorBSV: Update BSV metadata
`update_index()`,
`add_supplier_metadata_to_index()` DataProcessorBSV->DataProcessorBSV: Manage data index & aggregate data DataProcessorBSV-->Supplier: Process complete

Prepping the required environment¶

In order to prepare raw data, we will first check the Energy Transition Dataset Model. In order to do this, and most steps in this notebook, we will use the etdmap package. This is documented in all detail at stroomversnelling.github.io/etdmap. Other ETD packages are found at stroomversnelling.github.io/ but will not be required for this demonstration.

First we can install the requirements using pip:

pip install git+https://github.com/Stroomversnelling/etdmap.git

Configuration¶

We will need to configure some variables in order to run this example. First of all, we need to provide the folder that contains the raw data.

import etdmap

etdmap.options.mapped_folder_path = "../../demodata/mapped" # change to path to folder where you would like to store mapped data
etdmap.options.bsv_metadata_file = "../../demodata/processor_metadata.xlsx" # change to path to the Excel file with the data processor metadata (BSV in this case)

Prepping raw data¶

  • The data supplier is always asked to prepare an Excel sheet with the units/households that are included in the provided data.
  • The example Excel sheet can be found in the same folder as this notebook.
  • In addition, the data supplier will prepare a single data file (parquet format) per household.
  • We will use the example of the data received from Watch-E.
  • The columns did not match the data model so we will prepare a dictionary to rename the columns later.

Prepping the data processor metadata¶

The data processor also must create a metadata file. This one will initially have no records. See the example. The first row in the example can be deleted before starting.

In [3]:
import etdmap

watch_e_raw_data_folder_path = "../../demodata/watch_e" # this is our demo dataset we will be mapping
watch_e_metadata_file = "../../demodata/watch_e_metadata.xlsx"

# Set the standard etdmap options
etdmap.options.mapped_folder_path = "../../demodata/mapped" # change to path to folder where you would like to store mapped data
etdmap.options.bsv_metadata_file = "../../demodata/bsv_metadata.xlsx" # change to path to the Excel file with the data processor metadata (BSV is the data processor in this case)

Load the data model and map to raw data columns¶

First we will check the columns in the raw data and columns in the data model. Based on that we'll create a dictionary mapping the raw data columns to the columns of our model. In this case, we do not need to do any unit conversions. We can review the model columns that are required using the load_etdmodel() function in the data_model module.

%%{init: {"flowchart": {"htmlLabels": false}} }%%
sequenceDiagram
    participant Supplier
    participant DataProcessorBSV

    Supplier->Supplier: Acquire raw data & prepare metadata 
(Use Excel Template) Supplier->Supplier: Validate & check required columns Supplier->Supplier: Perform initial data transformations
(conversion) Supplier->DataProcessorBSV: Submit data for processing DataProcessorBSV->DataProcessorBSV: Validate data against ETD model
`load_etdmodel()` DataProcessorBSV->DataProcessorBSV: Initial statistical column check
`collect_column_stats()`
In [2]:
from etdmap.data_model import load_etdmodel

load_etdmodel().head()
Out[2]:
Entiteit Variabele Key Type variabele Vereist Resolutie Wie vult? Bron Definitie AVG gevoelig
0 Metadata ProjectIdBSV nee integer nee vaste waarde Stroomversnelling Stroomversnelling code toegekend door Stroomversnelling nee
1 Metadata ProjectIdLeverancier nee string ja vaste waarde Dataleverancier Dataleverancier code toegekend door dataleverancier ja
2 Metadata HuisIdBSV nee integer nee vaste waarde Stroomversnelling Stroomversnelling code toegekend door Stroomversnelling nee
3 Metadata HuisIdLeverancier ja string ja vaste waarde Dataleverancier Dataleverancier code toegekend door dataleverancier ja
4 Metadata Weerstation nee string ja vaste waarde Dataleverancier Dataleverancier dichtstbijzijnde KNMI weerstation nee
In [3]:
# Define a dictionary to map the raw data columns to the mapped data

watch_e_mapping_dict = {
    "ReadingDate": "ReadingDate",
    "Elektriciteit netgebruik (hoog) [kWh]": "ElektriciteitNetgebruikHoog",
    "Elektriciteit netgebruik (laag) [kWh]": "ElektriciteitNetgebruikLaag",
    "Elektriciteit teruglevering (hoog) [kWh]": "ElektriciteitTerugleveringHoog",
    "Elektriciteit teruglevering (laag) [kWh]": "ElektriciteitTerugleveringLaag",
    "Elektriciteitsgebruik WTW [kWh]": "ElektriciteitsgebruikWTW",
    # There is a variable for both the heat pump inside and outside which we will combine in the script:
    "Elektriciteitsgebruik warmtepomp binnen [kWh]": "ElektriciteitsgebruikWarmtepomp_binnen", 
    "Elektriciteitsgebruik warmtepomp buiten [kWh]": "ElektriciteitsgebruikWarmtepomp_buiten",
    "Elektriciteitsgebruik warmtepomp [kWh]": "ElektriciteitsgebruikWarmtepomp",
    "Elektriciteitsgebruik E.EL.Warmtepomp [kWh]": "ElektriciteitsgebruikBooster",
    "Elektriciteitsgebruik boiler [kWh]": "ElektriciteitsgebruikBoilervat",
    "Elektriciteitsgebruik Radiator [kWh]": "ElektriciteitsgebruikRadiator",
    "Temperatuur warm tapwater [°C]": "TemperatuurWarmTapwater",
    "Temperatuur woonkamer [°C]": "TemperatuurWoonkamer",
    "Warmteproductie warmtepomp [GJ]": "WarmteproductieWarmtepomp",
    "Watergebruik warm [liter]": "WatergebruikWarmTapwater",
    "Zon-opwek Momentaan [kW]": "Zon-opwekMomentaan",
    "Zon-opwek Totaal [kWh]": "Zon-opwekTotaal",
    "CDR-RH CO2 [ppm]": "CO2",
    "CDR-RH 1 CO2 [ppm]": "CO2",
    "CDR-RH Luchtvochtigheid [%]": "Luchtvochtigheid",
    "CDR-RH Temperatuur [grC]": "CDR-RH Temperatuur [grC]",
    "Gasgebruik [m3]": "Gasgebruik",
    "WarmteProductie CV Warm water [GJ]": "WarmteproductieWarmTapwater",
    "Luchtvochtigheid woonkamer [%]": "Luchtvochtigheid",
    "CDR-RH 1 Luchtvochtigheid": "Luchtvochtigheid",
    "Warmteproductie CV [GJ]": "WarmteproductieCV",
}

Examine the raw data variable statistiscs¶

We will now examine the raw variables by looping through the raw data files and collecting statistics.

In [4]:
from etdmap import mapping_helpers
import pandas as pd

df_raw_stats = mapping_helpers.get_raw_data_stats(
    raw_data_folder_path = watch_e_raw_data_folder_path, 
    multi = True, 
    max_workers = 10
)

df_raw_stats.head()
Out[4]:
Identifier column type count missing errors min max mean median iqr quantile_25 quantile_75 top5
0 024b35f544c0de4c78fbfa4b401126dc.parquet ReadingDate datetime64[ns] 105093 0 0 2023-01-01 00:00:00 2024-01-01 00:00:00 NaN NaN NaN NaN NaN None
1 024b35f544c0de4c78fbfa4b401126dc.parquet Elektriciteit netgebruik (hoog) [kWh] float64 105093 0 0 822.16 1364.201 1077.002484 1067.561 131.921 1007.248 1139.169 None
2 024b35f544c0de4c78fbfa4b401126dc.parquet Elektriciteit netgebruik (laag) [kWh] float64 105093 0 0 1157.82 1985.089 1558.287115 1564.534 299.327 1407.841 1707.168 None
3 024b35f544c0de4c78fbfa4b401126dc.parquet Elektriciteit teruglevering (hoog) [kWh] float64 105093 0 0 3397.66 5960.516 4755.762583 4830.055 2118.687 3686.697 5805.384 None
4 024b35f544c0de4c78fbfa4b401126dc.parquet Elektriciteit teruglevering (laag) [kWh] float64 105093 0 0 2268.15 3382.208 2862.596044 2949.556 935.412 2382.013 3317.425 None
In [5]:
df_raw_stats[df_raw_stats['missing']>0].head()
Out[5]:
Identifier column type count missing errors min max mean median iqr quantile_25 quantile_75 top5
5 024b35f544c0de4c78fbfa4b401126dc.parquet Elektriciteitsgebruik WTW [kWh] float64 8759 96334 96334 74.68 118.83 96.308056 97.870 24.850 83.85 108.700 None
10 024b35f544c0de4c78fbfa4b401126dc.parquet Watergebruik warm [liter] float64 8759 96334 96334 6519.0 11829.001 9149.792177 9184.001 2336.001 8004.00 10340.001 None
19 0290e5ebef786f7d3937c6ea53f9ef56.parquet Elektriciteitsgebruik boiler [kWh] float64 105107 2 2 264.99 601.1 433.210482 436.770 165.060 351.10 516.160 None
22 0290e5ebef786f7d3937c6ea53f9ef56.parquet Luchtvochtigheid woonkamer [%] float64 105005 104 104 41.3 79.2 59.553509 59.100 11.300 54.30 65.600 None
25 0290e5ebef786f7d3937c6ea53f9ef56.parquet Temperatuur warm tapwater [°C] float64 104968 141 141 19.16 64.07 31.340442 27.280 13.770 23.43 37.200 None

Mapping the raw data to the data model¶

Here we will go through all the following steps

%%{init: {"flowchart": {"htmlLabels": false}} }%%
sequenceDiagram
    participant Supplier
    participant DataProcessorBSV

    DataProcessorBSV->DataProcessorBSV: Calculate differences 
`add_diff_columns()` DataProcessorBSV->DataProcessorBSV: Validate cumulative data
`validate_cumulative_variables()`
In [6]:
import os
import logging

import watch_e_helpers

from etdmap.record_validators import record_flag_conditions

from etdmap.index_helpers import get_household_id_pairs, read_index, update_index
from etdmap.mapping_helpers import (
    rearrange_model_columns,
    add_diff_columns,
    model_column_type,
    ensure_intervals,
    fill_down_infrequent_devices,
)


# load the index of existing mapped data files
index_df, index_path = read_index()

# Generate a dictionary of new household ids `HuisIdBSV` for our index mapped
# the list of files in our raw data folder (one file per household)
household_id_pairs = get_household_id_pairs(
    index_df, watch_e_raw_data_folder_path, "Watch-E", watch_e_helpers.list_files_watch_e
)

# Loop through the files and load the raw data
for huis_code, file_name in household_id_pairs:

    # Set the file paths for the raw data files and mapped data files to be generated
    file_path = os.path.join(watch_e_raw_data_folder_path, file_name)
    new_file_path = os.path.join(
        etdmap.options.mapped_folder_path, f"household_{huis_code}_table.parquet"
    )

    # Load the raw data
    watche_df = pd.read_parquet(file_path)

    # Rename columns using the Watch-E dictionary
    watche_df.rename(columns=watch_e_mapping_dict, inplace=True)

    # Correct a specific issue Watch-E data during the summer time switch
    watche_df = watch_e_helpers.correct_summer_time_with_timezone_check_watch_e(
        watche_df, "ReadingDate"
    )

    # Ensure that the DataFrame has a consistent number of records and expected time intervals
    watche_df = ensure_intervals(watche_df)

    # Ensure all expected columns are present and combine the interior and exterior heatpump columns
    watche_df = watch_e_helpers.combine_inside_outside_heatpumps(watche_df, watch_e_mapping_dict, model_column_type, huis_code, file_name)

    # Sort our columns to match the data model and add empty columns when the column is 
    # not provided
    watche_df = rearrange_model_columns(
        household_df=watche_df, add_columns=True, context=f"{huis_code}/{file_name}"
    )

    # Certain devices do not report cumulative data when not operational and need to be 
    # filled down with some assumptions
    watche_df = fill_down_infrequent_devices(
        df=watche_df,
        columns=(
            'ElektriciteitsgebruikBoilervat',
            'ElektriciteitsgebruikRadiator',
            'ElektriciteitsgebruikBooster',
        )
    )

    # Add new difference columns to show the incremental change in cumulative columns in
    # each time period
    watche_df = add_diff_columns(watche_df, context=f"{huis_code}/{file_name}")

    # Add validation columns to the household to check threshholds and other issues
    for flag, condition in record_flag_conditions.items():
        try:
            watche_df[flag] = condition(watche_df)
        except Exception as e:
            logging.error(
                f"Error validating with {flag} for household {huis_code} / {file_name}: {e}",
                exc_info=True,
            )
            watche_df[flag] = pd.NA

    # Save the mapped data file for the household
    watche_df.to_parquet(new_file_path, engine="pyarrow")

    # Update the index with the new household
    index_df = update_index(
        index_df, 
        new_entry={"HuisIdLeverancier": file_name.replace(".parquet", ""), "HuisIdBSV": huis_code}, 
        data_provider="Watch-E"
    )

Various logging outputs will be provided during this mapping process and the logs are useful to examine to understand, which data may have issues. Examples are:

WARNING:root:8/1392bfdbbbc3870c65e405fb28641570.parquet: Group has a gap of 3 days 10:15:00 > allowed (0 days 01:00:00) starting at 2023-01-05 21:30:00 (1672954200.0) in 'ReadingDate' for column 'Zon-opwekTotaal'.
ERROR:root:9/13bdb9310675508db0e411a38d5e7824.parquet has no heat pump columns.

Check the mapped data index and statistics¶

We can now load the mapped data and check the statisitcs. In this example, we leverage multi-processing and set the number of works to the number of CPU cores to speed up the operation.

In [18]:
index_df, index_path = etdmap.index_helpers.read_index()

index_df.head()
Out[18]:
HuisIdLeverancier HuisIdBSV ProjectIdLeverancier ProjectIdBSV Dataleverancier Meenemen Notities validate_monitoring_data_counts validate_energiegebruik_warmteopwekker validate_approximately_one_year_of_records ... validate_ElektriciteitsgebruikBoilervatDiff validate_ElektriciteitsgebruikRadiator validate_ElektriciteitsgebruikRadiatorDiff validate_WarmteproductieWarmtepomp validate_WarmteproductieWarmtepompDiff validate_WatergebruikWarmTapwater validate_WatergebruikWarmTapwaterDiff validate_Zon-opwekTotaal validate_Zon-opwekTotaalDiff validate_cumulative_diff_ok
0 024b35f544c0de4c78fbfa4b401126dc 1 <NA> <NA> Watch-E <NA> <NA> True True True ... True <NA> True True True True True <NA> True True
1 0290e5ebef786f7d3937c6ea53f9ef56 2 <NA> <NA> Watch-E <NA> <NA> True True True ... True <NA> True True True True True <NA> True True
2 068690b7864a3d0feff39bea2e5556df 3 <NA> <NA> Watch-E <NA> <NA> True True True ... True <NA> True True True True True <NA> True True
3 08552726673ae0cb8ec73be7e4a75f9d 4 <NA> <NA> Watch-E <NA> <NA> True True True ... True <NA> True True True True True <NA> True True
4 0bf7095bf31ef4edc72e1e8353e68dbf 5 <NA> <NA> Watch-E <NA> <NA> True True True ... True <NA> True True False True True <NA> True False

5 rows × 39 columns

We can see a few columns are not yet defined. Such as the project ids and whether we want to include the data in our aggregate datasets (column Meenemen).

These metadata will be manually added in a later step.

In [ ]:
mapped_data_summary_df = mapping_helpers.get_mapped_data_stats(multi = False)
In [13]:
mapped_data_summary_df.head()
Out[13]:
HuisIdBSV column type count missing errors min max mean median ... validate_ElektriciteitsgebruikBoilervatDiff validate_ElektriciteitsgebruikRadiator validate_ElektriciteitsgebruikRadiatorDiff validate_WarmteproductieWarmtepomp validate_WarmteproductieWarmtepompDiff validate_WatergebruikWarmTapwater validate_WatergebruikWarmTapwaterDiff validate_Zon-opwekTotaal validate_Zon-opwekTotaalDiff validate_cumulative_diff_ok
0 1 ReadingDate datetime64[ns] 105121 0 0 2023-01-01 00:00:00 2024-01-01 00:00:00 NaN NaN ... True <NA> True True True True True <NA> True True
1 1 ElektriciteitNetgebruikHoog Float64 105093 28 28 822.16 1364.201 1077.002484 1067.561 ... True <NA> True True True True True <NA> True True
2 1 ElektriciteitNetgebruikLaag Float64 105093 28 28 1157.82 1985.089 1558.287115 1564.534 ... True <NA> True True True True True <NA> True True
3 1 ElektriciteitTerugleveringHoog Float64 105093 28 28 3397.66 5960.516 4755.762583 4830.055 ... True <NA> True True True True True <NA> True True
4 1 ElektriciteitTerugleveringLaag Float64 105093 28 28 2268.15 3382.208 2862.596044 2949.556 ... True <NA> True True True True True <NA> True True

5 rows × 52 columns

In [14]:
mapped_data_summary_df[mapped_data_summary_df['missing']>0].head()
Out[14]:
HuisIdBSV column type count missing errors min max mean median ... validate_ElektriciteitsgebruikBoilervatDiff validate_ElektriciteitsgebruikRadiator validate_ElektriciteitsgebruikRadiatorDiff validate_WarmteproductieWarmtepomp validate_WarmteproductieWarmtepompDiff validate_WatergebruikWarmTapwater validate_WatergebruikWarmTapwaterDiff validate_Zon-opwekTotaal validate_Zon-opwekTotaalDiff validate_cumulative_diff_ok
1 1 ElektriciteitNetgebruikHoog Float64 105093 28 28 822.16 1364.201 1077.002484 1067.561 ... True <NA> True True True True True <NA> True True
2 1 ElektriciteitNetgebruikLaag Float64 105093 28 28 1157.82 1985.089 1558.287115 1564.534 ... True <NA> True True True True True <NA> True True
3 1 ElektriciteitTerugleveringHoog Float64 105093 28 28 3397.66 5960.516 4755.762583 4830.055 ... True <NA> True True True True True <NA> True True
4 1 ElektriciteitTerugleveringLaag Float64 105093 28 28 2268.15 3382.208 2862.596044 2949.556 ... True <NA> True True True True True <NA> True True
5 1 ElektriciteitVermogen float64 0 105121 105121 None None NaN NaN ... True <NA> True True True True True <NA> True True

5 rows × 52 columns

In [ ]:
 

Add data processor metadata¶

At this point, we will want to decide which households can be included in our final mapped dataset and also group them by project. To do this we must manually create the data processor metadata that should also help to anonymize the household data by not using the data supplier project id.

To do this, we will first change our configuration to point to the new filled in metadata file.

%%{init: {"flowchart": {"htmlLabels": false}} }%%
sequenceDiagram
    participant Supplier
    participant DataProcessorBSV

    DataProcessorBSV->DataProcessorBSV: Final statistical column check 
`get_mapped_data_stats()` Note right of DataProcessorBSV: Send feedback if needed DataProcessorBSV-->Supplier: Feedback to revise data Supplier->Supplier: Revise data & resubmit Supplier->DataProcessorBSV: Submit revised data DataProcessorBSV->DataProcessorBSV: Update BSV metadata
`update_index()`,
`add_supplier_metadata_to_index()` DataProcessorBSV->DataProcessorBSV: Manage data index & aggregate data DataProcessorBSV-->Supplier: Process complete
In [6]:
etdmap.options.bsv_metadata_file = "../../demodata/bsv_metadata_after_mapping.xlsx" # change to path to the Excel file with the data processor metadata (BSV is the data processor in this case)
In [19]:
import pandas as pd

bsv_metadata_df = pd.read_excel(etdmap.options.bsv_metadata_file)
bsv_metadata_df.head()

bsv_metadata_df[bsv_metadata_df["Meenemen"] == False][['HuisIdBSV', 'ProjectIdBSV', 'Notities']].head()
Out[19]:
HuisIdBSV ProjectIdBSV Notities
38 45 6 Missing the first two months of the year (in r...
45 52 6 Missing the first two months of the year (in r...
67 76 6 Missing the first two months of the year (in r...
119 133 6 Missing the first two months of the year (in r...
152 170 6 Missing the first two months of the year (in r...

Now that we've decided which files to include, we can simply update the index to contain the metadata we've added to the excel sheet.

In [20]:
index_df = etdmap.index_helpers.update_meenemen()

Let's examine the updated index.

In [21]:
index_df.head()
Out[21]:
HuisIdBSV HuisIdLeverancier ProjectIdLeverancier ProjectIdBSV Dataleverancier Meenemen Notities validate_monitoring_data_counts validate_energiegebruik_warmteopwekker validate_approximately_one_year_of_records ... validate_ElektriciteitsgebruikBoilervatDiff validate_ElektriciteitsgebruikRadiator validate_ElektriciteitsgebruikRadiatorDiff validate_WarmteproductieWarmtepomp validate_WarmteproductieWarmtepompDiff validate_WatergebruikWarmTapwater validate_WatergebruikWarmTapwaterDiff validate_Zon-opwekTotaal validate_Zon-opwekTotaalDiff validate_cumulative_diff_ok
0 1 024b35f544c0de4c78fbfa4b401126dc E 4 Watch-E True <NA> True True True ... True <NA> True True True True True <NA> True True
1 2 0290e5ebef786f7d3937c6ea53f9ef56 C 7 Watch-E True <NA> True True True ... True <NA> True True True True True <NA> True True
2 3 068690b7864a3d0feff39bea2e5556df E 4 Watch-E True <NA> True True True ... True <NA> True True True True True <NA> True True
3 4 08552726673ae0cb8ec73be7e4a75f9d F 3 Watch-E True <NA> True True True ... True <NA> True True True True True <NA> True True
4 5 0bf7095bf31ef4edc72e1e8353e68dbf C 7 Watch-E True <NA> True True True ... True <NA> True True False True True <NA> True False

5 rows × 39 columns

At this point we are finished with the mapping. The next steps are to aggregate the data we have chosen to include. This is demonstrated in the transformation notebook.