forked from daily-co/pipecat-cloud-starter
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbot-nova.py
More file actions
313 lines (265 loc) Β· 11.1 KB
/
bot-nova.py
File metadata and controls
313 lines (265 loc) Β· 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#
# Copyright (c) 2024β2025, Daily
#
# SPDX-License-Identifier: BSD 2-Clause License
#
"""Gemini Bot Implementation.
This module implements a chatbot using Google's Gemini Multimodal Live model.
It includes:
- Real-time audio/video interaction through Daily
- Animated robot avatar
- Speech-to-speech model
The bot runs as part of a pipeline that processes audio/video frames and
manages the conversation flow using Gemini's streaming capabilities.
"""
import asyncio
import os
import sys
import argparse
import json
import aiohttp
from dotenv import load_dotenv
from loguru import logger
from PIL import Image
from runner import configure
from pipecat.audio.vad.silero import SileroVADAnalyzer
from pipecat.audio.vad.vad_analyzer import VADParams
from pipecat.frames.frames import (
BotStartedSpeakingFrame,
BotStoppedSpeakingFrame,
Frame,
OutputImageRawFrame,
SpriteFrame,
)
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineParams, PipelineTask
from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
from pipecat.processors.frameworks.rtvi import RTVIConfig, RTVIObserver, RTVIProcessor
from pipecat.services.aws_nova_sonic import AWSNovaSonicLLMService
from pipecat.transports.services.daily import DailyParams, DailyTransport
from tool_processor import ToolProcessor
load_dotenv(override=True)
logger.remove(0)
logger.add(sys.stderr, level="DEBUG")
sprites = []
script_dir = os.path.dirname(__file__)
for i in range(1, 26):
# Build the full path to the image file
full_path = os.path.join(script_dir, f"assets/robot0{i}.png")
# Get the filename without the extension to use as the dictionary key
# Open the image and convert it to bytes
with Image.open(full_path) as img:
sprites.append(
OutputImageRawFrame(image=img.tobytes(), size=img.size, format=img.format)
)
# Create a smooth animation by adding reversed frames
flipped = sprites[::-1]
sprites.extend(flipped)
# Define static and animated states
quiet_frame = sprites[0] # Static frame for when bot is listening
talking_frame = SpriteFrame(images=sprites) # Animation for talking bot
class TalkingAnimation(FrameProcessor):
"""Manages the bot's visual animation states.
Switches between static (listening) and animated (talking) states based on
the bot's current speaking status.
"""
def __init__(self):
super().__init__()
self._is_talking = False
async def process_frame(self, frame: Frame, direction: FrameDirection):
"""Process incoming frames and update animation state.
Args:
frame: The incoming frame to process
direction: The direction of frame flow in the pipeline
"""
await super().process_frame(frame, direction)
# Switch to talking animation when bot starts speaking
if isinstance(frame, BotStartedSpeakingFrame):
if not self._is_talking:
await self.push_frame(talking_frame)
self._is_talking = True
# Return to static frame when bot stops speaking
elif isinstance(frame, BotStoppedSpeakingFrame):
await self.push_frame(quiet_frame)
self._is_talking = False
await self.push_frame(frame, direction)
async def main():
"""Main bot execution function.
Sets up and runs the bot pipeline including:
- Daily video transport with specific audio parameters for Gemini
- Gemini Live multimodal model integration
- Voice activity detection
- Tool processing for Central API integration
- Animation processing
- RTVI event handling
"""
# Parse command line arguments
parser = argparse.ArgumentParser(description="Nova Sonic Bot")
parser.add_argument("-c", "--custom", type=str, help="Custom payload JSON string")
args, unknown = parser.parse_known_args()
# Parse custom payload if provided
system_prompt = None
tools = None
bearer_token = None
if args.custom:
try:
custom_data = json.loads(args.custom)
logger.info(f"π Received custom_data: {custom_data}")
system_prompt = custom_data.get("system_prompt")
tools = custom_data.get("tools")
logger.info(f"π Extracted tools: {tools}")
bearer_token = custom_data.get("bearer_token")
if bearer_token:
logger.info(
f"π Pipecat: Using proxied bearer token from Central "
f"(length: {len(bearer_token)})"
)
else:
logger.warning(
"β οΈ Pipecat: No bearer token provided from Central - "
"tool calls will fail"
)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse custom payload: {e}")
else:
logger.warning("π Pipecat: No custom data received")
async with aiohttp.ClientSession() as session:
(room_url, token) = await configure(session)
# Set up Daily transport with specific audio/video parameters
# for Gemini
transport = DailyTransport(
room_url,
token,
"Chatbot",
DailyParams(
audio_in_enabled=True,
audio_out_enabled=True,
video_out_enabled=True,
video_out_width=1024,
video_out_height=576,
vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.5)),
),
)
if not system_prompt:
raise ValueError(
"system_prompt is required for bot initialization. "
"Please provide a system prompt that defines the bot's behavior."
)
system_instruction = system_prompt.rstrip()
logger.info(f"π Pipecat using provided system_prompt")
llm = AWSNovaSonicLLMService(
secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
region="us-east-1",
voice_id="tiffany", # matthew, tiffany, amy
tools=tools,
)
# Create a tool processor BEFORE callbacks
# (so callbacks can reference it)
tool_processor = ToolProcessor(auth_token=bearer_token)
# Register dynamic function callback with LLM service
async def generic_tool_callback(
function_name, tool_call_id, arguments, llm, context, result_callback
):
"""Generic callback for all tool calls from LLM."""
logger.info(f"π§ LLM callback: {function_name} with args: {arguments}")
# Use ToolProcessor to execute the actual API call with the exact
# tool name from LLM
result = await tool_processor._call_central_tool(function_name, arguments)
await result_callback(result)
# Register the generic callback for all available tools
if tools:
logger.info(f"π§ Processing {len(tools)} tools for registration")
for i, tool in enumerate(tools):
logger.info(f"π§ Tool {i}: {tool}")
# Extract tool name from tool definition or use string directly
if isinstance(tool, str):
tool_name = tool
elif isinstance(tool, dict) and "toolSpec" in tool:
tool_name = tool["toolSpec"]["name"]
logger.info(
f"π§ Extracted tool name '{tool_name}' from tool definition"
)
else:
raise ValueError(
f"Tool must be a string or tool definition dict with "
f"'toolSpec', got {type(tool)}: {tool}"
)
llm.register_function(tool_name, generic_tool_callback)
logger.info(f"π§ β
Registered tool callback: {tool_name}")
logger.info("π§ All tool callbacks registered with LLM service")
else:
raise ValueError(
"No tools provided for bot initialization. "
"Tools are required for the bot to function properly. "
"Please provide a list of tools in the bot configuration."
)
# AWS Nova Sonic uses both registered callbacks AND frame-based tool
# processing via ToolProcessor
messages = [
{
"role": "system",
"content": system_instruction,
},
]
# Set up conversation context and management
# The context_aggregator will automatically collect conversation context
context = OpenAILLMContext(messages)
context_aggregator = llm.create_context_aggregator(context)
# Set up processors
ta = TalkingAnimation()
#
# RTVI events for Pipecat client UI
#
rtvi = RTVIProcessor(config=RTVIConfig(config=[]))
# Pipeline with tool processor for proper AWS Nova Sonic integration
pipeline = Pipeline(
[
transport.input(),
rtvi,
context_aggregator.user(),
llm,
tool_processor,
ta,
transport.output(),
context_aggregator.assistant(),
]
)
task = PipelineTask(
pipeline,
params=PipelineParams(
allow_interruptions=True,
enable_metrics=True,
enable_usage_metrics=True,
),
observers=[RTVIObserver(rtvi)],
)
await task.queue_frame(quiet_frame)
@rtvi.event_handler("on_client_ready")
async def on_client_ready(rtvi):
await rtvi.set_bot_ready()
# Kick off the conversation
await task.queue_frames([context_aggregator.user().get_context_frame()])
@transport.event_handler("on_first_participant_joined")
async def on_first_participant_joined(transport, participant):
print(f"Participant joined: {participant}")
await transport.capture_participant_transcription(participant["id"])
# Kick off the conversation.
await task.queue_frames([context_aggregator.user().get_context_frame()])
# HACK: for now, we need this special way of triggering the first
# assistant response in AWS Nova Sonic. Note that this trigger
# requires a special corresponding bit of text in the system
# instruction. In the future, simply queueing the context frame
# should be sufficient.
await llm.trigger_assistant_response()
@transport.event_handler("on_participant_left")
async def on_participant_left(transport, participant, reason):
print(f"Participant left: {participant}")
await task.cancel()
# Run the pipeline task using PipelineRunner
runner = PipelineRunner()
await runner.run(task)
if __name__ == "__main__":
asyncio.run(main())