Skip to content

Jet1090

tangram_jet1090

log module-attribute

log = getLogger(__name__)

router module-attribute

router = APIRouter(
    tags=["jet1090"],
    responses={404: {"description": "Not found"}},
)

plugin module-attribute

plugin = Plugin(
    frontend_path="dist-frontend", routers=[router]
)

PlanesConfig dataclass

Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
71
72
73
74
75
76
77
78
79
80
81
@dataclass(frozen=True)
class PlanesConfig:
    jet1090_channel: str = "jet1090"
    history_expire: int = 20
    stream_interval_secs: float = 1.0
    aircraft_db_url: str = (
        "https://jetvision.de/resources/sqb_databases/basestation.zip"
    )
    aircraft_db_cache_path: str | None = None
    log_level: str = "INFO"
    python_tracing_subscriber: bool = False

jet1090_channel class-attribute instance-attribute

jet1090_channel: str = 'jet1090'

history_expire class-attribute instance-attribute

history_expire: int = 20

stream_interval_secs class-attribute instance-attribute

stream_interval_secs: float = 1.0

aircraft_db_url class-attribute instance-attribute

aircraft_db_url: str = "https://jetvision.de/resources/sqb_databases/basestation.zip"

aircraft_db_cache_path class-attribute instance-attribute

aircraft_db_cache_path: str | None = None

log_level class-attribute instance-attribute

log_level: str = 'INFO'

python_tracing_subscriber class-attribute instance-attribute

python_tracing_subscriber: bool = False

__init__

__init__(
    jet1090_channel: str = "jet1090",
    history_expire: int = 20,
    stream_interval_secs: float = 1.0,
    aircraft_db_url: str = "https://jetvision.de/resources/sqb_databases/basestation.zip",
    aircraft_db_cache_path: str | None = None,
    log_level: str = "INFO",
    python_tracing_subscriber: bool = False,
) -> None

get_trajectory_data async

get_trajectory_data(
    icao24: str, backend_state: InjectBackendState
) -> list[dict[str, Any]]

Get the full trajectory for a given ICAO24 address from Redis Time Series.

Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@router.get("/data/{icao24}")
async def get_trajectory_data(
    icao24: str, backend_state: tangram.InjectBackendState
) -> list[dict[str, Any]]:
    """Get the full trajectory for a given ICAO24 address from Redis Time Series."""
    ts_client = backend_state.redis_client.ts()
    try:
        results = await ts_client.mrange("-", "+", filters=[f"icao24={icao24}"])
    except Exception as e:
        log.error(f"mrange failed for {icao24=}: {e}")
        return []

    if not results:
        return []

    data_frames = []
    for result_dict in results:
        # value: (labels, data_points)
        for key, value in result_dict.items():
            # aircraft:ts:{feature}:{icao24}
            feature = key.split(":")[2]
            if not (data := value[1]):
                continue
            df = pd.DataFrame(
                [(pd.Timestamp(ts, unit="ms", tz="utc"), val) for ts, val in data],
                columns=["timestamp", feature],
            )
            data_frames.append(df)

    if not data_frames:
        return []

    # merge all dataframes on the timestamp index
    merged_df = data_frames[0]
    for df in data_frames[1:]:
        merged_df = pd.merge(merged_df, df, on="timestamp", how="outer")

    merged_df = merged_df.sort_values("timestamp").fillna(pd.NA)

    response_data = []
    for _, row in merged_df.iterrows():
        point = {"timestamp": row["timestamp"].timestamp(), "icao24": icao24}
        for col in merged_df.columns:
            if col not in ["timestamp", "icao24"] and pd.notna(row[col]):
                point[col] = float(row[col])
        response_data.append(point)

    return response_data

run_planes async

run_planes(backend_state: BackendState) -> None
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@plugin.register_service()
async def run_planes(backend_state: tangram.BackendState) -> None:
    from . import _planes

    plugin_config = backend_state.config.plugins.get("tangram_jet1090", {})
    config_planes = TypeAdapter(PlanesConfig).validate_python(plugin_config)

    default_log_level = plugin_config.get(
        "log_level", backend_state.config.core.log_level
    )

    if config_planes.python_tracing_subscriber:
        layer = tangram.TracingLayer()
        _planes.init_tracing_python(layer, default_log_level)
    else:
        _planes.init_tracing_stderr(default_log_level)

    rust_config = _planes.PlanesConfig(
        redis_url=backend_state.config.core.redis_url,
        jet1090_channel=config_planes.jet1090_channel,
        history_expire=config_planes.history_expire,
        stream_interval_secs=config_planes.stream_interval_secs,
        aircraft_db_url=config_planes.aircraft_db_url,
        aircraft_db_cache_path=config_planes.aircraft_db_cache_path,
    )
    await _planes.run_planes(rust_config)

tangram_jet1090._planes

PlanesConfig

redis_url property writable

redis_url: str

jet1090_channel property writable

jet1090_channel: str

history_expire property writable

history_expire: int

stream_interval_secs property writable

stream_interval_secs: float

aircraft_db_url property writable

aircraft_db_url: str

aircraft_db_cache_path property writable

aircraft_db_cache_path: Optional[str]

__new__

__new__(
    redis_url: str,
    jet1090_channel: str,
    history_expire: int,
    stream_interval_secs: float,
    aircraft_db_url: str,
    aircraft_db_cache_path: Optional[str],
) -> PlanesConfig

init_tracing_python

init_tracing_python(py_layer: Any, filter_str: str) -> None

init_tracing_stderr

init_tracing_stderr(filter_str: str) -> None

run_planes

run_planes(config: PlanesConfig) -> Any