🔌 Creating a WebSocket Service
This guide walks you through creating a complete WebSocket service using the Nodeblocks SDK. We'll build three distinct WebSocket patterns: INCOMING (client → server), OUTGOING (server → client), and BIDIRECTIONAL (client ↔ server).
📦 Required Packages: This example uses several packages. MongoDB is required replica set for change streams. Make sure to install them:
npm install express rxjs ws ramda uuid dotenv cors @nodeblocks/backend-sdk
npm install @types/ws @types/uuid --save-dev
🏗️ Service Architecture
In order to build the WebSocket service, we will implement the following core components:
- WebSocket Routes - Define WebSocket endpoints with different flow patterns
- Service - Factory function that wires everything together
- Server Setup - WebSocket server configuration
1️⃣ Create WebSocket Routes
First, create a websocket.ts
file inside the src/routes
directory.
Here we define three distinct WebSocket routes that demonstrate all possible communication flows.
import { Subject, interval, timer } from 'rxjs';
import { filter, take } from 'rxjs/operators';
import { primitives, types } from '@nodeblocks/backend-sdk';
import { v4 as uuidv4 } from 'uuid';
const { WsRouteHandlerPayload } = types;
const { compose, withRoute, withSchema, notFromEmitter } = primitives;
// 📥 INCOMING ONLY: Clients send messages to server
export const incomingMessagesFeature = compose(
withSchema({
type: 'object',
properties: {
name: { type: 'string' },
message: { type: 'string' },
},
required: ['name'],
}),
withRoute({
handler: () => {
const subject = new Subject();
// INCOMING: Listen for messages from clients
subject.subscribe((data) => {
console.log('Message received from client:', data);
// Process the incoming message (save to DB, etc.)
// Note: We DON'T send anything back via subject.next()
});
return subject;
},
path: '/api/messages',
protocol: 'ws',
})
);
// 📤 OUTGOING ONLY: Server broadcasts data to clients
export const outgoingNotificationsFeature = compose(
withRoute({
handler: () => {
const subject = new Subject();
// OUTGOING: Send periodic notifications to clients
interval(5000).subscribe(() => {
subject.next({
type: 'server_notification',
message: 'Server heartbeat',
timestamp: Date.now()
});
});
// Note: We DON'T subscribe to incoming messages
return subject;
},
path: '/api/notifications',
protocol: 'ws',
})
);
// 📤 OUTGOING ONLY: Database change streams
export const databaseChangesFeature = compose(
withRoute({
handler: (request: WsRouteHandlerPayload) => {
const subject = new Subject();
// Watch for changes in the users collection
const changeStream = request.context.db.users.watch({
filter: {
age: { $gt: 18 } // Only watch users older than 18
},
});
changeStream.on('change', (data: any) => {
console.log('Change stream data', data);
// OUTGOING: Send data to ALL connected clients
subject.next({
timestamp: Date.now(),
type: 'user_change',
data
});
});
changeStream.on('error', (error: any) => {
console.error('Change stream error:', error);
subject.error(error);
});
// Cleanup when WebSocket closes
let subscription;
const cleanup = () => {
changeStream.close();
if (subscription) subscription.unsubscribe();
};
subscription = subject.subscribe({
complete: cleanup,
error: cleanup
});
return subject;
},
path: '/api/database-changes',
protocol: 'ws',
})
);
// 🔄 BIDIRECTIONAL: Chat system (both incoming and outgoing)
export const chatFeature = compose(
withSchema({
type: 'object',
properties: {
username: { type: 'string' },
message: { type: 'string' },
roomId: { type: 'string' },
},
required: ['username', 'message'],
}),
withRoute({
handler: (request: WsRouteHandlerPayload) => {
const uuid = uuidv4();
const subject = new Subject();
// INCOMING: Listen for messages from this client
subject.pipe(filter(notFromEmitter(uuid))).subscribe((data) => {
console.log('Chat message received:', data);
// OUTGOING: Broadcast message to all connected clients
subject.next({
emitterId: uuid,
type: 'chat_message',
username: data.username,
message: data.message,
timestamp: Date.now()
});
});
// OUTGOING: Send welcome message when client connects
timer(1000).pipe(take(1)).subscribe(() => {
subject.next({
emitterId: uuid,
type: 'welcome',
message: 'Welcome to the chat!',
timestamp: Date.now()
});
});
return subject;
},
path: '/api/chat',
protocol: 'ws',
})
);
2️⃣ Create Service
Create a websocket.ts
file inside the src/services
directory.
The service directly composes the routes into a complete WebSocket service.
import { partial } from 'ramda';
import { primitives } from '@nodeblocks/backend-sdk';
import {
incomingMessagesFeature,
outgoingNotificationsFeature,
databaseChangesFeature,
chatFeature,
} from '../routes/websocket';
const { compose, defService } = primitives;
export const websocketService: primitives.Service = (dataStores, config, wss) =>
defService((partial(compose(
incomingMessagesFeature, // 📥 INCOMING: Client messages
outgoingNotificationsFeature, // 📤 OUTGOING: Server notifications
databaseChangesFeature, // 📤 OUTGOING: Database changes
chatFeature // 🔄 BIDIRECTIONAL: Chat system
), [{dataStores: dataStores}])), wss);
3️⃣ Set Up Server
Create or update your index.ts
file in the src
directory.
import { createServer } from 'http';
import 'dotenv/config';
import express from 'express';
import { WebSocketServer } from 'ws';
import { middlewares, drivers, services } from '@nodeblocks/backend-sdk';
import cors from 'cors';
import { websocketService } from './services/websocket';
const { getMongoClient } = drivers;
const { nodeBlocksErrorMiddleware } = middlewares;
const { authService, userService } = services;
// Set up Express and WebSocket servers
const app = express();
const server = createServer(app);
const wss = new WebSocketServer({ server });
// Connect to MongoDB
const client = getMongoClient(process.env.MONGO_URI!, process.env.MONGO_DB_NAME!);
// Configure CORS
app.use(cors({
origin: '*',
methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH'],
allowedHeaders: ['*']
}));
// Add WebSocket service
app.use(
websocketService({
users: client.collection('users'),
},
{
authSecrets: {
authEncSecret: 'your-encryption-secret',
authSignSecret: 'your-signing-secret',
},
},
wss
)
);
// Add other services
const authSecrets = {
authEncSecret: 'your-encryption-secret',
authSignSecret: 'your-signing-secret',
};
app.use(
authService(
{
identities: client.collection('identity'),
onetimetokens: client.collection('onetimetokens'),
},
{
authSecrets,
maxFailedLoginAttempts: 5,
accessTokenExpireTime: '2h',
refreshTokenExpireTime: '2d',
},
{
mailService: {
sendMail: mailData => {
console.log('Auth email would be sent:', mailData);
return Promise.resolve(true);
},
},
}
)
);
app.use(
userService(
{
users: client.collection('users'),
identities: client.collection('identity'),
},
{
authSecrets,
}
)
);
// Error handling (must be last)
app.use(nodeBlocksErrorMiddleware());
// Start the server
const PORT = 8089;
server.listen(PORT, () => {
console.log(`🚀 Server running on port ${PORT}`);
console.log(`📡 WebSocket endpoints:`);
console.log(` 📥 ws://localhost:${PORT}/api/messages (INCOMING: send messages)`);
console.log(` 📤 ws://localhost:${PORT}/api/notifications (OUTGOING: receive notifications)`);
console.log(` 📤 ws://localhost:${PORT}/api/database-changes (OUTGOING: receive DB changes)`);
console.log(` 🔄 ws://localhost:${PORT}/api/chat (BIDIRECTIONAL: chat system)`);
});
4️⃣ Environment Setup
Create a .env
file in your project root:
MONGO_URI=mongodb://localhost:27017
MONGO_DB_NAME=your_app_database
🧪 Testing the Service
Test with Command Line
# Install wscat for command line testing
npm install -g wscat
# 📥 INCOMING: Send messages to server (you send, server receives)
wscat -c ws://localhost:8089/api/messages
> {"name": "John", "message": "Hello from client!"}
# 📤 OUTGOING: Receive notifications from server (server sends, you receive)
wscat -c ws://localhost:8089/api/notifications
# You'll see periodic server heartbeat messages
# 📤 OUTGOING: Receive database changes (server sends DB changes)
wscat -c ws://localhost:8089/api/database-changes
# You'll see database change notifications
# 🔄 BIDIRECTIONAL: Chat (both send and receive)
wscat -c ws://localhost:8089/api/chat
> {"username": "CLIUser", "message": "Hello chat!", "emitterId": "..."}
# You'll see your message echoed back and welcome message
Test with WebSocket Client
import WebSocket from 'ws';
// 📥 INCOMING: Send messages to server
const messageWs = new WebSocket('ws://localhost:8089/api/messages');
messageWs.on('open', () => {
console.log('📥 Connected for sending messages');
messageWs.send(JSON.stringify({
name: 'NodeJS Client',
message: 'Hello from Node.js!'
}));
});
// 📤 OUTGOING: Receive notifications from server
const notificationWs = new WebSocket('ws://localhost:8089/api/notifications');
notificationWs.on('open', () => {
console.log('📤 Connected to receive notifications');
});
notificationWs.on('message', (data) => {
console.log('📤 Notification received:', JSON.parse(data.toString()));
});
// 📤 OUTGOING: Receive database changes
const dbWs = new WebSocket('ws://localhost:8089/api/database-changes');
dbWs.on('open', () => {
console.log('📤 Connected to receive database changes');
});
dbWs.on('message', (data) => {
console.log('📤 Database change:', JSON.parse(data.toString()));
});
// 🔄 BIDIRECTIONAL: Chat system
const chatWs = new WebSocket('ws://localhost:8089/api/chat');
chatWs.on('open', () => {
console.log('🔄 Connected to chat');
chatWs.send(JSON.stringify({
username: 'NodeJS User',
message: 'Hello chat!'
}));
});
chatWs.on('message', (data) => {
console.log('🔄 Chat message:', JSON.parse(data.toString()));
});
➡️ Next Steps
Now you can practice by adding more functionality to your WebSocket service:
- Authentication - Add authentication validators to secure WebSocket connections
- Rate Limiting - Prevent WebSocket abuse with connection limits
- Message Persistence - Store WebSocket messages in the database
- Broadcasting - Send messages to specific groups of clients
🔗 Related Documentation
- Creating a Custom Service - Learn the basic service creation patterns
- Route Component - Understand route configuration options
- Authentication Service - Add user authentication to WebSockets