이번에는 kafka consumer app을 설정해보겠습니다.
import { Kafka, KafkaMessage, logLevel } from 'kafkajs'
/**
* kafka 인스턴스를 생성합니다.
* clientId: 원하는 문자열
* brokers: docker-compose.yml에서 kafka 설정에 작성한 PLAINTEXT_HOST://localhost:29092, 29093, 29094로 지정했던 것을 넣습니다.
* requestTimeout: kafka clinet가 broker로 데이터 전송 요청을 보낼때의 최대 소요 시간입니다. 이 시간이 초과하면 요청을 실패로 처리하고 에러를 반환합니다.
* connectionTimeout: client와 broker 간 연결을 맺을 때 소요되는 최대 시간입니다. 이 시간이 초과하면 연결을 실패로 처리하고, 다른 broker에 연결을 시도합니다.
* enforceRequestTimeout: true이면 requestTimeout이 지났을 때 그 요청에 대한 Promise가 reject됩니다.
* retry: {
* initialRetryTime: 첫 번째 재시도 간격이고, default는 100ms 입니다.
* maxRetryTime: client의 최대 재시도 시간이고, default는 30000ms 입니다.
* retries: client의 최대 재시도 횟수이고, default는 10입니다.
* }
* logLevel: log 레벨을 지정합니다. 옵션은 NOTHING, ERROR, WARN, INFO, DEBUG 입니다.
*/
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:29092', 'localhost:29093', 'localhost:29094'],
requestTimeout: 25000,
connectionTimeout: 10000,
enforceRequestTimeout: false,
retry: {
initialRetryTime: 1000,
maxRetryTime: 30000,
retries: 10
},
logLevel: logLevel.INFO
})
// 인스턴스 생성 후 로그 재정의 가능
kafka.logger().setLogLevel(logLevel.INFO)
// consumer 객체를 생성합니다.
/**
* groupId: consumer group의 식별자입니다. 아래와 같이 작성하면 이 client app은 test-consumer-group에 속한 consumer가 되는 것입니다.
* 여러 consumer가 같은 group에 있으면 각 consumer마다 partition이 지정되어 데이터를 처리합니다.
* */
const consumer = kafka.consumer({ groupId: 'test-consumer-group' })
let messageCount = 0
// consumer 앱을 실행하기 위한 실행함수입니다. bootstrap()과 같은 역할입니다.
const initKafka = async () => {
console.log('start subscribe')
// kafka에 연결합니다.
await consumer.connect()
// kafka의 topic을 구독합니다. 현재는 test topic만 구독하고 있습니다.
// fromBeginning은 consumer가 topic을 처음 구독할 때 해당 파티션의 첫 메세지부터 consume 합니다.
await consumer.subscribe({ topics: ['test'], fromBeginning: true })
/**
* 일반적인 consume. batch x
*/
await consumer.run({
// partitionsConsumedConcurrently: consumer가 처리할 수 있는 최대 파티션 수 설정
partitionsConsumedConcurrently: 3, // default: 1
/**
* topic: consume하는 메세지의 topic 입니다.
* partition: consumer하는 파티션 번호 입니다.
* message: {
* key: 메세지의 key 값입니다. Buffer
* value: 메세지의 값 입니다. Buffer
* timestamp: 메세지가 생성된 시간입니다.
* attributes: 메세지의 속성값입니다. default는 0이며 압축 등의 추가 속성을 적용할 수 있습니다.
* offset: 메세지의 offset 값 입니다.
* size: 메세지의 크기(byte) 입니다.
* headers: 메시지의 헤더 정보입니다. default는 undefined 입니다.
* }
* heartbeat(): consumer가 broker와 연결이 잘 되고 있는지 보내는 메세지입니다. 지정 시간 내에 heartbeat가 전송되지 않으면 broker는 해당 consumer를 해제하고, 다른 consumer에게 해당 파티션을 rebalane힙니다.
* pause():
*/
eachMessage: async ({
topic,
partition,
message,
pause
}: {
topic: string
partition: number
message: KafkaMessage
pause(): () => void
}) => {
try {
console.log({
// 값들은 Buffer 이므로 string으로 parsing 해줍니다.
value: message.value?.toString(),
offest: message.offset,
key: message.key?.toString(),
timestamp: message.timestamp?.toString()
})
} catch (err) {
if (err instanceof Error) {
// 에러 발생시 consume 일시정지합니다.
consumer.pause([{ topic, partitions: [partition] }])
setTimeout(() => {
// 1초 후 consume 재개합니다.
consumer.resume([{ topic, partitions: [partition] }])
}, 1000)
}
}
}
})
/**
* consumer.run으로 메세지를 가져오고, seek이 실행되어 특정 offset으로 consume 위치를 이동합니다.
* eachMessage는 비동기 콜백함수이기 때문에, seek 함수 다음에 실행됩니다.
*/
consumer.seek({ topic: 'test', partition: 0, offset: '0' })
// consumer.pause()는 현재 일시정지된 토픽과 파티션 배열을 반환합니다.
const pausedTopicPartitions: TopicPartitions[] = consumer.paused()
// export type TopicPartitions = { topic: string; partitions: number[] }
for (const topicPartitions of pausedTopicPartitions) {
const { topic, partitions } = topicPartitions
console.log({ topic, partitions })
}
/**
* batch consume
*/
await consumer.run({
/**
* eachBatchAutoResolve
* true일 경우 자동으로 마지막 offset을 커밋합니다.
*/
eachBatchAutoResolve: true,
eachBatch: async ({
batch,
resolveOffset,
heartbeat,
commitOffsetsIfNecessary,
uncommittedOffsets,
isRunning,
isStale,
pause
}) => {
for (let message of batch.messages) {
/**
* isRunning(): conusmer가 consuming 중이면 true, 아니면 false를 반환합니다.
* isStale(): conusmer 또는 producer의 연결이 끊어졌는지 확인합니다. client app (producer 혹은 consumer)과 kafka의 연결 여부를 확인할 수 있습니다.
*/
if (!isRunning() || isStale()) {
break
}
console.log({
topic: batch.topic,
partition: batch.partition,
highWaterMark: batch.highWatermark, // 파티션에서 마지막으로 커밋된 오프셋 값입니다.
message: {
offset: message.offset,
key: message.key?.toString(),
value: message.value?.toString(),
headers: message.headers?.toString(),
uncommittedOffsets: uncommittedOffsets()
}
})
resolveOffset(message.offset) // consumeg한 메세지 일괄 커밋, 에러 발생하면 기존 오프셋으로 커밋합니다.
/**
* heartbeat: conumser가 broker에게 자신이 연결되어 있는지 알리는 keep-alive 메세지
* consumer가 beartbeat를 보내지 않는다면, broker는 해당 consumer가 죽은 것으로 간주하고, 다른 consumer에게 해당 파티션을 할당합니다.
* conusmer와 broker 사이의 연결을 유지하고, consumer에 대한 성능 통계를 수집할 수 있습니다.
*/
await heartbeat()
}
}
})
}
initKafka()
kafka 인스턴스를 생성할 때 옵션은 아래와 같습니다.
export interface KafkaConfig {
brokers: string[] | BrokersFunction
ssl?: tls.ConnectionOptions | boolean
sasl?: SASLOptions | Mechanism
clientId?: string
connectionTimeout?: number
authenticationTimeout?: number
reauthenticationThreshold?: number
requestTimeout?: number
enforceRequestTimeout?: boolean
retry?: RetryOptions
socketFactory?: ISocketFactory
logLevel?: logLevel
logCreator?: logCreator
}
brokersFunction은 콜백함수로 broker를 동적으로 지정할 수 있습니다.
export type BrokersFunction = () => string[] | Promise<string[]>
ssl과 sasl은 보안과 관련된 옵션이므로 생략하겠습니다.
RetryOptions는 다음과 같습니다.
export interface RetryOptions {
maxRetryTime?: number
initialRetryTime?: number
factor?: number
multiplier?: number
retries?: number
restartOnFailure?: (e: Error) => Promise<boolean>
}
consumer.run({})의 옵션은 다음과 같습니다.
export type ConsumerRunConfig = {
autoCommit?: boolean
autoCommitInterval?: number | null
autoCommitThreshold?: number | null
eachBatchAutoResolve?: boolean
partitionsConsumedConcurrently?: number
eachBatch?: EachBatchHandler
eachMessage?: EachMessageHandler
}
export type EachBatchHandler = (payload: EachBatchPayload) => Promise<void>
export type EachMessageHandler = (payload: EachMessagePayload) => Promise<void>
export interface EachMessagePayload {
topic: string
partition: number
message: KafkaMessage
heartbeat(): Promise<void>
pause(): () => void
}
export interface EachBatchPayload {
batch: Batch
resolveOffset(offset: string): void
heartbeat(): Promise<void>
pause(): () => void
commitOffsetsIfNecessary(offsets?: Offsets): Promise<void>
uncommittedOffsets(): OffsetsByTopicPartition
isRunning(): boolean
isStale(): boolean
}
export type KafkaMessage = MessageSetEntry | RecordBatchEntry
interface MessageSetEntry {
key: Buffer | null
value: Buffer | null
timestamp: string
attributes: number
offset: string
size: number
headers?: never
}
interface RecordBatchEntry {
key: Buffer | null
value: Buffer | null
timestamp: string
attributes: number
offset: string
headers: IHeaders
size?: never
}
export type Batch = {
topic: string
partition: number
highWatermark: string
messages: KafkaMessage[]
isEmpty(): boolean
firstOffset(): string | null
lastOffset(): string
offsetLag(): string
offsetLagLow(): string
}
이렇게 보니까 엄청 복잡한데, 중요한것은 메세지이므로 메세지 타입을 자세히 보시면 되겠습니다.
consumer의 내장함수는 다음과 같습니다.
export type Consumer = {
connect(): Promise<void>
disconnect(): Promise<void>
subscribe(subscription: ConsumerSubscribeTopics | ConsumerSubscribeTopic): Promise<void>
stop(): Promise<void>
run(config?: ConsumerRunConfig): Promise<void>
commitOffsets(topicPartitions: Array<TopicPartitionOffsetAndMetadata>): Promise<void>
seek(topicPartitionOffset: TopicPartitionOffset): void
describeGroup(): Promise<GroupDescription>
pause(topics: Array<{ topic: string; partitions?: number[] }>): void
paused(): TopicPartitions[]
resume(topics: Array<{ topic: string; partitions?: number[] }>): void
...
logger(): Logger
readonly events: ConsumerEvents
'MQ' 카테고리의 다른 글
[kafkajs] producer 설정 (0) | 2023.02.27 |
---|---|
[kafkajs] kafka에 대한 간단한 소개 (0) | 2023.02.26 |
[kafkajs] docker를 이용한 kafka clustering 셋팅 방법 (0) | 2023.02.26 |
kafka 글을 작성하기 전에 (0) | 2023.02.26 |