[KAFKA] 대용량 센서데이터 저장 처리 - FastAPI Kafka Consumer

2025. 5. 27. 14:58Python

데이터를 전송할 에뮬레이터가 만들어졌기 때문에

이제 Kafka에 저장된 데이터를 가져와서 처리하는 Consumer를 생성한다.

Consumer의 역할은

1. kafka topic로 부터 메시지를 읽어온다.
2. Offset를 관리한다.
    메시지를 읽어온 위치를 기억하는 역할을 담당한다.
    Offset 방식은 자동커밋과 수동커밋방식으로 구분된다.
3. Consumer Group의 하나로 다중 인스턴스로 병렬처리가 가능하다.
4. 부하 분산 및 재조정이 가능하다.
    Consumer Group에 속해 있기 때문에 하나의 Consumer이 죽어도 다른 Consumer에 재 분배되어 동작한다.
5. 메시지 처리 후족 작업을 수행한다.
    DB저장, 알림 전송 등 비지니스 로직을 수행한다.

 

피자가게 예시로 들어보면

Topic : 피자 주문서 저장소
Kafka 메시지 : 각 주문서
Consumer : 직원(파자 굽는 사람)
Consumer Group :  피자 조리 팀

이런 역할이라고 보면 이해가 쉬울라나..??

 

FastAPI, uvicorn, confluent-kafka를 설치하도록 한다.

프로젝트폴더> pip install fastapi uvicorn confluent-kafka

FastAPI : Python 기반의 웹 프레임워크(비동기 웹 프레임워크)

uvicorn : FastAPI를 실행하기 위한 서버(ASGI 서버)

confluent-kafka : 고성능 Kafka Python 클라이언트

 

그럼 이제 Consumer 작성을 해 보겠다.

폴더 구조이다.

kafka-test/
├── docker-compose.yml    # Kafka & Zookeeper 구성
├── emulator    # 센서 데이터 에뮬레이터
        └── sensor_emulator.py     # 센서 여러 개, 초당 3200건 이상치 포함 전송
├── consumer    # FastAPI 기반 Kafka Consumer 서버
        └── kafka_consumer_server.py  # Kafka 메시지 소비 및 FastAPI 백엔드

 

kafka_consumer_server.py

from fastapi import FastAPI
from confluent_kafka import Consumer, KafkaException
import threading

app = FastAPI() # FastAPI 인스턴스 생성

config = {
    "bootstrap.servers": "localhost:9092",  # Kafka 브로커 주소
    "group.id": "sensor-group",  # 고정 그룹으로 커밋 유지
    "auto.offset.reset": "latest",  # 최신 메시지부터 읽기
    "enable.auto.commit": True  # 자동 커밋
}

consumer = Consumer(config)  # Kafka Consumer 인스턴스 생성
consumer.subscribe(["sensor_data"])  # Kafka 토픽 구독

def consume_messages():
    """
    Kafka 메시지를 소비하는 함수
    """
    print("🛰 Kafka Consumer 스레드 시작됨...")

    try:
        while True:
            message = consumer.poll(1.0)  # 1초 대기
            if message is None:
                continue  # 메시지가 없으면 계속 대기
            if message.error():
                raise KafkaException(message.error())  # 에러 발생 시 예외 처리

            # 메시지 처리 로직
            data = message.value().decode('utf-8')  # 메시지 디코딩
            print(f"수신된 메시지: {data}")  # 수신된 메시지 출력

    except KeyboardInterrupt:
        print("Consumer 중단됨")
        pass  # 키보드 인터럽트 시 종료
    finally:
        consumer.close()  # Consumer 종료

@app.on_event("startup")
def startup_event():
    """
    FastAPI 애플리케이션 시작 시 Consumer 스레드 시작
    """
    consumer_thread = threading.Thread(target=consume_messages, daemon=True)
    consumer_thread.start()
    print("Kafka Consumer 스레드가 시작되었습니다.")

@app.get("/")
def read_root():
    """
    기본 엔드포인트
    """
    return {"message": "Kafka Consumer 서버가 실행 중입니다!"}

이제 해당 server를 실행한다.

어디서는 아래와 같이 실행하면 된다고 한다.

프로젝트폴더> uvicorn consumer.kafka_consumer_server:app --reload

그런데 나 같은 경우에는 

uvicorn : 'uvicorn' 용어가 cmdlet, 함수, 스크립트 파일 또는 실행할 수 있는 프로그램 이름으로 인식되지 않습니다. 이름이 정 확한지 확인하고 경로가 포함된 경우 경로가 올바른지 검증한 다음 다시 시도하십시오.

이런 오류가 발생해서

프로젝트폴더> python -m uvicorn consumer.kafka_consumer_server:app --reload

이렇게 실행을 해야 했다.

실행 후에는 이러한 메시지가 보이게 된다.

그리고 브라우저에서 127.0.0.1:8000 을 입력하면

{"message":"Kafka Consumer 서버가 실행 중입니다!"}

이렇게 메시지가 보인다.

그리고 이제 kafka에서 메시지를 소비했나 확인한다.

[appuser@0f028ee6d2ce ~]$ kafka-consumer-groups --bootstrap-server localhost:9092 --group sensor-group --describe

그런데.. 서버를 실행했음에도 아래와 같이 하나도 소비하지 않았다.

 

이유는 

config = {
    "bootstrap.servers": "localhost:9092",  # Kafka 브로커 주소
    "group.id": "sensor-group",  # 고정 그룹으로 커밋 유지
    "auto.offset.reset": "latest",  # 최신 메시지부터 읽기
    "enable.auto.commit": True  # 자동 커밋
}

설정 부분에서 

"auto.offset.reset": "latest",  # 최신 메시지부터 읽기

이 부분이 문제가 되었다.

auto.offset.reset은 두가지 3가지 값이 있다.

earliest : 과거 메시지까지 전부 확인한다.
latest : 기존 메시지는 무시하고 실행된 시점부터 새로온 것만 확인한다.
none : 엄격한 제어가 필요한 경우 사용한다.(이건 언제 사용하지..ㅋ)

 

지금 소스에서는 latest로 설정을 해 놔서 이전건 무시하고 있어서 소비를 못하고 있는 상황이었다.

그래서 다시 수정하고 수행!!!

"auto.offset.reset": "earliest",  # 가장 오래된 메시지부터 읽기

그러면 수신된 메시지가 Terminal에 출력이 되고 kafka를 다시 확인하면

위와 같이 LAG가 0으로 모두 소비한게 확인 된다.

여기까지 하고 다음은 MongoDB에 저장하는거 진행하려 한다!!