CrewAI Ollama StoryTime AI Python Example
CrewAI Ollama StoryTime AI Python Example
create_story.sh
questV14.py
functions.py
requirements.txt
- Code:
#!/bin/bash
# (c) J~Net 2024
#
#
#
echo "Starting Storytime AI By J~Net 2024"
python -m venv venv
source venv/bin/activate
python questV14.py
questV14.py
- Code:
# (c) J~Net 2024
# https://jnet.forumotion.com/t2025-crewai-ollama-storytime-ai-python-example#3120
import sys
import os
import time
import logging
import json
import importlib
import subprocess
import tempfile
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from crewai import Agent, Task
import pkg_resources
import nltk
nltk.download('vader_lexicon')
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(filename='script.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
def sanitize_folder_name(name):
"""Sanitize folder name by removing spaces and limiting length."""
sanitized_name=name.replace(' ', '_')[:15] # Replace spaces and truncate to 15 characters
return sanitized_name
def determine_project_folder(prompt):
"""
Determine the project folder based on the prompt text.
"""
folder_name=prompt.replace(' ', '_').strip('_') # Remove leading/trailing underscores
# Get the directory where the script is running
script_dir=os.path.dirname(os.path.abspath(__file__))
project_folder=os.path.join(script_dir, folder_name)
# Create the directory if it doesn't exist
os.makedirs(project_folder, exist_ok=True)
# Print the full project folder path
print(f"\033[92mProject folder: {project_folder}\033[0m")
return project_folder
def read_prompt_file(file_path):
"""Read prompt from a file and return its content."""
with open(file_path, 'r') as file:
return file.read().strip()
def setup_and_execute(filename=None):
"""Sets up the environment and executes tasks based on the filename or interactive input."""
start_time=time.time()
if filename:
project_folder=determine_project_folder(filename)
print(f"Project folder: {project_folder}")
files_dir, output_dir, tools_dir=setup_directories(project_folder)
content=read_file(files_dir, filename)
general_agent=create_agent(
role='Requirements Manager',
goal="""Handle any user input commands including reading a file
and creating files for apps based on user input. For file reading,
read the contents and display them. For file creation,
generate a filename based on user input or a pattern and create
the file with the specified content. Respond as if
you were an expert developer providing file creation services
on demand.""",
backstory="""You are an expert developer with a strong background
in software engineering. You provide high quality,
thorough, and efficient file creation services based on
user requirements."""
)
if content:
is_story='story' in filename.lower()
result=execute_task_chain(content, general_agent, files_dir=files_dir, output_dir=output_dir, is_story=is_story)
generated_filename=generate_filename(files_dir, "generated_file.txt")
create_file(files_dir, generated_filename, result)
print(f"Generated file: {generated_filename}")
end_time=time.time()
elapsed_time=end_time - start_time
elapsed_hours, rem=divmod(elapsed_time, 3600)
elapsed_minutes, elapsed_seconds=divmod(rem, 60)
print(f"Execution time: {int(elapsed_hours)}h {int(elapsed_minutes)}m {int(elapsed_seconds)}s")
def handle_interactive_mode():
"""Handles interactive user input mode."""
while True:
prompt=input("Enter your command (type 'exit' to quit): ").strip()
if prompt.lower() == 'exit':
break
project_folder=determine_project_folder(prompt)
print(f"Project folder: {project_folder}")
files_dir, output_dir, tools_dir=setup_directories(project_folder)
memory_data=read_memory_file(files_dir)
existing_steps, latest_output=read_existing_steps(files_dir)
start_step=len(existing_steps)
general_agent=create_agent(
role='Requirements Manager',
goal="""Handle any user input commands including reading a file
and creating files for apps based on user input. For file reading,
read the contents and display them. For file creation,
generate a filename based on user input or a pattern and create
the file with the specified content. Respond as if
you were an expert developer providing file creation services
on demand.""",
backstory="""You are an expert developer with a strong background
in software engineering. You provide high quality,
thorough, and efficient file creation services based on
user requirements."""
)
if prompt.lower().startswith('read '):
filename=prompt[len('read '):].strip()
if filename:
try:
start_time=time.time()
content=read_file(files_dir, filename)
general_agent.goal=content.strip()
create_file(files_dir, 'goal.txt', general_agent.goal)
is_story='story' in filename.lower()
result=execute_task_chain(content, general_agent, files_dir=files_dir, output_dir=output_dir, is_story=is_story)
generated_filename=generate_filename(files_dir, "generated_file.txt")
create_file(files_dir, generated_filename, result)
print(f"Generated file: {generated_filename}")
end_time=time.time()
elapsed_time=end_time - start_time
elapsed_hours, rem=divmod(elapsed_time, 3600)
elapsed_minutes, elapsed_seconds=divmod(rem, 60)
print(f"Execution time: {int(elapsed_hours)}h {int(elapsed_minutes)}m {int(elapsed_seconds)}s")
except Exception as e:
logging.error(f"Error occurred while reading file: {e}", exc_info=True)
print(f"Error occurred: {e}")
else:
print("Filename is empty or invalid.")
elif prompt.lower().startswith('create file '):
parts=prompt[len('create file '):].split(' with content ', 1)
if len(parts) == 2:
filename, content=parts
filename=generate_filename(files_dir, filename.strip())
result=create_file(files_dir, filename, content.strip())
print(result)
append_output(output_dir, result)
else:
print("Invalid command format. Use: create file <filename> with content <content>")
else:
task_description=prompt
try:
task=Task(description=task_description.strip(), agent=general_agent)
result=task.execute()
if "Agent stopped due to iteration limit or time limit" in result:
print("Agent stopped due to iteration or time limit. Stopping execution.")
break
print(result)
append_output(output_dir, result)
generated_filename=generate_filename(files_dir, "generated_file.txt")
create_file(files_dir, generated_filename, result)
print(f"Generated file: {generated_filename}")
except Exception as e:
logging.error(f"Error occurred during task execution: {e}", exc_info=True)
print(f"Error occurred: {e}")
from functions import *
if __name__ == "__main__":
if len(sys.argv) > 1:
setup_and_execute(filename=sys.argv[1])
else:
handle_interactive_mode()
functions.py
- Code:
# (c) J~Net 2024
# https://jnet.forumotion.com/t2025-crewai-ollama-storytime-ai-python-example#3120
import os
import json
import logging
import subprocess
import importlib.util
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from crewai import Agent, Task
import pkg_resources
from nltk.sentiment import SentimentIntensityAnalyzer
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module='pydantic')
# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
# Configure the LLM model
llm=ChatOpenAI(
model="crewai-dolphin-phi",
base_url="http://localhost:11434/v1"
)
def sanitize_filename(name, max_length=15):
"""Sanitize and truncate filenames."""
sanitized_name=name.replace(' ', '_').strip('_')[:max_length]
invalid_chars='<>:\"/\\|?*'
sanitized_name=''.join(c for c in sanitized_name if c not in invalid_chars)
return sanitized_name
def setup_directories(project_folder):
"""Sets up directories for files, output, and tools."""
files_dir=os.path.join(project_folder, 'files')
output_dir=os.path.join(project_folder, 'output')
# Directory where the script is located
script_dir=os.path.dirname(os.path.abspath(__file__))
tools_dir=os.path.join(script_dir, 'tools')
os.makedirs(files_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
os.makedirs(tools_dir, exist_ok=True)
logging.info(f"Directories set up: files_dir='{files_dir}', output_dir='{output_dir}', tools_dir='{tools_dir}'")
return files_dir, output_dir, tools_dir
def determine_project_folder(prompt):
"""
Determine the project folder based on the prompt text.
"""
folder_name=sanitize_filename(prompt, max_length=15) # Adjust length as needed
# Get the directory where the script is running
script_dir=os.path.dirname(os.path.abspath(__file__))
project_folder=os.path.join(script_dir, folder_name)
# Create the directory if it doesn't exist
os.makedirs(project_folder, exist_ok=True)
# Print the project folder path (truncated to 15 characters)
print(f"\033[92mProject folder: {folder_name[:15]}\033[0m")
return project_folder
def get_last_processed_page(project_folder):
"""
Get the last processed page number from a progress file.
"""
progress_file = os.path.join(project_folder, 'progress.json')
if os.path.exists(progress_file):
with open(progress_file, 'r') as file:
progress = json.load(file)
return progress.get('last_page', 0)
return 0
def save_progress(project_folder, page_number):
"""
Save the progress to a file to track the last processed page number.
"""
progress_file = os.path.join(project_folder, 'progress.json')
with open(progress_file, 'w') as file:
json.dump({'last_page': page_number}, file)
def process_story(story_text, prompt, page_length=200):
"""
Processes the story by splitting it into pages and writing each page to a file.
"""
project_folder=determine_project_folder(prompt)
pages=split_story_into_pages(story_text, page_length)
start_page=get_last_processed_page(project_folder) + 1
for i, page in enumerate(pages[start_page - 1:200], start=start_page):
write_page_to_file(i, page, project_folder)
save_progress(project_folder, i)
def split_story_into_pages(story_text, page_length):
"""
Splits the story into pages of a given length.
"""
return [story_text[i:i + page_length] for i in range(0, len(story_text), page_length)]
def write_page_to_file(page_number, page_text, project_folder):
"""
Writes a single page of text to a file named 'page{page_number}.txt'.
"""
file_path=os.path.join(project_folder, f"page{page_number}.txt")
try:
with open(file_path, 'w') as file:
file.write(page_text)
logging.info(f"Page {page_number} written to {file_path}")
except Exception as e:
logging.error(f"Failed to write page {page_number} to file: {e}")
def save_story_to_file(filename, content, prompt):
"""
Save generated story content to a file inside the determined project folder.
"""
project_folder=determine_project_folder(prompt)
filename=sanitize_filename(filename)
file_path=os.path.join(project_folder, filename)
try:
with open(file_path, 'w') as file:
file.write(content)
print(f"\033[92mContent saved to {file_path}\033[0m")
except Exception as e:
print(f"\033[91mFailed to save content to {file_path}: {e}\033[0m")
def generate_story_in_chain_mode(story_text, prompt):
"""
Generate story pages in a continuous chain until completion, automatically saving each page.
"""
project_folder=determine_project_folder(prompt)
page_number=get_last_processed_page(project_folder) + 1
is_story_complete=False
while not is_story_complete:
try:
# Generate the next page of the story
story_page_response=llm.invoke(f"Generate the next page of the story starting with this prompt:\n{story_text}")
story_page_text=story_page_response.content if hasattr(story_page_response, 'content') else str(story_page_response)
# Print each page's generated text to the console
print(f"\033[92mGenerated Page {page_number}:\033[0m") # Green text for each generated page
print(story_page_text)
# Save the generated page
write_page_to_file(page_number, story_page_text, project_folder)
save_progress(project_folder, page_number)
# Check if the story is complete
if "THE END" in story_page_text.upper():
is_story_complete=True
print("\033[92mStory generation complete!\033[0m") # Green text for completion message
else:
# If story continues, update for next page
page_number += 1
story_text=story_page_text # Continue generating based on previous page
except Exception as e:
logging.error(f"Error during story generation: {e}", exc_info=True)
print(f"An error occurred during story generation: {e}")
is_story_complete=True # Break the loop on failure
def handle_interactive_mode():
"""
Handle interactive mode for user commands, ensuring progressive story generation.
"""
print("\033[92mStarting Storytime AI By J~Net 2024\033[0m") # Green text for starting message
script_dir=os.path.dirname(os.path.abspath(__file__)) # Directory where the script is located
print(f"\033[92mScript directory: {script_dir}\033[0m")
while True:
try:
user_input=input("Enter your command (type 'exit' to quit): ").strip()
if user_input.lower() == 'exit':
break
elif user_input.startswith("read "):
filename=sanitize_filename(user_input[5:].strip())
# Read the story file from the same directory as the script
story_file_path=os.path.join(script_dir, filename)
print(f"\033[93mLooking for file: {story_file_path}\033[0m") # Yellow text for file search
if os.path.exists(story_file_path):
print(f"\033[92mFile '{filename}' found. Reading the file...\033[0m")
# Read the file content
with open(story_file_path, 'r') as file:
story_text=file.read()
# Print the content to console
print("\033[92mFile content:\033[0m")
print(story_text)
# Start by generating the first page of the story
current_page=get_last_processed_page(determine_project_folder(story_text)) + 1
story_length=200 # Assume we want 200 pages
while current_page <= story_length:
print(f"\033[92mGenerating page {current_page} of the story...\033[0m")
page_prompt=f"Based on the plot, please generate page {current_page} of the story."
story_response=llm.invoke(f"{story_text}\n\n{page_prompt}")
# Ensure response is treated correctly
if hasattr(story_response, 'content'):
story_response_text=story_response.content
else:
story_response_text=str(story_response)
print(f"\033[92mGenerated content for page {current_page}:\033[0m")
print(story_response_text)
# Save the generated story incrementally
save_story_to_file(f"page{current_page}.txt", story_response_text, story_text)
print(f"\033[92mPage {current_page} saved successfully.\033[0m")
# Update the current page counter
current_page += 1
else:
print(f"\033[91mFile '{filename}' not found in '{script_dir}'.\033[0m")
else:
print("\033[93mUnknown command.\033[0m")
except Exception as e:
logging.error(f"Error during interactive mode: {e}", exc_info=True)
print(f"\033[91mAn error occurred: {e}\033[0m")
def list_available_tools(tools_dir):
"""
List all available tool files in the specified directory.
"""
try:
tools=[f[:-3] for f in os.listdir(tools_dir) if f.endswith('.py')]
logging.debug(f"Available tools: {tools}")
return tools
except Exception as e:
logging.error(f"Error listing available tools: {e}", exc_info=True)
return []
def delete_tool(tools_dir, tool_name):
"""
Delete a Python tool file from the specified directory.
"""
tool_filename=os.path.join(tools_dir, f"{tool_name}.py")
if os.path.exists(tool_filename):
try:
os.remove(tool_filename)
logging.info(f"Tool '{tool_name}' deleted.")
return f"Tool '{tool_name}' deleted successfully."
except Exception as e:
logging.error(f"Error occurred while deleting the tool: {e}", exc_info=True)
return f"An error occurred while deleting the tool: {e}"
else:
return f"Tool '{tool_name}' does not exist."
def update_tool(tools_dir, tool_name, new_code):
"""
Update the code of an existing Python tool.
"""
tool_filename=os.path.join(tools_dir, f"{tool_name}.py")
if os.path.exists(tool_filename):
try:
with open(tool_filename, 'w') as tool_file:
tool_file.write(new_code)
logging.info(f"Tool '{tool_name}' updated.")
return f"Tool '{tool_name}' updated successfully."
except Exception as e:
logging.error(f"Error occurred while updating the tool: {e}", exc_info=True)
return f"An error occurred while updating the tool: {e}"
else:
return f"Tool '{tool_name}' does not exist."
def execute_tool_function(tools_dir, tool_name, function_name, *args):
"""
Execute a function from a Python tool.
"""
tool=load_tool(tools_dir, tool_name)
if hasattr(tool, function_name):
func=getattr(tool, function_name)
try:
result=func(*args)
logging.debug(f"Executed function '{function_name}' from tool '{tool_name}' with result: {result}")
return result
except Exception as e:
logging.error(f"Error occurred while executing function '{function_name}' from tool '{tool_name}': {e}", exc_info=True)
return f"An error occurred while executing the function: {e}"
else:
return f"Function '{function_name}' not found in tool '{tool_name}'."
def analyze_sentiment(text):
"""
Analyze the sentiment of a given text using VADER.
"""
sia=SentimentIntensityAnalyzer()
sentiment_scores=sia.polarity_scores(text)
return sentiment_scores
def move_file(src_dir, dest_dir, filename):
"""
Move a file from one directory to another.
"""
src_path=os.path.join(src_dir, filename)
dest_path=os.path.join(dest_dir, filename)
if os.path.exists(src_path):
try:
os.rename(src_path, dest_path)
logging.debug(f"File '{filename}' moved from '{src_dir}' to '{dest_dir}'.")
return f"File '{filename}' moved successfully."
except Exception as e:
logging.error(f"Error occurred while moving file '{filename}': {e}", exc_info=True)
return f"An error occurred while moving the file: {e}"
else:
return f"File '{filename}' does not exist in the source directory."
def copy_file(src_dir, dest_dir, filename):
"""
Copy a file from one directory to another.
"""
src_path=os.path.join(src_dir, filename)
dest_path=os.path.join(dest_dir, filename)
if os.path.exists(src_path):
try:
with open(src_path, 'rb') as src_file:
with open(dest_path, 'wb') as dest_file:
dest_file.write(src_file.read())
logging.debug(f"File '{filename}' copied from '{src_dir}' to '{dest_dir}'.")
return f"File '{filename}' copied successfully."
except Exception as e:
logging.error(f"Error occurred while copying file '{filename}': {e}", exc_info=True)
return f"An error occurred while copying the file: {e}"
else:
return f"File '{filename}' does not exist in the source directory."
def search_text_in_files(files_dir, search_text):
"""
Search for a text string within all files in the specified directory.
"""
results={}
for filename in os.listdir(files_dir):
file_path=os.path.join(files_dir, filename)
if os.path.isfile(file_path):
try:
with open(file_path, 'r') as file:
lines=file.readlines()
matching_lines=[line.strip() for line in lines if search_text.lower() in line.lower()]
if matching_lines:
results[filename]=matching_lines
except Exception as e:
logging.error(f"Error occurred while searching in file '{filename}': {e}", exc_info=True)
return results
def list_files_in_directory(directory):
"""
List all files in the specified directory.
"""
try:
return [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
except Exception as e:
logging.error(f"Error occurred while listing files in directory '{directory}': {e}", exc_info=True)
return []
def execute_shell_command(command):
"""
Execute a shell command and return its output.
"""
try:
result=subprocess.check_output(command, shell=True, text=True)
logging.debug(f"Executed command: '{command}' with output: {result}")
return result
except subprocess.CalledProcessError as e:
logging.error(f"Command '{command}' failed with return code {e.returncode} and output: {e.output}", exc_info=True)
return f"Command failed with return code {e.returncode}."
def load_tool(tools_dir, tool_name):
"""
Load a Python tool module dynamically.
"""
tool_path=os.path.join(tools_dir, f"{tool_name}.py")
spec=importlib.util.spec_from_file_location(tool_name, tool_path)
tool=importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(tool)
logging.debug(f"Loaded tool '{tool_name}' from '{tool_path}'.")
return tool
except Exception as e:
logging.error(f"Error occurred while loading tool '{tool_name}': {e}", exc_info=True)
return None
requirements.txt
- Code:
python-dotenv
langchain_openai
crewai
setuptools
nltk
- Code:
pip install -r requirements.txt
Last edited by jamied_uk on 23rd August 2024, 21:41; edited 3 times in total
Similar topics
» RAG With Recovery Project Builder CrewAI & Ollama
» Create an ollama model file for CrewAI Dynamically
» Ollama Crewai Promp Example for stock and product analysis using llama38b model
» Securing a Python Menu With Python Encrypted Password File
» CrewAI Quest V5
» Create an ollama model file for CrewAI Dynamically
» Ollama Crewai Promp Example for stock and product analysis using llama38b model
» Securing a Python Menu With Python Encrypted Password File
» CrewAI Quest V5
Permissions in this forum:
You cannot reply to topics in this forum
|
|