Skip to main content

05 - API Integration

REST and WebSocket endpoints for IoT data ingestion and real-time communication


Table of Contents

  1. API Architecture
  2. REST Endpoints
  3. WebSocket Server
  4. Authentication
  5. Rate Limiting

1. API Architecture

Endpoint Overview

/api/iot/
├── ingest POST - Single sensor reading
├── batch POST - Batch sensor readings
├── status GET - Gateway health check
├── sensors GET - List registered sensors
├── sensors/:id GET - Sensor details
├── sensors/:id/history GET - Sensor historical data
├── predict POST - Trigger ML prediction
├── alerts GET - Active alerts
└── ws WS - Real-time data stream

2. REST Endpoints

2.1 Ingest Single Reading

File: src/app/api/iot/ingest/route.ts

import { NextRequest, NextResponse } from "next/server";
import { z } from "zod";

const SensorReadingSchema = z.object({
sensorId: z.string().min(1),
parameter: z.enum(["pH", "BOD", "COD", "TSS", "TDS", "flow_rate", "temperature", "DO"]),
value: z.number(),
unit: z.string(),
timestamp: z.string().datetime().optional(),
quality: z.enum(["good", "uncertain", "bad"]).default("good"),
metadata: z
.object({
plantId: z.string().optional(),
location: z.string().optional(),
})
.optional(),
});

export async function POST(request: NextRequest) {
try {
const body = await request.json();
const reading = SensorReadingSchema.parse(body);
const timestamp = reading.timestamp ? new Date(reading.timestamp) : new Date();

// Store in database
await writeReading({
measurement: "sensor_readings",
tags: {
sensor_id: reading.sensorId,
parameter: reading.parameter,
plant_id: reading.metadata?.plantId || "default",
},
fields: { value: reading.value, quality: reading.quality },
timestamp,
});

// Broadcast to WebSocket clients
broadcastReading({ ...reading, timestamp: timestamp.toISOString() });

return NextResponse.json(
{
success: true,
message: "Reading ingested successfully",
},
{ status: 201 }
);
} catch (error) {
if (error instanceof z.ZodError) {
return NextResponse.json(
{
success: false,
error: "Validation failed",
details: error.errors,
},
{ status: 400 }
);
}
return NextResponse.json({ success: false, error: "Internal error" }, { status: 500 });
}
}

2.2 Batch Ingest

File: src/app/api/iot/batch/route.ts

import { NextRequest, NextResponse } from "next/server";
import { z } from "zod";

const BatchSchema = z.object({
readings: z
.array(
z.object({
sensorId: z.string(),
parameter: z.string(),
value: z.number(),
unit: z.string(),
timestamp: z.string().datetime().optional(),
quality: z.enum(["good", "uncertain", "bad"]).default("good"),
})
)
.min(1)
.max(1000),
plantId: z.string().optional(),
});

export async function POST(request: NextRequest) {
try {
const { readings, plantId } = BatchSchema.parse(await request.json());

const points = readings.map((r) => ({
measurement: "sensor_readings",
tags: { sensor_id: r.sensorId, parameter: r.parameter, plant_id: plantId || "default" },
fields: { value: r.value, quality: r.quality },
timestamp: r.timestamp ? new Date(r.timestamp) : new Date(),
}));

await writeBatchReadings(points);

return NextResponse.json(
{
success: true,
message: `${readings.length} readings ingested`,
},
{ status: 201 }
);
} catch (error) {
return NextResponse.json({ success: false, error: "Failed" }, { status: 500 });
}
}

2.3 Trigger ML Prediction from Sensors

File: src/app/api/iot/predict/route.ts

import { NextRequest, NextResponse } from "next/server";

export async function POST(request: NextRequest) {
try {
const { plantId, model } = await request.json();

// Get latest sensor readings
const readings = await getLatestReadings(plantId);

// Transform to ML input format
const mlInput = transformToMLInput(readings);

// Call appropriate ML model
let result;
if (model === "prediction") {
result = await fetch(`${process.env.NEXT_PUBLIC_ML_API_URL}/api/predict/reusability`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(mlInput),
}).then((r) => r.json());
} else if (model === "treatment") {
result = await fetch(`${process.env.NEXT_PUBLIC_ML_API_URL}/api/predict/treatment`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(mlInput),
}).then((r) => r.json());
}

// Store prediction result
await storePrediction(plantId, model, result);

// Broadcast to dashboard
broadcastPrediction({ plantId, model, result, timestamp: new Date().toISOString() });

return NextResponse.json({ success: true, data: result });
} catch (error) {
return NextResponse.json({ success: false, error: "Prediction failed" }, { status: 500 });
}
}

function transformToMLInput(readings: Map<string, number>) {
return {
flow_rate: readings.get("flow_rate") || 150,
influent_BOD: readings.get("BOD") || 180,
influent_COD: readings.get("COD") || 350,
influent_TSS: readings.get("TSS") || 120,
influent_pH: readings.get("pH") || 7.5,
influent_TDS: readings.get("TDS") || 600,
aeration_rate: readings.get("aeration_rate") || 35,
chemical_dose: readings.get("chemical_dose") || 12,
sludge_recycle_rate: 25,
retention_time: 6,
temperature: readings.get("temperature") || 25,
effluent_BOD_lag1: 22,
};
}

2.4 Gateway Status

File: src/app/api/iot/status/route.ts

import { NextResponse } from "next/server";

export async function GET() {
const health = {
status: "healthy",
timestamp: new Date().toISOString(),
services: {
database: await checkDatabaseHealth(),
mqtt: await checkMQTTHealth(),
websocket: getWebSocketStats(),
},
uptime: process.uptime(),
};

return NextResponse.json(health);
}

3. WebSocket Server

3.1 WebSocket Service

File: src/lib/iot/websocket-server.ts

import { Server as SocketIOServer } from "socket.io";
import { Server as HTTPServer } from "http";

let io: SocketIOServer | null = null;

export function initWebSocket(server: HTTPServer) {
io = new SocketIOServer(server, {
cors: {
origin: process.env.NEXT_PUBLIC_APP_URL,
methods: ["GET", "POST"],
},
path: "/api/iot/ws",
});

io.on("connection", (socket) => {
console.log("Client connected:", socket.id);

// Join plant-specific room
socket.on("subscribe", (plantId: string) => {
socket.join(`plant:${plantId}`);
console.log(`${socket.id} subscribed to plant:${plantId}`);
});

// Leave room
socket.on("unsubscribe", (plantId: string) => {
socket.leave(`plant:${plantId}`);
});

socket.on("disconnect", () => {
console.log("Client disconnected:", socket.id);
});
});

return io;
}

export function broadcastReading(reading: any) {
if (!io) return;

const plantId = reading.metadata?.plantId || "default";
io.to(`plant:${plantId}`).emit("sensor:reading", reading);
io.emit("sensor:reading:global", reading);
}

export function broadcastAlert(alert: any) {
if (!io) return;
io.emit("alert", alert);
}

export function broadcastPrediction(prediction: any) {
if (!io) return;
io.to(`plant:${prediction.plantId}`).emit("prediction", prediction);
}

export function getWebSocketStats() {
if (!io) return { connections: 0 };
return {
connections: io.engine.clientsCount,
};
}

3.2 Client-Side Hook

File: src/hooks/useIoTSocket.ts

import { useEffect, useState, useCallback } from "react";
import { io, Socket } from "socket.io-client";

interface SensorReading {
sensorId: string;
parameter: string;
value: number;
timestamp: string;
}

export function useIoTSocket(plantId?: string) {
const [socket, setSocket] = useState<Socket | null>(null);
const [isConnected, setIsConnected] = useState(false);
const [readings, setReadings] = useState<SensorReading[]>([]);
const [latestReading, setLatestReading] = useState<SensorReading | null>(null);

useEffect(() => {
const socketInstance = io(process.env.NEXT_PUBLIC_APP_URL!, {
path: "/api/iot/ws",
});

socketInstance.on("connect", () => {
setIsConnected(true);
if (plantId) {
socketInstance.emit("subscribe", plantId);
}
});

socketInstance.on("disconnect", () => {
setIsConnected(false);
});

socketInstance.on("sensor:reading", (reading: SensorReading) => {
setLatestReading(reading);
setReadings((prev) => [...prev.slice(-99), reading]);
});

setSocket(socketInstance);

return () => {
socketInstance.disconnect();
};
}, [plantId]);

const subscribe = useCallback(
(newPlantId: string) => {
socket?.emit("subscribe", newPlantId);
},
[socket]
);

return {
isConnected,
readings,
latestReading,
subscribe,
};
}

4. Authentication

4.1 API Key Authentication

File: src/middleware/iot-auth.ts

import { NextRequest, NextResponse } from "next/server";

export function validateApiKey(request: NextRequest): boolean {
const apiKey = request.headers.get("x-api-key");
const validKeys = process.env.IOT_API_KEYS?.split(",") || [];
return apiKey !== null && validKeys.includes(apiKey);
}

export function withIoTAuth(handler: Function) {
return async (request: NextRequest, context: any) => {
if (!validateApiKey(request)) {
return NextResponse.json({ success: false, error: "Invalid API key" }, { status: 401 });
}
return handler(request, context);
};
}

4.2 Environment Variables

# .env.local
IOT_API_KEYS=key1,key2,key3
MQTT_BROKER_URL=mqtt://localhost:1883
INFLUXDB_URL=http://localhost:8086
INFLUXDB_TOKEN=your-token
INFLUXDB_ORG=Edubotx
INFLUXDB_BUCKET=sensors

5. Rate Limiting

5.1 Rate Limiter Middleware

import { Ratelimit } from "@upstash/ratelimit";
import { Redis } from "@upstash/redis";

const ratelimit = new Ratelimit({
redis: Redis.fromEnv(),
limiter: Ratelimit.slidingWindow(100, "1 m"), // 100 requests per minute
});

export async function checkRateLimit(identifier: string) {
const { success, limit, remaining, reset } = await ratelimit.limit(identifier);
return { success, limit, remaining, reset };
}

Next Steps


Last Updated: December 2024