Skip to main content

04 - Protocol Adapters

Implementation guides for MQTT, OPC-UA, Modbus, and other industrial protocols


Table of Contents

  1. Protocol Overview
  2. MQTT Implementation
  3. OPC-UA Implementation
  4. Modbus Implementation
  5. Protocol Bridge
  6. Data Transformation

1. Protocol Overview

Protocol Comparison

ProtocolLayerUse CaseLatencyComplexity
MQTTApplicationCloud/Edge commLowLow
OPC-UAApplicationIndustrial systemsMediumHigh
Modbus TCPApplicationPLCs, sensorsLowLow
Modbus RTUData LinkLegacy devicesLowLow
HTTP/RESTApplicationWeb servicesMediumLow
WebSocketApplicationReal-time UIVery LowMedium

Protocol Selection Flowchart

                        ┌─────────────────┐
│ What is your │
│ data source? │
└────────┬────────┘

┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Legacy │ │ Modern │ │ Cloud │
│ Sensors │ │ PLC/DCS │ │ Services │
└────┬─────┘ └────┬─────┘ └────┬─────┘
│ │ │
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Modbus │ │ OPC-UA │ │ REST/ │
│ RTU/TCP │ │ │ │ MQTT │
└──────────┘ └──────────┘ └──────────┘

2. MQTT Implementation

2.1 MQTT Client Service

// src/lib/iot/mqtt-client.ts

import mqtt, { MqttClient, IClientOptions } from "mqtt";
import { EventEmitter } from "events";

export interface SensorReading {
sensorId: string;
parameter: string;
value: number;
unit: string;
timestamp: Date;
quality: "good" | "uncertain" | "bad";
}

export interface MQTTConfig {
brokerUrl: string;
clientId: string;
username?: string;
password?: string;
topics: string[];
}

export class MQTTService extends EventEmitter {
private client: MqttClient | null = null;
private config: MQTTConfig;
private reconnectAttempts = 0;
private maxReconnectAttempts = 10;

constructor(config: MQTTConfig) {
super();
this.config = config;
}

async connect(): Promise<void> {
const options: IClientOptions = {
clientId: this.config.clientId,
username: this.config.username,
password: this.config.password,
clean: true,
reconnectPeriod: 5000,
connectTimeout: 30000,
};

return new Promise((resolve, reject) => {
this.client = mqtt.connect(this.config.brokerUrl, options);

this.client.on("connect", () => {
console.log("MQTT connected to:", this.config.brokerUrl);
this.reconnectAttempts = 0;
this.subscribeToTopics();
resolve();
});

this.client.on("message", (topic, message) => {
this.handleMessage(topic, message);
});

this.client.on("error", (error) => {
console.error("MQTT error:", error);
this.emit("error", error);
reject(error);
});

this.client.on("offline", () => {
console.warn("MQTT client offline");
this.emit("offline");
});

this.client.on("reconnect", () => {
this.reconnectAttempts++;
console.log(`MQTT reconnect attempt ${this.reconnectAttempts}`);
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
this.client?.end();
this.emit("maxReconnectsReached");
}
});
});
}

private subscribeToTopics(): void {
if (!this.client) return;

this.config.topics.forEach((topic) => {
this.client!.subscribe(topic, { qos: 1 }, (err) => {
if (err) {
console.error(`Failed to subscribe to ${topic}:`, err);
} else {
console.log(`Subscribed to: ${topic}`);
}
});
});
}

private handleMessage(topic: string, message: Buffer): void {
try {
const payload = JSON.parse(message.toString());
const reading = this.parseReading(topic, payload);
this.emit("reading", reading);
} catch (error) {
console.error("Failed to parse MQTT message:", error);
this.emit("parseError", { topic, message: message.toString(), error });
}
}

private parseReading(topic: string, payload: any): SensorReading {
// Parse topic: sensors/{plantId}/{sensorId}/{parameter}
const parts = topic.split("/");

return {
sensorId: payload.sensorId || parts[2] || "unknown",
parameter: payload.parameter || parts[3] || "unknown",
value: Number(payload.value),
unit: payload.unit || "",
timestamp: new Date(payload.timestamp || Date.now()),
quality: payload.quality || "good",
};
}

async publish(topic: string, message: object): Promise<void> {
if (!this.client?.connected) {
throw new Error("MQTT client not connected");
}

return new Promise((resolve, reject) => {
this.client!.publish(topic, JSON.stringify(message), { qos: 1, retain: false }, (error) => {
if (error) reject(error);
else resolve();
});
});
}

disconnect(): void {
if (this.client) {
this.client.end();
this.client = null;
}
}
}

// Usage example
export async function createMQTTClient(): Promise<MQTTService> {
const mqtt = new MQTTService({
brokerUrl: process.env.MQTT_BROKER_URL || "mqtt://localhost:1883",
clientId: `Edubotx-${Date.now()}`,
username: process.env.MQTT_USERNAME,
password: process.env.MQTT_PASSWORD,
topics: [
"sensors/+/+/pH",
"sensors/+/+/BOD",
"sensors/+/+/COD",
"sensors/+/+/TSS",
"sensors/+/+/TDS",
"sensors/+/+/flow_rate",
"sensors/+/+/temperature",
],
});

mqtt.on("reading", (reading: SensorReading) => {
console.log("Sensor reading:", reading);
// Forward to ML API or store in database
});

await mqtt.connect();
return mqtt;
}

2.2 MQTT Topic Structure

sensors/
├── {plantId}/
│ ├── {sensorId}/
│ │ ├── pH # Real-time pH value
│ │ ├── BOD # BOD reading
│ │ ├── COD # COD reading
│ │ ├── TSS # TSS reading
│ │ ├── TDS # TDS reading
│ │ ├── flow_rate # Flow rate
│ │ ├── temperature # Temperature
│ │ └── status # Sensor health status
│ └── alerts/ # Plant-level alerts
│ ├── threshold # Threshold exceeded
│ └── anomaly # Anomaly detected
└── system/
├── health # System health
└── config # Configuration updates

2.3 Message Payload Schema

// Standard sensor message payload
interface SensorMessage {
sensorId: string;
parameter: string;
value: number;
unit: string;
timestamp: string; // ISO 8601
quality: "good" | "uncertain" | "bad";
metadata?: {
plantId: string;
location: string;
calibrationDate?: string;
};
}

// Example message
const exampleMessage: SensorMessage = {
sensorId: "pH_001",
parameter: "pH",
value: 7.2,
unit: "pH",
timestamp: "2024-12-08T10:30:00Z",
quality: "good",
metadata: {
plantId: "plant_01",
location: "inlet",
calibrationDate: "2024-11-01",
},
};

3. OPC-UA Implementation

3.1 OPC-UA Client Service

// src/lib/iot/opcua-client.ts

import {
OPCUAClient,
MessageSecurityMode,
SecurityPolicy,
AttributeIds,
ClientSession,
ClientSubscription,
TimestampsToReturn,
MonitoringParametersOptions,
ReadValueIdOptions,
DataValue,
} from "node-opcua";
import { EventEmitter } from "events";

export interface OPCUAConfig {
endpointUrl: string;
securityMode: "None" | "Sign" | "SignAndEncrypt";
securityPolicy: "None" | "Basic256Sha256";
username?: string;
password?: string;
}

export interface NodeMapping {
nodeId: string;
parameter: string;
unit: string;
scaleFactor?: number;
}

export class OPCUAService extends EventEmitter {
private client: OPCUAClient | null = null;
private session: ClientSession | null = null;
private subscription: ClientSubscription | null = null;
private config: OPCUAConfig;
private nodeMappings: NodeMapping[] = [];

constructor(config: OPCUAConfig) {
super();
this.config = config;
}

async connect(): Promise<void> {
this.client = OPCUAClient.create({
applicationName: "Edubotx IoT Gateway",
connectionStrategy: {
initialDelay: 1000,
maxRetry: 10,
maxDelay: 10000,
},
securityMode: this.getSecurityMode(),
securityPolicy: this.getSecurityPolicy(),
endpointMustExist: false,
});

await this.client.connect(this.config.endpointUrl);
console.log("OPC-UA connected to:", this.config.endpointUrl);

// Create session
if (this.config.username && this.config.password) {
this.session = await this.client.createSession({
userName: this.config.username,
password: this.config.password,
});
} else {
this.session = await this.client.createSession();
}

console.log("OPC-UA session created");
}

private getSecurityMode(): MessageSecurityMode {
switch (this.config.securityMode) {
case "Sign":
return MessageSecurityMode.Sign;
case "SignAndEncrypt":
return MessageSecurityMode.SignAndEncrypt;
default:
return MessageSecurityMode.None;
}
}

private getSecurityPolicy(): SecurityPolicy {
switch (this.config.securityPolicy) {
case "Basic256Sha256":
return SecurityPolicy.Basic256Sha256;
default:
return SecurityPolicy.None;
}
}

async subscribeToNodes(mappings: NodeMapping[]): Promise<void> {
if (!this.session) {
throw new Error("OPC-UA session not established");
}

this.nodeMappings = mappings;

// Create subscription
this.subscription = ClientSubscription.create(this.session, {
requestedPublishingInterval: 1000,
requestedLifetimeCount: 100,
requestedMaxKeepAliveCount: 10,
maxNotificationsPerPublish: 100,
publishingEnabled: true,
priority: 10,
});

this.subscription.on("started", () => {
console.log("OPC-UA subscription started");
});

// Monitor each node
for (const mapping of mappings) {
await this.monitorNode(mapping);
}
}

private async monitorNode(mapping: NodeMapping): Promise<void> {
if (!this.subscription || !this.session) return;

const itemToMonitor: ReadValueIdOptions = {
nodeId: mapping.nodeId,
attributeId: AttributeIds.Value,
};

const parameters: MonitoringParametersOptions = {
samplingInterval: 1000,
discardOldest: true,
queueSize: 10,
};

const monitoredItem = await this.subscription.monitor(itemToMonitor, parameters, TimestampsToReturn.Both);

monitoredItem.on("changed", (dataValue: DataValue) => {
this.handleDataChange(mapping, dataValue);
});
}

private handleDataChange(mapping: NodeMapping, dataValue: DataValue): void {
let value = dataValue.value.value as number;

// Apply scale factor if defined
if (mapping.scaleFactor) {
value *= mapping.scaleFactor;
}

const reading = {
sensorId: mapping.nodeId,
parameter: mapping.parameter,
value: value,
unit: mapping.unit,
timestamp: dataValue.sourceTimestamp || new Date(),
quality: this.mapQuality(dataValue.statusCode.value),
};

this.emit("reading", reading);
}

private mapQuality(statusCode: number): "good" | "uncertain" | "bad" {
if (statusCode === 0) return "good";
if (statusCode < 0x40000000) return "uncertain";
return "bad";
}

async readNode(nodeId: string): Promise<DataValue> {
if (!this.session) {
throw new Error("OPC-UA session not established");
}

return await this.session.read({
nodeId: nodeId,
attributeId: AttributeIds.Value,
});
}

async disconnect(): Promise<void> {
if (this.subscription) {
await this.subscription.terminate();
}
if (this.session) {
await this.session.close();
}
if (this.client) {
await this.client.disconnect();
}
}
}

// Usage example
export async function createOPCUAClient(): Promise<OPCUAService> {
const opcua = new OPCUAService({
endpointUrl: process.env.OPCUA_ENDPOINT || "opc.tcp://localhost:4840",
securityMode: "None",
securityPolicy: "None",
});

await opcua.connect();

// Subscribe to water quality nodes
await opcua.subscribeToNodes([
{ nodeId: "ns=2;s=WaterQuality.pH", parameter: "pH", unit: "pH" },
{ nodeId: "ns=2;s=WaterQuality.BOD", parameter: "BOD", unit: "mg/L" },
{ nodeId: "ns=2;s=WaterQuality.COD", parameter: "COD", unit: "mg/L" },
{ nodeId: "ns=2;s=WaterQuality.TSS", parameter: "TSS", unit: "mg/L" },
{ nodeId: "ns=2;s=WaterQuality.TDS", parameter: "TDS", unit: "mg/L" },
{ nodeId: "ns=2;s=WaterQuality.FlowRate", parameter: "flow_rate", unit: "m³/hr" },
{ nodeId: "ns=2;s=WaterQuality.Temperature", parameter: "temperature", unit: "°C" },
]);

opcua.on("reading", (reading) => {
console.log("OPC-UA reading:", reading);
});

return opcua;
}

4. Modbus Implementation

4.1 Modbus TCP Client

// src/lib/iot/modbus-client.ts

import ModbusRTU from "modbus-serial";
import { EventEmitter } from "events";

export interface ModbusConfig {
host: string;
port: number;
unitId: number;
timeout: number;
}

export interface ModbusRegister {
address: number;
length: number;
parameter: string;
unit: string;
dataType: "int16" | "uint16" | "int32" | "uint32" | "float32";
scaleFactor: number;
}

export class ModbusService extends EventEmitter {
private client: ModbusRTU;
private config: ModbusConfig;
private registers: ModbusRegister[] = [];
private pollInterval: NodeJS.Timeout | null = null;
private isConnected = false;

constructor(config: ModbusConfig) {
super();
this.config = config;
this.client = new ModbusRTU();
}

async connect(): Promise<void> {
try {
await this.client.connectTCP(this.config.host, { port: this.config.port });
this.client.setID(this.config.unitId);
this.client.setTimeout(this.config.timeout);
this.isConnected = true;
console.log(`Modbus connected to ${this.config.host}:${this.config.port}`);
} catch (error) {
console.error("Modbus connection failed:", error);
throw error;
}
}

setRegisters(registers: ModbusRegister[]): void {
this.registers = registers;
}

startPolling(intervalMs: number = 1000): void {
if (this.pollInterval) {
clearInterval(this.pollInterval);
}

this.pollInterval = setInterval(async () => {
if (!this.isConnected) return;

for (const register of this.registers) {
try {
const value = await this.readRegister(register);
const reading = {
sensorId: `modbus_${register.address}`,
parameter: register.parameter,
value: value,
unit: register.unit,
timestamp: new Date(),
quality: "good" as const,
};
this.emit("reading", reading);
} catch (error) {
console.error(`Failed to read register ${register.address}:`, error);
this.emit("error", { register, error });
}
}
}, intervalMs);
}

private async readRegister(register: ModbusRegister): Promise<number> {
const data = await this.client.readHoldingRegisters(register.address, register.length);

let value: number;

switch (register.dataType) {
case "int16":
value = data.buffer.readInt16BE(0);
break;
case "uint16":
value = data.buffer.readUInt16BE(0);
break;
case "int32":
value = data.buffer.readInt32BE(0);
break;
case "uint32":
value = data.buffer.readUInt32BE(0);
break;
case "float32":
value = data.buffer.readFloatBE(0);
break;
default:
value = data.data[0];
}

return value * register.scaleFactor;
}

async writeRegister(address: number, value: number): Promise<void> {
if (!this.isConnected) {
throw new Error("Modbus not connected");
}
await this.client.writeRegister(address, value);
}

stopPolling(): void {
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
}

disconnect(): void {
this.stopPolling();
this.client.close(() => {
console.log("Modbus disconnected");
});
this.isConnected = false;
}
}

// Usage example
export async function createModbusClient(): Promise<ModbusService> {
const modbus = new ModbusService({
host: process.env.MODBUS_HOST || "192.168.1.100",
port: parseInt(process.env.MODBUS_PORT || "502"),
unitId: 1,
timeout: 5000,
});

await modbus.connect();

// Configure registers
modbus.setRegisters([
{ address: 40001, length: 2, parameter: "pH", unit: "pH", dataType: "float32", scaleFactor: 1 },
{ address: 40003, length: 2, parameter: "BOD", unit: "mg/L", dataType: "float32", scaleFactor: 1 },
{ address: 40005, length: 2, parameter: "COD", unit: "mg/L", dataType: "float32", scaleFactor: 1 },
{ address: 40007, length: 2, parameter: "TSS", unit: "mg/L", dataType: "float32", scaleFactor: 1 },
{ address: 40009, length: 2, parameter: "TDS", unit: "mg/L", dataType: "float32", scaleFactor: 1 },
{ address: 40011, length: 2, parameter: "flow_rate", unit: "m³/hr", dataType: "float32", scaleFactor: 1 },
{ address: 40013, length: 2, parameter: "temperature", unit: "°C", dataType: "float32", scaleFactor: 1 },
]);

modbus.on("reading", (reading) => {
console.log("Modbus reading:", reading);
});

modbus.startPolling(1000);

return modbus;
}

5. Protocol Bridge

5.1 Unified IoT Gateway

// src/lib/iot/iot-gateway.ts

import { EventEmitter } from "events";
import { MQTTService, SensorReading } from "./mqtt-client";
import { OPCUAService } from "./opcua-client";
import { ModbusService } from "./modbus-client";

export type ProtocolType = "mqtt" | "opcua" | "modbus";

export interface GatewayConfig {
mqtt?: {
enabled: boolean;
brokerUrl: string;
topics: string[];
};
opcua?: {
enabled: boolean;
endpointUrl: string;
nodes: Array<{ nodeId: string; parameter: string; unit: string }>;
};
modbus?: {
enabled: boolean;
host: string;
port: number;
registers: Array<{
address: number;
parameter: string;
unit: string;
dataType: string;
}>;
};
}

export class IoTGateway extends EventEmitter {
private mqttService: MQTTService | null = null;
private opcuaService: OPCUAService | null = null;
private modbusService: ModbusService | null = null;
private config: GatewayConfig;
private readingsBuffer: SensorReading[] = [];
private bufferFlushInterval: NodeJS.Timeout | null = null;

constructor(config: GatewayConfig) {
super();
this.config = config;
}

async initialize(): Promise<void> {
const initPromises: Promise<void>[] = [];

if (this.config.mqtt?.enabled) {
initPromises.push(this.initMQTT());
}

if (this.config.opcua?.enabled) {
initPromises.push(this.initOPCUA());
}

if (this.config.modbus?.enabled) {
initPromises.push(this.initModbus());
}

await Promise.all(initPromises);

// Start buffer flush interval
this.bufferFlushInterval = setInterval(() => {
this.flushBuffer();
}, 5000);

console.log("IoT Gateway initialized");
}

private async initMQTT(): Promise<void> {
this.mqttService = new MQTTService({
brokerUrl: this.config.mqtt!.brokerUrl,
clientId: `gateway-${Date.now()}`,
topics: this.config.mqtt!.topics,
});

this.mqttService.on("reading", (reading: SensorReading) => {
this.handleReading("mqtt", reading);
});

await this.mqttService.connect();
}

private async initOPCUA(): Promise<void> {
this.opcuaService = new OPCUAService({
endpointUrl: this.config.opcua!.endpointUrl,
securityMode: "None",
securityPolicy: "None",
});

await this.opcuaService.connect();

const mappings = this.config.opcua!.nodes.map((n) => ({
nodeId: n.nodeId,
parameter: n.parameter,
unit: n.unit,
}));

await this.opcuaService.subscribeToNodes(mappings);

this.opcuaService.on("reading", (reading: SensorReading) => {
this.handleReading("opcua", reading);
});
}

private async initModbus(): Promise<void> {
this.modbusService = new ModbusService({
host: this.config.modbus!.host,
port: this.config.modbus!.port,
unitId: 1,
timeout: 5000,
});

await this.modbusService.connect();

const registers = this.config.modbus!.registers.map((r) => ({
address: r.address,
length: 2,
parameter: r.parameter,
unit: r.unit,
dataType: r.dataType as any,
scaleFactor: 1,
}));

this.modbusService.setRegisters(registers);
this.modbusService.startPolling(1000);

this.modbusService.on("reading", (reading: SensorReading) => {
this.handleReading("modbus", reading);
});
}

private handleReading(source: ProtocolType, reading: SensorReading): void {
// Add source metadata
const enrichedReading = {
...reading,
source,
receivedAt: new Date(),
};

// Add to buffer
this.readingsBuffer.push(enrichedReading);

// Emit real-time event
this.emit("reading", enrichedReading);

// Check for alerts
this.checkAlerts(enrichedReading);
}

private checkAlerts(reading: SensorReading): void {
// Define thresholds
const thresholds: Record<string, { min: number; max: number }> = {
pH: { min: 6.5, max: 8.5 },
BOD: { min: 0, max: 30 },
COD: { min: 0, max: 250 },
TSS: { min: 0, max: 100 },
};

const threshold = thresholds[reading.parameter];
if (threshold) {
if (reading.value < threshold.min || reading.value > threshold.max) {
this.emit("alert", {
type: "threshold_exceeded",
reading,
threshold,
message: `${reading.parameter} value ${reading.value} out of range [${threshold.min}, ${threshold.max}]`,
});
}
}
}

private flushBuffer(): void {
if (this.readingsBuffer.length === 0) return;

const readings = [...this.readingsBuffer];
this.readingsBuffer = [];

this.emit("batch", readings);
}

async shutdown(): Promise<void> {
if (this.bufferFlushInterval) {
clearInterval(this.bufferFlushInterval);
}

this.flushBuffer();

if (this.mqttService) {
this.mqttService.disconnect();
}

if (this.opcuaService) {
await this.opcuaService.disconnect();
}

if (this.modbusService) {
this.modbusService.disconnect();
}

console.log("IoT Gateway shutdown complete");
}
}

6. Data Transformation

6.1 Transform to ML Model Input

// src/lib/iot/data-transformer.ts

import { SensorReading } from "./mqtt-client";
import { WastewaterInput, TreatmentInput } from "@/lib/ml-api";

export class DataTransformer {
private latestReadings: Map<string, SensorReading> = new Map();
private readingTimeout = 60000; // 1 minute

updateReading(reading: SensorReading): void {
this.latestReadings.set(reading.parameter, reading);
}

isDataComplete(requiredParams: string[]): boolean {
const now = Date.now();
return requiredParams.every((param) => {
const reading = this.latestReadings.get(param);
if (!reading) return false;
return now - reading.timestamp.getTime() < this.readingTimeout;
});
}

toPredictionInput(): WastewaterInput | null {
const requiredParams = ["flow_rate", "influent_BOD", "influent_COD", "influent_TSS", "influent_pH", "influent_TDS", "temperature"];

if (!this.isDataComplete(requiredParams)) {
return null;
}

return {
flow_rate: this.getValue("flow_rate"),
influent_BOD: this.getValue("influent_BOD") || this.getValue("BOD"),
influent_COD: this.getValue("influent_COD") || this.getValue("COD"),
influent_TSS: this.getValue("influent_TSS") || this.getValue("TSS"),
influent_pH: this.getValue("influent_pH") || this.getValue("pH"),
influent_TDS: this.getValue("influent_TDS") || this.getValue("TDS"),
aeration_rate: this.getValue("aeration_rate") || 35,
chemical_dose: this.getValue("chemical_dose") || 12,
sludge_recycle_rate: this.getValue("sludge_recycle_rate") || 25,
retention_time: this.getValue("retention_time") || 6,
temperature: this.getValue("temperature") || 25,
effluent_BOD_lag1: this.getValue("effluent_BOD_lag1") || 22,
};
}

toTreatmentInput(): TreatmentInput | null {
const requiredParams = ["pH", "TSS", "BOD", "COD", "TDS"];

if (!this.isDataComplete(requiredParams)) {
return null;
}

return {
pH: this.getValue("pH"),
TSS: this.getValue("TSS"),
Turbidity: this.getValue("Turbidity") || 45,
BOD: this.getValue("BOD"),
COD: this.getValue("COD"),
NH4_N: this.getValue("NH4_N") || 25,
Total_Nitrogen: this.getValue("Total_Nitrogen") || 40,
Phosphate: this.getValue("Phosphate") || 8,
Fecal_Coliform: this.getValue("Fecal_Coliform") || 5000,
Oil_Grease: this.getValue("Oil_Grease") || 15,
TDS: this.getValue("TDS"),
Heavy_Metals: this.getValue("Heavy_Metals") || 0.5,
};
}

private getValue(param: string): number {
const reading = this.latestReadings.get(param);
return reading?.value || 0;
}
}

Next Steps


Last Updated: December 2024