DBMS/Redis

[REDIS] ๐Ÿ“š Node.js ์—์„œ Pub/Sub ๊ธฐ๋Šฅ ๊ตฌํ˜„ํ•˜๊ธฐ

์ธํŒŒ_ 2022. 7. 13. 10:23

node-redis-pub-sub

Node-Redis Publish / Subscribe ๊ตฌํ˜„ (v4 )

Node.js์—์„œ Redis ๋ชจ๋“ˆ์„ ์ด์šฉํ•˜์—ฌ publish/subscribe ํ•˜๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ํฌ์ŠคํŒ… ํ•ด๋ณธ๋‹ค.

์šฐ์„  redis์˜ pub/sub ๊ธฐ๋ณธ ๊ธฐ๋Šฅ๊ณผ ๋”๋ถˆ์–ด ๋…ธ๋“œ์— redis ๋ชจ๋“ˆ ์„ค์น˜์™€ ์‚ฌ์šฉ๋ฒ•์— ๋Œ€ํ•ด์„œ ๋ชจ๋ฅด๋ฉด ๋‹ค์Œ ๊ธ€์„ ๋จผ์ € ์ •๋…ํ•˜๊ณ  ์˜ค๊ธธ ๊ถŒํ•œ๋‹ค.

 

[REDIS] ๐Ÿ“š PUB/SUB ๊ธฐ๋Šฅ (์ฑ„ํŒ… / ๊ตฌ๋… ์•Œ๋ฆผ)

Redis Publish / Subscribe Publish / Subscribe ๋ž€ ํŠน์ •ํ•œ ์ฃผ์ œ(topic)์— ๋Œ€ํ•˜์—ฌ ํ•ด๋‹น topic์„ ๊ตฌ๋…ํ•œ ๋ชจ๋‘์—๊ฒŒ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•˜๋Š” ํ†ต์‹  ๋ฐฉ๋ฒ•์œผ๋กœ ์ฑ„๋„์„ ๊ตฌ๋…ํ•œ ์ˆ˜์‹ ์ž(ํด๋ผ์ด์–ธํŠธ) ๋ชจ๋‘์—๊ฒŒ ๋ฉ”์„ธ์ง€๋ฅผ ์ „์†ก ํ•˜.

inpa.tistory.com

node-redis-pub-sub


Pub/Sub ํด๋ž˜์Šค ๊ตฌํ˜„

๊ตฌ๋…์ž(๊ฐ์ฒด) ๊ตฌ์กฐ ์ด๊ธฐ ๋•Œ๋ฌธ์— ํด๋ž˜์Šค๋กœ pub/sub์„ ๊ตฌํ˜„ํ•ด๋ณด์•˜๋‹ค.

ํ•ด๋‹น Redis ๊ตฌํ˜„ ์ฝ”๋“œ๋Š” ์ตœ์‹  ๋ฒ„์ ผ v4๋ฅผ ๊ธฐ์ค€์œผ๋กœ ์ž‘์„ฑ ๋˜์—ˆ๋‹ค.
v3์„ ์“ฐ๊ฑฐ๋‚˜ legacymode ์˜ต์…˜์„ ํ™œ์„ฑํ™”ํ•œ ์‚ฌ์šฉ์ž๋“ค์€ ๋™์ž‘๋˜์ง€ ์•Š๋Š”๋‹ค.
const express = require('express');
const redis = require('redis');

class Redis {
   constructor() {
      this.redisClient = redis.createClient();
      this.redisClient.on('connect', () => {
         console.info('Redis PubSub connected!');
      });
      this.redisClient.on('error', (err) => {
         console.error('Redis PubSub Client Error', err);
      });
      this.redisClient.connect().then(); // redis v4 ์—ฐ๊ฒฐ (๋น„๋™๊ธฐ)
   }
   
   // ์ด๋ฐ–์˜ ๋ช…๋ น์–ด ...
}

class PubSub extends Redis {
   constructor() {
      super();
   }
   
   async subscribe(channel) {
      await this.redisClient.subscribe(channel, (message) => {
         console.log('message : ', message);
      });
      console.log('์ฑ„๋„ ์—ฐ๊ฒฐ ์™„๋ฃŒ');
   }
   
   async unsubscribe(channel) {
      await this.redisClient.unsubscribe(channel);
   }
   
   async pSubscribe(channel) {
      await this.redisClient.pSubscribe(channel, (message, channel) => {
         console.log('channel : %s , message : %s', channel, message);
      });
      console.log('์ฑ„๋„(ํŒจํ„ด) ์—ฐ๊ฒฐ ์™„๋ฃŒ');
   }
   
   async pUnsubscribe(channel) {
      await this.redisClient.pUnsubscribe(channel);
   }
   
   async publish(channel, message) {
      await this.redisClient.publish(channel, message);
   }
}

const router = express.Router();
const subscriber = new PubSub(); // ๊ตฌ๋…์ž
const publisher = new PubSub(); // ๋ฐœํ–‰์ž

router.get('/sub', async (req, res, next) => {
   await subscriber.subscribe('me'); // ์ฑ„๋„ ์ƒ์„ฑ & ๊ตฌ๋…
   res.end();
});

router.get('/unsub', async (req, res, next) => {
   await subscriber.unsubscribe('me'); // ๊ตฌ๋… ํ•ด์ œ
   res.end();
});

router.get('/psub', async (req, res, next) => {
   await subscriber.pSubscribe('m*'); // ํŒจํ„ด ์ฑ„๋„ ์ƒ์„ฑ & ๊ตฌ๋…
   res.end();
});

router.get('/punsub', async (req, res, next) => {
   await subscriber.pUnsubscribe('m*'); // ํŒจํ„ด ๊ตฌ๋… ํ•ด์ œ
   res.end();
});

router.get('/pub', async (req, res, next) => {
   await publisher.publish('me', 'hello world'); // ์ฑ„๋„์— ๋ฉ”์„ธ์ง€ ์†ก์‹ 
   res.end();
});

module.exports = router;

Redis Cli ํ„ฐ๋ฏธ๋„์—์„œ pub/sub์„ ํ•˜๊ธฐ์œ„ํ•ด์„œ๋Š” subscriber ์ „์šฉ ํ„ฐ๋ฏธ๋„๊ณผ publisher ์ „์šฉ ํ„ฐ๋ฏธ๋„ 2๊ฐœ์ด์ƒ์ด ํ•„์š”ํ–ˆ์—ˆ๋‹ค.

ํ„ฐ๋ฏธ๋„์„ ์—ฌ๋Ÿฌ๊ฐœ ๋„์› ๋“ฏ์ด ์„œ๋ฒ„ ์ฝ”๋“œ์—์„œ๋„ subscriber ์ „์šฉ ํด๋ผ์ด์–ธํŠธ ์ฑ„๋„๊ณผ publisher ์ „์šฉ ์ฑ„๋„์ด ๊ฐ๊ฐ ํ•„์š”ํ•˜๋‹ค.

๊ทธ๋ž˜์„œ OOP(๊ฐ์ฒด์ง€ํ–ฅ) ๊ตฌ์กฐ๋กœ ์งฐ๊ณ , ๊ฐ ๊ฐ์ฒด๋งˆ๋‹ค ํด๋ž˜์Šค ์ƒ์„ฑ์ž๋กœ ๋…๋ฆฝ๋œ ๋ ˆ๋””์Šค ์—ฐ๊ฒฐ์„ ํ•ด์ฃผ๊ณ  ๊ตฌ๋…(sub) ํ•ด์ค€๋‹ค.

๊ทธ๋ฆฌ๊ณ  ๋‚˜์„œ, publisher ๊ฐ์ฒด ์ฑ„๋„์—์„œ publish api๋กœ me ์ฑ„๋„์— ๋ฉ”์„ธ์ง€๋ฅผ ๋ณด๋‚ด๋ฉด, subscriber์˜ me ์ฑ„๋„์— ์ฝ˜์†” ๋ฉ”์„ธ์ง€๊ฐ€ ์ฐํž ๊ฒƒ์ด๋‹ค.

 

[REST API ์ˆœ์„œ]

  1. localhost:80801/sub
  2. localhost:80801/psub
  3. localhost:80801/pub

node-redis-pub-sub

me ์ฑ„๋„๊ณผ m* ํŒจํ„ด ์ฑ„๋„์— ์—ฐ๊ฒฐํ–ˆ์œผ๋‹ˆ, me ์ฑ„๋„์— ๋ฉ”์„ธ์ง€๋ฅผ publish ํ•˜๋ฉด ๊ฐ ๋‘๊ฐœ์˜ ๊ตฌ๋… ์ฑ„๋„์—์„œ ๋ฉ”์„ธ์ง€๋ฅผ ์ˆ˜์‹  ๋ฐ›๋Š” ๊ฑธ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.