백엔드/NestJS

[NestJS]내장된 기능 활용해 마이크로서비스 구현하기 - 3. MQTT

SparkIT 2024. 11. 4. 12:15
[NestJS]내장된 기능 활용해 마이크로서비스 구현하기 - 1. 개요
[NestJS]내장된 기능 활용해 마이크로서비스 구현하기 - 2. TCP
[NestJS]내장된기능활용해마이크로서비스구현하기 - 3. MQTT
[NestJS]내장된 기능 활용해 마이크로서비스 구현하기 - 4. Kafka

 

 

Documentation | NestJS - A progressive Node.js framework

Nest is a framework for building efficient, scalable Node.js server-side applications. It uses progressive JavaScript, is built with TypeScript and combines elements of OOP (Object Oriented Programming), FP (Functional Programming), and FRP (Functional Rea

docs.nestjs.com

(기본적으로 저는 도커 컨테이너로 각 마이크로서비스를 구현하고 이들을 도커 컴포즈로 묶어서 실행하여 실습을 진행했습니다.)

전체 구조

실습은 위와 같은 구조를 이용했습니다. 먼저 HTTP 요청을 받아서 다른 마이크로서비스에 MQTT 요청을 보낼 수 있는 A 서버를 만들었습니다. 또 A 서버로부터 MQTT 요청을 받고 응답을 반환할 B 서버도 만들었습니다.

 

❓MQTT가 무슨 프로토콜인지 모르겠다면? --> https://sparkit.tistory.com/16

 

기본 설치

mqtt 프로토콜을 사용하기 위해 해당 패키지를 설치해야합니다.

npm i --save mqtt

 

 


MQTT 브로커 서버

# docker-compose.yml 파일 중 mqtt 브로커 설정 부분

	mqtt-broker:
        image: eclipse-mosquitto
        container_name: mqtt-broker
        ports:
            - "1883:1883"

위와 같이 간단한 mqtt 브로커 컨테이너를 실행하도록 설정했습니다. 또한 여기서 외부(다른 도커 컨테이너)에서 해당 컨테이너에 접속할 수 있게 하기 위해서는 해당 컨테이너에 존재하는 mosquitto.conf에 다음과 같은 코드를 추가했습니다.

listener 1883
allow_anonymous true

 

 


A 마이크로서비스 서버 (Client)

# main.ts

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  await app.listen(3000);
}
bootstrap();

HTTP 프로토콜 요청을 받기 위해 위와 같이 설정합니다.

 

# app.module.ts

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'TEST_SERVICE',
        transport: Transport.MQTT,
        options: {
          url: 'mqtt://mqtt-broker:1883'
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule { }

저는 각 마이크로서비스를 도커 컨테이너로 띄웠기 때문에, options에 들어가는 url(mqtt 브로커) 정보는 위와 같이 mqtt 브로커 컨테이너명(mqtt-broker)을 활용해 구성했습니다.

 

# app.controller.ts

import { Controller, Get, Query } from '@nestjs/common';
import { AppService } from './app.service';
import { Observable } from 'rxjs';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) { }

  @Get('test')
  test() {
    return this.appService.test();
  }
}

A 서버에서는 사용자로부터 HTTP 프로토콜 GET 요청을 받도록 설정했습니다.

 

# app.service.ts

import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { Observable } from 'rxjs';

@Injectable()
export class AppService {
  constructor(
    @Inject('TEST_SERVICE') private client: ClientProxy,
  ) { }

  test() {
    const pattern = 'test pattern';
    const payload = 'test payload';
    return this.client.send(pattern, payload);
  }
}

send 명령어를 이용해 mqtt 메시지를 전송하게 했습니다. 토픽은 pattern에 해당하는 'test pattern'이고 메시지 내용은 payload에 해당하는 'test payload'입니다.

 

 


B 마이크로서비스 서버

# main.ts

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
    transport: Transport.MQTT,
    options: {
      url: 'mqtt://mqtt-broker:1883',
      subscribeOptions: {
        qos: 2
      },
    },
  });
  await app.listen();
}
bootstrap();

B 서버는 MQTT 프로토콜만 사용해 통신할 것이기 때문에 위와 같이 설정했습니다. 마찬가지로 도커 컨테이너로 띄우져있는 MQTT 브로커에 연결하기 위해 도커 컨테이너명을 활용해 url 옵션을 설정했습니다. 또한 QoS 수준은 2로 설정해보았습니다. (QoS가 무엇인지 궁금하다면? --> https://sparkit.tistory.com/16)

 

# app.controller.ts

import { AppService } from './app.service';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @MessagePattern('test pattern')
    test() {
      return 'good ~';
  }
}

간단하게 테스트하기 위해 request-response 스타일을 활용해 @MessagePattern 데코레이터를 이용했습니다. 그리고 구독할 MQTT 토픽은 'test pattern' 으로 설정했습니다. 즉, MQTT 브로커에서 'test pattern'이라는 토픽을 구독한 것입니다.

 

 


실행 결과

A 서버에서 /test GET 요청을 보냈더니, 결과적으로 'good ~'이라는 문자열을 반환받을 수 있었습니다. 이는 A 서버에 들어간 HTTP(GET)요청이 처리되고 이후에 A 서버가 MQTT 브로커로 'test pattern'이라는 토픽에 'test payload'라는 메시지를 포함시켜 발행합니다. 그리고 MQTT 브로커는 이 토픽(test pattern)을 구독 중인 B 서버로 메시지를 전달합니다. 해당 전달을 통해 B 서버에서 return 'good ~';  코드가 실행됩니다.

이를 통해 MQTT 프로토콜과 request-response 스타일의 마이크로서비스 동작 방식에 대해 이해할 수 있었습니다.

 

 


'요청-응답' 대신 '이벤트 기반' 테스트해보기

TCP를 활용한 이전 글과 이번 MQTT 글에서는 모두 @MessagePattern을 활용한 request-response 스타일의 동작을 구현해보았습니다. 하지만 더 빠른 동작을 위해서는 응답을 기다리지 않는 비동기적 방식을 사용해야합니다. MQTT 프로토콜 사용 시 이런 비동기적 동작도 짧게 다뤄보겠습니다.

수정할 부분은 두 부분입니다.

# A 서버 app.service.ts

import { Inject, Injectable } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { Observable } from 'rxjs';

@Injectable()
export class AppService {
  constructor(
    @Inject('TEST_SERVICE') private client: ClientProxy,
  ) { }

  test() {
    const pattern = 'test pattern';
    const payload = 'test payload';
    // return this.client.send(pattern, payload);
    return this.client.emit(pattern, payload);
  }
}

다음과 같이 A 서버에서 B 서버에 요청을 보낼 때 send 메서드 대신 emit 메서드를 사용합니다.

 

# B 서버 app.controller.ts

import { AppService } from './app.service';
import { MessagePattern } from '@nestjs/microservices';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) {}

  @EventPattern('test pattern')
    test() {
      return 'good ~';
  }
}

A 서버의 요청을 받는 B 서버에서는 controller 부분의 데코레이터를 @MessagePattern --> @EventPattern으로 수정해줍니다. 이후 테스트를 진행하면 어떻게 될까요?

 

실행 결과

만약 똑같이 HTTP 프로토콜 GET 요청을 이용했다면 아무런 응답도 받지 못했을 것입니다. 그럼 잘못된 것일까요? 아닙니다. 실제 요청이 들어오는지 확인하기 위해 @EventPattern 로직에 브레이크포인트를 걸어서 디버깅을 진행하면 해당 부분까지 요청이 들어오는 모습을 확인할 수 있습니다. 그러면 왜 응답값이 없을까요? 바로 비동기적이기 때문입니다. client.emit으로 요청하면 이는 비동기적인 요청으로 응답을 기대하지 않습니다. 즉, 연결을 바로 끊어버리죠. 이로 인해 아무리 @EventPattern 로직에서 응답을 해줘도 응답을 받을 곳이 사라졌기 때문에 이런 현상이 발생한 것입니다.

즉, 이벤트 기반 아키텍처는 요청 이후 응답을 받아야 하는 구조에서는 사용하지 않고 요청 이후 응답없이 별개의 작업을 진행할 때 많이 사용되게 됩니다.