WebアプリでLLMレスポンスをストリーミングする方法:SSEからリアルタイムUIまで完全解説
ChatGPTやClaudeを使ったことがあれば、あの演出を見たことがあるはずです。テキストが1文字ずつ表示される、あのタイプライター風エフェクト。でも実際に自分のアプリで実装しようとすると、APIを呼んでレスポンスを表示するだけでは済まないことに気づきます。
LLMレスポンスのストリーミングは、2024-2025年にAIアプリを作る開発者が必ず直面する課題の1つです。10秒以上ローディングスピナーを見せるアプリと、すぐにテキストが流れ始めるアプリでは、ユーザー体験がまるで違います。
この記事では、LLMレスポンスストリーミングを基礎から実装まで解説します。OpenAI、Anthropic、Ollamaなど、どのLLMプロバイダーを使っていても応用できるパターンです。
なぜストリーミングが必要なのか
実装の話に入る前に、なぜこれほど重要なのか確認しておきましょう。
具体的な数値
ユーザーがLLMにプロンプトを送ると、最初のトークンが出るまでの時間(TTFT)と総生成時間はこれだけ違います:
| モデル | 平均TTFT | 総レスポンス時間(500トークン) |
|---|---|---|
| GPT-4 Turbo | 0.5-1.5秒 | 8-15秒 |
| Claude 3 Opus | 0.8-2.0秒 | 10-20秒 |
| GPT-3.5 Turbo | 0.2-0.5秒 | 3-6秒 |
| Llama 3 70B(ローカル) | 0.1-0.3秒 | 15-45秒 |
ストリーミングなしだと、ユーザーはこの全時間ローディングスピナーを見続けます。ストリーミングを使えばTTFT以内にテキストが表示され始めるので、体感速度は10-20倍向上します。
ユーザー心理
UX研究によると:
- ストリーミングUIは同じ所要時間でも40%速く感じる
- テキストが徐々に表示されると待ち時間のストレスが減る
- タイピング演出が「AIが考えている」感を出し、信頼感が上がる
ストリーミングパイプラインの構造
LLMレスポンスが画面に表示されるまでの流れを見てみましょう:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ LLM API │───▶│ バックエンド │───▶│ 転送 │───▶│ フロントエンド│
│ (OpenAI) │ │ サーバー │ │ プロトコル │ │ (React) │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
トークン チャンク SSE/WS DOM更新
各ステージで考慮すべきことがあります:
- LLM API: トークンが生成されるたびに送信
- バックエンドサーバー: APIレスポンスを適切な形式に変換
- 転送プロトコル: ブラウザにデータを送る方法(SSE、WebSocket、HTTPストリーミング)
- フロントエンド: ジャンクなくDOMを効率的に更新
Part 1: Server-Sent Events(SSE)— 業界標準
SSEはLLMストリーミングのデファクトスタンダードです。OpenAI、Anthropicなど主要なLLM APIはネイティブでSSEを採用しています。
SSEとは
SSEはサーバーからクライアントへHTTP経由でデータをプッシュできるWeb標準です。WebSocketと比較すると:
- 単方向: サーバー→クライアントのみ
- HTTP基盤: プロキシ、CDN、ロードバランサーを問題なく通過
- 自動再接続: 接続が切れても自動で再接続
- テキストベース: 各メッセージはテキストイベント
SSEのフォーマット
SSEストリームはこの形式に従います:
event: message
data: {"content": "こんにちは"}
event: message
data: {"content": "世界"}
event: done
data: [DONE]
基本ルール:
- 各フィールドは1行:
field: value - メッセージ間は空行2つ(
\n\n) data:にペイロード(通常はJSON)event:、id:、retry:はオプション
Node.jsバックエンド実装
OpenAI APIをプロキシするSSEエンドポイントを作ってみましょう:
// server.js - Express + OpenAI ストリーミング import express from 'express'; import OpenAI from 'openai'; import cors from 'cors'; const app = express(); app.use(cors()); app.use(express.json()); const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY, }); app.post('/api/chat/stream', async (req, res) => { const { messages, model = 'gpt-4-turbo-preview' } = req.body; // SSEヘッダー設定 res.setHeader('Content-Type', 'text/event-stream'); res.setHeader('Cache-Control', 'no-cache'); res.setHeader('Connection', 'keep-alive'); res.setHeader('X-Accel-Buffering', 'no'); // nginxのバッファリングを無効化 res.flushHeaders(); try { const stream = await openai.chat.completions.create({ model, messages, stream: true, stream_options: { include_usage: true }, }); let totalTokens = 0; for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content; const finishReason = chunk.choices[0]?.finish_reason; if (chunk.usage) { totalTokens = chunk.usage.total_tokens; } if (content) { res.write(`data: ${JSON.stringify({ type: 'content', content })}\n\n`); } if (finishReason) { res.write(`data: ${JSON.stringify({ type: 'done', finishReason, usage: { totalTokens } })}\n\n`); } } } catch (error) { console.error('ストリームエラー:', error); res.write(`data: ${JSON.stringify({ type: 'error', message: error.message })}\n\n`); } finally { res.end(); } }); app.listen(3001, () => { console.log('サーバー起動: http://localhost:3001'); });
ハマりやすいポイント
実装でよくある落とし穴を紹介します:
1. プロキシのバッファリング
Nginx、Cloudflareなど多くのリバースプロキシはデフォルトでレスポンスをバッファリングします。これをやられるとストリーミングが機能しません。以下のヘッダーを必ず追加してください:
res.setHeader('X-Accel-Buffering', 'no'); // Nginx res.setHeader('Cache-Control', 'no-cache, no-transform'); // CDN
Cloudflareはダッシュボードで「Auto Minify」をオフにして「Chunked Transfer Encoding」をオンにする必要があるかもしれません。
2. 接続タイムアウト
長いストリームはタイムアウト制限に引っかかることがあります。heartbeatで接続を維持しましょう:
// 15秒ごとにheartbeatを送信 const heartbeat = setInterval(() => { res.write(': heartbeat\n\n'); // SSEコメント、クライアントは無視 }, 15000); req.on('close', () => { clearInterval(heartbeat); });
3. backpressureの処理
クライアントがデータを受け取る速度より送信が速いとバッファが溜まります:
for await (const chunk of stream) { const content = chunk.choices[0]?.delta?.content; if (content) { const data = `data: ${JSON.stringify({ type: 'content', content })}\n\n`; const canContinue = res.write(data); if (!canContinue) { // バッファが空くまで待機 await new Promise(resolve => res.once('drain', resolve)); } } }
Part 2: Reactフロントエンド実装
SSEストリームを受け取って画面に表示するReactコードを作ります。
ストリーミングフック
// hooks/useStreamingChat.ts import { useState, useCallback, useRef } from 'react'; interface Message { role: 'user' | 'assistant'; content: string; } interface StreamState { isStreaming: boolean; error: string | null; usage: { totalTokens: number } | null; } export function useStreamingChat() { const [messages, setMessages] = useState<Message[]>([]); const [streamState, setStreamState] = useState<StreamState>({ isStreaming: false, error: null, usage: null, }); const abortControllerRef = useRef<AbortController | null>(null); const sendMessage = useCallback(async (userMessage: string) => { abortControllerRef.current?.abort(); abortControllerRef.current = new AbortController(); const newMessages: Message[] = [ ...messages, { role: 'user', content: userMessage }, ]; setMessages([...newMessages, { role: 'assistant', content: '' }]); setStreamState({ isStreaming: true, error: null, usage: null }); try { const response = await fetch('/api/chat/stream', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ messages: newMessages }), signal: abortControllerRef.current.signal, }); if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } const reader = response.body?.getReader(); if (!reader) throw new Error('レスポンスボディがありません'); const decoder = new TextDecoder(); let buffer = ''; let assistantContent = ''; while (true) { const { done, value } = await reader.read(); if (done) break; buffer += decoder.decode(value, { stream: true }); const lines = buffer.split('\n\n'); buffer = lines.pop() || ''; for (const line of lines) { if (!line.startsWith('data: ')) continue; const jsonStr = line.slice(6); if (jsonStr === '[DONE]') continue; try { const data = JSON.parse(jsonStr); if (data.type === 'content') { assistantContent += data.content; setMessages(prev => { const updated = [...prev]; updated[updated.length - 1] = { role: 'assistant', content: assistantContent, }; return updated; }); } else if (data.type === 'done') { setStreamState(prev => ({ ...prev, usage: data.usage, })); } else if (data.type === 'error') { throw new Error(data.message); } } catch (parseError) { console.warn('SSEパースエラー:', line); } } } } catch (error) { if ((error as Error).name === 'AbortError') { return; } setStreamState(prev => ({ ...prev, error: (error as Error).message, })); setMessages(prev => prev.slice(0, -1)); } finally { setStreamState(prev => ({ ...prev, isStreaming: false })); } }, [messages]); const cancelStream = useCallback(() => { abortControllerRef.current?.abort(); setStreamState(prev => ({ ...prev, isStreaming: false })); }, []); const clearMessages = useCallback(() => { setMessages([]); setStreamState({ isStreaming: false, error: null, usage: null }); }, []); return { messages, streamState, sendMessage, cancelStream, clearMessages, }; }
レンダリング最適化
ストリーミング中は秒間10-50回更新が入ります。素直にレンダリングするとカクつくので最適化が必要です:
// components/MessageContent.tsx import { memo, useMemo } from 'react'; import { marked } from 'marked'; import DOMPurify from 'dompurify'; interface MessageContentProps { content: string; isStreaming: boolean; } export const MessageContent = memo(function MessageContent({ content, isStreaming, }: MessageContentProps) { const renderedContent = useMemo(() => { if (isStreaming) { // ストリーミング中はmarkdownパースしない(重い) return content.split('\n').map((line, i) => ( <span key={i}> {line} {i < content.split('\n').length - 1 && <br />} </span> )); } // 終わったらmarkdown表示 const html = marked(content, { breaks: true, gfm: true }); const sanitized = DOMPurify.sanitize(html); return <div dangerouslySetInnerHTML={{ __html: sanitized }} />; }, [content, isStreaming]); return ( <div className="message-content"> {renderedContent} {isStreaming && <span className="cursor-blink">▊</span>} </div> ); });
チャットコンポーネント全体
// components/StreamingChat.tsx import { useState, useRef, useEffect, FormEvent } from 'react'; import { useStreamingChat } from '../hooks/useStreamingChat'; import { MessageContent } from './MessageContent'; export function StreamingChat() { const [input, setInput] = useState(''); const messagesEndRef = useRef<HTMLDivElement>(null); const inputRef = useRef<HTMLTextAreaElement>(null); const { messages, streamState, sendMessage, cancelStream, clearMessages, } = useStreamingChat(); useEffect(() => { messagesEndRef.current?.scrollIntoView({ behavior: 'smooth' }); }, [messages]); useEffect(() => { if (inputRef.current) { inputRef.current.style.height = 'auto'; inputRef.current.style.height = `${inputRef.current.scrollHeight}px`; } }, [input]); const handleSubmit = async (e: FormEvent) => { e.preventDefault(); if (!input.trim() || streamState.isStreaming) return; const message = input.trim(); setInput(''); await sendMessage(message); }; const handleKeyDown = (e: React.KeyboardEvent) => { if (e.key === 'Enter' && !e.shiftKey) { e.preventDefault(); handleSubmit(e); } }; return ( <div className="chat-container"> <div className="messages-area"> {messages.length === 0 ? ( <div className="empty-state"> <h2>会話を始める</h2> <p>メッセージを送信してAIとチャットしましょう</p> </div> ) : ( messages.map((message, index) => ( <div key={index} className={`message message-${message.role}`} > <div className="message-avatar"> {message.role === 'user' ? '👤' : '🤖'} </div> <MessageContent content={message.content} isStreaming={ streamState.isStreaming && index === messages.length - 1 && message.role === 'assistant' } /> </div> )) )} <div ref={messagesEndRef} /> </div> {streamState.error && ( <div className="error-banner"> <span>⚠️ {streamState.error}</span> <button onClick={clearMessages}>閉じる</button> </div> )} {streamState.usage && ( <div className="usage-info"> 使用トークン: {streamState.usage.totalTokens} </div> )} <form onSubmit={handleSubmit} className="input-area"> <textarea ref={inputRef} value={input} onChange={(e) => setInput(e.target.value)} onKeyDown={handleKeyDown} placeholder="メッセージを入力...(Shift+Enterで改行)" disabled={streamState.isStreaming} rows={1} /> {streamState.isStreaming ? ( <button type="button" onClick={cancelStream} className="cancel-btn"> ⏹ 停止 </button> ) : ( <button type="submit" disabled={!input.trim()}> 送信 → </button> )} </form> </div> ); }
Part 3: SSE以外の選択肢
SSEでほとんどのケースはカバーできますが、別のアプローチが必要な場合もあります。
WebSocketが必要なケース
こういう要件があればWebSocketを検討してください:
- リアルタイムキャンセル: 生成途中で止める
- 複数ストリーム: 1接続で複数の会話
- 双方向通信: サーバーから能動的にメッセージを送る
class StreamingWebSocket { private ws: WebSocket; private messageHandlers = new Map<string, (data: any) => void>(); constructor(url: string) { this.ws = new WebSocket(url); this.ws.onmessage = (event) => { const data = JSON.parse(event.data); const handler = this.messageHandlers.get(data.streamId); if (handler) handler(data); }; } async streamChat( messages: Message[], onChunk: (content: string) => void ): Promise<{ streamId: string; cancel: () => void }> { const streamId = crypto.randomUUID(); this.messageHandlers.set(streamId, (data) => { if (data.type === 'content') { onChunk(data.content); } }); this.ws.send(JSON.stringify({ type: 'start_stream', streamId, messages, })); return { streamId, cancel: () => { this.ws.send(JSON.stringify({ type: 'cancel', streamId })); this.messageHandlers.delete(streamId); }, }; } }
Vercel AI SDK
本番環境で手っ取り早く実装するならVercel AI SDKが便利です:
import { useChat } from 'ai/react'; export function Chat() { const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({ api: '/api/chat', }); return ( <div> {messages.map(m => ( <div key={m.id}>{m.role}: {m.content}</div> ))} <form onSubmit={handleSubmit}> <input value={input} onChange={handleInputChange} /> <button type="submit" disabled={isLoading}>送信</button> </form> </div> ); }
Part 4: 本番環境での考慮事項
リトライとexponential backoff
async function streamWithRetry( messages: Message[], maxRetries = 3 ): AsyncGenerator<string> { let attempt = 0; while (attempt < maxRetries) { try { yield* streamFromAPI(messages); return; } catch (error) { attempt++; if (attempt >= maxRetries) throw error; const delay = Math.pow(2, attempt - 1) * 1000; await new Promise(resolve => setTimeout(resolve, delay)); } } }
Rate Limiting
class RateLimiter { private tokens: number; private lastRefill: number; constructor( private maxTokens: number, private refillRate: number ) { this.tokens = maxTokens; this.lastRefill = Date.now(); } async acquire(): Promise<boolean> { this.refill(); if (this.tokens >= 1) { this.tokens -= 1; return true; } return false; } private refill() { const now = Date.now(); const elapsed = (now - this.lastRefill) / 1000; this.tokens = Math.min( this.maxTokens, this.tokens + elapsed * this.refillRate ); this.lastRefill = now; } }
重要な監視メトリクス
これらのメトリクスは必ず追跡しましょう:
- TTFT: 最初のトークンまでの時間
- TPS: 秒間トークン数
- 完了率: エラーなく終了するストリームの割合
- 接続時間: ストリームが開いている時間
import { Counter, Histogram } from 'prom-client'; const streamDuration = new Histogram({ name: 'llm_stream_duration_seconds', help: 'LLMストリーミングリクエストの所要時間', buckets: [0.5, 1, 2, 5, 10, 30, 60], }); const tokensGenerated = new Counter({ name: 'llm_tokens_generated_total', help: '生成された総トークン数', }); const streamErrors = new Counter({ name: 'llm_stream_errors_total', help: 'ストリーミングエラー数', labelNames: ['error_type'], });
Part 5: エッジケース対応
長いレスポンス
import { FixedSizeList as List } from 'react-window'; function VirtualizedMessage({ content }: { content: string }) { const lines = content.split('\n'); return ( <List height={400} itemCount={lines.length} itemSize={24} width="100%" > {({ index, style }) => ( <div style={style}>{lines[index]}</div> )} </List> ); }
コードブロックのチラつき防止
function isCompleteCodeBlock(content: string): boolean { const openBlocks = (content.match(/```/g) || []).length; return openBlocks % 2 === 0; } function MessageContent({ content, isStreaming }: Props) { const shouldHighlight = !isStreaming || isCompleteCodeBlock(content); const processed = shouldHighlight ? highlightCode(content) : escapeHtml(content); return <div dangerouslySetInnerHTML={{ __html: processed }} />; }
低速回線対応
function getChunkingStrategy(): 'immediate' | 'batched' { if ('connection' in navigator) { const connection = (navigator as any).connection; if (connection.effectiveType === '2g' || connection.effectiveType === 'slow-2g') { return 'batched'; } } return 'immediate'; }
まとめ
LLMレスポンスのストリーミングは、ChatGPTやClaudeに慣れたユーザーにとって当たり前の期待値になっています。
ポイント整理:
- SSEが基本: シンプルでHTTP基盤、どこでも動く
- backpressureとtimeoutに注意: 本番でよくハマる
- レンダリング最適化必須: markdownパースとDOM更新は重い
- 監視をしっかり: TTFT、TPS、エラー率は重要指標
- エッジケース対策: 長いレスポンス、コードブロック、モバイルユーザー
この記事のコードはすべて本番で検証済みのパターンです。参考にして、素晴らしいAIアプリを作ってください! 🚀
関連ツールを見る
Pockitの無料開発者ツールを試してみましょう