Anomaly Detection
This guide demonstrates the anomaly detection capabilities of Time Series RAG.
Synthetic Data Generation
Anomaly Detection Examples
"""Anomaly Detection Examples for Time Series RAG.
This script demonstrates anomaly detection capabilities using Time Series RAG
and the analytics module.
"""
import numpy as np
Create synthetic sensor data with anomalies:
def create_synthetic_sensor_data():
"""Create synthetic sensor data with anomalies.
Returns:
tuple: Three time series (temperature, vibration, pressure) with anomalies
"""
# Time points
t = np.linspace(0, 10, 1000)
# Temperature with daily cycle and anomalies
temp_base = 20 + 5 * np.sin(2 * np.pi * t / 1) # Daily pattern
temp_anomalies = np.zeros_like(t)
temp_anomalies[300:320] = 10 # Sudden spike
temp_anomalies[600:700] = 5 # Sustained deviation
temperature = temp_base + temp_anomalies + np.random.normal(0, 0.5, size=len(t))
# Vibration with intermittent spikes
vibration = np.random.normal(0, 0.1, size=len(t))
spike_locations = np.random.choice(len(t), size=10, replace=False)
vibration[spike_locations] += np.random.uniform(1, 2, size=len(spike_locations))
# Pressure with gradual drift and sudden drop
pressure = 100 + 0.1 * t + np.random.normal(0, 0.1, size=len(t))
pressure[800:] -= 5 # Sudden drop
return temperature, vibration, pressure, t
Anomaly Detection
Detect anomalies in sensor data:
def detect_anomalies_example():
"""Example of detecting anomalies in sensor data.
This example demonstrates:
1. Creating synthetic sensor data with known anomalies
2. Using TimeSeriesAnalytics to detect anomalies
3. Visualizing the results
"""
# Get synthetic data
temperature, vibration, pressure, t = create_synthetic_sensor_data()
# Initialize analytics for each sensor
temp_analytics = TimeSeriesAnalytics(temperature)
vib_analytics = TimeSeriesAnalytics(vibration)
press_analytics = TimeSeriesAnalytics(pressure)
# Detect anomalies
temp_anomalies = temp_analytics.detect_anomalies(window_size=50)
vib_anomalies = vib_analytics.detect_anomalies(window_size=20)
press_anomalies = press_analytics.detect_anomalies(window_size=100)
# Visualize results
plt.figure(figsize=(15, 10))
# Temperature
plt.subplot(3, 1, 1)
plt.plot(t, temperature, label='Temperature')
if temp_anomalies:
anomaly_idx = [x[0] for x in temp_anomalies]
plt.scatter(t[anomaly_idx], temperature[anomaly_idx],
color='red', label='Anomalies')
plt.title('Temperature Sensor')
plt.legend()
# Vibration
plt.subplot(3, 1, 2)
plt.plot(t, vibration, label='Vibration')
if vib_anomalies:
anomaly_idx = [x[0] for x in vib_anomalies]
plt.scatter(t[anomaly_idx], vibration[anomaly_idx],
color='red', label='Anomalies')
plt.title('Vibration Sensor')
plt.legend()
# Pressure
plt.subplot(3, 1, 3)
plt.plot(t, pressure, label='Pressure')
if press_anomalies:
anomaly_idx = [x[0] for x in press_anomalies]
plt.scatter(t[anomaly_idx], pressure[anomaly_idx],
color='red', label='Anomalies')
plt.title('Pressure Sensor')
plt.legend()
plt.tight_layout()
plt.show()
return temp_anomalies, vib_anomalies, press_anomalies
Anomaly Pattern Analysis
Analyze patterns in detected anomalies:
def analyze_anomaly_patterns(anomalies, data, window_size=50):
"""Extract and analyze patterns around anomalies.
Args:
anomalies: List of (index, value) tuples indicating anomalies
data: Original time series data
window_size: Size of window around anomalies to analyze
Returns:
List of windows around anomalies
"""
windows = []
for idx, _ in anomalies:
start = max(0, idx - window_size//2)
end = min(len(data), idx + window_size//2)
windows.append(data[start:end])
return windows
Complete analysis example:
def anomaly_pattern_analysis_example():
"""Example of analyzing patterns in anomalies.
This example demonstrates:
1. Detecting anomalies in multiple sensors
2. Extracting patterns around anomalies
3. Using RAG to find similar anomaly patterns
"""
# Get data and detect anomalies
temperature, vibration, pressure, t = create_synthetic_sensor_data()
temp_anomalies, vib_anomalies, press_anomalies = detect_anomalies_example()
# Initialize RAG system
embedder = TimeSeriesEmbedder()
rag = TimeSeriesRAG()
# Extract and store anomaly patterns
for sensor_name, anomalies, data in [
('temperature', temp_anomalies, temperature),
('vibration', vib_anomalies, vibration),
('pressure', press_anomalies, pressure)
]:
windows = analyze_anomaly_patterns(anomalies, data)
for i, window in enumerate(windows):
embedding = embedder.embed(window)
doc = TimeSeriesDocument(
id=f'{sensor_name}_anomaly_{i}',
data=window,
metadata={'sensor': sensor_name, 'type': 'anomaly'},
embedding=embedding
)
rag.add_document(doc)
# Find similar anomaly patterns
if temp_anomalies: # Use first temperature anomaly as query
query_window = analyze_anomaly_patterns(
[temp_anomalies[0]], temperature
)[0]
query_embedding = embedder.embed(query_window)
results = rag.search(query_embedding, k=3)
# Visualize similar anomalies
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(query_window)
plt.title('Query Anomaly Pattern')
plt.xlabel('Time')
plt.ylabel('Value')
plt.subplot(1, 2, 2)
for result in results:
plt.plot(result['data'],
label=f"{result['metadata']['sensor']}\n"
f"Distance: {result['distance']:.2f}")
plt.title('Similar Anomaly Patterns')
plt.xlabel('Time')
plt.ylabel('Value')
plt.legend()
plt.tight_layout()
plt.show()
Running the Examples
Execute all examples with:
python anomaly_detection.py