·3 min read·AgTech
스마트팜 데이터 파이프라인 설계하기
IoT 센서 데이터를 수집하고 분석하는 스마트팜 데이터 파이프라인의 설계와 구현 경험을 공유합니다.
스마트팜의 데이터 흐름
스마트팜에서는 다양한 센서가 실시간으로 데이터를 생성합니다:
- 환경 센서: 온도, 습도, CO₂, 광량
- 토양 센서: 수분, EC, pH
- 영상 센서: 작물 생육 모니터링
이 데이터를 효율적으로 수집하고 분석하려면 잘 설계된 파이프라인이 필요합니다.
아키텍처 개요
센서 → MQTT Broker → Stream Processor → Time-series DB → Dashboard
↓
Alert Engine → 알림 (Slack/SMS)
기술 스택
| 계층 | 기술 | 역할 |
|---|---|---|
| 수집 | MQTT (Mosquitto) | 센서 데이터 브로커 |
| 처리 | Apache Kafka | 스트림 프로세싱 |
| 저장 | TimescaleDB | 시계열 데이터 저장 |
| 시각화 | Grafana | 대시보드 |
| 알림 | Custom Python | 임계값 기반 알림 |
센서 데이터 모델
from dataclasses import dataclass
from datetime import datetime
@dataclass
class SensorReading:
sensor_id: str
farm_id: str
reading_type: str # temperature, humidity, etc.
value: float
unit: str
timestamp: datetime
def to_dict(self) -> dict:
return {
"sensor_id": self.sensor_id,
"farm_id": self.farm_id,
"type": self.reading_type,
"value": self.value,
"unit": self.unit,
"ts": self.timestamp.isoformat(),
}MQTT 수집기
import paho.mqtt.client as mqtt
import json
class SensorCollector:
def __init__(self, broker_host: str, topic: str):
self.client = mqtt.Client()
self.client.on_message = self._on_message
self.client.connect(broker_host)
self.client.subscribe(topic)
def _on_message(self, client, userdata, msg):
payload = json.loads(msg.payload)
reading = SensorReading(**payload)
self._process(reading)
def _process(self, reading: SensorReading):
# Kafka로 전송 또는 직접 DB 저장
if reading.value > THRESHOLDS.get(reading.reading_type, float('inf')):
self._send_alert(reading)
def start(self):
self.client.loop_forever()데이터 분석 인사이트
수집된 데이터로 다음과 같은 분석이 가능합니다:
- 환경-수확량 상관관계: 온도·습도 패턴과 수확량의 관계
- 이상 감지: 센서 값의 급격한 변화 자동 탐지
- 예측 모델: 환경 데이터 기반 수확 시기 예측
다음 단계
- Edge computing 도입으로 지연 시간 감소
- ML 기반 자동 환경 제어
- 멀티 농장 데이터 통합 대시보드
스마트팜 데이터 파이프라인에 대해 더 궁금한 점이 있다면 댓글로 남겨주세요.