Get the full trajectory for a given ICAO24 address from Redis Time Series.
Source code in packages/tangram_jet1090/src/tangram_jet1090/__init__.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 | @router.get("/data/{icao24}")
async def get_trajectory_data(
icao24: str, backend_state: tangram.InjectBackendState
) -> list[dict[str, Any]]:
"""Get the full trajectory for a given ICAO24 address from Redis Time Series."""
ts_client = backend_state.redis_client.ts()
try:
results = await ts_client.mrange("-", "+", filters=[f"icao24={icao24}"])
except Exception as e:
log.error(f"mrange failed for {icao24=}: {e}")
return []
if not results:
return []
data_frames = []
for result_dict in results:
# value: (labels, data_points)
for key, value in result_dict.items():
# aircraft:ts:{feature}:{icao24}
feature = key.split(":")[2]
if not (data := value[1]):
continue
df = pd.DataFrame(
[(pd.Timestamp(ts, unit="ms", tz="utc"), val) for ts, val in data],
columns=["timestamp", feature],
)
data_frames.append(df)
if not data_frames:
return []
# merge all dataframes on the timestamp index
merged_df = data_frames[0]
for df in data_frames[1:]:
merged_df = pd.merge(merged_df, df, on="timestamp", how="outer")
merged_df = merged_df.sort_values("timestamp").fillna(pd.NA)
response_data = []
for _, row in merged_df.iterrows():
point = {"timestamp": row["timestamp"].timestamp(), "icao24": icao24}
for col in merged_df.columns:
if col not in ["timestamp", "icao24"] and pd.notna(row[col]):
point[col] = float(row[col])
response_data.append(point)
return response_data
|