2025. 5. 27. 17:13ㆍPython
이번엔 지금까지 받은 데이터를 MongoDB에 저장하도록 한다.
Docker에 MongoDB 설치
프로젝트폴더> docker run -d --name mongodb -p 27017:27017 mongo
현재 Docker에서는 mongodb와 kafka관련 항목이 실행 중이어야한다.
이제 이전에 작성했던 kafka_consumer_server.py 파일을 수정한다.
kafka_consumer_server.py 일부 수정
...
app = FastAPI() # FastAPI 인스턴스 생성
mongo_client = MongClient("mongodb://localhost:27017/") # MongoDB 클라이언트 생성
mongo_db = mongo_client["sensor_database"] # MongoDB 데이터베이스 설정
mongo_collection = mongo_db["sensor_data"] # MongoDB 컬렉션 설정
...
def consume_messages():
...
# 메시지 처리 로직
data = json.loads( message.value().decode('utf-8')) # 메시지 디코딩
# MongoDB에 데이터 저장
mongo_collection.insert_one(data)
...
위의 부분이 추가된 부분이다.
MongoDB 접속 정보를 설정하고 저장하는 부분이 추가 되었다.
MongoDB에서 데이터 수가 제대로 들어왔는지 확인한다.
그러기 위해 우선 Docker에 접근해서 kafka에 등록되어 있는 모든 메시지를 삭제하고 다시 생성한다.
[appuser@0f028ee6d2ce ~]$ kafka-topics --bootstrap-server localhost:9092 --delete --topic sensor_data
[appuser@0f028ee6d2ce ~]$ kafka-topics --bootstrap-server localhost:9092 --create --topic sensor_data --partitions 3 --replication-factor 1
후에 에뮬레이터와 서버를 실행한다.
그리고 일정시간이 지나고 에뮬레이터를 중지 시키고 kafka에서 처리한게 얼마나 되나보려고 했더니.
시간이 많이 지났음에도 불구하고 아직도 처리하고 있었다.
최종 다 등록되는데 시간차가 있겠지만 나는... 12분이 넘게 걸렸다.
그래서 어디가 문제인지 찾아야 했다.
첫번째 Consumer이 느린건가?
그래서 Consumer 병렬처리방식으로 늘려 보기로 했다.
파일명을 바꿔서 작업했다.
kafka_consumer_multi.py
# 📌 kafka_consumer_multi.py
import json
import multiprocessing
from confluent_kafka import Consumer, TopicPartition
from pymongo import MongoClient
from datetime import datetime
def process_partition(partition_id):
consumer = Consumer({
"bootstrap.servers": "localhost:9092",
"group.id": f"sensor-group-{partition_id}", # 프로세스별 고유 그룹
"auto.offset.reset": "earliest",
"enable.auto.commit": False
})
consumer.assign([TopicPartition("sensor_data", partition_id)])
print(f"✅ Consumer 프로세스 시작 (파티션 {partition_id})")
mongo_client = MongoClient("mongodb://localhost:27017/")
collection = mongo_client["sensor_database"]["sensor_data"]
buffer = []
try:
while True:
msg = consumer.poll(0.1)
if msg is None:
continue
if msg.error():
print(f"❌ Kafka Error: {msg.error()}")
continue
try:
data = json.loads(msg.value().decode('utf-8'))
data["timestamp"] = datetime.fromisoformat(data["timestamp"])
buffer.append(data)
if len(buffer) >= 500:
collection.insert_many(buffer)
buffer.clear()
consumer.commit(msg)
except Exception as e:
print(f"🚨 메시지 처리 오류: {e}")
finally:
consumer.close()
mongo_client.close()
if __name__ == "__main__":
processes = []
for partition in range(10): # 파티션 수만큼 프로세스 생성
p = multiprocessing.Process(target=process_partition, args=(partition,))
p.start()
processes.append(p)
for p in processes:
p.join()
FastAPI를 사용하지 않고 날 것의 데이터 처리 로직으로 변경하였다.
여기서 보면 consumer 설정에서 아래 부분이 바뀌었다. 자동으로 하는것이 아닌 DB에 데이터를 저장하고 커밋하는 방식으로.
"enable.auto.commit": False
commit 하는 부분은 이 부분이다.
consumer.commit(msg) # 메시지 커밋
또한, Thread 방식에서 Process 방식으로 변경하였고, DB 입력도 insert_one에서 insert_many로 변경하였다.
이렇게 하면 10개의 Consumer이 돌아가면서 일을 하기 시작한다.
이렇게 수정하고 다시 실행.
결과적으로 kafka 에서 소비하는데 걸린 시간은 39초가 걸렸다.
그리고 DB 데이터를 1초 단위로 검색을 했더니 카운트 수가 25000건 언저리였다.
10개의 센서면 32000 건이 있어야 하는데.
두번째, 토픽 파티션 늘려 보기
Doker kafka에 접속해서 이전것은 지우고 다시 생성했다.
[appuser@0f028ee6d2ce ~]$ kafka-topics --bootstrap-server localhost:9092 --create --topic sensor-data --partitions 10 --replication-factor 1
--partitions 10 이부분이 3개에서 10개로 늘렸다.
이렇게 하니까 kafka에서는 정상적으로 빠르게 소모를 해서 없애 버렸다.
그런데 DB에 들어간 Data는 아직 32000개가 안되고 아까와 비슷한 수준이었다.
세번째, MongoDB 쓰기 속도에서 문제 발생?
GPT에 물어 보니까 Queue를 사용하란다.
그래서 다시 consumer 수정...ㅜㅜ
이름을 바꿔서
kafka_consumer_queue.py
import json
import multiprocessing
from confluent_kafka import Consumer, TopicPartition
from datetime import datetime
from pymongo import MongoClient
# MongoDB Writer 프로세스
def mongo_writer(write_queue: multiprocessing.Queue):
mongo_client = MongoClient("mongodb://localhost:27017/")
collection = mongo_client["sensor_database"]["sensor_data"]
print("📝 MongoDB Writer 프로세스 시작됨")
buffer = []
while True:
try:
# 데이터가 queue에 들어올 때까지 기다림 (timeout 가능)
try:
data = write_queue.get()
except queue.Full:
print("⚠️ 큐가 가득 찼습니다. 잠시 대기합니다...", flush=True)
# 종료 신호
if data == "STOP":
break
buffer.append(data)
# 일정량 쌓이면 한 번에 저장
if len(buffer) >= 1000:
collection.insert_many(buffer)
print(f"📝 MongoDB 저장 완료: {len(buffer)}건", flush=True)
buffer.clear()
except Exception as e:
print(f"🚨 MongoDB 저장 오류: {e}", flush=True)
# 종료 시 남은 버퍼 정리
if buffer:
collection.insert_many(buffer)
print("🧹 MongoDB Writer 종료")
# ✅ Kafka Consumer 프로세스
def kafka_consumer_worker(partition_id: int, write_queue: multiprocessing.Queue):
print(f"🛰 Kafka Consumer 프로세스 시작: 파티션 {partition_id}")
# Kafka Consumer 설정
conf = {
"bootstrap.servers": "localhost:9092", # Kafka 브로커 주소
"group.id": f"sensor-group-{partition_id}", # Consumer group ID (파티션별 유니크)
"auto.offset.reset": "earliest", # 오프셋 없을 경우 가장 처음부터 읽기
"enable.auto.commit": False # 수동 커밋 (정확한 메시지 처리 보장)
}
consumer = Consumer(conf)
consumer.assign([TopicPartition("sensor_data", partition_id)]) # 파티션 지정하여 구독
try:
while True:
msg = consumer.poll(0.1)
if msg is None:
continue
if msg.error():
print(f"❌ Kafka 에러: {msg.error()}")
continue
try:
data = json.loads(msg.value().decode("utf-8")) # Kafka 메시지를 JSON 파싱
data["timestamp"] = datetime.fromisoformat(data["timestamp"]) # timestamp 필드를 문자열에서 datetime으로 변환
write_queue.put(data) # ✅ 큐에 데이터 넣기
consumer.commit(msg) # 메시지 커밋
except Exception as e:
print(f"🚨 메시지 처리 실패: {e}")
finally:
consumer.close()
# ✅ 메인
if __name__ == "__main__":
process_count = 10 # Kafka 파티션 수 = Consumer 수
write_queue = multiprocessing.Queue(maxsize=10000) # MongoDB에 쓸 데이터 큐 (최대 10,000건)
# MongoDB 쓰기 전용 프로세스 시작
writer_proc = multiprocessing.Process(target=mongo_writer, args=(write_queue,))
writer_proc.start()
# Kafka Consumer 프로세스 시작
consumers = []
for pid in range(process_count):
proc = multiprocessing.Process(target=kafka_consumer_worker, args=(pid, write_queue))
proc.start()
consumers.append(proc)
try:
# 메인 프로세스가 종료될 때까지 대기
for proc in consumers:
proc.join()
except KeyboardInterrupt:
print("🛑 종료 신호 감지됨")
# ✅ Writer에게 종료 신호 보내기
write_queue.put("STOP")
writer_proc.join()
print("✅ 전체 종료 완료")
그래도 아직... 정상적으로 32000건이 등록이 안됨...
다 수정했는데..
마지막으로, 에뮬레이터를 다시 수정해 보기로 했다.
Thread 방식이 문제가 된다고 해서 Process 방식으로 변경
sensor_emulator_process.py
import json
import time
import random
from kafka import KafkaProducer
from datetime import datetime
from multiprocessing import Process
sensor_ids = [f"sensor_{i}" for i in range(10)] # 10개의 센서 ID 생성
records_per_sec = 3200 # 초당 전송할 레코드 수
topic_name = "sensor_data" # Kafka 토픽 이름
def generate_sensor_data(sensor_id):
"""
센서 데이터를 주기적으로 생성하고 Kafka로 전송
"""
# ✅ KafkaProducer를 한 번만 생성
producer = KafkaProducer(
bootstrap_servers="localhost:9092", # Kafka 브로커 주소
value_serializer=lambda v: json.dumps(v).encode("utf-8"), # JSON 직렬화
linger_ms=10, # 메시지 버퍼링 시간 (10ms)
batch_size=32 * 1024, # 배치 크기 (32KB)
compression_type='gzip', # 압축 방식
acks=1, # 리더가 메시지를 받았을 때만 응답
retries=3, # 재시도 횟수
)
print(f"센서 {sensor_id} 시작 (초당 {records_per_sec} 건 전송)")
total_sent = 0
while True:
start_time = time.time() # 초당 전송 시간 측정
total_sent = 0
for _ in range(records_per_sec):
# 이상치 여부 결정 (5% 확률)
is_anomaly = random.random() < 0.05
if is_anomaly:
value = random.choice([
random.uniform(500, 10000), # 너무 높은 값
random.uniform(-10000, -500) # 너무 낮은 값
])
else:
value = round(random.uniform(10.0, 100.0), 2)
# 센서 데이터 생성
data = {
"sensor_id": sensor_id,
"timestamp": datetime.now().isoformat(),
"value": round(value, 2)
}
# Kafka에 데이터 전송 (value 파라미터 명시!)
producer.send(topic_name, value=data)
total_sent += 1
# 🧹 1초에 한 번 flush (버퍼 전송)
producer.flush()
print(f"센서 {sensor_id} 전송 건수: {total_sent}")
# 초당 records_per_sec 건을 맞추기 위한 대기 시간
elapsed = time.time() - start_time
print(f"[{sensor_id}] 전송 시간: {elapsed:.3f}초, {records_per_sec}건 전송 완료", flush=True)
time.sleep(max(0, 1 - elapsed))
def main():
# 센서별 스레드 실행
processes = []
for sensor_id in sensor_ids:
p = Process(target=generate_sensor_data, args=(sensor_id,))
p.start()
processes.append(p)
# 💤 메인 스레드는 종료 방지용
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("🛑 프로그램 종료 요청")
producer.close()
if __name__ == "__main__":
main()
에구... 잘 돌아간다.. 힘들다..
결론적으로는 Thread는 한계가 있다는 결론이다.
GPT에다 물어 봤다. 왜 Thread는 안되고 Process는 되는지.
🧠 핵심 차이 요약
항목 | multiprocessing 버전 | threading 버전 |
병렬 방식 | 진짜 병렬 (CPU 멀티코어 활용) | 가짜 병렬 (GIL 제약) |
KafkaProducer 인스턴스 | 각 프로세스마다 독립 | 하나의 공유 인스턴스 |
Kafka 처리 성능 | 안정적, 병렬 전송 | 불안정, 병목 발생 |
전송량 | 32,000건/초 이상 가능 | 15,000~17,000건/초로 제한 |
Kafka 내부 Queue 사용 | 독립 | 동시 접근 경쟁 발생 가능 |
🔥 왜 threading 방식은 메시지가 유실되는가?
① Python GIL(Global Interpreter Lock) 때문
Python은 threading.Thread를 쓰더라도 하나의 쓰레드만 CPU에서 실행 가능함.
→ 결국 10개 센서가 있어도 실제로는 번갈아 가며 실행됨 → 1초 내 3200건 전송 어려움
② KafkaProducer는 Thread-safe지만, flush는 병목
- KafkaProducer는 내부적으로 비동기 큐에 넣고 전송함
- flush()는 모든 메시지 전송 대기 → 동시 flush가 겹치면 지연 + 블로킹 발생
→ 다수의 쓰레드에서 한 인스턴스에 send() → flush()가 몰리면
→ Kafka 내부 버퍼가 overflow / 지연 / 타임아웃 / 누락 발생 가능성 있음
③ send() 후 응답 기다리지 않고 무조건 밀어넣는 구조
→ 너무 빠르게 호출하면 KafkaProducer 내부 큐가 가득 차거나, 타이밍 맞지 않아 메시지 손실 가능
이렇단다... 힘들다...ㅋ
여튼 32000건이 제대로 등록되는거 까지 확인했다.
이제 다음으로는 GUI처리 및 이상값 부분 처리할 계획이나... 힘들다..ㅋ
최종적으로 지금까지 한거 튜닝해달라고 GPT한테 이야기 했을 때
kafka_consumer_queue.py는 수정할게 없는 듯하고
sensor_emulator_process.py 파일은 변경했다.
# sensor_emulator_process.py
import json, time, random
from kafka import KafkaProducer
from datetime import datetime
from multiprocessing import Process
sensor_ids = [f"sensor_{i}" for i in range(10)]
records_per_sec = 3200
topic_name = "sensor_data"
def generate_sensor_data(sensor_id):
# 안정성 강화된 KafkaProducer 설정
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
linger_ms=10,
batch_size=32 * 1024,
compression_type='gzip',
acks='all',
enable_idempotence=True,
retries=5,
max_in_flight_requests_per_connection=1,
)
print(f"센서 {sensor_id} 시작 (초당 {records_per_sec} 건 전송)")
while True:
start_time = time.time()
total_sent = 0
for _ in range(records_per_sec):
is_anomaly = random.random() < 0.05
value = (
random.choice([random.uniform(500, 10000), random.uniform(-10000, -500)])
if is_anomaly else round(random.uniform(10.0, 100.0), 2)
)
data = {
"sensor_id": sensor_id,
"timestamp": datetime.now().isoformat(),
"value": round(value, 2)
}
producer.send(topic_name, value=data)
total_sent += 1
print(f"[{sensor_id}] 전송 시간: {time.time() - start_time:.3f}s / {total_sent}건", flush=True)
time.sleep(max(0, 1 - (time.time() - start_time)))
def main():
processes = [Process(target=generate_sensor_data, args=(sid,)) for sid in sensor_ids]
for p in processes: p.start()
try:
while True: time.sleep(1)
except KeyboardInterrupt:
print("🛑 종료 요청 감지됨")
if __name__ == "__main__":
main()
변경된 부분은 다음과 같다.
producer = KafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8"),
linger_ms=10,
batch_size=32 * 1024,
compression_type='gzip',
acks='all',
enable_idempotence=True,
retries=5,
max_in_flight_requests_per_connection=1,
)
- acks='all', enable_idempotence=True 로 유실 방지
- flush() 제거로 불필요한 block 방지
- max_in_flight_requests_per_connection=5 → 순서 보장
이부분에서 5로 하니까 문제가 발생해서 1로 했다.
이유는 "enable_idempotence=True 옵션을 사용할 때는 Kafka 내부에서 메시지 중복 없이 정확히 한 번 처리(Exactly-once)를 보장하기 위해 메시지 전송 순서를 철저히 지켜야 해." 이거라는데 몬말인지 모르겠다. 그래서 우선 1로 변경
그리고 기존에 있던
# 1초에 한 번 flush (버퍼 전송)
producer.flush()
이부분은 삭제해도 된다고 한다. 대신에 linger_ms 값으로 조절한다고 한다.
✅ flush()와 linger_ms 차이점 핵심 요약
항목 | flush() | linger_ms |
역할 | 버퍼에 쌓인 메시지를 즉시 강제로 전송 | KafkaProducer가 버퍼를 잠시 기다렸다가 전송하도록 하는 설정 |
호출 시점 | 개발자가 명시적으로 호출해야 함 | KafkaProducer 내부에서 자동으로 동작 |
성능 | ❌ 자주 호출하면 성능 심각하게 저하됨 | ✅ 자연스러운 전송 타이밍 조절 → 성능 향상 |
전송 타이밍 | 즉시 | 버퍼가 차거나 linger_ms 타이머 만료 시 |
🧠 비유로 설명
KafkaProducer는 택배기사라고 생각해보자.
- send()는 물건을 박스에 넣는 작업
- flush()는 박스에 뭐가 들어있든 무조건 지금 택배 보내!
- linger_ms=10은 박스에 몇 개쯤 모일 때까지 10ms 기다렸다가 보내도 돼라는 유예 시간
즉,
- flush()는 성급한 사람: "아 몰라 그냥 지금 보내!"
- linger_ms는 합리적인 택배 기사: "조금만 더 모이면 같이 보내서 비용 아끼자"
🔥 왜 flush()를 피해야 하냐?
1. 동기 호출이기 때문
- flush()는 모든 메시지 전송이 완료될 때까지 block
- 초당 수천~수만 건을 전송하는 구조에서는 완전히 병목
2. KafkaProducer가 알아서 잘함
- linger_ms, batch_size 설정만 적절하면, flush 없이도 Kafka가 스스로 버퍼 관리
- 성능과 안정성 모두 챙김
'Python' 카테고리의 다른 글
[KAFKA] 대용량 센서데이터 저장 처리 - FastAPI Kafka Consumer (0) | 2025.05.27 |
---|---|
[KAFKA] 대용량 센서데이터 저장 처리 - 에뮬레이터 생성 (0) | 2025.05.27 |
[KAFKA] 대용량 센서데이터 저장 처리 - 환경설정 (0) | 2025.05.27 |