Skip to content

Weather

tangram_weather

router module-attribute

router = APIRouter(
    prefix="/weather",
    tags=["weather"],
    responses={404: {"description": "Not found"}},
)

plugin module-attribute

plugin = Plugin(
    frontend_path="dist-frontend", routers=[router]
)

get_weather async

get_weather() -> dict[str, str]

An example endpoint that returns some data.

Source code in packages/tangram_weather/src/tangram_weather/__init__.py
17
18
19
20
@router.get("/")
async def get_weather() -> dict[str, str]:
    """An example endpoint that returns some data."""
    return {"message": "This is the weather plugin response"}

wind async

wind(isobaric: int = 300) -> ORJSONResponse
Source code in packages/tangram_weather/src/tangram_weather/__init__.py
23
24
25
26
27
28
29
30
31
@router.get("/wind")
async def wind(isobaric: int = 300) -> ORJSONResponse:
    print("Fetching wind data")

    now = pd.Timestamp.now(tz="UTC").floor("1h")
    ds = await asyncio.to_thread(latest_arpege_data, now)
    res = ds.sel(isobaricInhPa=isobaric, time=now.tz_convert(None))[["u", "v"]]

    return ORJSONResponse(content=res.to_dict())

arpege

bare_url module-attribute

bare_url = (
    "https://object.data.gouv.fr/meteofrance-pnt/pnt/"
)

DEFAULT_LEVELS_37 module-attribute

DEFAULT_LEVELS_37 = [
    100,
    125,
    150,
    175,
    200,
    225,
    250,
    300,
    350,
    400,
    450,
    500,
    550,
    600,
    650,
    700,
    750,
    800,
    850,
    900,
    950,
    1000,
]

DEFAULT_IP1_FEATURES module-attribute

DEFAULT_IP1_FEATURES = ['u', 'v', 't', 'r']

tempdir module-attribute

tempdir = Path(gettempdir())

download_with_progress

download_with_progress(url: str, file: Path) -> None
Source code in packages/tangram_weather/src/tangram_weather/arpege.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def download_with_progress(url: str, file: Path) -> None:
    with httpx.stream("GET", url) as r:
        total_size = int(r.headers.get("Content-Length", 0))
        with file.open("wb") as buffer:
            with tqdm(
                total=total_size,
                unit="B",
                unit_scale=True,
                desc=url.split("/")[-1],
            ) as progress_bar:
                first_chunk = True
                for chunk in r.iter_bytes():
                    if first_chunk and chunk.startswith(b"<?xml"):
                        raise RuntimeError(
                            f"Error downloading data from {url}. "
                            "Check if the requested data is available."
                        )
                    first_chunk = False
                    buffer.write(chunk)
                    progress_bar.update(len(chunk))

        return None

latest_data

latest_data(
    hour: Timestamp,
    model: str = "ARPEGE",
    resolution: Literal["025", "01"] = "025",
    package: Literal[
        "SP1", "SP2", "IP1", "IP2", "IP3", "IP4", "HP1"
    ] = "IP1",
    time_range: Literal[
        "000H024H",
        "025H048H",
        "049H072H",
        "073H102H",
        "000H012H",
        "013H024H",
        "025H036H",
        "037H048H",
        "049H060H",
        "061H072H",
        "073H084H",
        "085H096H",
        "097H102H",
    ] = "000H024H",
    recursion: int = 0,
) -> Dataset

Fetch the latest ARPEGE data for a given hour.

Source code in packages/tangram_weather/src/tangram_weather/arpege.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def latest_data(
    hour: pd.Timestamp,
    model: str = "ARPEGE",
    resolution: Literal["025", "01"] = "025",
    package: Literal["SP1", "SP2", "IP1", "IP2", "IP3", "IP4", "HP1"] = "IP1",
    time_range: Literal[
        "000H024H",  # on the 0.25 degree grid
        "025H048H",  # on the 0.25 degree grid
        "049H072H",  # on the 0.25 degree grid
        "073H102H",  # on the 0.25 degree grid
        "000H012H",  # on the 0.1 degree grid
        "013H024H",  # on the 0.1 degree grid
        "025H036H",  # on the 0.1 degree grid
        "037H048H",  # on the 0.1 degree grid
        "049H060H",  # on the 0.1 degree grid
        "061H072H",  # on the 0.1 degree grid
        "073H084H",  # on the 0.1 degree grid
        "085H096H",  # on the 0.1 degree grid
        "097H102H",  # on the 0.1 degree grid
    ] = "000H024H",
    recursion: int = 0,
) -> xr.Dataset:
    """
    Fetch the latest ARPEGE data for a given hour.
    """
    # let's give them time to upload data to the repo
    runtime = (hour - pd.Timedelta("2h")).floor("6h")

    url = f"{bare_url}{runtime.isoformat()}/"
    url += f"{model.lower()}/{resolution}/{package}/"
    filename = f"{model.lower()}__{resolution}__{package}__"
    filename += f"{time_range}__{runtime.isoformat()}.grib2"
    filename = filename.replace("+00:00", "Z")
    url += filename
    url = url.replace("+00:00", "Z")

    if not (tempdir / filename).exists():
        # If the file does not exist, we try to download it.
        try:
            download_with_progress(url, tempdir / filename)
        except Exception:
            (tempdir / filename).unlink(missing_ok=True)  # remove the file if it exists
            # If the download fails, we try to fetch the latest data
            # (or survive with older data we may have in the /tmp directory)
            if recursion >= 3:
                raise  # do not insist too much in history
            return latest_data(
                hour - pd.Timedelta("6h"),
                model,
                resolution,
                package,
                time_range,
                recursion + 1,
            )

    ds = xr.open_dataset(
        tempdir / filename,
        engine="cfgrib",
        backend_kwargs={
            "filter_by_keys": {
                "typeOfLevel": "isobaricInhPa",
                "level": DEFAULT_LEVELS_37,
            }
        },
    )
    ds = ds.assign(step=ds.time + ds.step).drop_vars("time")
    ds = ds.rename(step="time")
    return ds