Author : Kailash Subramaniyam
Progress : Working...
https://www.canva.com/design/DAGgPJ4skjI/OzsKYLfwSWmsQcI3FpiB0g/view?embed
I built an end-to-end machine learning pipeline to detect anomalies in electrical power usage, inspired by measurable.energy’s mission to eliminate energy waste.
The project starts with generating synthetic time-series data representing power consumption, which I stored in AWS RDS (PostgreSQL) and visualized using pgAdmin and a local Python script. I then performed ETL locally to process the data and uploaded it to AWS S3. Using SageMaker, I trained an anomaly detection model and saved it to S3. Although my attempt to deploy a SageMaker endpoint failed due to scikit version mismatch, I worked around it by running inference locally and logged metrics like anomaly counts to AWS CloudWatch for monitoring.
"""
This Python script simulates an energy monitoring system. It:
# Connects to a PostgreSQL database hosted on Amazon RDS.
# Creates tables for devices (e.g., computers, lights) and for power readings (e.g., power usage over time).
# Populates the database with sample devices if none exist.
# Generates realistic power consumption data for these devices based on their type and time of day.
# Inserts this data into the database in batches.
# Runs continuously, generating a day's worth of data every 6 hours.
The script uses realistic behavior patterns (e.g., a coffee machine uses more power in the morning) and simulates 15-minute intervals of energy usage, which is common in energy monitoring systems.
"""
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
from sqlalchemy import create_engine
from sqlalchemy.sql import text
# RDS connection parameters
DB_USER = "postgres"
DB_PASSWORD = "vAlentina96"
DB_HOST = "energy-monitoring-db.c65cw0qa0ur4.us-east-1.rds.amazonaws.com"
DB_PORT = "5432"
DB_NAME = "energy_monitoring"
db_connection = f"postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
engine = create_engine(db_connection)
def create_smart_plug_table():
try:
with engine.connect() as connection:
connection.execute(text("""
DROP TABLE IF EXISTS smart_plug_data;
CREATE TABLE smart_plug_data (
record_id SERIAL PRIMARY KEY,
appliance_id INTEGER,
appliance_type VARCHAR(255),
location VARCHAR(255),
timestamp TIMESTAMP,
power_watts FLOAT,
duration_minutes FLOAT,
voltage FLOAT,
current FLOAT,
is_on BOOLEAN,
day_of_week INTEGER,
hour_of_day INTEGER
)
"""))
connection.connection.commit()
print("Table 'smart_plug_data' created successfully.")
except Exception as e:
print(f"Error creating table: {e}")
def generate_dummy_data(num_records=5000):
# Define appliances with realistic power ranges and usage patterns
appliances = [
{"type": "Toaster", "location": "Kitchen", "min_power": 800, "max_power": 1200, "peak_hours": [6, 9], "avg_duration": 5},
{"type": "TV", "location": "Living Room", "min_power": 50, "max_power": 200, "peak_hours": [18, 23], "avg_duration": 120},
{"type": "Washing Machine", "location": "Laundry", "min_power": 300, "max_power": 1000, "peak_hours": [9, 15], "avg_duration": 60},
{"type": "Refrigerator", "location": "Kitchen", "min_power": 100, "max_power": 300, "peak_hours": [0, 23], "avg_duration": 1440}, # Always on
{"type": "Microwave", "location": "Kitchen", "min_power": 600, "max_power": 1200, "peak_hours": [12, 20], "avg_duration": 3},
{"type": "Laptop", "location": "Bedroom", "min_power": 30, "max_power": 90, "peak_hours": [8, 22], "avg_duration": 240},
{"type": "Lamp", "location": "Living Room", "min_power": 10, "max_power": 60, "peak_hours": [17, 22], "avg_duration": 180}
]
data = []
start_time = datetime.now() - timedelta(days=30) # 30 days of data
appliance_ids = list(range(1, len(appliances) + 1)) # Unique ID per appliance
for _ in range(num_records):
appliance = random.choice(appliances)
appliance_id = appliance_ids[appliances.index(appliance)]
# Generate timestamp with bias toward peak hours
hour = random.choices(
range(24),
weights=[2 if hour in appliance["peak_hours"] else 1 for hour in range(24)],
k=1
)[0]
timestamp = start_time + timedelta(
days=random.randint(0, 29),
hours=hour,
minutes=random.randint(0, 59),
seconds=random.randint(0, 59)
)
# Power and state
is_on = random.random() < 0.8 if hour in appliance["peak_hours"] else random.random() < 0.3
power_watts = random.uniform(appliance["min_power"], appliance["max_power"]) if is_on else 0.0
# Duration (only if on, otherwise 0)
duration_minutes = random.gauss(appliance["avg_duration"], appliance["avg_duration"] * 0.2) if is_on else 0.0
if duration_minutes < 0:
duration_minutes = 0.0
# Electrical metrics
voltage = random.uniform(115, 125)
current = power_watts / voltage if power_watts > 0 else 0.0
# Contextual features
day_of_week = timestamp.weekday() # 0=Mon, 6=Sun
hour_of_day = timestamp.hour
data.append({
"appliance_id": appliance_id,
"appliance_type": appliance["type"],
"location": appliance["location"],
"timestamp": timestamp,
"power_watts": power_watts,
"duration_minutes": duration_minutes,
"voltage": voltage,
"current": current,
"is_on": is_on,
"day_of_week": day_of_week,
"hour_of_day": hour_of_day
})
# Convert to DataFrame and insert
df = pd.DataFrame(data)
try:
df.to_sql('smart_plug_data', engine, if_exists='append', index=False)
print(f"Inserted {len(df)} records into 'smart_plug_data'.")
except Exception as e:
print(f"Error inserting data: {e}")
# Execute
try:
print("Starting process...")
create_smart_plug_table()
generate_dummy_data(num_records=500) # Adjust number of records as needed
print("Dummy data generation complete.")
except Exception as e:
print(f"Process failed: {e}")
AWS RDS
pgAdmin4
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objs as go
class EnergyDataAnalyzer:
def __init__(self, data_path):
"""
Initialize the analyzer with energy consumption data
Args:
data_path (str): Path to the CSV or database connection
"""
# Read data from CSV or database
self.df = pd.read_csv(data_path)
# Convert timestamp to datetime
self.df['timestamp'] = pd.to_datetime(self.df['timestamp'])
def appliance_energy_breakdown(self):
"""
Analyze total energy consumption by appliance type
Returns:
pandas.DataFrame: Aggregated energy consumption by appliance
"""
appliance_energy = self.df.groupby('appliance_type').agg({
'power_watts': ['sum', 'mean', 'count']
}).reset_index()
appliance_energy.columns = ['Appliance', 'Total Watts', 'Average Watts', 'Usage Count']
return appliance_energy
def time_based_analysis(self):
"""
Perform time-based energy consumption analysis
Returns:
dict: Various time-based energy consumption metrics
"""
# Hourly analysis
hourly_consumption = self.df.groupby(self.df['timestamp'].dt.hour)['power_watts'].mean()
# Daily analysis
daily_consumption = self.df.groupby(self.df['timestamp'].dt.day)['power_watts'].mean()
# Day of week analysis
day_of_week_consumption = self.df.groupby(self.df['timestamp'].dt.day_name())['power_watts'].mean()
return {
'hourly_consumption': hourly_consumption,
'daily_consumption': daily_consumption,
'day_of_week_consumption': day_of_week_consumption
}
def visualize_energy_consumption(self):
"""
Create visualizations for energy consumption
Saves plots to files and returns plot objects
"""
# 1. Appliance Energy Consumption Pie Chart
appliance_energy = self.appliance_energy_breakdown()
plt.figure(figsize=(10, 6))
plt.pie(appliance_energy['Total Watts'],
labels=appliance_energy['Appliance'],
autopct='%1.1f%%')
plt.title('Energy Consumption by Appliance')
plt.tight_layout()
plt.savefig('appliance_energy_pie.png')
plt.close()
# 2. Hourly Consumption Line Plot
time_analysis = self.time_based_analysis()
plt.figure(figsize=(12, 6))
time_analysis['hourly_consumption'].plot(kind='line', marker='o')
plt.title('Average Energy Consumption by Hour of Day')
plt.xlabel('Hour of Day')
plt.ylabel('Average Power Watts')
plt.tight_layout()
plt.savefig('hourly_consumption.png')
plt.close()
# 3. Interactive Plotly Visualization
# Assumes Plotly is installed
appliance_daily = self.df.groupby([
self.df['timestamp'].dt.date,
'appliance_type'
])['power_watts'].sum().reset_index()
fig = px.line(appliance_daily,
x='timestamp',
y='power_watts',
color='appliance_type',
title='Daily Energy Consumption by Appliance')
fig.write_html('daily_energy_consumption.html')
return {
'appliance_pie': 'appliance_energy_pie.png',
'hourly_consumption': 'hourly_consumption.png',
'interactive_plot': 'daily_energy_consumption.html'
}
def detect_energy_waste(self, standby_threshold=10):
"""
Detect potential energy waste from standby power
Args:
standby_threshold (float): Threshold for standby power in watts
Returns:
pandas.DataFrame: Appliances with significant standby power
"""
standby_devices = self.df[
(self.df['power_watts'] > 0) &
(self.df['power_watts'] <= standby_threshold) &
(self.df['is_on'] == False)
]
return standby_devices.groupby('appliance_type').agg({
'power_watts': ['count', 'mean']
}).reset_index()
# Example usage
if __name__ == '__main__':
analyzer = EnergyDataAnalyzer('smart_plug_data.csv')
# Get appliance energy breakdown
print(analyzer.appliance_energy_breakdown())
# Visualize energy consumption
analyzer.visualize_energy_consumption()
# Detect energy waste
print(analyzer.detect_energy_waste())