Skip to content

Core

tangram

InjectBackendState module-attribute

InjectBackendState: TypeAlias = Annotated[
    BackendState, Depends(get_state)
]

BackendState dataclass

Source code in packages/tangram/src/tangram/backend.py
32
33
34
35
@dataclass
class BackendState:
    redis_client: redis.Redis
    config: Config

redis_client instance-attribute

redis_client: Redis

config instance-attribute

config: Config

TracingLayer

Source code in packages/tangram/src/tangram/backend.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class TracingLayer:
    def on_event(self, event: str, state: None) -> None:
        data = json.loads(event)
        metadata = data.get("metadata", {})
        target = metadata.get("target", "")
        level_str = metadata.get("level", "INFO")
        message = data.get("message", "")

        if not all([target, level_str, message]):
            return

        event_level = LOG_LEVEL_MAP.get(level_str, logging.INFO)
        logger = logging.getLogger(target)
        logger.log(event_level, message)

    def on_new_span(self, span_attrs: str, span_id: str) -> None:
        return None

    def on_close(self, span_id: str, state: None) -> None:
        pass

    def on_record(self, span_id: str, values: str, state: None) -> None:
        pass

on_event

on_event(event: str, state: None) -> None
Source code in packages/tangram/src/tangram/backend.py
135
136
137
138
139
140
141
142
143
144
145
146
147
def on_event(self, event: str, state: None) -> None:
    data = json.loads(event)
    metadata = data.get("metadata", {})
    target = metadata.get("target", "")
    level_str = metadata.get("level", "INFO")
    message = data.get("message", "")

    if not all([target, level_str, message]):
        return

    event_level = LOG_LEVEL_MAP.get(level_str, logging.INFO)
    logger = logging.getLogger(target)
    logger.log(event_level, message)

on_new_span

on_new_span(span_attrs: str, span_id: str) -> None
Source code in packages/tangram/src/tangram/backend.py
149
150
def on_new_span(self, span_attrs: str, span_id: str) -> None:
    return None

on_close

on_close(span_id: str, state: None) -> None
Source code in packages/tangram/src/tangram/backend.py
152
153
def on_close(self, span_id: str, state: None) -> None:
    pass

on_record

on_record(span_id: str, values: str, state: None) -> None
Source code in packages/tangram/src/tangram/backend.py
155
156
def on_record(self, span_id: str, values: str, state: None) -> None:
    pass

Config dataclass

Source code in packages/tangram/src/tangram/config.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@dataclass
class Config:
    core: CoreConfig = field(default_factory=CoreConfig)
    server: ServerConfig = field(default_factory=ServerConfig)
    channel: ChannelConfig = field(default_factory=ChannelConfig)
    plugins: dict[str, Any] = field(default_factory=dict)

    @classmethod
    def from_file(cls, config_path: Path) -> Config:
        if sys.version_info < (3, 11):
            import tomli as tomllib
        else:
            import tomllib
        from pydantic import TypeAdapter

        with open(config_path, "rb") as f:
            cfg_data = tomllib.load(f)

        config_adapter = TypeAdapter(cls)
        config = config_adapter.validate_python(cfg_data)
        return config

core class-attribute instance-attribute

core: CoreConfig = field(default_factory=CoreConfig)

server class-attribute instance-attribute

server: ServerConfig = field(default_factory=ServerConfig)

channel class-attribute instance-attribute

channel: ChannelConfig = field(
    default_factory=ChannelConfig
)

plugins class-attribute instance-attribute

plugins: dict[str, Any] = field(default_factory=dict)

from_file classmethod

from_file(config_path: Path) -> Config
Source code in packages/tangram/src/tangram/config.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@classmethod
def from_file(cls, config_path: Path) -> Config:
    if sys.version_info < (3, 11):
        import tomli as tomllib
    else:
        import tomllib
    from pydantic import TypeAdapter

    with open(config_path, "rb") as f:
        cfg_data = tomllib.load(f)

    config_adapter = TypeAdapter(cls)
    config = config_adapter.validate_python(cfg_data)
    return config

Plugin dataclass

Source code in packages/tangram/src/tangram/plugin.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@dataclass
class Plugin:
    frontend_path: str | None = None
    routers: list[APIRouter] = field(default_factory=list)
    services: list[tuple[Priority, ServiceAsyncFunc]] = field(
        default_factory=list, init=False
    )

    def register_service(
        self, priority: Priority = 0
    ) -> Callable[[ServiceFunc], ServiceFunc]:
        def decorator(func: ServiceFunc) -> ServiceFunc:
            @functools.wraps(func)
            async def async_wrapper(backend_state: BackendState) -> None:
                if asyncio.iscoroutinefunction(func):
                    await func(backend_state)
                else:
                    await asyncio.to_thread(func, backend_state)

            self.services.append((priority, async_wrapper))
            return func

        return decorator

frontend_path class-attribute instance-attribute

frontend_path: str | None = None

routers class-attribute instance-attribute

routers: list[APIRouter] = field(default_factory=list)

services class-attribute instance-attribute

services: list[tuple[Priority, ServiceAsyncFunc]] = field(
    default_factory=list, init=False
)

register_service

register_service(
    priority: Priority = 0,
) -> Callable[[ServiceFunc], ServiceFunc]
Source code in packages/tangram/src/tangram/plugin.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def register_service(
    self, priority: Priority = 0
) -> Callable[[ServiceFunc], ServiceFunc]:
    def decorator(func: ServiceFunc) -> ServiceFunc:
        @functools.wraps(func)
        async def async_wrapper(backend_state: BackendState) -> None:
            if asyncio.iscoroutinefunction(func):
                await func(backend_state)
            else:
                await asyncio.to_thread(func, backend_state)

        self.services.append((priority, async_wrapper))
        return func

    return decorator

backend

logger module-attribute

logger = getLogger(__name__)

InjectBackendState module-attribute

InjectBackendState: TypeAlias = Annotated[
    BackendState, Depends(get_state)
]

LOG_LEVEL_MAP module-attribute

LOG_LEVEL_MAP = {
    "TRACE": DEBUG,
    "DEBUG": DEBUG,
    "INFO": INFO,
    "WARN": WARNING,
    "ERROR": ERROR,
}

BackendState dataclass

Source code in packages/tangram/src/tangram/backend.py
32
33
34
35
@dataclass
class BackendState:
    redis_client: redis.Redis
    config: Config
redis_client instance-attribute
redis_client: Redis
config instance-attribute
config: Config

TracingLayer

Source code in packages/tangram/src/tangram/backend.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
class TracingLayer:
    def on_event(self, event: str, state: None) -> None:
        data = json.loads(event)
        metadata = data.get("metadata", {})
        target = metadata.get("target", "")
        level_str = metadata.get("level", "INFO")
        message = data.get("message", "")

        if not all([target, level_str, message]):
            return

        event_level = LOG_LEVEL_MAP.get(level_str, logging.INFO)
        logger = logging.getLogger(target)
        logger.log(event_level, message)

    def on_new_span(self, span_attrs: str, span_id: str) -> None:
        return None

    def on_close(self, span_id: str, state: None) -> None:
        pass

    def on_record(self, span_id: str, values: str, state: None) -> None:
        pass
on_event
on_event(event: str, state: None) -> None
Source code in packages/tangram/src/tangram/backend.py
135
136
137
138
139
140
141
142
143
144
145
146
147
def on_event(self, event: str, state: None) -> None:
    data = json.loads(event)
    metadata = data.get("metadata", {})
    target = metadata.get("target", "")
    level_str = metadata.get("level", "INFO")
    message = data.get("message", "")

    if not all([target, level_str, message]):
        return

    event_level = LOG_LEVEL_MAP.get(level_str, logging.INFO)
    logger = logging.getLogger(target)
    logger.log(event_level, message)
on_new_span
on_new_span(span_attrs: str, span_id: str) -> None
Source code in packages/tangram/src/tangram/backend.py
149
150
def on_new_span(self, span_attrs: str, span_id: str) -> None:
    return None
on_close
on_close(span_id: str, state: None) -> None
Source code in packages/tangram/src/tangram/backend.py
152
153
def on_close(self, span_id: str, state: None) -> None:
    pass
on_record
on_record(span_id: str, values: str, state: None) -> None
Source code in packages/tangram/src/tangram/backend.py
155
156
def on_record(self, span_id: str, values: str, state: None) -> None:
    pass

get_state async

get_state(request: Request) -> BackendState
Source code in packages/tangram/src/tangram/backend.py
38
39
async def get_state(request: Request) -> BackendState:
    return request.app.state.backend_state  # type: ignore

resolve_frontend

resolve_frontend(
    *, path: str, dist_name: str
) -> Path | Traversable | None
Source code in packages/tangram/src/tangram/backend.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def resolve_frontend(*, path: str, dist_name: str) -> Path | Traversable | None:
    # always try to parse from direct url first (this is robust for editable
    # installs like `uv sync --all-packages`)
    try:
        dist = Distribution.from_name(dist_name)
        if direct_url_content := dist.read_text("direct_url.json"):
            direct_url_data = json.loads(direct_url_content)
            if (url := direct_url_data.get("url")) and (
                path1 := Path(url.removeprefix("file://")) / path
            ).is_dir():
                return path1
    except (PackageNotFoundError, json.JSONDecodeError, FileNotFoundError):
        pass

    # fallback in case it was installed via pip
    if (path2 := importlib.resources.files(dist_name) / path).is_dir():
        return path2
    return None

load_enabled_plugins

load_enabled_plugins(
    config: Config,
) -> list[tuple[DistName, Plugin]]
Source code in packages/tangram/src/tangram/backend.py
65
66
67
68
69
70
71
72
73
74
75
76
77
def load_enabled_plugins(
    config: Config,
) -> list[tuple[DistName, Plugin]]:
    loaded_plugins = []
    enabled_plugin_names = set(config.core.plugins)

    for entry_point in scan_plugins():
        if entry_point.name not in enabled_plugin_names:
            continue
        if (plugin := load_plugin(entry_point)) is not None:
            loaded_plugins.append(plugin)

    return loaded_plugins

lifespan async

lifespan(
    app: FastAPI, backend_state: BackendState
) -> AsyncGenerator[None, None]
Source code in packages/tangram/src/tangram/backend.py
80
81
82
83
84
85
86
@asynccontextmanager
async def lifespan(
    app: FastAPI, backend_state: BackendState
) -> AsyncGenerator[None, None]:
    async with backend_state.redis_client:
        app.state.backend_state = backend_state
        yield

create_app

create_app(
    backend_state: BackendState,
    loaded_plugins: Iterable[tuple[DistName, Plugin]],
) -> FastAPI
Source code in packages/tangram/src/tangram/backend.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def create_app(
    backend_state: BackendState,
    loaded_plugins: Iterable[tuple[DistName, Plugin]],
) -> FastAPI:
    app = FastAPI(lifespan=partial(lifespan, backend_state=backend_state))
    frontend_plugins = []

    for dist_name, plugin in loaded_plugins:
        for router in plugin.routers:
            app.include_router(router)

        if (p := plugin.frontend_path) is not None and (
            frontend_path_resolved := resolve_frontend(path=p, dist_name=dist_name)
        ) is not None:
            app.mount(
                f"/plugins/{dist_name}",
                StaticFiles(directory=str(frontend_path_resolved)),
                name=dist_name,
            )
            frontend_plugins.append(dist_name)

    @app.get("/manifest.json")
    async def get_manifest() -> JSONResponse:
        return JSONResponse(content={"plugins": frontend_plugins})

    # TODO: we might want to host the frontend separately from the backend
    if (
        frontend_path := resolve_frontend(path="dist-frontend", dist_name="tangram")
    ) is None:
        raise ValueError(
            "error: frontend was not found, did you run `pnpm i && pnpm run build`?"
        )
    app.mount("/", StaticFiles(directory=str(frontend_path), html=True), name="core")
    return app

run_channel_service async

run_channel_service(config: Config) -> None
Source code in packages/tangram/src/tangram/backend.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
async def run_channel_service(config: Config) -> None:
    from . import _channel

    if config.channel.python_tracing_subscriber:
        layer = TracingLayer()
        _channel.init_tracing_python(layer, config.core.log_level)
    else:
        _channel.init_tracing_stderr(config.core.log_level)

    rust_config = _channel.ChannelConfig(
        host=config.channel.host,
        port=config.channel.port,
        redis_url=config.core.redis_url,
        jwt_secret=config.channel.jwt_secret,
        jwt_expiration_secs=config.channel.jwt_expiration_secs,
        id_length=config.channel.id_length,
    )
    await _channel.run(rust_config)

run_services async

run_services(
    backend_state: BackendState,
    loaded_plugins: Iterable[tuple[DistName, Plugin]],
) -> AsyncGenerator[Task[None], None]
Source code in packages/tangram/src/tangram/backend.py
179
180
181
182
183
184
185
186
187
188
189
190
async def run_services(
    backend_state: BackendState,
    loaded_plugins: Iterable[tuple[DistName, Plugin]],
) -> AsyncGenerator[asyncio.Task[None], None]:
    yield asyncio.create_task(run_channel_service(backend_state.config))

    for dist_name, plugin in loaded_plugins:
        for _, service_func in sorted(
            plugin.services, key=lambda s: (s[0], s[1].__name__)
        ):
            yield asyncio.create_task(service_func(backend_state))
            logger.info(f"started service from plugin: {dist_name}")

run_server async

run_server(
    backend_state: BackendState,
    loaded_plugins: list[tuple[DistName, Plugin]],
) -> None
Source code in packages/tangram/src/tangram/backend.py
193
194
195
196
197
198
199
200
201
202
203
204
async def run_server(
    backend_state: BackendState, loaded_plugins: list[tuple[DistName, Plugin]]
) -> None:
    app_instance = create_app(backend_state, loaded_plugins)
    server_config = uvicorn.Config(
        app_instance,
        host=backend_state.config.server.host,
        port=backend_state.config.server.port,
        log_config=get_log_config_dict(backend_state.config),
    )
    server = uvicorn.Server(server_config)
    await server.serve()

start_tasks async

start_tasks(config: Config) -> None
Source code in packages/tangram/src/tangram/backend.py
207
208
209
210
211
212
213
214
215
216
217
218
219
async def start_tasks(config: Config) -> None:
    loaded_plugins = load_enabled_plugins(config)

    async with AsyncExitStack() as stack:
        redis_client = await stack.enter_async_context(
            redis.from_url(config.core.redis_url)  # type: ignore
        )
        state = BackendState(redis_client=redis_client, config=config)

        server_task = asyncio.create_task(run_server(state, loaded_plugins))
        service_tasks = [s async for s in run_services(state, loaded_plugins)]

        await asyncio.gather(server_task, *service_tasks)

get_log_config_dict

get_log_config_dict(config: Config) -> dict[str, Any]
Source code in packages/tangram/src/tangram/backend.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def get_log_config_dict(config: Config) -> dict[str, Any]:
    def format_time(dt: datetime) -> str:
        return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ ")

    return {
        "version": 1,
        "disable_existing_loggers": False,
        "handlers": {
            "default": {
                "class": "rich.logging.RichHandler",
                "log_time_format": format_time,
                "omit_repeated_times": False,
            },
        },
        "root": {"handlers": ["default"], "level": config.core.log_level.upper()},
    }

config

ServerConfig dataclass

Source code in packages/tangram/src/tangram/config.py
 9
10
11
12
@dataclass
class ServerConfig:
    host: str = "127.0.0.1"
    port: int = 2346
host class-attribute instance-attribute
host: str = '127.0.0.1'
port class-attribute instance-attribute
port: int = 2346

ChannelConfig dataclass

Source code in packages/tangram/src/tangram/config.py
15
16
17
18
19
20
21
22
@dataclass
class ChannelConfig:
    host: str = "127.0.0.1"
    port: int = 2347
    jwt_secret: str = "secret"
    jwt_expiration_secs: int = 315360000  # 10 years
    id_length: int = 8
    python_tracing_subscriber: bool = False
host class-attribute instance-attribute
host: str = '127.0.0.1'
port class-attribute instance-attribute
port: int = 2347
jwt_secret class-attribute instance-attribute
jwt_secret: str = 'secret'
jwt_expiration_secs class-attribute instance-attribute
jwt_expiration_secs: int = 315360000
id_length class-attribute instance-attribute
id_length: int = 8
python_tracing_subscriber class-attribute instance-attribute
python_tracing_subscriber: bool = False

CoreConfig dataclass

Source code in packages/tangram/src/tangram/config.py
25
26
27
28
29
@dataclass
class CoreConfig:
    redis_url: str = "redis://127.0.0.1:6379"
    plugins: list[str] = field(default_factory=list)
    log_level: str = "INFO"
redis_url class-attribute instance-attribute
redis_url: str = 'redis://127.0.0.1:6379'
plugins class-attribute instance-attribute
plugins: list[str] = field(default_factory=list)
log_level class-attribute instance-attribute
log_level: str = 'INFO'

Config dataclass

Source code in packages/tangram/src/tangram/config.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@dataclass
class Config:
    core: CoreConfig = field(default_factory=CoreConfig)
    server: ServerConfig = field(default_factory=ServerConfig)
    channel: ChannelConfig = field(default_factory=ChannelConfig)
    plugins: dict[str, Any] = field(default_factory=dict)

    @classmethod
    def from_file(cls, config_path: Path) -> Config:
        if sys.version_info < (3, 11):
            import tomli as tomllib
        else:
            import tomllib
        from pydantic import TypeAdapter

        with open(config_path, "rb") as f:
            cfg_data = tomllib.load(f)

        config_adapter = TypeAdapter(cls)
        config = config_adapter.validate_python(cfg_data)
        return config
core class-attribute instance-attribute
core: CoreConfig = field(default_factory=CoreConfig)
server class-attribute instance-attribute
server: ServerConfig = field(default_factory=ServerConfig)
channel class-attribute instance-attribute
channel: ChannelConfig = field(
    default_factory=ChannelConfig
)
plugins class-attribute instance-attribute
plugins: dict[str, Any] = field(default_factory=dict)
from_file classmethod
from_file(config_path: Path) -> Config
Source code in packages/tangram/src/tangram/config.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@classmethod
def from_file(cls, config_path: Path) -> Config:
    if sys.version_info < (3, 11):
        import tomli as tomllib
    else:
        import tomllib
    from pydantic import TypeAdapter

    with open(config_path, "rb") as f:
        cfg_data = tomllib.load(f)

    config_adapter = TypeAdapter(cls)
    config = config_adapter.validate_python(cfg_data)
    return config

plugin

RouterFunc module-attribute

RouterFunc: TypeAlias = Callable[[], APIRouter]

ServiceAsyncFunc module-attribute

ServiceAsyncFunc: TypeAlias = Callable[
    [BackendState], Awaitable[None]
]

ServiceFunc module-attribute

ServiceFunc: TypeAlias = (
    ServiceAsyncFunc | Callable[[BackendState], None]
)

Priority module-attribute

Priority: TypeAlias = int

DistName module-attribute

DistName = NewType('DistName', str)

logger module-attribute

logger = getLogger(__name__)

Plugin dataclass

Source code in packages/tangram/src/tangram/plugin.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@dataclass
class Plugin:
    frontend_path: str | None = None
    routers: list[APIRouter] = field(default_factory=list)
    services: list[tuple[Priority, ServiceAsyncFunc]] = field(
        default_factory=list, init=False
    )

    def register_service(
        self, priority: Priority = 0
    ) -> Callable[[ServiceFunc], ServiceFunc]:
        def decorator(func: ServiceFunc) -> ServiceFunc:
            @functools.wraps(func)
            async def async_wrapper(backend_state: BackendState) -> None:
                if asyncio.iscoroutinefunction(func):
                    await func(backend_state)
                else:
                    await asyncio.to_thread(func, backend_state)

            self.services.append((priority, async_wrapper))
            return func

        return decorator
frontend_path class-attribute instance-attribute
frontend_path: str | None = None
routers class-attribute instance-attribute
routers: list[APIRouter] = field(default_factory=list)
services class-attribute instance-attribute
services: list[tuple[Priority, ServiceAsyncFunc]] = field(
    default_factory=list, init=False
)
register_service
register_service(
    priority: Priority = 0,
) -> Callable[[ServiceFunc], ServiceFunc]
Source code in packages/tangram/src/tangram/plugin.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def register_service(
    self, priority: Priority = 0
) -> Callable[[ServiceFunc], ServiceFunc]:
    def decorator(func: ServiceFunc) -> ServiceFunc:
        @functools.wraps(func)
        async def async_wrapper(backend_state: BackendState) -> None:
            if asyncio.iscoroutinefunction(func):
                await func(backend_state)
            else:
                await asyncio.to_thread(func, backend_state)

        self.services.append((priority, async_wrapper))
        return func

    return decorator

scan_plugins

scan_plugins() -> EntryPoints
Source code in packages/tangram/src/tangram/plugin.py
50
51
def scan_plugins() -> importlib.metadata.EntryPoints:
    return importlib.metadata.entry_points(group="tangram.plugins")

load_plugin

load_plugin(
    entry_point: EntryPoint,
) -> tuple[DistName, Plugin] | None
Source code in packages/tangram/src/tangram/plugin.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def load_plugin(
    entry_point: importlib.metadata.EntryPoint,
) -> tuple[DistName, Plugin] | None:
    try:
        plugin_instance = entry_point.load()
    except Exception as e:
        logger.error(
            f"failed to load plugin {entry_point.name}: {e}"
            f"\n= help: does {entry_point.value} exist?"
        )
        return None
    if not isinstance(plugin_instance, Plugin):
        logger.error(f"entry point {entry_point.name} is not an instance of `Plugin`")
        return None
    return DistName(entry_point.name), plugin_instance

redis

log module-attribute

log = getLogger(__name__)

StateT module-attribute

StateT = TypeVar('StateT')

Subscriber

Bases: ABC, Generic[StateT]

Source code in packages/tangram/src/tangram/redis.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
class Subscriber(abc.ABC, Generic[StateT]):
    redis: Redis
    task: asyncio.Task[None]
    pubsub: PubSub

    def __init__(
        self, name: str, redis_url: str, channels: List[str], initial_state: StateT
    ):
        self.name = name
        self.redis_url: str = redis_url
        self.channels: List[str] = channels
        self.state: StateT = initial_state
        self._running = False

    async def subscribe(self) -> None:
        if self._running:
            log.warning("%s already running", self.name)
            return

        try:
            self.redis = await Redis.from_url(self.redis_url)
            self.pubsub = self.redis.pubsub()
            await self.pubsub.psubscribe(*self.channels)
        except RedisError as e:
            log.error("%s failed to connect to Redis: %s", self.name, e)
            raise

        async def listen() -> None:
            try:
                log.info("%s listening ...", self.name)
                async for message in self.pubsub.listen():
                    log.debug("message: %s", message)
                    if message["type"] == "pmessage":
                        await self.message_handler(
                            message["channel"].decode("utf-8"),
                            message["data"].decode("utf-8"),
                            message["pattern"].decode("utf-8"),
                            self.state,
                        )
            except asyncio.CancelledError:
                log.warning("%s cancelled", self.name)

        self._running = True

        self.task = asyncio.create_task(listen())
        log.info("%s task created, running ...", self.name)

    async def cleanup(self) -> None:
        if not self._running:
            return

        if self.task:
            log.debug("%s canceling task ...", self.name)
            self.task.cancel()
            try:
                log.debug("%s await task to finish ...", self.name)
                await self.task
                log.debug("%s task canceled", self.name)
            except asyncio.CancelledError as exc:
                log.error("%s task canceling error: %s", self.name, exc)
        if self.pubsub:
            await self.pubsub.unsubscribe()
        if self.redis:
            await self.redis.close()
        self._running = False

    def is_active(self) -> bool:
        """Return True if the subscriber is actively listening."""
        return self._running and self.task is not None and not self.task.done()

    @abc.abstractmethod
    async def message_handler(
        self, event: str, payload: str, pattern: str, state: StateT
    ) -> None:
        pass
redis instance-attribute
redis: Redis
task instance-attribute
task: Task[None]
pubsub instance-attribute
pubsub: PubSub
name instance-attribute
name = name
redis_url instance-attribute
redis_url: str = redis_url
channels instance-attribute
channels: List[str] = channels
state instance-attribute
state: StateT = initial_state
subscribe async
subscribe() -> None
Source code in packages/tangram/src/tangram/redis.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def subscribe(self) -> None:
    if self._running:
        log.warning("%s already running", self.name)
        return

    try:
        self.redis = await Redis.from_url(self.redis_url)
        self.pubsub = self.redis.pubsub()
        await self.pubsub.psubscribe(*self.channels)
    except RedisError as e:
        log.error("%s failed to connect to Redis: %s", self.name, e)
        raise

    async def listen() -> None:
        try:
            log.info("%s listening ...", self.name)
            async for message in self.pubsub.listen():
                log.debug("message: %s", message)
                if message["type"] == "pmessage":
                    await self.message_handler(
                        message["channel"].decode("utf-8"),
                        message["data"].decode("utf-8"),
                        message["pattern"].decode("utf-8"),
                        self.state,
                    )
        except asyncio.CancelledError:
            log.warning("%s cancelled", self.name)

    self._running = True

    self.task = asyncio.create_task(listen())
    log.info("%s task created, running ...", self.name)
cleanup async
cleanup() -> None
Source code in packages/tangram/src/tangram/redis.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
async def cleanup(self) -> None:
    if not self._running:
        return

    if self.task:
        log.debug("%s canceling task ...", self.name)
        self.task.cancel()
        try:
            log.debug("%s await task to finish ...", self.name)
            await self.task
            log.debug("%s task canceled", self.name)
        except asyncio.CancelledError as exc:
            log.error("%s task canceling error: %s", self.name, exc)
    if self.pubsub:
        await self.pubsub.unsubscribe()
    if self.redis:
        await self.redis.close()
    self._running = False
is_active
is_active() -> bool

Return True if the subscriber is actively listening.

Source code in packages/tangram/src/tangram/redis.py
81
82
83
def is_active(self) -> bool:
    """Return True if the subscriber is actively listening."""
    return self._running and self.task is not None and not self.task.done()
message_handler abstractmethod async
message_handler(
    event: str, payload: str, pattern: str, state: StateT
) -> None
Source code in packages/tangram/src/tangram/redis.py
85
86
87
88
89
@abc.abstractmethod
async def message_handler(
    self, event: str, payload: str, pattern: str, state: StateT
) -> None:
    pass

tangram._channel

ChannelConfig

host property writable

host: str

port property writable

port: int

redis_url property writable

redis_url: str

jwt_secret property writable

jwt_secret: str

jwt_expiration_secs property writable

jwt_expiration_secs: int

id_length property writable

id_length: int

__new__

__new__(
    host: str,
    port: int,
    redis_url: str,
    jwt_secret: str,
    jwt_expiration_secs: int,
    id_length: int,
) -> ChannelConfig

init_tracing_python

init_tracing_python(py_layer: Any, filter_str: str) -> None

init_tracing_stderr

init_tracing_stderr(filter_str: str) -> None

run

run(config: ChannelConfig) -> Any