OpenAI API streaming Python: stream GPT responses token-by-token

Without streaming, your code waits for the entire GPT response before displaying anything — for a long answer, that can mean 10+ seconds of nothing. The OpenAI API supports server-sent events (SSE) streaming: pass stream=True and the model sends tokens as they are generated. This guide covers the OpenAI Python SDK v1.x patterns — basic streaming, the context manager form, async for FastAPI, tool call accumulation, and getting usage stats (which are not included by default).

The 30-second answer

Basic streaming

The minimal streaming pattern: pass stream=True, iterate over the result, and read chunk.choices[0].delta.content. The delta content is None on non-text chunks (the first and last chunks, and tool call chunks), so always guard against it:

from openai import OpenAI

client = OpenAI()

stream = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Write a short poem about APIs."}],
    stream=True,
)

for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="", flush=True)

Each chunk is a ChatCompletionChunk object. The text you want is at chunk.choices[0].delta.content. The delta is additive — each string is a new fragment to append to what you have already received, not a replacement. The final chunk has chunk.choices[0].finish_reason set to "stop" (or "length", "tool_calls", etc.) and delta.content is None.

Context manager pattern (cleaner resource handling)

Using the stream as a context manager ensures the underlying HTTP connection is closed even if your loop raises an exception:

from openai import OpenAI

client = OpenAI()

with client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello"}],
    stream=True,
) as stream:
    for chunk in stream:
        if not chunk.choices:
            # Final usage chunk when stream_options={"include_usage": True}
            continue
        delta = chunk.choices[0].delta.content
        if delta:
            print(delta, end="", flush=True)

The if not chunk.choices: guard handles the edge case that arises when you enable usage stats (see below) — the final chunk has an empty choices list. Adding this guard now makes the code forward-compatible.

Async streaming for FastAPI and asyncio apps

For async applications, import AsyncOpenAI. The create() call with stream=True is awaitable and returns an async context manager:

from openai import AsyncOpenAI
import asyncio

async def stream():
    client = AsyncOpenAI()
    async with await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": "Hello"}],
        stream=True,
    ) as stream:
        async for chunk in stream:
            if not chunk.choices:
                continue
            delta = chunk.choices[0].delta.content
            if delta:
                print(delta, end="", flush=True)

asyncio.run(stream())

In a FastAPI route, yield chunks into a StreamingResponse:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI

app = FastAPI()
client = AsyncOpenAI()

@app.get("/stream")
async def stream_gpt():
    async def generate():
        async with await client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": "Tell me something interesting."}],
            stream=True,
        ) as stream:
            async for chunk in stream:
                if not chunk.choices:
                    continue
                delta = chunk.choices[0].delta.content
                if delta:
                    yield delta

    return StreamingResponse(generate(), media_type="text/plain")

Streaming with tool and function calls

When the model decides to call a tool, finish_reason becomes "tool_calls" and the delta carries incremental tool call data in delta.tool_calls rather than text in delta.content. Each tool_calls item has a function.arguments field that arrives as a partial JSON string fragment — you must accumulate these across chunks and parse the complete JSON after the stream ends.

from openai import OpenAI
import json

client = OpenAI()
tool_calls_buffer = {}

with client.chat.completions.create(
    model="gpt-4o",
    tools=[{
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get the current weather for a city.",
            "parameters": {
                "type": "object",
                "properties": {"city": {"type": "string"}},
                "required": ["city"],
            },
        },
    }],
    messages=[{"role": "user", "content": "What is the weather in Tokyo?"}],
    stream=True,
) as stream:
    for chunk in stream:
        if not chunk.choices:
            continue
        delta = chunk.choices[0].delta
        finish_reason = chunk.choices[0].finish_reason

        if delta.content:
            print(delta.content, end="", flush=True)

        if delta.tool_calls:
            for tc in delta.tool_calls:
                idx = tc.index
                if idx not in tool_calls_buffer:
                    tool_calls_buffer[idx] = {"name": "", "arguments": ""}
                if tc.function.name:
                    tool_calls_buffer[idx]["name"] += tc.function.name
                if tc.function.arguments:
                    tool_calls_buffer[idx]["arguments"] += tc.function.arguments

        if finish_reason == "tool_calls":
            for idx, tc in tool_calls_buffer.items():
                args = json.loads(tc["arguments"])
                print(f"\nTool: {tc['name']}, Args: {args}")

The key point: function.arguments is a partial JSON string in each chunk. Concatenate all fragments indexed by tc.index (which identifies which tool call the chunk belongs to when the model calls multiple tools at once), then parse the full string at the end.

Getting usage stats while streaming

By default, token usage data is not included in streaming responses. To get it, add stream_options={"include_usage": True} to your create() call. This appends one final chunk to the stream where choices is an empty list and usage is populated:

from openai import OpenAI

client = OpenAI()
usage_data = None

with client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Explain REST APIs briefly."}],
    stream=True,
    stream_options={"include_usage": True},
) as stream:
    for chunk in stream:
        if not chunk.choices:
            # This is the final usage chunk
            usage_data = chunk.usage
            continue
        delta = chunk.choices[0].delta.content
        if delta:
            print(delta, end="", flush=True)

if usage_data:
    print(f"\n\nPrompt tokens: {usage_data.prompt_tokens}")
    print(f"Completion tokens: {usage_data.completion_tokens}")
    print(f"Total tokens: {usage_data.total_tokens}")

The critical detail: when stream_options={"include_usage": True} is set, the last chunk has choices=[]. Accessing chunk.choices[0] on this chunk raises an IndexError. The if not chunk.choices: continue pattern shown here prevents that. Capture chunk.usage on that same branch to retain the stats.

FAQ

How do I stream an OpenAI API response in Python?
Pass stream=True to client.chat.completions.create(). Iterate over the returned stream and read chunk.choices[0].delta.content for each text chunk, checking for None before printing. The final chunk has choices[0].finish_reason set to "stop" (or another stop reason).
How do I get token usage stats when streaming the OpenAI API?
By default, usage is not included in streaming responses. Add stream_options={"include_usage": True} to your create() call. This adds a final chunk where choices is an empty list and usage contains the prompt and completion token counts. Handle the empty choices list in your loop to avoid an IndexError.
Can I stream OpenAI API responses in FastAPI?
Yes. Use AsyncOpenAI() and iterate with async for. In a FastAPI route, wrap the async generator in StreamingResponse with media_type="text/plain" (or "text/event-stream" for SSE). Use async with await client.chat.completions.create(..., stream=True) as stream: and async for chunk in stream: to yield chunks.

Last updated May 28, 2026. Code examples verified against OpenAI Python SDK v1.x and the OpenAI streaming API documentation.