[REDIS] ๐ Node.js ์์ Pub/Sub ๊ธฐ๋ฅ ๊ตฌํํ๊ธฐ
Node-Redis Publish / Subscribe ๊ตฌํ (v4 )
Node.js์์ Redis ๋ชจ๋์ ์ด์ฉํ์ฌ publish/subscribe ํ๋ ๋ฐฉ๋ฒ์ ๋ํด ํฌ์คํ ํด๋ณธ๋ค.
์ฐ์ redis์ pub/sub ๊ธฐ๋ณธ ๊ธฐ๋ฅ๊ณผ ๋๋ถ์ด ๋ ธ๋์ redis ๋ชจ๋ ์ค์น์ ์ฌ์ฉ๋ฒ์ ๋ํด์ ๋ชจ๋ฅด๋ฉด ๋ค์ ๊ธ์ ๋จผ์ ์ ๋ ํ๊ณ ์ค๊ธธ ๊ถํ๋ค.
[REDIS] ๐ PUB/SUB ๊ธฐ๋ฅ (์ฑํ / ๊ตฌ๋ ์๋ฆผ)
Redis Publish / Subscribe Publish / Subscribe ๋ ํน์ ํ ์ฃผ์ (topic)์ ๋ํ์ฌ ํด๋น topic์ ๊ตฌ๋ ํ ๋ชจ๋์๊ฒ ๋ฉ์์ง๋ฅผ ๋ฐํํ๋ ํต์ ๋ฐฉ๋ฒ์ผ๋ก ์ฑ๋์ ๊ตฌ๋ ํ ์์ ์(ํด๋ผ์ด์ธํธ) ๋ชจ๋์๊ฒ ๋ฉ์ธ์ง๋ฅผ ์ ์ก ํ.
inpa.tistory.com
Pub/Sub ํด๋์ค ๊ตฌํ
๊ตฌ๋ ์(๊ฐ์ฒด) ๊ตฌ์กฐ ์ด๊ธฐ ๋๋ฌธ์ ํด๋์ค๋ก pub/sub์ ๊ตฌํํด๋ณด์๋ค.
ํด๋น Redis ๊ตฌํ ์ฝ๋๋ ์ต์ ๋ฒ์ ผ v4๋ฅผ ๊ธฐ์ค์ผ๋ก ์์ฑ ๋์๋ค.
v3์ ์ฐ๊ฑฐ๋ legacymode ์ต์ ์ ํ์ฑํํ ์ฌ์ฉ์๋ค์ ๋์๋์ง ์๋๋ค.
const express = require('express');
const redis = require('redis');
class Redis {
constructor() {
this.redisClient = redis.createClient();
this.redisClient.on('connect', () => {
console.info('Redis PubSub connected!');
});
this.redisClient.on('error', (err) => {
console.error('Redis PubSub Client Error', err);
});
this.redisClient.connect().then(); // redis v4 ์ฐ๊ฒฐ (๋น๋๊ธฐ)
}
// ์ด๋ฐ์ ๋ช
๋ น์ด ...
}
class PubSub extends Redis {
constructor() {
super();
}
async subscribe(channel) {
await this.redisClient.subscribe(channel, (message) => {
console.log('message : ', message);
});
console.log('์ฑ๋ ์ฐ๊ฒฐ ์๋ฃ');
}
async unsubscribe(channel) {
await this.redisClient.unsubscribe(channel);
}
async pSubscribe(channel) {
await this.redisClient.pSubscribe(channel, (message, channel) => {
console.log('channel : %s , message : %s', channel, message);
});
console.log('์ฑ๋(ํจํด) ์ฐ๊ฒฐ ์๋ฃ');
}
async pUnsubscribe(channel) {
await this.redisClient.pUnsubscribe(channel);
}
async publish(channel, message) {
await this.redisClient.publish(channel, message);
}
}
const router = express.Router();
const subscriber = new PubSub(); // ๊ตฌ๋
์
const publisher = new PubSub(); // ๋ฐํ์
router.get('/sub', async (req, res, next) => {
await subscriber.subscribe('me'); // ์ฑ๋ ์์ฑ & ๊ตฌ๋
res.end();
});
router.get('/unsub', async (req, res, next) => {
await subscriber.unsubscribe('me'); // ๊ตฌ๋
ํด์
res.end();
});
router.get('/psub', async (req, res, next) => {
await subscriber.pSubscribe('m*'); // ํจํด ์ฑ๋ ์์ฑ & ๊ตฌ๋
res.end();
});
router.get('/punsub', async (req, res, next) => {
await subscriber.pUnsubscribe('m*'); // ํจํด ๊ตฌ๋
ํด์
res.end();
});
router.get('/pub', async (req, res, next) => {
await publisher.publish('me', 'hello world'); // ์ฑ๋์ ๋ฉ์ธ์ง ์ก์
res.end();
});
module.exports = router;
Redis Cli ํฐ๋ฏธ๋์์ pub/sub์ ํ๊ธฐ์ํด์๋ subscriber ์ ์ฉ ํฐ๋ฏธ๋๊ณผ publisher ์ ์ฉ ํฐ๋ฏธ๋ 2๊ฐ์ด์์ด ํ์ํ์๋ค.
ํฐ๋ฏธ๋์ ์ฌ๋ฌ๊ฐ ๋์ ๋ฏ์ด ์๋ฒ ์ฝ๋์์๋ subscriber ์ ์ฉ ํด๋ผ์ด์ธํธ ์ฑ๋๊ณผ publisher ์ ์ฉ ์ฑ๋์ด ๊ฐ๊ฐ ํ์ํ๋ค.
๊ทธ๋์ OOP(๊ฐ์ฒด์งํฅ) ๊ตฌ์กฐ๋ก ์งฐ๊ณ , ๊ฐ ๊ฐ์ฒด๋ง๋ค ํด๋์ค ์์ฑ์๋ก ๋ ๋ฆฝ๋ ๋ ๋์ค ์ฐ๊ฒฐ์ ํด์ฃผ๊ณ ๊ตฌ๋ (sub) ํด์ค๋ค.
๊ทธ๋ฆฌ๊ณ ๋์, publisher ๊ฐ์ฒด ์ฑ๋์์ publish api๋ก me ์ฑ๋์ ๋ฉ์ธ์ง๋ฅผ ๋ณด๋ด๋ฉด, subscriber์ me ์ฑ๋์ ์ฝ์ ๋ฉ์ธ์ง๊ฐ ์ฐํ ๊ฒ์ด๋ค.
[REST API ์์]
- localhost:80801/sub
- localhost:80801/psub
- localhost:80801/pub
me ์ฑ๋๊ณผ m* ํจํด ์ฑ๋์ ์ฐ๊ฒฐํ์ผ๋, me ์ฑ๋์ ๋ฉ์ธ์ง๋ฅผ publish ํ๋ฉด ๊ฐ ๋๊ฐ์ ๊ตฌ๋ ์ฑ๋์์ ๋ฉ์ธ์ง๋ฅผ ์์ ๋ฐ๋ ๊ฑธ ๋ณผ ์ ์๋ค.