Integrating Pydantic AI with Streamlit: Async Challenges and Fragment Optimisations

September 07, 2025

By Rian Dolphin

Streamlit's approach creates friction when working with async libraries. Pydantic AI is built around Python's async/await patterns, while Streamlit's main execution model is synchronous. This post is just a brain dump (LLM assisted) of some things I figured out when trying to make Streamlit play nicely with Pydantic AI.

The full code is provided for a simple Streamlit x PydanticAI chat UI. Skip ahead if you already know about sync vs async in Python, or just want the code.

Understanding Python's Async Model

To understand async/await, it helps to first understand how regular synchronous Python code executes.

Synchronous Execution: The Default

In normal Python code, statements execute sequentially, one after another:

import time

def task_a():
    print("Task A starting")
    time.sleep(2)  # Blocks for 2 seconds - nothing else can run
    print("Task A finished")

def task_b():
    print("Task B starting")
    time.sleep(1)  # Blocks for 1 second
    print("Task B finished")

# Synchronous execution
start = time.time()
task_a()  # Must complete entirely before task_b starts
task_b()  # Only starts after task_a finishes
print(f"Total time: {time.time() - start:.1f} seconds")

# Output:
# Task A starting
# Task A finished (after 2 seconds)
# Task B starting
# Task B finished (after 1 more second)
# Total time: 3.0 seconds

When task_a() calls time.sleep(2), the entire Python interpreter stops and waits. Nothing else can execute during those 2 seconds. This is blocking behavior - the thread is occupied doing nothing productive while waiting for the sleep to complete.

Asynchronous Execution: Cooperative Multitasking

Python's async/await syntax provides cooperative concurrency. When you mark a function with async def, it becomes a coroutine — i.e. a function that can be paused and resumed:

import asyncio
import time

async def task_a():
    print("Task A starting")
    time.sleep(1)  # Blocking call — does NOT yield to event loop
    print("Task A about to await")
    await asyncio.sleep(2)  # Now we yield control
    print("Task A finished")

async def task_b():
    print("Task B starting")
    await asyncio.sleep(1)
    print("Task B finished")

# Asynchronous execution
async def main():
    start = time.time()
    await asyncio.gather(task_a(), task_b())  # Run concurrently
    print(f"Total time: {time.time() - start:.1f} seconds")

asyncio.run(main())

# Sample Output:
# Task A starting
# (1-second pause due to blocking time.sleep)
# Task A about to await
# Task B starting
# Task B finished (after 1 second)
# Task A finished (after 2 seconds more)
# Total time: ~3.0 seconds

The key difference: await asyncio.sleep(2) doesn't block the interpreter. Instead, it tells the event loop "I'm waiting for something, run other code while I wait." This allows both tasks to run concurrently on a single thread.

Understanding the Event Loop

The event loop is the core mechanism that makes this cooperative multitasking work. Think of it as a scheduler that manages the execution of coroutines:

  1. Keeps track of tasks: The event loop maintains a queue of coroutines that need to run
  2. Handles suspension: When a coroutine hits an await, the event loop pauses it and switches to another task
  3. Manages I/O operations: It monitors network sockets, file operations, and timers in the background
  4. Resumes tasks: When an awaited operation completes, the event loop resumes the paused coroutine

Here's what happens step-by-step in the async example above:

  1. asyncio.run(main()) starts the event loop and runs main()

  2. Inside main(), asyncio.gather(...) schedules task_a() and task_b() concurrently

  3. Event loop begins executing task_a()

  4. task_a() prints "Task A starting"

  5. task_a() runs time.sleep(1) — this blocks the entire event loop for 1 second

  6. After the blocking sleep, task_a() prints "Task A about to await"

  7. task_a() hits await asyncio.sleep(2) and yields control back to the event loop

  8. Event loop now starts executing task_b() (which was waiting to run)

  9. task_b() prints "Task B starting" and hits await asyncio.sleep(1), yielding control

  10. Event loop is now waiting for two timers:

    • task_b()'s 1-second sleep
    • task_a()'s 2-second sleep (which started 1 second ago)
  11. After 1 second:

    • task_b()'s timer completes
    • Event loop resumes task_b()
  12. task_b() prints "Task B finished" and completes

  13. After another 1 second (2 seconds total since task_a()'s sleep started):

    • task_a()'s timer completes
    • Event loop resumes task_a()
  14. task_a() prints "Task A finished" and completes

  15. main() prints total runtime (approximately 3 seconds), and the program exits

Key Takeaway: Only One Task Runs at a Time

Although task_a() and task_b() are scheduled concurrently using asyncio.gather(), only one coroutine runs at a time. The event loop switches between them only when a coroutine hits an await.

In this example, task_a() blocks the entire program for 1 second because of the time.sleep(1) — this is a synchronous (non-cooperative) delay (think some time consuming loop). During that time, task_b() can’t start.

To avoid blocking the event loop, use non-blocking calls inside async functions when possible.

Random thought: it'd be nice if you could use Polars async. Since the lazy API offloads to rust, seems like it should be possible.

This is fundamentally different from threading. So async stuff is great but it's still only one thread. Think the GIL lock etc. here.

When Async Doesn't Help: Data Dependencies

Async only provides benefits when operations can run independently. If one operation depends on the output of another, you still have to wait of course:

This is why Pydantic AI's streaming in a chat UI is particularly well-suited to async - each token in the stream can be processed as it arrives without waiting for the complete response. But if you needed the full response before proceeding to the next step, async wouldn't help with that dependency chain. I.e. in a batch agentic workflow where one agent depends on the output of the previous.

Also, async functions must be called from within an async context. You cannot directly call await fetch_data() from synchronous code. Instead, you need an event loop to manage the execution.

Using asyncio.run() creates a new event loop, runs the coroutine to completion, then closes the loop. This is convenient for top-level entry points but creates overhead if called repeatedly.

# Synchronous context - need to create event loop
result = asyncio.run(fetch_data())

# Async context - can await directly
async def main():
    result = await fetch_data()

Pydantic AI's Async Design

Pydantic AI's streaming interface is async:

async with agent.run_stream(prompt) as response:
    async for text in response.stream_text():
        print(text)

This design makes sense for LLM applications where you're waiting for tokens to stream back from remote APIs. The async pattern allows your application to remain responsive while processing the stream.

The Streamlit Bridge

The solution uses asyncio.run() to execute async code within Streamlit's synchronous environment. It is exceptionally simple and if you factor out the two helper functions, boils down to not much code at all.

import asyncio
import streamlit as st
from pydantic_ai import Agent
from pydantic_ai.messages import (
    ModelMessage,
    ModelRequest,
    ModelResponse,
    TextPart,
    UserPromptPart,
)

def convert_messages_to_pydantic_ai(messages: list[dict]) -> list[ModelMessage]:
    """Convert simple chat messages to pydantic-ai format"""
    pydantic_messages = []

    for msg in messages:
        if msg["role"] == "user":
            pydantic_messages.append(
                ModelRequest(parts=[UserPromptPart(content=msg["content"])])
            )
        elif msg["role"] == "assistant":
            pydantic_messages.append(
                ModelResponse(parts=[TextPart(content=msg["content"])])
            )

    return pydantic_messages

async def stream_agent_response(
    agent: Agent, prompt: str, message_history: list[ModelMessage] = None
) -> str:
    """Stream response from agent with conversation history support"""
    message_placeholder = st.empty()
    full_response = ""

    async with agent.run_stream(prompt, message_history=message_history) as response:
        async for text in response.stream_text():
            full_response = text  # pydantic-ai gives cumulative text
            message_placeholder.markdown(full_response + "▌")

    # Remove cursor and show final text
    message_placeholder.markdown(full_response)
    return full_response

st.set_page_config(page_title="Streamlit + Pydantic-AI", page_icon="⚡")
st.markdown("### Streamlit + Pydantic-AI Streaming Demo")

# Initialise session state
if "messages" not in st.session_state:
    st.session_state.messages = []

# Create agent
agent = Agent(
    "openai:gpt-4o",
    system_prompt="You are a helpful assistant. Be conversational and engaging.",
)

# Display chat history
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

# Chat input
if prompt := st.chat_input("Type your message..."):
    # Add user message to history and display
    st.session_state.messages.append({"role": "user", "content": prompt})
    with st.chat_message("user"):
        st.markdown(prompt)

    # Stream the assistant response
    with st.chat_message("assistant"):
        # Convert chat history to pydantic-ai format
        message_history = convert_messages_to_pydantic_ai(st.session_state.messages)

        # Stream the response using asyncio.run
        full_response = asyncio.run(
            stream_agent_response(agent, prompt, message_history)
        )

    # Add assistant message to history
    st.session_state.messages.append({"role": "assistant", "content": full_response})

Key Implementation Details

Message Format Conversion: Streamlit's chat interface uses simple dictionaries with "role" and "content" keys. Pydantic AI requires its own message format using ModelRequest and ModelResponse objects. The convert_messages_to_pydantic_ai() function handles this conversion.

Streaming with st.empty(): The streaming effect uses st.empty() to create a placeholder that can be updated in real-time. The cursor character "▌" provides visual feedback that the response is still generating.

I initially thought this approach might mean full page reloads after each chunk, which of course wouldn't be good for networking costs. I ran tests and verified this is not happening.

Final Thought

I'm interested in trying out Pydantic AI with FastHTML instead of Streamlit but it takes a bit more to get up and running.

Related Posts

Pydantic AI Output Validators: Runtime Validation with Dependencies

July 15, 2025

ai
pydantic-ai
pydantic
Read More

Building a Caching System with FastHTML

April 11, 2025

dev
fasthtml
dev
Read More

Triggering Multiple Routes from a Single Form - FastHTML & HTMX

March 23, 2025

dev
fasthtml
htmx
Read More