Adding a New Data Source¶
This guide walks you through adding a new water data API as an AquaScope collector. Contributions from all countries and regions are welcome.
Step 1: Add the DataSource Enum¶
Edit aquascope/schemas/water_data.py and add your source to the DataSource enum:
Step 2: Create the Collector Module¶
Create aquascope/collectors/your_source.py:
"""
Collector for [Your Source Name].
API docs: https://...
"""
from __future__ import annotations
import logging
from collections.abc import Sequence
from aquascope.collectors.base import BaseCollector
from aquascope.schemas.water_data import (
DataSource,
GeoLocation,
WaterQualitySample,
)
from aquascope.utils.http_client import CachedHTTPClient, RateLimiter
logger = logging.getLogger(__name__)
BASE_URL = "https://api.example.org/v1"
class YourSourceCollector(BaseCollector):
"""Collect water data from [Your Source]."""
name = "your_source"
def __init__(self, api_key: str = "", client: CachedHTTPClient | None = None):
super().__init__(
client or CachedHTTPClient(
base_url=BASE_URL,
rate_limiter=RateLimiter(max_calls=10, period_seconds=60),
cache_ttl_seconds=3600,
)
)
self.api_key = api_key
def fetch_raw(self, **kwargs) -> list[dict]:
"""Fetch raw data from the API."""
params = {"key": self.api_key}
# Add your API-specific parameters
data = self.client.get_json("/endpoint", params=params)
return data.get("records", [])
def normalise(self, raw: list[dict]) -> Sequence[WaterQualitySample]:
"""Transform raw API records into WaterQualitySample objects."""
samples = []
for row in raw:
try:
samples.append(
WaterQualitySample(
source=DataSource.YOUR_SOURCE,
station_id=row["station_id"],
station_name=row.get("name"),
sample_datetime=row["datetime"],
parameter=row["parameter"],
value=float(row["value"]),
unit=row.get("unit", ""),
)
)
except (ValueError, KeyError) as exc:
logger.debug("Skipping row: %s", exc)
return samples
Step 3: Register the Collector¶
Edit aquascope/collectors/__init__.py:
from aquascope.collectors.your_source import YourSourceCollector
__all__ = [
# ... existing exports ...
"YourSourceCollector",
]
Step 4: Add CLI Support¶
Edit aquascope/cli.py, adding your source to:
1. The collector_map dictionary in cmd_collect()
2. The --source choices list
3. The source_info dictionary in cmd_list_sources()
Step 5: Write Tests¶
Create tests/test_collectors/test_your_source.py with:
- Sample raw API response data (mocked)
- Test normalise() produces correct records
- Test edge cases (missing fields, invalid values)
Step 6: Update Documentation¶
- Add your source to the table in
README.md - Update
docs/guides/architecture.mdif needed
Guidelines¶
- Use
CachedHTTPClient— It provides caching and rate limiting out of the box - Handle errors gracefully — Skip invalid records with
logger.debug(), don't crash - Include geographic data — Set
GeoLocationwhen lat/lon are available - Respect rate limits — Configure
RateLimiterbased on the API's actual limits - Document the API — Include the API docs URL and any key requirements