Skip to content

Transport

For details about the transport layer, see the Server and Client reference pages.

Messages

datadivr.transport.messages

Classes

Functions

create_error_message(error_msg, to, websocket=None)

Create a standardized error message.

Source code in datadivr/transport/messages.py
def create_error_message(error_msg: str, to: str, websocket: WebSocket | None = None) -> WebSocketMessage:
    """Create a standardized error message."""
    return WebSocketMessage(event_name="error", message=error_msg, to=to, websocket=websocket)

create_message(event_name, payload, to, message=None, websocket=None)

Create a standardized WebSocket message.

Source code in datadivr/transport/messages.py
def create_message(
    event_name: str, payload: Any, to: str, message: str | None = None, websocket: WebSocket | None = None
) -> WebSocketMessage:
    """Create a standardized WebSocket message."""
    return WebSocketMessage(event_name=event_name, payload=payload, to=to, message=message, websocket=websocket)

send_message(websocket, message) async

Send a message over a WebSocket connection.

Source code in datadivr/transport/messages.py
@BackgroundTasks.task(name="send_message")
async def send_message(websocket: Any, message: WebSocketMessage) -> None:
    """Send a message over a WebSocket connection."""
    message_data = message.model_dump(exclude={"websocket"})
    logger.debug("send_message", message=message_data)

    # Check if it's a FastAPI WebSocket
    if hasattr(websocket, "send_json"):
        await websocket.send_json(message_data)
    # Check if it's a websockets WebSocket
    elif hasattr(websocket, "send"):
        await websocket.send(json.dumps(message_data))
    else:
        raise UnsupportedWebSocketTypeError()

options: show_root_heading: true show_source: true