[KAFKA] 대용량 센서데이터 저장 처리 - 에뮬레이터 생성

2025. 5. 27. 13:58Python

앞서서는 환경설정이 끝났고

이제 센서에서 초당 3200개의 데이터를 발생하는 에뮬레이터를 생성한다.

우선 이번 페이지의 폴더구조는 다음과 같다.

kafka-test
├── docker-compose.yml
├── emulator
        └── sensor_emulator.py

 

sensor_emulator.py

from kafka import KafkaProducer
import json
import time
import random
import threading
from datetime import datetime

# kafka producer 설정
producer = KafkaProducer(
    bootstrap_servers="localhost:9092",  # Kafka 브로커 주소
    value_serializer=lambda v: json.dumps(v).encode("utf-8"),  # JSON 직렬화
)

sensor_ids = [f"sensor_{i}" for i in range(10)]  # 10개의 센서 ID 생성
records_per_sec = 3200  # 초당 전송할 레코드 수
topic_name = "sensor_data"  # Kafka 토픽 이름

def generate_sensor_data(sensor_id):
    """
    센서 데이터를 주기적으로 생성하고 Kafka로 전송
    """
    print(f"센서 {sensor_id} 시작 (초당 {records_per_sec} 건 전송)")

    while True:
        start_time = time.time()  # 초당 전송 시간 측정

        for _ in range(records_per_sec):
           
            # 센서 데이터
            value = round(random.uniform(10.0, 100.0), 2) # (10.0, 100.0) 범위의 값 생성

            data = {
                "sensor_id": sensor_id, # 센서 ID
                "timestamp": datetime.now().isoformat(), # 현재 시간 ISO 포맷
                "value": value # 센서 값
            }

            # Kafka에 데이터 전송 (value 파라미터 명시!)
            producer.send(topic_name, value=data)

        # 🧹 1초에 한 번 flush (버퍼 전송)
        producer.flush()

        # 초당 records_per_sec 건을 맞추기 위한 대기 시간
        elapsed = time.time() - start_time
        time.sleep(max(0, 1 - elapsed))

threads = []

# 각 센서 ID에 대해 스레드 생성 및 시작
for sensor_id in sensor_ids:
    thread = threading.Thread(target=generate_sensor_data, args=(sensor_id,), daemon=True)  # 데몬 스레드로 설정    
    thread.start()
    threads.append(thread)

# 메인 스레드가 종료되지 않도록 대기
try:
    while True:
        time.sleep(1)  # 메인 스레드가 계속 실행되도록 대기
except KeyboardInterrupt:
    print("에뮬레이터종료!!!")
    producer.close()  # Kafka 프로듀서 종료

최대한 주석을 달아서 별도의 설명은 하지 않겠다.

해당 에뮬레이터는 단순하다.

위에 말한 것 처럼 하나의 센서가 초당 3200개의 데이터를 생성하는 프로세스 이다.

파일을 실행한다.

프로젝트폴더> python .\emulator\sensor_emulator.py

그러면 다음과 같이 Terminal에 출력이 된다.

이렇게 되면 기본 에뮬레이터는 완성이 되었다.

그럼 kafka에 재대로 전송이 되었는지 확인한다.

kafka docker로 진입해서

kafka-run-class kafka.tools.GetOffsetShell \       # Kafka 내부 유틸리티 클래스 실행 (GetOffsetShell 도구 사용)
  --broker-list localhost:9092 \                   # Kafka 브로커 주소 지정
  --topic sensor_data \                            # 오프셋을 조회할 대상 토픽 이름 지정
  --time -1                                        # 최신 오프셋(latest offset) 조회 (-1은 가장 마지막 메시지 위치 의미)

이렇게 나오면 우선은 성공!!!

 


 

그런데 센서 값을 받아 오는 이유는 이상값을 체크하기 위해 가져오는것이기 때문에 데이터를 가져올 때 5% 확율로 이상값이 들어가도록 처리한다.

sensor_emulator.py 일부 수정

def generate_sensor_data(sensor_id):
    """
    센서 데이터를 주기적으로 생성하고 Kafka로 전송
    """
    print(f"센서 {sensor_id} 시작 (초당 {records_per_sec} 건 전송)")

    while True:
        start_time = time.time()  # 초당 전송 시간 측정

        for _ in range(records_per_sec):
           
            is_anomaly = random.random() < 0.05  # 이상치 여부 결정 (5% 확률)
            
            if is_anomaly:
                # 이상치 생성: 너무 높은 값 또는 너무 낮은 값
                value = random.choice([
                    random.uniform(500, 10000),   # 너무 높은 값
                    random.uniform(-10000, -500)  # 너무 낮은 값
                ])
            else:
                value = round(random.uniform(10.0, 100.0), 2) # (10.0, 100.0) 범위의 값 생성

            data = {
                "sensor_id": sensor_id, # 센서 ID
                "timestamp": datetime.now().isoformat(), # 현재 시간 ISO 포맷
                "value": value # 센서 값
            }

            # Kafka에 데이터 전송 (value 파라미터 명시!)
            producer.send(topic_name, value=data)

        # 🧹 1초에 한 번 flush (버퍼 전송)
        producer.flush()

        # 초당 records_per_sec 건을 맞추기 위한 대기 시간
        elapsed = time.time() - start_time
        time.sleep(max(0, 1 - elapsed))

센서 데이터를 생성할 때 이상값 주기를 줘서 문제가 될만한 값을 생성하게 하였다.