05 - API Integration
REST and WebSocket endpoints for IoT data ingestion and real-time communication
Table of Contents
1. API Architecture
Endpoint Overview
/api/iot/
├── ingest POST - Single sensor reading
├── batch POST - Batch sensor readings
├── status GET - Gateway health check
├── sensors GET - List registered sensors
├── sensors/:id GET - Sensor details
├── sensors/:id/history GET - Sensor historical data
├── predict POST - Trigger ML prediction
├── alerts GET - Active alerts
└── ws WS - Real-time data stream
2. REST Endpoints
2.1 Ingest Single Reading
File: src/app/api/iot/ingest/route.ts
import { NextRequest, NextResponse } from "next/server";
import { z } from "zod";
const SensorReadingSchema = z.object({
sensorId: z.string().min(1),
parameter: z.enum(["pH", "BOD", "COD", "TSS", "TDS", "flow_rate", "temperature", "DO"]),
value: z.number(),
unit: z.string(),
timestamp: z.string().datetime().optional(),
quality: z.enum(["good", "uncertain", "bad"]).default("good"),
metadata: z
.object({
plantId: z.string().optional(),
location: z.string().optional(),
})
.optional(),
});
export async function POST(request: NextRequest) {
try {
const body = await request.json();
const reading = SensorReadingSchema.parse(body);
const timestamp = reading.timestamp ? new Date(reading.timestamp) : new Date();
// Store in database
await writeReading({
measurement: "sensor_readings",
tags: {
sensor_id: reading.sensorId,
parameter: reading.parameter,
plant_id: reading.metadata?.plantId || "default",
},
fields: { value: reading.value, quality: reading.quality },
timestamp,
});
// Broadcast to WebSocket clients
broadcastReading({ ...reading, timestamp: timestamp.toISOString() });
return NextResponse.json(
{
success: true,
message: "Reading ingested successfully",
},
{ status: 201 }
);
} catch (error) {
if (error instanceof z.ZodError) {
return NextResponse.json(
{
success: false,
error: "Validation failed",
details: error.errors,
},
{ status: 400 }
);
}
return NextResponse.json({ success: false, error: "Internal error" }, { status: 500 });
}
}
2.2 Batch Ingest
File: src/app/api/iot/batch/route.ts
import { NextRequest, NextResponse } from "next/server";
import { z } from "zod";
const BatchSchema = z.object({
readings: z
.array(
z.object({
sensorId: z.string(),
parameter: z.string(),
value: z.number(),
unit: z.string(),
timestamp: z.string().datetime().optional(),
quality: z.enum(["good", "uncertain", "bad"]).default("good"),
})
)
.min(1)
.max(1000),
plantId: z.string().optional(),
});
export async function POST(request: NextRequest) {
try {
const { readings, plantId } = BatchSchema.parse(await request.json());
const points = readings.map((r) => ({
measurement: "sensor_readings",
tags: { sensor_id: r.sensorId, parameter: r.parameter, plant_id: plantId || "default" },
fields: { value: r.value, quality: r.quality },
timestamp: r.timestamp ? new Date(r.timestamp) : new Date(),
}));
await writeBatchReadings(points);
return NextResponse.json(
{
success: true,
message: `${readings.length} readings ingested`,
},
{ status: 201 }
);
} catch (error) {
return NextResponse.json({ success: false, error: "Failed" }, { status: 500 });
}
}
2.3 Trigger ML Prediction from Sensors
File: src/app/api/iot/predict/route.ts
import { NextRequest, NextResponse } from "next/server";
export async function POST(request: NextRequest) {
try {
const { plantId, model } = await request.json();
// Get latest sensor readings
const readings = await getLatestReadings(plantId);
// Transform to ML input format
const mlInput = transformToMLInput(readings);
// Call appropriate ML model
let result;
if (model === "prediction") {
result = await fetch(`${process.env.NEXT_PUBLIC_ML_API_URL}/api/predict/reusability`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(mlInput),
}).then((r) => r.json());
} else if (model === "treatment") {
result = await fetch(`${process.env.NEXT_PUBLIC_ML_API_URL}/api/predict/treatment`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(mlInput),
}).then((r) => r.json());
}
// Store prediction result
await storePrediction(plantId, model, result);
// Broadcast to dashboard
broadcastPrediction({ plantId, model, result, timestamp: new Date().toISOString() });
return NextResponse.json({ success: true, data: result });
} catch (error) {
return NextResponse.json({ success: false, error: "Prediction failed" }, { status: 500 });
}
}
function transformToMLInput(readings: Map<string, number>) {
return {
flow_rate: readings.get("flow_rate") || 150,
influent_BOD: readings.get("BOD") || 180,
influent_COD: readings.get("COD") || 350,
influent_TSS: readings.get("TSS") || 120,
influent_pH: readings.get("pH") || 7.5,
influent_TDS: readings.get("TDS") || 600,
aeration_rate: readings.get("aeration_rate") || 35,
chemical_dose: readings.get("chemical_dose") || 12,
sludge_recycle_rate: 25,
retention_time: 6,
temperature: readings.get("temperature") || 25,
effluent_BOD_lag1: 22,
};
}
2.4 Gateway Status
File: src/app/api/iot/status/route.ts
import { NextResponse } from "next/server";
export async function GET() {
const health = {
status: "healthy",
timestamp: new Date().toISOString(),
services: {
database: await checkDatabaseHealth(),
mqtt: await checkMQTTHealth(),
websocket: getWebSocketStats(),
},
uptime: process.uptime(),
};
return NextResponse.json(health);
}
3. WebSocket Server
3.1 WebSocket Service
File: src/lib/iot/websocket-server.ts
import { Server as SocketIOServer } from "socket.io";
import { Server as HTTPServer } from "http";
let io: SocketIOServer | null = null;
export function initWebSocket(server: HTTPServer) {
io = new SocketIOServer(server, {
cors: {
origin: process.env.NEXT_PUBLIC_APP_URL,
methods: ["GET", "POST"],
},
path: "/api/iot/ws",
});
io.on("connection", (socket) => {
console.log("Client connected:", socket.id);
// Join plant-specific room
socket.on("subscribe", (plantId: string) => {
socket.join(`plant:${plantId}`);
console.log(`${socket.id} subscribed to plant:${plantId}`);
});
// Leave room
socket.on("unsubscribe", (plantId: string) => {
socket.leave(`plant:${plantId}`);
});
socket.on("disconnect", () => {
console.log("Client disconnected:", socket.id);
});
});
return io;
}
export function broadcastReading(reading: any) {
if (!io) return;
const plantId = reading.metadata?.plantId || "default";
io.to(`plant:${plantId}`).emit("sensor:reading", reading);
io.emit("sensor:reading:global", reading);
}
export function broadcastAlert(alert: any) {
if (!io) return;
io.emit("alert", alert);
}
export function broadcastPrediction(prediction: any) {
if (!io) return;
io.to(`plant:${prediction.plantId}`).emit("prediction", prediction);
}
export function getWebSocketStats() {
if (!io) return { connections: 0 };
return {
connections: io.engine.clientsCount,
};
}
3.2 Client-Side Hook
File: src/hooks/useIoTSocket.ts
import { useEffect, useState, useCallback } from "react";
import { io, Socket } from "socket.io-client";
interface SensorReading {
sensorId: string;
parameter: string;
value: number;
timestamp: string;
}
export function useIoTSocket(plantId?: string) {
const [socket, setSocket] = useState<Socket | null>(null);
const [isConnected, setIsConnected] = useState(false);
const [readings, setReadings] = useState<SensorReading[]>([]);
const [latestReading, setLatestReading] = useState<SensorReading | null>(null);
useEffect(() => {
const socketInstance = io(process.env.NEXT_PUBLIC_APP_URL!, {
path: "/api/iot/ws",
});
socketInstance.on("connect", () => {
setIsConnected(true);
if (plantId) {
socketInstance.emit("subscribe", plantId);
}
});
socketInstance.on("disconnect", () => {
setIsConnected(false);
});
socketInstance.on("sensor:reading", (reading: SensorReading) => {
setLatestReading(reading);
setReadings((prev) => [...prev.slice(-99), reading]);
});
setSocket(socketInstance);
return () => {
socketInstance.disconnect();
};
}, [plantId]);
const subscribe = useCallback(
(newPlantId: string) => {
socket?.emit("subscribe", newPlantId);
},
[socket]
);
return {
isConnected,
readings,
latestReading,
subscribe,
};
}
4. Authentication
4.1 API Key Authentication
File: src/middleware/iot-auth.ts
import { NextRequest, NextResponse } from "next/server";
export function validateApiKey(request: NextRequest): boolean {
const apiKey = request.headers.get("x-api-key");
const validKeys = process.env.IOT_API_KEYS?.split(",") || [];
return apiKey !== null && validKeys.includes(apiKey);
}
export function withIoTAuth(handler: Function) {
return async (request: NextRequest, context: any) => {
if (!validateApiKey(request)) {
return NextResponse.json({ success: false, error: "Invalid API key" }, { status: 401 });
}
return handler(request, context);
};
}
4.2 Environment Variables
# .env.local
IOT_API_KEYS=key1,key2,key3
MQTT_BROKER_URL=mqtt://localhost:1883
INFLUXDB_URL=http://localhost:8086
INFLUXDB_TOKEN=your-token
INFLUXDB_ORG=Edubotx
INFLUXDB_BUCKET=sensors
5. Rate Limiting
5.1 Rate Limiter Middleware
import { Ratelimit } from "@upstash/ratelimit";
import { Redis } from "@upstash/redis";
const ratelimit = new Ratelimit({
redis: Redis.fromEnv(),
limiter: Ratelimit.slidingWindow(100, "1 m"), // 100 requests per minute
});
export async function checkRateLimit(identifier: string) {
const { success, limit, remaining, reset } = await ratelimit.limit(identifier);
return { success, limit, remaining, reset };
}
Next Steps
- Proceed to 06-EDGE-DEPLOYMENT.md for edge computing setup
- Review 07-DATA-PIPELINE.md for streaming architecture
Last Updated: December 2024