MQ

[kafkajs] consumer

꼰딩 2023. 3. 1. 00:04

이번에는 kafka consumer app을 설정해보겠습니다.

import { Kafka, KafkaMessage, logLevel } from 'kafkajs'

/**
 * kafka 인스턴스를 생성합니다.
 * clientId: 원하는 문자열
 * brokers: docker-compose.yml에서 kafka 설정에 작성한 PLAINTEXT_HOST://localhost:29092, 29093, 29094로 지정했던 것을 넣습니다.
 * requestTimeout: kafka clinet가 broker로 데이터 전송 요청을 보낼때의 최대 소요 시간입니다. 이 시간이 초과하면 요청을 실패로 처리하고 에러를 반환합니다.
 * connectionTimeout: client와 broker 간 연결을 맺을 때 소요되는 최대 시간입니다. 이 시간이 초과하면 연결을 실패로 처리하고, 다른 broker에 연결을 시도합니다.
 * enforceRequestTimeout: true이면 requestTimeout이 지났을 때 그 요청에 대한 Promise가 reject됩니다.
 * retry: {
 *  initialRetryTime: 첫 번째 재시도 간격이고, default는 100ms 입니다.
 *  maxRetryTime: client의 최대 재시도 시간이고, default는 30000ms 입니다.
 *  retries: client의 최대 재시도 횟수이고, default는 10입니다.
 * }
 * logLevel: log 레벨을 지정합니다. 옵션은 NOTHING, ERROR, WARN, INFO, DEBUG 입니다.

 */
const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:29092', 'localhost:29093', 'localhost:29094'],
  requestTimeout: 25000,
  connectionTimeout: 10000,
  enforceRequestTimeout: false,
  retry: {
    initialRetryTime: 1000,
    maxRetryTime: 30000,
    retries: 10
  },
  logLevel: logLevel.INFO
})

// 인스턴스 생성 후 로그 재정의 가능
kafka.logger().setLogLevel(logLevel.INFO)

// consumer 객체를 생성합니다.
/**
 * groupId: consumer group의 식별자입니다. 아래와 같이 작성하면 이 client app은 test-consumer-group에 속한 consumer가 되는 것입니다.
 * 여러 consumer가 같은 group에 있으면 각 consumer마다 partition이 지정되어 데이터를 처리합니다.
 * */
const consumer = kafka.consumer({ groupId: 'test-consumer-group' })

let messageCount = 0

// consumer 앱을 실행하기 위한 실행함수입니다. bootstrap()과 같은 역할입니다.
const initKafka = async () => {
  console.log('start subscribe')

  // kafka에 연결합니다.
  await consumer.connect()
  // kafka의 topic을 구독합니다. 현재는 test topic만 구독하고 있습니다.
  // fromBeginning은 consumer가 topic을 처음 구독할 때 해당 파티션의 첫 메세지부터 consume 합니다.
  await consumer.subscribe({ topics: ['test'], fromBeginning: true })

  /**
   * 일반적인 consume. batch x
   */
  await consumer.run({
    // partitionsConsumedConcurrently: consumer가 처리할 수 있는 최대 파티션 수 설정
    partitionsConsumedConcurrently: 3, // default: 1
    /**
     *   topic: consume하는 메세지의 topic 입니다.
     *   partition: consumer하는 파티션 번호 입니다.
     *   message: {
     *     key: 메세지의 key 값입니다. Buffer
     *     value: 메세지의 값 입니다. Buffer
     *     timestamp: 메세지가 생성된 시간입니다.
     *     attributes: 메세지의 속성값입니다. default는 0이며 압축 등의 추가 속성을 적용할 수 있습니다.
     *     offset: 메세지의 offset 값 입니다.
     *     size: 메세지의 크기(byte) 입니다.
     *     headers: 메시지의 헤더 정보입니다. default는 undefined 입니다.
     *   }
     *   heartbeat(): consumer가 broker와 연결이 잘 되고 있는지 보내는 메세지입니다. 지정 시간 내에 heartbeat가 전송되지 않으면 broker는 해당 consumer를 해제하고, 다른 consumer에게 해당 파티션을 rebalane힙니다.
     *   pause():
     */
    eachMessage: async ({
      topic,
      partition,
      message,
      pause
    }: {
      topic: string
      partition: number
      message: KafkaMessage
      pause(): () => void
    }) => {
      try {
        console.log({
          // 값들은 Buffer 이므로 string으로 parsing 해줍니다.
          value: message.value?.toString(),
          offest: message.offset,
          key: message.key?.toString(),
          timestamp: message.timestamp?.toString()
        })
      } catch (err) {
        if (err instanceof Error) {
          // 에러 발생시 consume 일시정지합니다.
          consumer.pause([{ topic, partitions: [partition] }])
          setTimeout(() => {
            // 1초 후 consume 재개합니다.
            consumer.resume([{ topic, partitions: [partition] }])
          }, 1000)
        }
      }
    }
  })

  /**
   * consumer.run으로 메세지를 가져오고, seek이 실행되어 특정 offset으로 consume 위치를 이동합니다.
   * eachMessage는 비동기 콜백함수이기 때문에, seek 함수 다음에 실행됩니다.
   */
  consumer.seek({ topic: 'test', partition: 0, offset: '0' })

  // consumer.pause()는 현재 일시정지된 토픽과 파티션 배열을 반환합니다.
  const pausedTopicPartitions: TopicPartitions[] = consumer.paused()
  // export type TopicPartitions = { topic: string; partitions: number[] }
  for (const topicPartitions of pausedTopicPartitions) {
    const { topic, partitions } = topicPartitions
    console.log({ topic, partitions })
  }

  /**
   * batch consume
   */
  await consumer.run({
    /**
     * eachBatchAutoResolve
     * true일 경우 자동으로 마지막 offset을 커밋합니다.
     */
    eachBatchAutoResolve: true,
    eachBatch: async ({
      batch,
      resolveOffset,
      heartbeat,
      commitOffsetsIfNecessary,
      uncommittedOffsets,
      isRunning,
      isStale,
      pause
    }) => {
      for (let message of batch.messages) {
        /**
         * isRunning(): conusmer가 consuming 중이면 true, 아니면 false를 반환합니다.
         * isStale(): conusmer 또는 producer의 연결이 끊어졌는지 확인합니다. client app (producer 혹은 consumer)과 kafka의 연결 여부를 확인할 수 있습니다.
         */
        if (!isRunning() || isStale()) {
          break
        }

        console.log({
          topic: batch.topic,
          partition: batch.partition,
          highWaterMark: batch.highWatermark, // 파티션에서 마지막으로 커밋된 오프셋 값입니다.
          message: {
            offset: message.offset,
            key: message.key?.toString(),
            value: message.value?.toString(),
            headers: message.headers?.toString(),
            uncommittedOffsets: uncommittedOffsets()
          }
        })

        resolveOffset(message.offset) // consumeg한 메세지 일괄 커밋, 에러 발생하면 기존 오프셋으로 커밋합니다.
        /**
         * heartbeat: conumser가 broker에게 자신이 연결되어 있는지 알리는 keep-alive 메세지
         * consumer가 beartbeat를 보내지 않는다면, broker는 해당 consumer가 죽은 것으로 간주하고, 다른 consumer에게 해당 파티션을 할당합니다.
         * conusmer와 broker 사이의 연결을 유지하고, consumer에 대한 성능 통계를 수집할 수 있습니다.
         */
        await heartbeat()
      }
    }
  })
}

initKafka()

kafka 인스턴스를 생성할 때 옵션은 아래와 같습니다.


export interface KafkaConfig {
  brokers: string[] | BrokersFunction
  ssl?: tls.ConnectionOptions | boolean
  sasl?: SASLOptions | Mechanism
  clientId?: string
  connectionTimeout?: number
  authenticationTimeout?: number
  reauthenticationThreshold?: number
  requestTimeout?: number
  enforceRequestTimeout?: boolean
  retry?: RetryOptions
  socketFactory?: ISocketFactory
  logLevel?: logLevel
  logCreator?: logCreator
}

brokersFunction은 콜백함수로 broker를 동적으로 지정할 수 있습니다.


export type BrokersFunction = () => string[] | Promise<string[]>

ssl과 sasl은 보안과 관련된 옵션이므로 생략하겠습니다.


RetryOptions는 다음과 같습니다.


export interface RetryOptions {
  maxRetryTime?: number
  initialRetryTime?: number
  factor?: number
  multiplier?: number
  retries?: number
  restartOnFailure?: (e: Error) => Promise<boolean>
}

consumer.run({})의 옵션은 다음과 같습니다.


export type ConsumerRunConfig = {
  autoCommit?: boolean
  autoCommitInterval?: number | null
  autoCommitThreshold?: number | null
  eachBatchAutoResolve?: boolean
  partitionsConsumedConcurrently?: number
  eachBatch?: EachBatchHandler
  eachMessage?: EachMessageHandler
}

export type EachBatchHandler = (payload: EachBatchPayload) => Promise<void>
export type EachMessageHandler = (payload: EachMessagePayload) => Promise<void>

export interface EachMessagePayload {
  topic: string
  partition: number
  message: KafkaMessage
  heartbeat(): Promise<void>
  pause(): () => void
}

export interface EachBatchPayload {
  batch: Batch
  resolveOffset(offset: string): void
  heartbeat(): Promise<void>
  pause(): () => void
  commitOffsetsIfNecessary(offsets?: Offsets): Promise<void>
  uncommittedOffsets(): OffsetsByTopicPartition
  isRunning(): boolean
  isStale(): boolean
}

export type KafkaMessage = MessageSetEntry | RecordBatchEntry

interface MessageSetEntry {
  key: Buffer | null
  value: Buffer | null
  timestamp: string
  attributes: number
  offset: string
  size: number
  headers?: never
}

interface RecordBatchEntry {
  key: Buffer | null
  value: Buffer | null
  timestamp: string
  attributes: number
  offset: string
  headers: IHeaders
  size?: never
}

export type Batch = {
  topic: string
  partition: number
  highWatermark: string
  messages: KafkaMessage[]
  isEmpty(): boolean
  firstOffset(): string | null
  lastOffset(): string
  offsetLag(): string
  offsetLagLow(): string
}

이렇게 보니까 엄청 복잡한데, 중요한것은 메세지이므로 메세지 타입을 자세히 보시면 되겠습니다.


consumer의 내장함수는 다음과 같습니다.


export type Consumer = {
  connect(): Promise<void>
  disconnect(): Promise<void>
  subscribe(subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic): Promise<void>
  stop(): Promise<void>
  run(config?: ConsumerRunConfig): Promise<void>
  commitOffsets(topicPartitions: Array<TopicPartitionOffsetAndMetadata>): Promise<void>
  seek(topicPartitionOffset: TopicPartitionOffset): void
  describeGroup(): Promise<GroupDescription>
  pause(topics: Array<{ topic: string; partitions?: number[] }>): void
  paused(): TopicPartitions[]
  resume(topics: Array<{ topic: string; partitions?: number[] }>): void
  ...
  logger(): Logger
  readonly events: ConsumerEvents