"""
Gemini Live API - Realtime Audio Streaming
pip install google-genai pyaudio
"""
import asyncio
import pyaudio
from google import genai
from google.genai import types
FORMAT = pyaudio.paInt16
CHANNELS = 1
SEND_SAMPLE_RATE = 16000
RECEIVE_SAMPLE_RATE = 24000
CHUNK_SIZE = 1024
API_KEY = "your-tfy-api-key"
MODEL = "gemini-live-2.5-flash" # actual model id
BASE_URL = "https://{controlPlaneUrl}/api/llm/live/{geminiProviderAccountName}"
client = genai.Client(
http_options={
"base_url": BASE_URL,
"headers": {
"Authorization": f"Bearer {API_KEY}",
},
},
api_key=API_KEY,
)
CONFIG = types.LiveConnectConfig(
response_modalities=["AUDIO"],
speech_config=types.SpeechConfig(
voice_config=types.VoiceConfig(
prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name="Zephyr")
)
),
)
pya = pyaudio.PyAudio()
async def main():
try:
async with client.aio.live.connect(model=MODEL, config=CONFIG) as session:
print("Connected!")
mic_info = pya.get_default_input_device_info()
mic_stream = pya.open(
format=FORMAT, channels=CHANNELS, rate=SEND_SAMPLE_RATE,
input=True, input_device_index=mic_info["index"],
frames_per_buffer=CHUNK_SIZE,
)
speaker_stream = pya.open(
format=FORMAT, channels=CHANNELS, rate=RECEIVE_SAMPLE_RATE,
output=True,
)
audio_in_queue = asyncio.Queue()
async def send_audio():
while True:
data = await asyncio.to_thread(
mic_stream.read, CHUNK_SIZE, exception_on_overflow=False
)
await session.send(input={"data": data, "mime_type": "audio/pcm"})
async def receive_audio():
while True:
turn = session.receive()
was_interrupted = False
async for response in turn:
if data := response.data:
audio_in_queue.put_nowait(data)
if text := response.text:
print(text, end="")
if (
hasattr(response, "server_content")
and response.server_content
and hasattr(response.server_content, "interrupted")
and response.server_content.interrupted
):
was_interrupted = True
if was_interrupted:
while not audio_in_queue.empty():
audio_in_queue.get_nowait()
async def play_audio():
while True:
data = await audio_in_queue.get()
await asyncio.to_thread(speaker_stream.write, data)
async with asyncio.TaskGroup() as tg:
tg.create_task(send_audio())
tg.create_task(receive_audio())
tg.create_task(play_audio())
except Exception as e:
print(f"Error: {e}")
finally:
pya.terminate()
asyncio.run(main())