🔌 WebSocket サービスの作成
このガイドでは、Nodeblocks SDK を使用して完全な WebSocket サービスを作成する手順を説明します。INCOMING(クライアント → サーバー)、OUTGOING(サーバー → クライアント)、BIDIRECTIONAL(クライアント ↔ サーバー)の3つの異なる WebSocket パターンを構築します。
📦 必要なパッケージ: この例では複数のパッケージを使用します。MongoDB は変更ストリーム用にレプリカセットが必要です。必ずインストールしてください:
npm install express rxjs ws ramda uuid dotenv cors @nodeblocks/backend-sdk
npm install @types/ws @types/uuid --save-dev
🏗️ サービスアーキテクチャ
WebSocket サービスを構築するために、以下のコアコンポーネントを実装します:
- WebSocket ルート - 異なるフローパターンを持つ WebSocket エンドポイントを定義
- サービス - すべてをまとめるファクトリー関数
- サーバーセットアップ - WebSocket サーバー設定
1️⃣ WebSocket ルートの作成
まず、src/routes ディレクトリ内に websocket.ts ファイルを作成します。
ここでは、すべての可能な通信フローを示す3つの異なる WebSocket ルートを定義します。
src/routes/websocket.ts
import { Subject, interval, timer } from 'rxjs';
import { filter, take } from 'rxjs/operators';
import { primitives, types } from '@nodeblocks/backend-sdk';
import { v4 as uuidv4 } from 'uuid';
const { compose, withRoute, withSchema, notFromEmitter } = primitives;
// 📥 INCOMING ONLY: クライアントがサーバーにメッセージを送信
export const incomingMessagesFeature = compose(
withSchema({
type: 'object',
properties: {
name: { type: 'string' },
message: { type: 'string' },
},
required: ['name'],
}),
withRoute({
handler: () => {
const subject = new Subject();
// INCOMING: クライアントからのメッセージをリッスン
subject.subscribe((data) => {
console.log('クライアントからメッセージを受信:', data);
// 受信メッセージを処理(DB に保存など)
// 注意: subject.next() で何も送信しない
});
return subject;
},
path: '/api/messages',
protocol: 'ws',
})
);
// 📤 OUTGOING ONLY: サーバーがクライアントにデータをブロードキャスト
export const outgoingNotificationsFeature = compose(
withRoute({
handler: () => {
const subject = new Subject();
// OUTGOING: クライアントに定期的な通知を送信
interval(5000).subscribe(() => {
subject.next({
type: 'server_notification',
message: 'Server heartbeat',
timestamp: Date.now()
});
});
// 注意: 受信メッセージをサブスクライブしない
return subject;
},
path: '/api/notifications',
protocol: 'ws',
})
);
// 📤 OUTGOING ONLY: データベース変更ストリーム
export const databaseChangesFeature = compose(
withRoute({
handler: (request: primitives.WsRouteHandlerPayload) => {
const subject = new Subject();
// users コレクションの変更を監視
const changeStream = request.context.db.users.watch({
filter: {
age: { $gt: 18 } // 18歳以上のユーザーのみ監視
},
});
changeStream.on('change', (data: any) => {
console.log('変更ストリームデータ', data);
// OUTGOING: 接続されているすべてのクライアントにデータを送信
subject.next({
timestamp: Date.now(),
type: 'user_change',
data
});
});
changeStream.on('error', (error: any) => {
console.error('変更ストリームエラー:', error);
subject.error(error);
});
// WebSocket が閉じたときにクリーンアップ
let subscription;
const cleanup = () => {
changeStream.close();
if (subscription) subscription.unsubscribe();
};
subscription = subject.subscribe({
complete: cleanup,
error: cleanup
});
return subject;
},
path: '/api/database-changes',
protocol: 'ws',
})
);
// 🔄 BIDIRECTIONAL: チャットシステム(受信と送信の両方)
export const chatFeature = compose(
withSchema({
type: 'object',
properties: {
username: { type: 'string' },
message: { type: 'string' },
roomId: { type: 'string' },
},
required: ['username', 'message'],
}),
withRoute({
handler: (request: primitives.WsRouteHandlerPayload) => {
const uuid = uuidv4();
const subject = new Subject();
// INCOMING: このクライアントからのメッセージをリッスン
subject.pipe(filter(notFromEmitter(uuid))).subscribe((data) => {
console.log('チャットメッセージを受信:', data);
// OUTGOING: 接続されているすべてのクライアントにメッセージをブロードキャスト
subject.next({
emitterId: uuid,
type: 'chat_message',
username: data.username,
message: data.message,
timestamp: Date.now()
});
});
// OUTGOING: クライアントが接続したときにウェルカムメッセージを送信
timer(1000).pipe(take(1)).subscribe(() => {
subject.next({
emitterId: uuid,
type: 'welcome',
message: 'Welcome to the chat!',
timestamp: Date.now()
});
});
return subject;
},
path: '/api/chat',
protocol: 'ws',
})
);
2️⃣ サービスの作成
src/services ディレクトリ内に websocket.ts ファイルを作成します。
サービスはルートを直接構成して、完全な WebSocket サービスにまとめます。
src/services/websocket.ts
import { partial } from 'ramda';
import { primitives } from '@nodeblocks/backend-sdk';
import {
incomingMessagesFeature,
outgoingNotificationsFeature,
databaseChangesFeature,
chatFeature,
} from '../routes/websocket';
const { compose, defService } = primitives;
export const websocketService: primitives.Service = (dataStores, config, wss) =>
defService((partial(compose(
incomingMessagesFeature, // 📥 INCOMING: クライアントメッセージ
outgoingNotificationsFeature, // 📤 OUTGOING: サーバー通知
databaseChangesFeature, // 📤 OUTGOING: データベース変更
chatFeature // 🔄 BIDIRECTIONAL: チャットシステム
), [{dataStores: dataStores}])), wss);
3️⃣ サーバーのセットアップ
src ディレクトリ内の index.ts ファイルを作成または更新します。
src/index.ts
import { createServer } from 'http';
import 'dotenv/config';
import express from 'express';
import { WebSocketServer } from 'ws';
import { middlewares, drivers, services } from '@nodeblocks/backend-sdk';
import cors from 'cors';
import { websocketService } from './services/websocket';
const { withMongo } = drivers;
const { nodeBlocksErrorMiddleware } = middlewares;
const { authService, userService } = services;
// Express と WebSocket サーバーをセットアップ
const app = express();
const server = createServer(app);
const wss = new WebSocketServer({ server });
// MongoDB に接続
const connectToDatabase = withMongo(process.env.MONGO_URI!, process.env.MONGO_DB_NAME!, 'user', 'password');
// CORS を設定
app.use(cors({
origin: '*',
methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS', 'PATCH'],
allowedHeaders: ['*']
}));
// WebSocket サービスを追加
app.use(
websocketService(
await connectToDatabase('users'),
},
{
authSecrets: {
authEncSecret: 'your-encryption-secret',
authSignSecret: 'your-signing-secret',
},
},
wss
)
);
// 他のサービスを追加
const authSecrets = {
authEncSecret: 'your-encryption-secret',
authSignSecret: 'your-signing-secret',
};
app.use(
authService(
{
...(await connectToDatabase('identities')),
...(await connectToDatabase('onetimetokens')),
},
{
authSecrets,
maxFailedLoginAttempts: 5,
accessTokenExpireTime: '2h',
refreshTokenExpireTime: '2d',
identity: {
typeIds: {
admin: '100',
guest: '000',
regular: '010',
},
},
},
{
mailService: {
sendMail: mailData => {
console.log('Auth email would be sent:', mailData);
return Promise.resolve(true);
},
},
}
)
);
app.use(
userService(
{
...(await connectToDatabase('users')),
...(await connectToDatabase('identities')),
...(await connectToDatabase('organizations')),
...(await connectToDatabase('products')),
},
{
authSecrets,
identity: {
typeIds: {
admin: '100',
guest: '000',
regular: '010',
},
},
}
)
);
// エラーハンドリング(最後に配置する必要があります)
app.use(nodeBlocksErrorMiddleware());
// サーバーを起動
const PORT = 8089;
server.listen(PORT, () => {
console.log(`🚀 Server running on port ${PORT}`);
console.log(`📡 WebSocket endpoints:`);
console.log(` 📥 ws://localhost:${PORT}/api/messages (INCOMING: メッセージを送信)`);
console.log(` 📤 ws://localhost:${PORT}/api/notifications (OUTGOING: 通知を受信)`);
console.log(` 📤 ws://localhost:${PORT}/api/database-changes (OUTGOING: DB 変更を受信)`);
console.log(` 🔄 ws://localhost:${PORT}/api/chat (BIDIRECTIONAL: チャットシステム)`);
});
4️⃣ 環境設定
プロジェクトルートに .env ファイルを作成します:
.env
MONGO_URI=mongodb://localhost:27017
MONGO_DB_NAME=your_app_database
🧪 サービスのテスト
コマンドラインでテスト
# コマンドラインテスト用に wscat をインストール
npm install -g wscat
# 📥 INCOMING: サーバーにメッセージを送信(送信側、サーバーが受信)
wscat -c ws://localhost:8089/api/messages
> {"name": "John", "message": "Hello from client!"}
# 📤 OUTGOING: サーバーから通知を受信(サーバーが送信、受信側)
wscat -c ws://localhost:8089/api/notifications
# 定期的なサーバーハートビートメッセージが表示されます
# 📤 OUTGOING: データベース変更を受信(サーバーが DB 変更を送信)
wscat -c ws://localhost:8089/api/database-changes
# データベース変更通知が表示されます
# 🔄 BIDIRECTIONAL: チャット(送信と受信の両方)
wscat -c ws://localhost:8089/api/chat
> {"username": "CLIUser", "message": "Hello chat!", "emitterId": "..."}
# メッセージがエコーされ、ウェルカムメッセージが表示されます
WebSocket クライアントでテスト
test-client.js
import WebSocket from 'ws';
// 📥 INCOMING: サーバーにメッセージを送信
const messageWs = new WebSocket('ws://localhost:8089/api/messages');
messageWs.on('open', () => {
console.log('📥 メッセージ送信用に接続');
messageWs.send(JSON.stringify({
name: 'NodeJS Client',
message: 'Hello from Node.js!'
}));
});
// 📤 OUTGOING: サーバーから通知を受信
const notificationWs = new WebSocket('ws://localhost:8089/api/notifications');
notificationWs.on('open', () => {
console.log('📤 通知受信用に接続');
});
notificationWs.on('message', (data) => {
console.log('📤 通知を受信:', JSON.parse(data.toString()));
});
// 📤 OUTGOING: データベース変更を受信
const dbWs = new WebSocket('ws://localhost:8089/api/database-changes');
dbWs.on('open', () => {
console.log('📤 データベース変更受信用に接続');
});
dbWs.on('message', (data) => {
console.log('📤 データベース変更:', JSON.parse(data.toString()));
});
// 🔄 BIDIRECTIONAL: チャットシステム
const chatWs = new WebSocket('ws://localhost:8089/api/chat');
chatWs.on('open', () => {
console.log('🔄 チャットに接続');
chatWs.send(JSON.stringify({
username: 'NodeJS User',
message: 'Hello chat!'
}));
});
chatWs.on('message', (data) => {
console.log('🔄 チャットメッセージ:', JSON.parse(data.toString()));
});
➡️ 次のステップ
WebSocket サービスにさらに機能を追加して練習できます:
- 認証 - WebSocket 接続を保護するために認証バリデーターを追加
- レート制限 - 接続制限で WebSocket の悪用を防止
- メッセージ永続化 - WebSocket メッセージをデータベースに保存
- ブロードキャスト - 特定のクライアントグループにメッセージを送信
🔗 関連ドキュメント
- カスタムサービスの作成 - 基本的なサービス作成パターンを学習
- Route Component - ルート設定オプションを理解
- Authentication Service - WebSocket にユーザー認証を追加