September 07, 2025
By Rian Dolphin
Streamlit's approach creates friction when working with async libraries. Pydantic AI is built around Python's async/await patterns, while Streamlit's main execution model is synchronous. This post is just a brain dump (LLM assisted) of some things I figured out when trying to make Streamlit play nicely with Pydantic AI.
The full code is provided for a simple Streamlit x PydanticAI chat UI. Skip ahead if you already know about sync vs async in Python, or just want the code.
To understand async/await, it helps to first understand how regular synchronous Python code executes.
In normal Python code, statements execute sequentially, one after another:
import time
def task_a():
print("Task A starting")
time.sleep(2) # Blocks for 2 seconds - nothing else can run
print("Task A finished")
def task_b():
print("Task B starting")
time.sleep(1) # Blocks for 1 second
print("Task B finished")
# Synchronous execution
start = time.time()
task_a() # Must complete entirely before task_b starts
task_b() # Only starts after task_a finishes
print(f"Total time: {time.time() - start:.1f} seconds")
# Output:
# Task A starting
# Task A finished (after 2 seconds)
# Task B starting
# Task B finished (after 1 more second)
# Total time: 3.0 seconds
When task_a()
calls time.sleep(2)
, the entire Python interpreter stops and waits. Nothing else can execute during those 2 seconds. This is blocking behavior - the thread is occupied doing nothing productive while waiting for the sleep to complete.
Python's async/await syntax provides cooperative concurrency. When you mark a function with async def
, it becomes a coroutine — i.e. a function that can be paused and resumed:
import asyncio
import time
async def task_a():
print("Task A starting")
time.sleep(1) # Blocking call — does NOT yield to event loop
print("Task A about to await")
await asyncio.sleep(2) # Now we yield control
print("Task A finished")
async def task_b():
print("Task B starting")
await asyncio.sleep(1)
print("Task B finished")
# Asynchronous execution
async def main():
start = time.time()
await asyncio.gather(task_a(), task_b()) # Run concurrently
print(f"Total time: {time.time() - start:.1f} seconds")
asyncio.run(main())
# Sample Output:
# Task A starting
# (1-second pause due to blocking time.sleep)
# Task A about to await
# Task B starting
# Task B finished (after 1 second)
# Task A finished (after 2 seconds more)
# Total time: ~3.0 seconds
The key difference: await asyncio.sleep(2)
doesn't block the interpreter. Instead, it tells the event loop "I'm waiting for something, run other code while I wait." This allows both tasks to run concurrently on a single thread.
The event loop is the core mechanism that makes this cooperative multitasking work. Think of it as a scheduler that manages the execution of coroutines:
await
, the event loop pauses it and switches to another taskHere's what happens step-by-step in the async example above:
asyncio.run(main())
starts the event loop and runs main()
Inside main()
, asyncio.gather(...)
schedules task_a()
and task_b()
concurrently
Event loop begins executing task_a()
task_a()
prints "Task A starting"
task_a()
runs time.sleep(1)
— this blocks the entire event loop for 1 second
After the blocking sleep, task_a()
prints "Task A about to await"
task_a()
hits await asyncio.sleep(2)
and yields control back to the event loop
Event loop now starts executing task_b()
(which was waiting to run)
task_b()
prints "Task B starting"
and hits await asyncio.sleep(1)
, yielding control
Event loop is now waiting for two timers:
task_b()
's 1-second sleeptask_a()
's 2-second sleep (which started 1 second ago)After 1 second:
task_b()
's timer completestask_b()
task_b()
prints "Task B finished"
and completes
After another 1 second (2 seconds total since task_a()
's sleep started):
task_a()
's timer completestask_a()
task_a()
prints "Task A finished"
and completes
main()
prints total runtime (approximately 3 seconds), and the program exits
Although task_a()
and task_b()
are scheduled concurrently using asyncio.gather()
, only one coroutine runs at a time. The event loop switches between them only when a coroutine hits an await
.
In this example, task_a()
blocks the entire program for 1 second because of the time.sleep(1)
— this is a synchronous (non-cooperative) delay (think some time consuming loop). During that time, task_b()
can’t start.
To avoid blocking the event loop, use non-blocking calls inside async functions when possible.
Random thought: it'd be nice if you could use Polars async. Since the lazy API offloads to rust, seems like it should be possible.
This is fundamentally different from threading. So async stuff is great but it's still only one thread. Think the GIL lock etc. here.
Async only provides benefits when operations can run independently. If one operation depends on the output of another, you still have to wait of course:
This is why Pydantic AI's streaming in a chat UI is particularly well-suited to async - each token in the stream can be processed as it arrives without waiting for the complete response. But if you needed the full response before proceeding to the next step, async wouldn't help with that dependency chain. I.e. in a batch agentic workflow where one agent depends on the output of the previous.
Also, async functions must be called from within an async context. You cannot directly call await fetch_data()
from synchronous code. Instead, you need an event loop to manage the execution.
Using asyncio.run()
creates a new event loop, runs the coroutine to completion, then closes the loop. This is convenient for top-level entry points but creates overhead if called repeatedly.
# Synchronous context - need to create event loop
result = asyncio.run(fetch_data())
# Async context - can await directly
async def main():
result = await fetch_data()
Pydantic AI's streaming interface is async:
async with agent.run_stream(prompt) as response:
async for text in response.stream_text():
print(text)
This design makes sense for LLM applications where you're waiting for tokens to stream back from remote APIs. The async pattern allows your application to remain responsive while processing the stream.
The solution uses asyncio.run()
to execute async code within Streamlit's synchronous environment. It is exceptionally simple and if you factor out the two helper functions, boils down to not much code at all.
import asyncio
import streamlit as st
from pydantic_ai import Agent
from pydantic_ai.messages import (
ModelMessage,
ModelRequest,
ModelResponse,
TextPart,
UserPromptPart,
)
def convert_messages_to_pydantic_ai(messages: list[dict]) -> list[ModelMessage]:
"""Convert simple chat messages to pydantic-ai format"""
pydantic_messages = []
for msg in messages:
if msg["role"] == "user":
pydantic_messages.append(
ModelRequest(parts=[UserPromptPart(content=msg["content"])])
)
elif msg["role"] == "assistant":
pydantic_messages.append(
ModelResponse(parts=[TextPart(content=msg["content"])])
)
return pydantic_messages
async def stream_agent_response(
agent: Agent, prompt: str, message_history: list[ModelMessage] = None
) -> str:
"""Stream response from agent with conversation history support"""
message_placeholder = st.empty()
full_response = ""
async with agent.run_stream(prompt, message_history=message_history) as response:
async for text in response.stream_text():
full_response = text # pydantic-ai gives cumulative text
message_placeholder.markdown(full_response + "▌")
# Remove cursor and show final text
message_placeholder.markdown(full_response)
return full_response
st.set_page_config(page_title="Streamlit + Pydantic-AI", page_icon="⚡")
st.markdown("### Streamlit + Pydantic-AI Streaming Demo")
# Initialise session state
if "messages" not in st.session_state:
st.session_state.messages = []
# Create agent
agent = Agent(
"openai:gpt-4o",
system_prompt="You are a helpful assistant. Be conversational and engaging.",
)
# Display chat history
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Chat input
if prompt := st.chat_input("Type your message..."):
# Add user message to history and display
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# Stream the assistant response
with st.chat_message("assistant"):
# Convert chat history to pydantic-ai format
message_history = convert_messages_to_pydantic_ai(st.session_state.messages)
# Stream the response using asyncio.run
full_response = asyncio.run(
stream_agent_response(agent, prompt, message_history)
)
# Add assistant message to history
st.session_state.messages.append({"role": "assistant", "content": full_response})
Message Format Conversion: Streamlit's chat interface uses simple dictionaries with "role" and "content" keys. Pydantic AI requires its own message format using ModelRequest
and ModelResponse
objects. The convert_messages_to_pydantic_ai()
function handles this conversion.
Streaming with st.empty()
: The streaming effect uses st.empty()
to create a placeholder that can be updated in real-time. The cursor character "▌" provides visual feedback that the response is still generating.
I initially thought this approach might mean full page reloads after each chunk, which of course wouldn't be good for networking costs. I ran tests and verified this is not happening.
I'm interested in trying out Pydantic AI with FastHTML instead of Streamlit but it takes a bit more to get up and running.