simplex-bot/venice_bot.py
first d882245947 Add venice_bot.py
Key features of this implementation:

Uses /ask as the command prefix to trigger the bot
Implements async/await for better performance
Includes error handling for API calls
Shows typing indicators while processing
Uses environment variables for secure API key storage
Maintains a persistent SimpleX connection
2025-01-09 07:25:11 +00:00

95 lines
2.9 KiB
Python

import os
import json
import asyncio
import aiohttp
from simplexmq import SimpleXMQ, SimpleXChat, Message
# Venice API Configuration
VENICE_API_KEY = os.getenv("VENICE_API_KEY")
VENICE_BASE_URL = "https://api.venice.ai/api/v1"
BOT_COMMAND = "/ask"
class VeniceBot:
def __init__(self):
self.simplex = SimpleXMQ()
self.headers = {
"Authorization": f"Bearer {VENICE_API_KEY}",
"Content-Type": "application/json"
}
async def send_to_venice(self, prompt: str) -> str:
"""Send a prompt to Venice API and get the response"""
async with aiohttp.ClientSession() as session:
payload = {
"model": "venice-1", # Replace with your preferred model
"messages": [
{"role": "user", "content": prompt}
],
"venice_parameters": {
"disable_default_system_prompt": False
}
}
async with session.post(
f"{VENICE_BASE_URL}/chat/completions",
headers=self.headers,
json=payload
) as response:
if response.status == 200:
data = await response.json()
return data['choices'][0]['message']['content']
else:
return f"Error: {response.status} - {await response.text()}"
async def handle_message(self, chat: SimpleXChat, msg: Message):
"""Handle incoming SimpleX messages"""
if msg.content.startswith(BOT_COMMAND):
# Extract the prompt (everything after /ask)
prompt = msg.content[len(BOT_COMMAND):].strip()
if not prompt:
await chat.send_message("Please provide a question after /ask")
return
try:
# Show typing indicator
await chat.send_typing()
# Get response from Venice
response = await self.send_to_venice(prompt)
# Send response back to SimpleX chat
await chat.send_message(response)
except Exception as e:
await chat.send_message(f"Error processing request: {str(e)}")
async def start(self):
"""Start the bot"""
print("Starting Venice Bot...")
# Initialize SimpleX connection
await self.simplex.init()
# Create or load bot identity
chat = await self.simplex.create_chat()
print(f"Bot address: {chat.address}")
# Main message loop
async for msg in chat.get_messages():
await self.handle_message(chat, msg)
def main():
# Check for API key
if not VENICE_API_KEY:
print("Error: VENICE_API_KEY environment variable not set")
return
# Create and run the bot
bot = VeniceBot()
asyncio.run(bot.start())
if __name__ == "__main__":
main()