Skip to main content

07 - Data Pipeline

Real-time data streaming architecture and processing pipelines


Table of Contents

  1. Pipeline Architecture
  2. Stream Processing
  3. Data Storage
  4. Event Processing
  5. Monitoring

1. Pipeline Architecture

Data Flow Overview

┌─────────────────────────────────────────────────────────────────────┐
│ DATA PIPELINE ARCHITECTURE │
│ │
│ INGESTION PROCESSING STORAGE SERVING │
│ ───────── ────────── ─────── ─────── │
│ │
│ ┌─────────┐ ┌─────────────┐ ┌─────────┐ ┌────────┐ │
│ │ Sensors │──────▶│ MQTT Broker │─────▶│ Kafka │────▶│InfluxDB│ │
│ └─────────┘ └─────────────┘ │ Streams │ └────────┘ │
│ │ └────┬────┘ │ │
│ ┌─────────┐ │ │ │ │
│ │ Edge │──────────────┤ │ │ │
│ │ Gateway │ │ ▼ ▼ │
│ └─────────┘ │ ┌─────────────┐ ┌─────────┐ │
│ │ │ ML Models │ │ Grafana │ │
│ ┌─────────┐ │ └──────┬──────┘ └─────────┘ │
│ │ REST │──────────────┘ │ │
│ │ API │ ▼ │
│ └─────────┘ ┌─────────────┐ │
│ │ Predictions │ │
│ │ + Alerts │ │
│ └──────┬──────┘ │
│ │ │
│ ▼ │
│ ┌─────────────┐ │
│ │ WebSocket │ │
│ │ Dashboard │ │
│ └─────────────┘ │
└─────────────────────────────────────────────────────────────────────┘

Pipeline Stages

StageTechnologyLatencyThroughput
IngestionMQTT/REST< 10ms10K msg/s
BufferingKafka< 5ms100K msg/s
ProcessingKafka Streams< 50ms50K msg/s
StorageInfluxDB< 20ms500K writes/s
ServingWebSocket< 10msReal-time

2. Stream Processing

2.1 Kafka Topics Structure

sensors/
├── raw/ # Raw sensor data
│ ├── plant-01
│ ├── plant-02
│ └── ...
├── validated/ # Validated readings
├── aggregated/ # Aggregated metrics
├── alerts/ # Alert events
└── predictions/ # ML predictions

2.2 Kafka Producer (Node.js)

// src/lib/kafka/producer.ts
import { Kafka, Producer, CompressionTypes } from "kafkajs";

const kafka = new Kafka({
clientId: "Edubotx-iot",
brokers: (process.env.KAFKA_BROKERS || "localhost:9092").split(","),
});

let producer: Producer | null = null;

export async function getProducer(): Promise<Producer> {
if (!producer) {
producer = kafka.producer();
await producer.connect();
}
return producer;
}

export async function publishReading(reading: SensorReading) {
const prod = await getProducer();

await prod.send({
topic: `sensors.raw.${reading.plantId || "default"}`,
compression: CompressionTypes.GZIP,
messages: [
{
key: reading.sensorId,
value: JSON.stringify(reading),
timestamp: String(Date.now()),
headers: {
"content-type": "application/json",
source: "iot-gateway",
},
},
],
});
}

export async function publishBatch(readings: SensorReading[]) {
const prod = await getProducer();

// Group by plant
const grouped = readings.reduce((acc, r) => {
const key = r.plantId || "default";
if (!acc[key]) acc[key] = [];
acc[key].push(r);
return acc;
}, {} as Record<string, SensorReading[]>);

// Send to respective topics
await prod.sendBatch({
topicMessages: Object.entries(grouped).map(([plantId, msgs]) => ({
topic: `sensors.raw.${plantId}`,
messages: msgs.map((r) => ({
key: r.sensorId,
value: JSON.stringify(r),
})),
})),
});
}

2.3 Stream Processor

// src/lib/kafka/stream-processor.ts
import { Kafka, Consumer, EachMessagePayload } from "kafkajs";

const kafka = new Kafka({
clientId: "Edubotx-processor",
brokers: (process.env.KAFKA_BROKERS || "localhost:9092").split(","),
});

export class StreamProcessor {
private consumer: Consumer;
private handlers: Map<string, (data: any) => Promise<void>> = new Map();

constructor(groupId: string) {
this.consumer = kafka.consumer({ groupId });
}

async start(topics: string[]) {
await this.consumer.connect();
await this.consumer.subscribe({ topics, fromBeginning: false });

await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
await this.processMessage(payload);
},
});
}

private async processMessage({ topic, message }: EachMessagePayload) {
try {
const data = JSON.parse(message.value?.toString() || "{}");

// Validate reading
const validated = this.validateReading(data);
if (!validated.isValid) {
console.warn("Invalid reading:", validated.errors);
return;
}

// Check thresholds
const alerts = this.checkThresholds(data);
if (alerts.length > 0) {
await this.publishAlerts(alerts);
}

// Aggregate for ML
await this.updateAggregation(data);

// Publish validated reading
await this.publishValidated(data);
} catch (error) {
console.error("Processing error:", error);
}
}

private validateReading(data: any) {
const errors: string[] = [];

if (!data.sensorId) errors.push("Missing sensorId");
if (typeof data.value !== "number") errors.push("Invalid value");
if (isNaN(data.value)) errors.push("Value is NaN");

// Range validation
const ranges: Record<string, [number, number]> = {
pH: [0, 14],
BOD: [0, 2000],
COD: [0, 5000],
TSS: [0, 10000],
TDS: [0, 50000],
temperature: [-10, 100],
};

const range = ranges[data.parameter];
if (range && (data.value < range[0] || data.value > range[1])) {
errors.push(`Value out of range for ${data.parameter}`);
}

return { isValid: errors.length === 0, errors };
}

private checkThresholds(data: SensorReading): Alert[] {
const thresholds: Record<string, { warning: number; critical: number }> = {
BOD: { warning: 30, critical: 100 },
COD: { warning: 250, critical: 500 },
TSS: { warning: 100, critical: 300 },
};

const alerts: Alert[] = [];
const threshold = thresholds[data.parameter];

if (threshold) {
if (data.value > threshold.critical) {
alerts.push({
level: "critical",
parameter: data.parameter,
value: data.value,
threshold: threshold.critical,
sensorId: data.sensorId,
timestamp: new Date(),
});
} else if (data.value > threshold.warning) {
alerts.push({
level: "warning",
parameter: data.parameter,
value: data.value,
threshold: threshold.warning,
sensorId: data.sensorId,
timestamp: new Date(),
});
}
}

return alerts;
}

async stop() {
await this.consumer.disconnect();
}
}

2.4 Aggregation Service

// src/lib/kafka/aggregator.ts

interface AggregationWindow {
plantId: string;
windowStart: Date;
windowEnd: Date;
readings: Map<string, number[]>;
}

export class AggregationService {
private windows: Map<string, AggregationWindow> = new Map();
private windowDuration = 60000; // 1 minute

addReading(reading: SensorReading) {
const windowKey = this.getWindowKey(reading.plantId);
let window = this.windows.get(windowKey);

if (!window || Date.now() > window.windowEnd.getTime()) {
// Start new window
window = {
plantId: reading.plantId || "default",
windowStart: new Date(),
windowEnd: new Date(Date.now() + this.windowDuration),
readings: new Map(),
};
this.windows.set(windowKey, window);
}

// Add reading to window
const values = window.readings.get(reading.parameter) || [];
values.push(reading.value);
window.readings.set(reading.parameter, values);
}

getAggregatedMetrics(plantId: string): AggregatedMetrics | null {
const windowKey = this.getWindowKey(plantId);
const window = this.windows.get(windowKey);

if (!window) return null;

const metrics: AggregatedMetrics = {
plantId,
timestamp: new Date(),
parameters: {},
};

window.readings.forEach((values, param) => {
metrics.parameters[param] = {
min: Math.min(...values),
max: Math.max(...values),
avg: values.reduce((a, b) => a + b, 0) / values.length,
count: values.length,
latest: values[values.length - 1],
};
});

return metrics;
}

private getWindowKey(plantId?: string): string {
return `${plantId || "default"}_${Math.floor(Date.now() / this.windowDuration)}`;
}
}

3. Data Storage

3.1 InfluxDB Schema

// src/lib/influxdb/client.ts
import { InfluxDB, Point, WriteApi, QueryApi } from "@influxdata/influxdb-client";

const influx = new InfluxDB({
url: process.env.INFLUXDB_URL || "http://localhost:8086",
token: process.env.INFLUXDB_TOKEN,
});

const writeApi: WriteApi = influx.getWriteApi(process.env.INFLUXDB_ORG || "Edubotx", process.env.INFLUXDB_BUCKET || "sensors", "ms");

const queryApi: QueryApi = influx.getQueryApi(process.env.INFLUXDB_ORG || "Edubotx");

export async function writeReading(reading: SensorReading) {
const point = new Point("water_quality")
.tag("sensor_id", reading.sensorId)
.tag("parameter", reading.parameter)
.tag("plant_id", reading.plantId || "default")
.tag("location", reading.location || "unknown")
.floatField("value", reading.value)
.stringField("quality", reading.quality)
.stringField("unit", reading.unit)
.timestamp(reading.timestamp || new Date());

writeApi.writePoint(point);
}

export async function writePrediction(prediction: MLPrediction) {
const point = new Point("predictions")
.tag("plant_id", prediction.plantId)
.tag("model", prediction.model)
.stringField("prediction", prediction.result.prediction)
.floatField("confidence", prediction.result.confidence)
.timestamp(new Date());

writeApi.writePoint(point);
}

export async function queryLatestReadings(plantId: string, duration = "-5m") {
const query = `
from(bucket: "${process.env.INFLUXDB_BUCKET}")
|> range(start: ${duration})
|> filter(fn: (r) => r.plant_id == "${plantId}")
|> filter(fn: (r) => r._measurement == "water_quality")
|> last()
|> pivot(rowKey:["_time"], columnKey: ["parameter"], valueColumn: "_value")
`;

const results: any[] = [];

await new Promise((resolve, reject) => {
queryApi.queryRows(query, {
next: (row, tableMeta) => {
results.push(tableMeta.toObject(row));
},
error: reject,
complete: resolve,
});
});

return results;
}

export async function queryHistory(plantId: string, parameter: string, start: string, end: string, aggregateWindow = "1m") {
const query = `
from(bucket: "${process.env.INFLUXDB_BUCKET}")
|> range(start: ${start}, stop: ${end})
|> filter(fn: (r) => r.plant_id == "${plantId}")
|> filter(fn: (r) => r.parameter == "${parameter}")
|> aggregateWindow(every: ${aggregateWindow}, fn: mean, createEmpty: false)
`;

const results: any[] = [];

await new Promise((resolve, reject) => {
queryApi.queryRows(query, {
next: (row, tableMeta) => {
results.push(tableMeta.toObject(row));
},
error: reject,
complete: resolve,
});
});

return results;
}

// Flush on shutdown
process.on("beforeExit", async () => {
await writeApi.close();
});

3.2 Retention Policies

-- InfluxDB retention policies via Flux
// Raw data: 7 days
option task = {name: "downsample_raw", every: 1h}

from(bucket: "sensors")
|> range(start: -1h)
|> aggregateWindow(every: 1m, fn: mean)
|> to(bucket: "sensors_1m", org: "Edubotx")

// 1-minute aggregates: 30 days
// 1-hour aggregates: 1 year
// Daily aggregates: Forever

4. Event Processing

4.1 Alert Event Handler

// src/lib/events/alert-handler.ts

export class AlertEventHandler {
private alertHistory: Alert[] = [];
private subscribers: Set<(alert: Alert) => void> = new Set();

async handleAlert(alert: Alert) {
// Store alert
this.alertHistory.push(alert);

// Persist to database
await this.persistAlert(alert);

// Notify subscribers
this.subscribers.forEach((callback) => callback(alert));

// Send notifications based on severity
if (alert.level === "critical") {
await this.sendCriticalNotification(alert);
} else if (alert.level === "warning") {
await this.sendWarningNotification(alert);
}
}

private async sendCriticalNotification(alert: Alert) {
// Email
await sendEmail({
to: process.env.ALERT_EMAIL,
subject: `🚨 CRITICAL: ${alert.parameter} Alert`,
body: `Critical alert at ${alert.plantId}: ${alert.message}`,
});

// SMS (via Twilio)
await sendSMS({
to: process.env.ALERT_PHONE,
message: `CRITICAL: ${alert.parameter} = ${alert.value} at ${alert.plantId}`,
});

// Webhook
await fetch(process.env.ALERT_WEBHOOK_URL!, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(alert),
});
}

subscribe(callback: (alert: Alert) => void) {
this.subscribers.add(callback);
return () => this.subscribers.delete(callback);
}
}

4.2 ML Trigger Events

// src/lib/events/ml-trigger.ts

export class MLTriggerService {
private aggregator: AggregationService;
private lastPrediction: Map<string, Date> = new Map();
private predictionInterval = 60000; // 1 minute

constructor(aggregator: AggregationService) {
this.aggregator = aggregator;
}

async checkAndTrigger(plantId: string) {
const lastTime = this.lastPrediction.get(plantId);

if (lastTime && Date.now() - lastTime.getTime() < this.predictionInterval) {
return; // Too soon
}

const metrics = this.aggregator.getAggregatedMetrics(plantId);
if (!metrics) return;

// Check if we have enough data
const requiredParams = ["pH", "BOD", "COD", "TSS", "TDS"];
const hasAllParams = requiredParams.every((p) => metrics.parameters[p]?.count > 0);

if (!hasAllParams) return;

// Trigger prediction
await this.runPrediction(plantId, metrics);
this.lastPrediction.set(plantId, new Date());
}

private async runPrediction(plantId: string, metrics: AggregatedMetrics) {
const input = {
flow_rate: metrics.parameters.flow_rate?.latest || 150,
influent_BOD: metrics.parameters.BOD?.latest || 180,
influent_COD: metrics.parameters.COD?.latest || 350,
influent_TSS: metrics.parameters.TSS?.latest || 120,
influent_pH: metrics.parameters.pH?.latest || 7.5,
influent_TDS: metrics.parameters.TDS?.latest || 600,
temperature: metrics.parameters.temperature?.latest || 25,
};

const response = await fetch(`${process.env.ML_API_URL}/api/predict/reusability`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(input),
});

const result = await response.json();

// Store and broadcast
await writePrediction({ plantId, model: "reusability", result, input });
broadcastPrediction({ plantId, result, timestamp: new Date() });
}
}

5. Monitoring

5.1 Pipeline Metrics

// src/lib/monitoring/metrics.ts
import { Counter, Histogram, Gauge, Registry } from "prom-client";

const register = new Registry();

// Counters
export const messagesReceived = new Counter({
name: "iot_messages_received_total",
help: "Total messages received",
labelNames: ["source", "plant_id"],
registers: [register],
});

export const messagesProcessed = new Counter({
name: "iot_messages_processed_total",
help: "Total messages processed",
labelNames: ["status", "plant_id"],
registers: [register],
});

export const alertsTriggered = new Counter({
name: "iot_alerts_triggered_total",
help: "Total alerts triggered",
labelNames: ["level", "parameter"],
registers: [register],
});

// Histograms
export const processingLatency = new Histogram({
name: "iot_processing_latency_seconds",
help: "Message processing latency",
labelNames: ["stage"],
buckets: [0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1],
registers: [register],
});

// Gauges
export const activeConnections = new Gauge({
name: "iot_active_connections",
help: "Active WebSocket connections",
registers: [register],
});

export const bufferSize = new Gauge({
name: "iot_buffer_size",
help: "Current buffer size",
labelNames: ["plant_id"],
registers: [register],
});

// Export metrics endpoint
export async function getMetrics() {
return register.metrics();
}

5.2 Health Check Endpoint

// src/app/api/health/route.ts
import { NextResponse } from "next/server";

export async function GET() {
const checks = await Promise.all([checkKafka(), checkInfluxDB(), checkRedis(), checkMQTT()]);

const isHealthy = checks.every((c) => c.status === "healthy");

return NextResponse.json(
{
status: isHealthy ? "healthy" : "unhealthy",
timestamp: new Date().toISOString(),
checks: checks.reduce((acc, c) => ({ ...acc, [c.name]: c }), {}),
},
{ status: isHealthy ? 200 : 503 }
);
}

Next Steps


Last Updated: December 2024