Skip to content

WebSocket Client

The datadivr client provides a simple interface for connecting to a WebSocket server and handling real-time communication.

Basic Usage

import asyncio
from datadivr import WebSocketClient, HandlerType, websocket_handler, WebSocketMessage
from typing import Optional

# Define handlers
@websocket_handler("sum_handler_result", HandlerType.CLIENT)
async def handle_sum_result(message: WebSocketMessage) -> None:
    """Handle the result of a sum calculation."""
    print(f"Sum result from {message.from_id}: {message.payload}")

@websocket_handler("msg", HandlerType.CLIENT)
async def msg_handler(message: WebSocketMessage) -> None:
    """Handle text messages."""
    print(f">> {message.from_id}: '{message.message}'")

# Create and run client
async def run_client() -> None:
    # Create and connect client
    client = WebSocketClient("ws://localhost:8765/ws")
    await client.connect()

    # Send a calculation request
    await client.send_message(
        payload={"numbers": [391, 29]},
        event_name="sum_event"
    )

    # Create tasks for message handling
    tasks = [
        asyncio.create_task(client.receive_messages()),
    ]

    try:
        await asyncio.gather(*tasks)
    finally:
        for task in tasks:
            task.cancel()
        await client.disconnect()

# Run the client
asyncio.run(run_client())

Built-in Handlers

The client comes with several built-in handlers:

Sum Handler

@websocket_handler("sum_event_client", HandlerType.CLIENT)
async def sum_handler(message: WebSocketMessage) -> None:
    """Handle sum calculation results."""
    print(f"Sum result from {message.from_id}: {message.payload}")

Message Handler

@websocket_handler("msg", HandlerType.CLIENT)
async def msg_handler(message: WebSocketMessage) -> None:
    """Handle text messages."""
    print(f">> {message.from_id}({message.event_name}): '{message.message}'")

Interactive Usage

Here's how the CLI client uses these handlers:

async def run_client() -> None:
    # Create and connect client
    client = WebSocketClient(f"ws://{host}:{port}/ws")
    await client.connect()

    # Create tasks for message handling and user input
    tasks = [
        asyncio.create_task(client.receive_messages()),
        asyncio.create_task(input_loop(client)),
    ]

    try:
        await asyncio.gather(*tasks)
    finally:
        for task in tasks:
            task.cancel()
        await client.disconnect()

# Example JSON messages to send:
# Sum calculation:
{"event_name": "sum_event", "payload": {"numbers": [391, 29]}}
# Broadcast message:
{"event_name": "msg", "to": "all", "message": "hello"}

Message Types

  1. Sum Calculation:
await client.send_message(
    payload={"numbers": [1, 2, 3]},
    event_name="sum_event"
)
  1. Text Messages:
await client.send_message(
    message="Hello everyone!",
    event_name="msg",
    to="all"
)
  1. Custom Events:
await client.send_message(
    payload={"data": "custom_data"},
    event_name="custom_event",
    to="specific_client_id"
)

Error Handling

The client handles several error conditions:

  • NotConnectedError: Raised when trying to send messages before connecting
  • ConnectionClosed: Handled during message reception
  • Invalid message formats: Logged and handled gracefully

Connection Lifecycle

  1. Connection:
client = WebSocketClient("ws://localhost:8765/ws")
await client.connect()  # Automatically registers handlers
  1. Message Loop:
# Start receiving messages (blocks until connection closes)
await client.receive_messages()
  1. Disconnection:
await client.disconnect()  # Clean up connection

Reference

datadivr.transport.client.WebSocketClient

A WebSocket client for communicating with a datadivr server.

This class handles the connection to a WebSocket server, message sending, and event handling for received messages.

Attributes:

Name Type Description
uri

The WebSocket server URI to connect to

handlers

Dictionary of registered event handlers

websocket WebSocketClientProtocol | None

The active WebSocket connection (if connected)

Example
client = WebSocketClient("ws://localhost:8765/ws")
await client.connect()
await client.send_message(payload=data, event_name="custom_event")
Source code in datadivr/transport/client.py
class WebSocketClient:
    """A WebSocket client for communicating with a datadivr server.

    This class handles the connection to a WebSocket server, message sending,
    and event handling for received messages.

    Attributes:
        uri: The WebSocket server URI to connect to
        handlers: Dictionary of registered event handlers
        websocket: The active WebSocket connection (if connected)

    Example:
        ```python
        client = WebSocketClient("ws://localhost:8765/ws")
        await client.connect()
        await client.send_message(payload=data, event_name="custom_event")
        ```
    """

    def __init__(self, uri: str):
        """Initialize the WebSocket client.

        Args:
            uri: The WebSocket server URI to connect to
        """
        self.uri = uri
        self.handlers = get_handlers(HandlerType.CLIENT)
        self.websocket: WebSocketClientProtocol | None = None
        self.logger = get_logger(__name__)

    async def connect(self) -> None:
        """Connect to the WebSocket server and send initial handler information."""
        try:
            self.websocket = await websockets.connect(self.uri)
            # await self.send_handler_names()
        except ConnectionRefusedError as e:
            self.logger.exception("connection_refused", error=str(e))
            raise
        except Exception as e:
            self.logger.exception("unexpected_error_during_connection", error=str(e))
            raise

    async def receive_messages(self) -> None:
        """Listen for incoming messages from the server."""
        if not self.websocket:
            raise NotConnectedError()

        try:
            async for message in self.websocket:
                self.logger.info("raw_message_received", raw_message=message)
                event_data = json.loads(message)
                self.logger.info("message_received", event_data=event_data)
                await self.handle_event(event_data, self.websocket)
        except websockets.exceptions.ConnectionClosed:
            self.logger.info("connection_closed")
        finally:
            await self.disconnect()

    async def handle_event(self, event_data: dict, websocket: WebSocketClientProtocol) -> None:
        """Handle an incoming event using registered handlers.

        Args:
            event_data: The received event data
            websocket: The WebSocket connection to use for responses
        """
        event_name = event_data["event_name"]
        if event_name in self.handlers:
            self.logger.info("handling_event", event_name=event_name)
            handler = self.handlers[event_name]
            message = WebSocketMessage.model_validate(event_data)
            response = await handler(message)
            if response and isinstance(response, WebSocketMessage):
                await send_message(websocket, response)
        else:
            self.logger.debug(
                "no_handler_for_event", event_name=event_name, event_data=json.dumps(event_data, indent=2)
            )

    async def send_message(self, payload: Any, event_name: str, msg: str | None = None, to: str = "others") -> None:
        """Send a message to the server.

        Args:
            payload: The message payload
            event_name: The name of the event
            msg: Optional text message
            to: The recipient of the message (default: "others")

        Raises:
            NotConnectedError: If called before connecting to the server
        """
        if self.websocket:
            message = WebSocketMessage(event_name=event_name, payload=payload, to=to, message=msg)
            await send_message(self.websocket, message)
        else:
            raise NotConnectedError()

    async def disconnect(self) -> None:
        """Close the WebSocket connection."""
        if self.websocket:
            try:
                await self.websocket.close()
            except Exception:
                self.logger.exception("error_closing_connection")
            finally:
                self.websocket = None

    async def send_handler_names(self) -> None:
        """Send a message with the names of all registered handlers.

        This is called automatically after connection to inform the server
        about available client-side handlers.
        """
        handler_names = list(self.handlers.keys())
        payload = {"handlers": handler_names}
        self.logger.info("sending_handler_names", handlers=handler_names)
        await self.send_message(payload=payload, event_name="CLI_HELLO", to="others")

Functions

__init__(uri)

Initialize the WebSocket client.

Parameters:

Name Type Description Default
uri str

The WebSocket server URI to connect to

required
Source code in datadivr/transport/client.py
def __init__(self, uri: str):
    """Initialize the WebSocket client.

    Args:
        uri: The WebSocket server URI to connect to
    """
    self.uri = uri
    self.handlers = get_handlers(HandlerType.CLIENT)
    self.websocket: WebSocketClientProtocol | None = None
    self.logger = get_logger(__name__)

connect() async

Connect to the WebSocket server and send initial handler information.

Source code in datadivr/transport/client.py
async def connect(self) -> None:
    """Connect to the WebSocket server and send initial handler information."""
    try:
        self.websocket = await websockets.connect(self.uri)
        # await self.send_handler_names()
    except ConnectionRefusedError as e:
        self.logger.exception("connection_refused", error=str(e))
        raise
    except Exception as e:
        self.logger.exception("unexpected_error_during_connection", error=str(e))
        raise

disconnect() async

Close the WebSocket connection.

Source code in datadivr/transport/client.py
async def disconnect(self) -> None:
    """Close the WebSocket connection."""
    if self.websocket:
        try:
            await self.websocket.close()
        except Exception:
            self.logger.exception("error_closing_connection")
        finally:
            self.websocket = None

handle_event(event_data, websocket) async

Handle an incoming event using registered handlers.

Parameters:

Name Type Description Default
event_data dict

The received event data

required
websocket WebSocketClientProtocol

The WebSocket connection to use for responses

required
Source code in datadivr/transport/client.py
async def handle_event(self, event_data: dict, websocket: WebSocketClientProtocol) -> None:
    """Handle an incoming event using registered handlers.

    Args:
        event_data: The received event data
        websocket: The WebSocket connection to use for responses
    """
    event_name = event_data["event_name"]
    if event_name in self.handlers:
        self.logger.info("handling_event", event_name=event_name)
        handler = self.handlers[event_name]
        message = WebSocketMessage.model_validate(event_data)
        response = await handler(message)
        if response and isinstance(response, WebSocketMessage):
            await send_message(websocket, response)
    else:
        self.logger.debug(
            "no_handler_for_event", event_name=event_name, event_data=json.dumps(event_data, indent=2)
        )

receive_messages() async

Listen for incoming messages from the server.

Source code in datadivr/transport/client.py
async def receive_messages(self) -> None:
    """Listen for incoming messages from the server."""
    if not self.websocket:
        raise NotConnectedError()

    try:
        async for message in self.websocket:
            self.logger.info("raw_message_received", raw_message=message)
            event_data = json.loads(message)
            self.logger.info("message_received", event_data=event_data)
            await self.handle_event(event_data, self.websocket)
    except websockets.exceptions.ConnectionClosed:
        self.logger.info("connection_closed")
    finally:
        await self.disconnect()

send_handler_names() async

Send a message with the names of all registered handlers.

This is called automatically after connection to inform the server about available client-side handlers.

Source code in datadivr/transport/client.py
async def send_handler_names(self) -> None:
    """Send a message with the names of all registered handlers.

    This is called automatically after connection to inform the server
    about available client-side handlers.
    """
    handler_names = list(self.handlers.keys())
    payload = {"handlers": handler_names}
    self.logger.info("sending_handler_names", handlers=handler_names)
    await self.send_message(payload=payload, event_name="CLI_HELLO", to="others")

send_message(payload, event_name, msg=None, to='others') async

Send a message to the server.

Parameters:

Name Type Description Default
payload Any

The message payload

required
event_name str

The name of the event

required
msg str | None

Optional text message

None
to str

The recipient of the message (default: "others")

'others'

Raises:

Type Description
NotConnectedError

If called before connecting to the server

Source code in datadivr/transport/client.py
async def send_message(self, payload: Any, event_name: str, msg: str | None = None, to: str = "others") -> None:
    """Send a message to the server.

    Args:
        payload: The message payload
        event_name: The name of the event
        msg: Optional text message
        to: The recipient of the message (default: "others")

    Raises:
        NotConnectedError: If called before connecting to the server
    """
    if self.websocket:
        message = WebSocketMessage(event_name=event_name, payload=payload, to=to, message=msg)
        await send_message(self.websocket, message)
    else:
        raise NotConnectedError()

options: show_root_heading: true show_source: true