Author : Kailash Subramaniyam

Progress : Working...

https://www.canva.com/design/DAGgPJ4skjI/OzsKYLfwSWmsQcI3FpiB0g/view?embed

I built an end-to-end machine learning pipeline to detect anomalies in electrical power usage, inspired by measurable.energy’s mission to eliminate energy waste.

The project starts with generating synthetic time-series data representing power consumption, which I stored in AWS RDS (PostgreSQL) and visualized using pgAdmin and a local Python script. I then performed ETL locally to process the data and uploaded it to AWS S3. Using SageMaker, I trained an anomaly detection model and saved it to S3. Although my attempt to deploy a SageMaker endpoint failed due to scikit version mismatch, I worked around it by running inference locally and logged metrics like anomaly counts to AWS CloudWatch for monitoring.

1. Smart Plug - Data Simulation - AWS RDS - PostgreSQL

"""
This Python script simulates an energy monitoring system. It:

    # Connects to a PostgreSQL database hosted on Amazon RDS.
    # Creates tables for devices (e.g., computers, lights) and for power readings (e.g., power usage over time).
    # Populates the database with sample devices if none exist.
    # Generates realistic power consumption data for these devices based on their type and time of day.
    # Inserts this data into the database in batches.
    # Runs continuously, generating a day's worth of data every 6 hours.

The script uses realistic behavior patterns (e.g., a coffee machine uses more power in the morning) and simulates 15-minute intervals of energy usage, which is common in energy monitoring systems.
"""

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
from sqlalchemy import create_engine
from sqlalchemy.sql import text

# RDS connection parameters
DB_USER = "postgres"
DB_PASSWORD = "vAlentina96"
DB_HOST = "energy-monitoring-db.c65cw0qa0ur4.us-east-1.rds.amazonaws.com"
DB_PORT = "5432"
DB_NAME = "energy_monitoring"

db_connection = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(db_connection)

def create_smart_plug_table():
    try:
        with engine.connect() as connection:
            connection.execute(text("""
            DROP TABLE IF EXISTS smart_plug_data;
            CREATE TABLE smart_plug_data (
                record_id SERIAL PRIMARY KEY,
                appliance_id INTEGER,
                appliance_type VARCHAR(255),
                location VARCHAR(255),
                timestamp TIMESTAMP,
                power_watts FLOAT,
                duration_minutes FLOAT,
                voltage FLOAT,
                current FLOAT,
                is_on BOOLEAN,
                day_of_week INTEGER,
                hour_of_day INTEGER
            )
            """))
            connection.connection.commit()
        print("Table 'smart_plug_data' created successfully.")
    except Exception as e:
        print(f"Error creating table: {e}")

def generate_dummy_data(num_records=5000):
    # Define appliances with realistic power ranges and usage patterns
    appliances = [
        {"type": "Toaster", "location": "Kitchen", "min_power": 800, "max_power": 1200, "peak_hours": [6, 9], "avg_duration": 5},
        {"type": "TV", "location": "Living Room", "min_power": 50, "max_power": 200, "peak_hours": [18, 23], "avg_duration": 120},
        {"type": "Washing Machine", "location": "Laundry", "min_power": 300, "max_power": 1000, "peak_hours": [9, 15], "avg_duration": 60},
        {"type": "Refrigerator", "location": "Kitchen", "min_power": 100, "max_power": 300, "peak_hours": [0, 23], "avg_duration": 1440},  # Always on
        {"type": "Microwave", "location": "Kitchen", "min_power": 600, "max_power": 1200, "peak_hours": [12, 20], "avg_duration": 3},
        {"type": "Laptop", "location": "Bedroom", "min_power": 30, "max_power": 90, "peak_hours": [8, 22], "avg_duration": 240},
        {"type": "Lamp", "location": "Living Room", "min_power": 10, "max_power": 60, "peak_hours": [17, 22], "avg_duration": 180}
    ]

    data = []
    start_time = datetime.now() - timedelta(days=30)  # 30 days of data
    appliance_ids = list(range(1, len(appliances) + 1))  # Unique ID per appliance

    for _ in range(num_records):
        appliance = random.choice(appliances)
        appliance_id = appliance_ids[appliances.index(appliance)]
        
        # Generate timestamp with bias toward peak hours
        hour = random.choices(
            range(24),
            weights=[2 if hour in appliance["peak_hours"] else 1 for hour in range(24)],
            k=1
        )[0]
        timestamp = start_time + timedelta(
            days=random.randint(0, 29),
            hours=hour,
            minutes=random.randint(0, 59),
            seconds=random.randint(0, 59)
        )
        
        # Power and state
        is_on = random.random() < 0.8 if hour in appliance["peak_hours"] else random.random() < 0.3
        power_watts = random.uniform(appliance["min_power"], appliance["max_power"]) if is_on else 0.0
        
        # Duration (only if on, otherwise 0)
        duration_minutes = random.gauss(appliance["avg_duration"], appliance["avg_duration"] * 0.2) if is_on else 0.0
        if duration_minutes < 0:
            duration_minutes = 0.0
        
        # Electrical metrics
        voltage = random.uniform(115, 125)
        current = power_watts / voltage if power_watts > 0 else 0.0
        
        # Contextual features
        day_of_week = timestamp.weekday()  # 0=Mon, 6=Sun
        hour_of_day = timestamp.hour
        
        data.append({
            "appliance_id": appliance_id,
            "appliance_type": appliance["type"],
            "location": appliance["location"],
            "timestamp": timestamp,
            "power_watts": power_watts,
            "duration_minutes": duration_minutes,
            "voltage": voltage,
            "current": current,
            "is_on": is_on,
            "day_of_week": day_of_week,
            "hour_of_day": hour_of_day
        })

    # Convert to DataFrame and insert
    df = pd.DataFrame(data)
    try:
        df.to_sql('smart_plug_data', engine, if_exists='append', index=False)
        print(f"Inserted {len(df)} records into 'smart_plug_data'.")
    except Exception as e:
        print(f"Error inserting data: {e}")

# Execute
try:
    print("Starting process...")
    create_smart_plug_table()
    generate_dummy_data(num_records=500)  # Adjust number of records as needed
    print("Dummy data generation complete.")
except Exception as e:
    print(f"Process failed: {e}")

AWS RDS

Screenshot 2025-02-26 at 9.27.09 PM.png

pgAdmin4

Screenshot 2025-02-26 at 9.26.15 PM.png

2. Visualization

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objs as go

class EnergyDataAnalyzer:
    def __init__(self, data_path):
        """
        Initialize the analyzer with energy consumption data
        
        Args:
            data_path (str): Path to the CSV or database connection
        """
        # Read data from CSV or database
        self.df = pd.read_csv(data_path)
        
        # Convert timestamp to datetime
        self.df['timestamp'] = pd.to_datetime(self.df['timestamp'])
        
    def appliance_energy_breakdown(self):
        """
        Analyze total energy consumption by appliance type
        
        Returns:
            pandas.DataFrame: Aggregated energy consumption by appliance
        """
        appliance_energy = self.df.groupby('appliance_type').agg({
            'power_watts': ['sum', 'mean', 'count']
        }).reset_index()
        appliance_energy.columns = ['Appliance', 'Total Watts', 'Average Watts', 'Usage Count']
        return appliance_energy
    
    def time_based_analysis(self):
        """
        Perform time-based energy consumption analysis
        
        Returns:
            dict: Various time-based energy consumption metrics
        """
        # Hourly analysis
        hourly_consumption = self.df.groupby(self.df['timestamp'].dt.hour)['power_watts'].mean()
        
        # Daily analysis
        daily_consumption = self.df.groupby(self.df['timestamp'].dt.day)['power_watts'].mean()
        
        # Day of week analysis
        day_of_week_consumption = self.df.groupby(self.df['timestamp'].dt.day_name())['power_watts'].mean()
        
        return {
            'hourly_consumption': hourly_consumption,
            'daily_consumption': daily_consumption,
            'day_of_week_consumption': day_of_week_consumption
        }
    
    def visualize_energy_consumption(self):
        """
        Create visualizations for energy consumption
        
        Saves plots to files and returns plot objects
        """
        # 1. Appliance Energy Consumption Pie Chart
        appliance_energy = self.appliance_energy_breakdown()
        plt.figure(figsize=(10, 6))
        plt.pie(appliance_energy['Total Watts'], 
                labels=appliance_energy['Appliance'], 
                autopct='%1.1f%%')
        plt.title('Energy Consumption by Appliance')
        plt.tight_layout()
        plt.savefig('appliance_energy_pie.png')
        plt.close()
        
        # 2. Hourly Consumption Line Plot
        time_analysis = self.time_based_analysis()
        plt.figure(figsize=(12, 6))
        time_analysis['hourly_consumption'].plot(kind='line', marker='o')
        plt.title('Average Energy Consumption by Hour of Day')
        plt.xlabel('Hour of Day')
        plt.ylabel('Average Power Watts')
        plt.tight_layout()
        plt.savefig('hourly_consumption.png')
        plt.close()
        
        # 3. Interactive Plotly Visualization
        # Assumes Plotly is installed
        appliance_daily = self.df.groupby([
            self.df['timestamp'].dt.date, 
            'appliance_type'
        ])['power_watts'].sum().reset_index()
        
        fig = px.line(appliance_daily, 
                      x='timestamp', 
                      y='power_watts', 
                      color='appliance_type',
                      title='Daily Energy Consumption by Appliance')
        fig.write_html('daily_energy_consumption.html')
        
        return {
            'appliance_pie': 'appliance_energy_pie.png',
            'hourly_consumption': 'hourly_consumption.png',
            'interactive_plot': 'daily_energy_consumption.html'
        }
    
    def detect_energy_waste(self, standby_threshold=10):
        """
        Detect potential energy waste from standby power
        
        Args:
            standby_threshold (float): Threshold for standby power in watts
        
        Returns:
            pandas.DataFrame: Appliances with significant standby power
        """
        standby_devices = self.df[
            (self.df['power_watts'] > 0) & 
            (self.df['power_watts'] <= standby_threshold) & 
            (self.df['is_on'] == False)
        ]
        
        return standby_devices.groupby('appliance_type').agg({
            'power_watts': ['count', 'mean']
        }).reset_index()

# Example usage
if __name__ == '__main__':
    analyzer = EnergyDataAnalyzer('smart_plug_data.csv')
    
    # Get appliance energy breakdown
    print(analyzer.appliance_energy_breakdown())
    
    # Visualize energy consumption
    analyzer.visualize_energy_consumption()
    
    # Detect energy waste
    print(analyzer.detect_energy_waste())

appliance_energy_pie.png

hourly_consumption.png

Screenshot 2025-02-26 at 9.15.54 PM.png

3. ETL - AWS S3