home · Posts · Archive · Tags

20231006-fastapi-streaming

  • main.py
from fastapi import FastAPI from fastapi.responses import StreamingResponse import time app = FastAPI() def fake_data_streamer(): for i in range(10): yield b'some fake data\n\n' time.sleep(0.5) @app.get('/') async def main(): return StreamingResponse(fake_data_streamer(), media_type='text/event-stream') @app.get("/video") def main(): def iterfile(): # with open("rick_roll.mp4", mode="rb") as file_like: # yield from file_like # return StreamingResponse(iterfile(), media_type="video/mp4")
  • test.py
import httpx url = 'http://127.0.0.1:8000/' with httpx.stream('GET', url) as r: for chunk in r.iter_raw(): # or, for line in r.iter_lines(): print(chunk)

Ref

👈Go Back

@alanhc