Python
[KAFKA] 대용량 센서데이터 저장 처리 - 에뮬레이터 생성
최강깜시
2025. 5. 27. 13:58
앞서서는 환경설정이 끝났고
이제 센서에서 초당 3200개의 데이터를 발생하는 에뮬레이터를 생성한다.
우선 이번 페이지의 폴더구조는 다음과 같다.
kafka-test
├── docker-compose.yml
├── emulator
└── sensor_emulator.py
sensor_emulator.py
from kafka import KafkaProducer
import json
import time
import random
import threading
from datetime import datetime
# kafka producer 설정
producer = KafkaProducer(
bootstrap_servers="localhost:9092", # Kafka 브로커 주소
value_serializer=lambda v: json.dumps(v).encode("utf-8"), # JSON 직렬화
)
sensor_ids = [f"sensor_{i}" for i in range(10)] # 10개의 센서 ID 생성
records_per_sec = 3200 # 초당 전송할 레코드 수
topic_name = "sensor_data" # Kafka 토픽 이름
def generate_sensor_data(sensor_id):
"""
센서 데이터를 주기적으로 생성하고 Kafka로 전송
"""
print(f"센서 {sensor_id} 시작 (초당 {records_per_sec} 건 전송)")
while True:
start_time = time.time() # 초당 전송 시간 측정
for _ in range(records_per_sec):
# 센서 데이터
value = round(random.uniform(10.0, 100.0), 2) # (10.0, 100.0) 범위의 값 생성
data = {
"sensor_id": sensor_id, # 센서 ID
"timestamp": datetime.now().isoformat(), # 현재 시간 ISO 포맷
"value": value # 센서 값
}
# Kafka에 데이터 전송 (value 파라미터 명시!)
producer.send(topic_name, value=data)
# 🧹 1초에 한 번 flush (버퍼 전송)
producer.flush()
# 초당 records_per_sec 건을 맞추기 위한 대기 시간
elapsed = time.time() - start_time
time.sleep(max(0, 1 - elapsed))
threads = []
# 각 센서 ID에 대해 스레드 생성 및 시작
for sensor_id in sensor_ids:
thread = threading.Thread(target=generate_sensor_data, args=(sensor_id,), daemon=True) # 데몬 스레드로 설정
thread.start()
threads.append(thread)
# 메인 스레드가 종료되지 않도록 대기
try:
while True:
time.sleep(1) # 메인 스레드가 계속 실행되도록 대기
except KeyboardInterrupt:
print("에뮬레이터종료!!!")
producer.close() # Kafka 프로듀서 종료
최대한 주석을 달아서 별도의 설명은 하지 않겠다.
해당 에뮬레이터는 단순하다.
위에 말한 것 처럼 하나의 센서가 초당 3200개의 데이터를 생성하는 프로세스 이다.
파일을 실행한다.
프로젝트폴더> python .\emulator\sensor_emulator.py
그러면 다음과 같이 Terminal에 출력이 된다.
이렇게 되면 기본 에뮬레이터는 완성이 되었다.
그럼 kafka에 재대로 전송이 되었는지 확인한다.
kafka docker로 진입해서
kafka-run-class kafka.tools.GetOffsetShell \ # Kafka 내부 유틸리티 클래스 실행 (GetOffsetShell 도구 사용)
--broker-list localhost:9092 \ # Kafka 브로커 주소 지정
--topic sensor_data \ # 오프셋을 조회할 대상 토픽 이름 지정
--time -1 # 최신 오프셋(latest offset) 조회 (-1은 가장 마지막 메시지 위치 의미)
이렇게 나오면 우선은 성공!!!
그런데 센서 값을 받아 오는 이유는 이상값을 체크하기 위해 가져오는것이기 때문에 데이터를 가져올 때 5% 확율로 이상값이 들어가도록 처리한다.
sensor_emulator.py 일부 수정
def generate_sensor_data(sensor_id):
"""
센서 데이터를 주기적으로 생성하고 Kafka로 전송
"""
print(f"센서 {sensor_id} 시작 (초당 {records_per_sec} 건 전송)")
while True:
start_time = time.time() # 초당 전송 시간 측정
for _ in range(records_per_sec):
is_anomaly = random.random() < 0.05 # 이상치 여부 결정 (5% 확률)
if is_anomaly:
# 이상치 생성: 너무 높은 값 또는 너무 낮은 값
value = random.choice([
random.uniform(500, 10000), # 너무 높은 값
random.uniform(-10000, -500) # 너무 낮은 값
])
else:
value = round(random.uniform(10.0, 100.0), 2) # (10.0, 100.0) 범위의 값 생성
data = {
"sensor_id": sensor_id, # 센서 ID
"timestamp": datetime.now().isoformat(), # 현재 시간 ISO 포맷
"value": value # 센서 값
}
# Kafka에 데이터 전송 (value 파라미터 명시!)
producer.send(topic_name, value=data)
# 🧹 1초에 한 번 flush (버퍼 전송)
producer.flush()
# 초당 records_per_sec 건을 맞추기 위한 대기 시간
elapsed = time.time() - start_time
time.sleep(max(0, 1 - elapsed))
센서 데이터를 생성할 때 이상값 주기를 줘서 문제가 될만한 값을 생성하게 하였다.