백엔드/NestJS

[NestJS]내장된 기능 활용해 마이크로서비스 구현하기 - 4. Kafka

SparkIT 2024. 11. 4. 21:15
[NestJS]내장된 기능 활용해 마이크로서비스 구현하기 - 1. 개요
[NestJS]내장된 기능 활용해 마이크로서비스 구현하기 - 2. TCP
[NestJS]내장된기능활용해마이크로서비스구현하기 - 3. MQTT
[NestJS]내장된 기능 활용해 마이크로서비스 구현하기 - 4. Kafka

 

 

Documentation | NestJS - A progressive Node.js framework

Nest is a framework for building efficient, scalable Node.js server-side applications. It uses progressive JavaScript, is built with TypeScript and combines elements of OOP (Object Oriented Programming), FP (Functional Programming), and FRP (Functional Rea

docs.nestjs.com

(기본적으로 저는 도커 컨테이너로 각 마이크로서비스를 구현하고 이들을 도커 컴포즈로 묶어서 실행하여 실습을 진행했습니다.)

개인적으로는 해당 방법이 마이크로서비스 간 통신을 구현하기에 가장 좋다고 생각했습니다. 주요한 이유는 다음과 같습니다.

  1. 대용량 데이터 처리 용이
    메시지 큐 시스템 중 많은 데이터를 가장 빠르게 처리할 수 있는 것으로 알려져 있습니다.
  2. 메시지 안정성 수준 설정 가능
    메시지 송신 및 수신이 유실이 치명적인 시스템에서는 더 안정적인 시스템(ex. RabbitMQ)을 사용하는 것이 좋다고 생각했습니다. 하지만 kafka도 3.x 버전 이후에는 송수신을 보장하기 위한 옵션이 추가되어 충분히 안정적인 시스템에도 사용할 수 있다고 생각했습니다.
  3. 확장성
    새로운 브로커를 추가하여 클러스터를 쉽게 확장할 수 있어 고가용성 및 성능 극대화에 최적화되어 있습니다.

 

❓Kafka가 궁금하다면? --> https://sparkit.tistory.com/19

 

 


request-response  🆚  event-based

NestJS 카프카 마이크로서비스에서 사용되는 요청-응답 스타일과 이벤트 기반 메시지 스타일 차이에 대해 설명드리겠습니다.

요청-응답 메시지 스타일

  • @MessagePattern
    클라이언트가 특정 요청을 보낼 때, 서버가 그 요청을 처리하고 응답을 반환하는 구조입니다.
  • (ClientKafka) send 메서드
    이 메서드는 특정 패턴에 대한 메시지를 전송하고, 응답을 기다리는 데 사용됩니다. 요청을 보내고 나면, 서버가 해당 요청을 처리하여 응답을 반환합니다.
  • 응답 토픽 구독 필요
    요청-응답 스타일에서는 클라이언트가 특정 요청을 보내고, 해당 요청에 대한 응답을 기다리기 때문에, 클라이언트는 그 응답을 받기 위해 응답 주제에 구독해야 합니다. 예를 들어, 클라이언트는 요청에 대한 응답을 받기 위해 this.client.subscribeToResponseOf('응답패턴')를 사용하여 해당 응답 패턴에 구독해야 합니다. 이 작업은 클라이언트가 서버의 응답을 받을 수 있도록 보장합니다.

이벤트 기반 메시지 스타일

  • @EventPattern
    클라이언트가 특정 이벤트를 발생시키고, 서버가 그 이벤트에 반응하여 처리하는 구조입니다.
  • (ClientKafka) emit 메서드
    이 메서드는 특정 이벤트를 발생시키며, 일반적으로 응답이 필요 없는 경우에 사용됩니다. 서버는 이벤트를 수신하고 처리하지만, 클라이언트는 응답을 기다리지 않습니다.
  • 응답 토픽 구독 불필요
    이벤트 기반 통신에서는 클라이언트가 응답을 기다리지 않기 때문에, 응답 주제에 구독할 필요가 없습니다. 클라이언트는 이벤트를 발생시키기만 하고, 서버는 그 이벤트를 수신하여 처리합니다.

 

간단하게 생각하면 특정 요청에 대한 후처리가 필요한 경우 @MessagePattern 데코레이터와 send 메서드를 사용해 request-response 스타일로 구현하고, 그렇지 않고 단순 요청 이후 해당 요청이 어떻게 처리되던 상관하지 않을 경우 @EventPattern 데코레이터와 emit 메서드를 사용해 event-based 스타일로 구현하면 될 것 같습니다.

 

 


요청-응답 메시지 스타일

 

기본 설치

npm i --save kafkajs

우선 kafka를 사용하기 위해 해당 패키지를 설치해야 합니다.

 

# A서버 main.ts

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  await app.listen(3000);
}
bootstrap();

HTTP 프로토콜 요청을 받기 위해 위와 같이 설정합니다. 포트는 3000번 포트를 열어줬습니다.

 

A 서버

# A서버 app.module.ts

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'TEST_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'test-client',
            brokers: ['kafka:9092'],
          },
          consumer: {
            groupId: 'test-consumer'
          }
        },
      },
    ]),
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule { }

저는 각 마이크로서비스를 도커 컨테이너로 띄웠기 때문에, options에 들어가는 client의 brokers는 kafka 도커 컨테이너명('kafka')을 이용해 구성했습니다.

 

# A서버 app.controller.ts

import { Controller, Get, Query } from '@nestjs/common';
import { AppService } from './app.service';

@Controller()
export class AppController {
  constructor(private readonly appService: AppService) { }

  @Get('test')
  test() {
    return this.appService.test();
  }
}
# A서버 app.service.ts

import { Inject, Injectable } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { firstValueFrom } from 'rxjs';

@Injectable()
export class AppService {
  constructor(
    @Inject('TEST_SERVICE') private client: ClientKafka,
  ) { }

  async onModuleInit() {
    // 특정 이벤트에 대한 응답을 구독
    this.client.subscribeToResponseOf('test.pattern');
    await this.client.connect();
  }

  async test() {
    const pattern = 'test.pattern';
    const payload = 'test.payload';
    const kafkaResponse = await firstValueFrom(this.client.send(pattern, payload));
    return '카프카반환결과 : ' + kafkaResponse;
  }
}

서비스 코드에서 확인할 부분이 있습니다. 지금 코드는 요청-응답 스타일을 실습하는 것이므로 ClientKafka의 send 메서드를 사용했습니다. 그리고 이때 응답에 대한 토픽을 구독해야 응답을 받을 수 있기 때문에 subscribeToResponseOf 메서드를 사용했습니다. 또한 요청에 사용할 토픽은 'test.pattern'입니다.

 

 

B-1 서버

# B-1서버 main.ts

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: 'test-client',
        brokers: ['kafka:9092'],
      },
      consumer: {
        groupId: 'test-consumer'
      }
    },
  });
  await app.listen();
}
bootstrap();

이때 clientId와 groupId가 A 서버와 동일하게 작성되어 있습니다. 그래서 A 서버와 같은 클라이언트 아이디와 같은 컨슈머 그룹 아이디를 가진다고 생각할 수 있지만, 사실은 아닙니다. A 서버처럼 ClientsModule을 사용하면 클라이언트 아이디와 컨슈머 그룹 아이디 뒤에 '-client'라는 이름이 디폴트로 붙습니다. 또한 B-1 서버처럼 마이크로서비스로 생성할 때는 클라이언트 아이디와 컨슈머 그룹 아이디 뒤에 '-server'라는 이름이 디폴트로 붙습니다. 그래서 두 코드가 동일해 보여도 실제 이름은 다르기 때문에 문제가 없습니다.

참고 : https://docs.nestjs.com/microservices/kafka#naming-conventions

 

# B-1서버 app.controller.ts

import { Controller } from '@nestjs/common';
import { TestService } from './test.service';
import { EventPattern, MessagePattern } from '@nestjs/microservices';

@Controller('test')
export class TestController {
    constructor(private readonly testService: TestService) { }

    @MessagePattern('test.pattern')
    test() {
        console.log('1번째 ms입니다');
        return '1번째 ms입니다';
    }
}

간단한 실습을 위해 서비스 코드를 구현하지 않고 컨트롤러에서 바로 응답하게 설정했습니다. 이때 지금은 요청-응답 스타일을 사용하기로 했으니 여기서도 @MessagePattern 데코레이터를 사용하고 구독할 토픽은 'test.pattern'으로 설정합니다.

 

B-2 서버

# B-2서버 main.ts

import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
    transport: Transport.KAFKA,
    options: {
      client: {
        clientId: 'test-client-2',
        brokers: ['kafka:9092'],
      },
      consumer: {
        groupId: 'test-consumer'
      }
    },
  });
  await app.listen();
}
bootstrap();
# B-2서버 app.controller.ts

import { Controller } from '@nestjs/common';
import { TestService } from './test.service';
import { EventPattern, MessagePattern } from '@nestjs/microservices';

@Controller('test')
export class TestController {
    constructor(private readonly testService: TestService) { }

    @MessagePattern('test.pattern')
    test() {
        console.log('2번째 ms입니다');
        return '2번째 ms입니다';
    }
}

B-2 서버가 B-1 서버와 달라진 것은 main.ts에서 설정한 clientId값과 해당 토픽 수신 시 return 할 값 총 2가지입니다. cliendId값을 다르게 한 것은 누가 해당 토픽을 보냈는지를 구별하기 위해서는 개별적인 아이디값을 사용해야 하기 때문입니다. 그리고 여기서 컨슈머 그룹 아이디값은 동일한 값을 사용합니다. 왜냐하면 현재 구성이 A 서버에서 보낸 요청을 B-1, B-2, B-3 서버가 동시에 받는 것이 아니라 부하 분산을 위해 돌아가면서 한 서버 씩 요청을 받도록 한 것이기 때문입니다.(예를 들어 요청 4번 보내면 첫 번째는 B-1, 두 번째는 B-2, 세 번째는 B-3, 네 번째는 다시 B-1...) 마지막으로 return값은 그냥 어떤 서버가 응답했는지 눈으로 확인하기 위해 바꿨습니다.

 

B-3 서버

B-3 서버도 B-2 서버와 동일한 방식으로 수정했습니다. (clientId 수정, consumer group id은 공유, return에는 3번째 ms입니다 출력하게)

 

 

❗️kafka 파티션 설정

여기서 중요한 문제가 있습니다. 위와 같이 A서버에서 보낸 토픽을 B-1, B-2, B-3 서버가 라운드 로빈 방식으로 돌아가면서 수신하게 하려면 파티션 자체가 3개 이상이어야 합니다. 파티션은 토픽 안에 존재하는 데이터 저장소라고 생각할 수 있고, 같은 소비자 그룹은 파티션마다 지정되어 자신의 파티션에 토픽이 들어와야만 해당 토픽을 수신합니다. 즉, 자신이 구독한 토픽이 발행되어도 자신이 지정된 파티션에 기록되지 않으면 해당 데이터를 가져가지 않습니다.

이를 위해 파티션을 3개로 수정했습니다. 일단 저는 도커 컨테이너 이미지 'confluentinc/cp-kafka'이미지를 활용해 카프카 컨테이너를 띄웠기 때문에 아래 명령어를 사용했습니다.(사용 이미지별 명령어는 상이할 수 있습니다)

파티션 확인하기

kafka-topics --describe --topic test.pattern --bootstrap-server localhost:9092

# 결과
Topic: test.pattern     TopicId: uUMOQKIwQqKZ7HJqjvOHnA PartitionCount: 1       ReplicationFactor: 1    Configs: 
        Topic: test.pattern     Partition: 0    Leader: 1       Replicas: 1     Isr: 1

카프카 컨테이너에 접속 후 해당 컨테이너 9092 포트에서 실행 중인 카프카의 토픽 'test.pattern'에 해당하는 파티션을 확인했습니다. 위 결과를 통해 파티션이 하나밖에 없음을 확인할 수 있습니다.

 

파티션 개수 3개로 수정하기

kafka-topics --alter --topic test.pattern --partitions 3 --bootstrap-server localhost:9092

 

 

요청-응답 결과

A 서버에서 설정된 '/test'로 GET 요청을 계속해서 보내니 B-1, B-2, B-3 순서대로 계속 응답을 반환함을 확인할 수 있었습니다.

 

 

 


이벤트 기반 메시지 스타일

이벤트 기반으로 구성하기 위해서는 위에서 구성한 요청-응답 스타일에서 몇 가지만 수정하면 됩니다.

# A서버 app.service.ts

import { Inject, Injectable } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { firstValueFrom } from 'rxjs';

@Injectable()
export class AppService {
  constructor(
    @Inject('TEST_SERVICE') private client: ClientKafka,
  ) { }

  async test() {
    const pattern = 'test.pattern';
    const payload = 'test.payload';
    const kafkaResponse = await firstValueFrom(this.client.emit(pattern, payload));
    return '카프카반환결과 : ' + kafkaResponse;
  }
}

일단 A 서버에서는 응답을 받을 필요가 없으므로 subscribeToResponseOf 메서드 부분 구현이 필요 없습니다. 그리고 이벤트 기반이므로 ClientKafka의 send 메서드가 아닌 emit 메서드를 사용합니다.

 

# B-1,B-2,B-3서버 app.controller.ts

import { Controller } from '@nestjs/common';
import { TestService } from './test.service';
import { EventPattern, MessagePattern } from '@nestjs/microservices';

@Controller('test')
export class TestController {
    constructor(private readonly testService: TestService) { }

    @EventPattern('test.pattern')
    test() {
        console.log('?번째 ms입니다');
        return '?번째 ms입니다';
    }
}

그리고 B-1, B-2, B-3 서버에서는 @MessagePattern 데코레이터 대신 @EventPattern 데코레이터를 사용합니다.

 

이를 통해 이벤트 기반 스타일도 정상적으로 동작함을 확인하실 수 있을 것입니다.

 

 


마무리

kafka에 대한 개념이 살짝 부족해서 구현하는데 어려웠는데 나름 의미 있는 실습이었던 것 같습니다. 현재 NestJS에서 kafka를 활용한 마이크로서비스를 구현하려는 분들에게 조금이나마 도움이 되었으면 좋겠습니다.