CrewAI Ollama StoryTime AI Python Example

9th August 2024


# (c) J~Net 2024
echo "Starting Storytime AI By J~Net 2024"

python -m venv venv
source venv/bin/activate

python questV14.py


# https://jnet.forumotion.com/t2025-crewai-ollama-storytime-ai-python-example#3120

import sys
import os
import time
import logging
import json
import importlib
import subprocess
import tempfile
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from crewai import Agent, Task
import pkg_resources
import nltk


# Load environment variables

# Configure logging
logging.basicConfig(filename='script.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

def sanitize_folder_name(name):
    """Sanitize folder name by removing spaces and limiting length."""
    sanitized_name=name.replace(' ', '_')[:15]  # Replace spaces and truncate to 15 characters
    return sanitized_name

def determine_project_folder(prompt):
    Determine the project folder based on the prompt text.
    folder_name=prompt.replace(' ', '_').strip('_')  # Remove leading/trailing underscores
    # Get the directory where the script is running
    project_folder=os.path.join(script_dir, folder_name)

    # Create the directory if it doesn't exist
    os.makedirs(project_folder, exist_ok=True)
    # Print the full project folder path
    print(f"\033[92mProject folder: {project_folder}\033[0m")
    return project_folder

def read_prompt_file(file_path):
    """Read prompt from a file and return its content."""
    with open(file_path, 'r') as file:
        return file.read().strip()

def setup_and_execute(filename=None):
    """Sets up the environment and executes tasks based on the filename or interactive input."""

    if filename:
        print(f"Project folder: {project_folder}")

        files_dir, output_dir, tools_dir=setup_directories(project_folder)
        content=read_file(files_dir, filename)

            role='Requirements Manager',
            goal="""Handle any user input commands including reading a file
                    and creating files for apps based on user input. For file reading,
                    read the contents and display them. For file creation,
                    generate a filename based on user input or a pattern and create
                    the file with the specified content. Respond as if
                    you were an expert developer providing file creation services
                    on demand.""",
            backstory="""You are an expert developer with a strong background
                        in software engineering. You provide high quality,
                        thorough, and efficient file creation services based on
                        user requirements."""

        if content:
            is_story='story' in filename.lower()
            result=execute_task_chain(content, general_agent, files_dir=files_dir, output_dir=output_dir, is_story=is_story)

            generated_filename=generate_filename(files_dir, "generated_file.txt")
            create_file(files_dir, generated_filename, result)
            print(f"Generated file: {generated_filename}")

    elapsed_time=end_time - start_time
    elapsed_hours, rem=divmod(elapsed_time, 3600)
    elapsed_minutes, elapsed_seconds=divmod(rem, 60)
    print(f"Execution time: {int(elapsed_hours)}h {int(elapsed_minutes)}m {int(elapsed_seconds)}s")

def handle_interactive_mode():
    """Handles interactive user input mode."""
    while True:
        prompt=input("Enter your command (type 'exit' to quit): ").strip()
        if prompt.lower() == 'exit':

        print(f"Project folder: {project_folder}")

        files_dir, output_dir, tools_dir=setup_directories(project_folder)
        existing_steps, latest_output=read_existing_steps(files_dir)

            role='Requirements Manager',
            goal="""Handle any user input commands including reading a file
                    and creating files for apps based on user input. For file reading,
                    read the contents and display them. For file creation,
                    generate a filename based on user input or a pattern and create
                    the file with the specified content. Respond as if
                    you were an expert developer providing file creation services
                    on demand.""",
            backstory="""You are an expert developer with a strong background
                        in software engineering. You provide high quality,
                        thorough, and efficient file creation services based on
                        user requirements."""

        if prompt.lower().startswith('read '):
            filename=prompt[len('read '):].strip()
            if filename:
                    content=read_file(files_dir, filename)

                    create_file(files_dir, 'goal.txt', general_agent.goal)

                    is_story='story' in filename.lower()
                    result=execute_task_chain(content, general_agent, files_dir=files_dir, output_dir=output_dir, is_story=is_story)

                    generated_filename=generate_filename(files_dir, "generated_file.txt")
                    create_file(files_dir, generated_filename, result)
                    print(f"Generated file: {generated_filename}")

                    elapsed_time=end_time - start_time
                    elapsed_hours, rem=divmod(elapsed_time, 3600)
                    elapsed_minutes, elapsed_seconds=divmod(rem, 60)
                    print(f"Execution time: {int(elapsed_hours)}h {int(elapsed_minutes)}m {int(elapsed_seconds)}s")
                except Exception as e:
                    logging.error(f"Error occurred while reading file: {e}", exc_info=True)
                    print(f"Error occurred: {e}")
                print("Filename is empty or invalid.")
        elif prompt.lower().startswith('create file '):
            parts=prompt[len('create file '):].split(' with content ', 1)
            if len(parts) == 2:
                filename, content=parts
                filename=generate_filename(files_dir, filename.strip())
                result=create_file(files_dir, filename, content.strip())
                append_output(output_dir, result)
                print("Invalid command format. Use: create file <filename> with content <content>")
                task=Task(description=task_description.strip(), agent=general_agent)

                if "Agent stopped due to iteration limit or time limit" in result:
                    print("Agent stopped due to iteration or time limit. Stopping execution.")

                append_output(output_dir, result)

                generated_filename=generate_filename(files_dir, "generated_file.txt")
                create_file(files_dir, generated_filename, result)
                print(f"Generated file: {generated_filename}")

            except Exception as e:
                logging.error(f"Error occurred during task execution: {e}", exc_info=True)
                print(f"Error occurred: {e}")

from functions import *

if __name__ == "__main__":
    if len(sys.argv) > 1:


import os
import json
import logging
import subprocess
import importlib.util
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from crewai import Agent, Task
import pkg_resources
from nltk.sentiment import SentimentIntensityAnalyzer
import warnings

warnings.filterwarnings("ignore", category=UserWarning, module='pydantic')

# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

# Configure the LLM model

def sanitize_filename(name, max_length=15):
    """Sanitize and truncate filenames."""
    sanitized_name=name.replace(' ', '_').strip('_')[:max_length]
    sanitized_name=''.join(c for c in sanitized_name if c not in invalid_chars)
    return sanitized_name

def setup_directories(project_folder):
    """Sets up directories for files, output, and tools."""
    files_dir=os.path.join(project_folder, 'files')
    output_dir=os.path.join(project_folder, 'output')
    # Directory where the script is located
    tools_dir=os.path.join(script_dir, 'tools')

    os.makedirs(files_dir, exist_ok=True)
    os.makedirs(output_dir, exist_ok=True)
    os.makedirs(tools_dir, exist_ok=True)

    logging.info(f"Directories set up: files_dir='{files_dir}', output_dir='{output_dir}', tools_dir='{tools_dir}'")
    return files_dir, output_dir, tools_dir

def determine_project_folder(prompt):
    Determine the project folder based on the prompt text.
    folder_name=sanitize_filename(prompt, max_length=15)  # Adjust length as needed
    # Get the directory where the script is running
    project_folder=os.path.join(script_dir, folder_name)

    # Create the directory if it doesn't exist
    os.makedirs(project_folder, exist_ok=True)
    # Print the project folder path (truncated to 15 characters)
    print(f"\033[92mProject folder: {folder_name[:15]}\033[0m")
    return project_folder

def get_last_processed_page(project_folder):
    Get the last processed page number from a progress file.
    progress_file = os.path.join(project_folder, 'progress.json')
    if os.path.exists(progress_file):
        with open(progress_file, 'r') as file:
            progress = json.load(file)
            return progress.get('last_page', 0)
    return 0

def save_progress(project_folder, page_number):
    Save the progress to a file to track the last processed page number.
    progress_file = os.path.join(project_folder, 'progress.json')
    with open(progress_file, 'w') as file:
        json.dump({'last_page': page_number}, file)

def process_story(story_text, prompt, page_length=200):
    Processes the story by splitting it into pages and writing each page to a file.
    pages=split_story_into_pages(story_text, page_length)
    start_page=get_last_processed_page(project_folder) + 1

    for i, page in enumerate(pages[start_page - 1:200], start=start_page):
        write_page_to_file(i, page, project_folder)
        save_progress(project_folder, i)

def split_story_into_pages(story_text, page_length):
    Splits the story into pages of a given length.
    return [story_text[i:i + page_length] for i in range(0, len(story_text), page_length)]

def write_page_to_file(page_number, page_text, project_folder):
    Writes a single page of text to a file named 'page{page_number}.txt'.
    file_path=os.path.join(project_folder, f"page{page_number}.txt")
        with open(file_path, 'w') as file:
        logging.info(f"Page {page_number} written to {file_path}")
    except Exception as e:
        logging.error(f"Failed to write page {page_number} to file: {e}")

def save_story_to_file(filename, content, prompt):
    Save generated story content to a file inside the determined project folder.
    file_path=os.path.join(project_folder, filename)
        with open(file_path, 'w') as file:
        print(f"\033[92mContent saved to {file_path}\033[0m")
    except Exception as e:
        print(f"\033[91mFailed to save content to {file_path}: {e}\033[0m")

def generate_story_in_chain_mode(story_text, prompt):
    Generate story pages in a continuous chain until completion, automatically saving each page.
    page_number=get_last_processed_page(project_folder) + 1
    while not is_story_complete:
            # Generate the next page of the story
            story_page_response=llm.invoke(f"Generate the next page of the story starting with this prompt:\n{story_text}")
            story_page_text=story_page_response.content if hasattr(story_page_response, 'content') else str(story_page_response)
            # Print each page's generated text to the console
            print(f"\033[92mGenerated Page {page_number}:\033[0m")  # Green text for each generated page
            # Save the generated page
            write_page_to_file(page_number, story_page_text, project_folder)
            save_progress(project_folder, page_number)
            # Check if the story is complete
            if "THE END" in story_page_text.upper():
                print("\033[92mStory generation complete!\033[0m")  # Green text for completion message
                # If story continues, update for next page
                page_number += 1
                story_text=story_page_text  # Continue generating based on previous page
        except Exception as e:
            logging.error(f"Error during story generation: {e}", exc_info=True)
            print(f"An error occurred during story generation: {e}")
            is_story_complete=True  # Break the loop on failure

def handle_interactive_mode():
    Handle interactive mode for user commands, ensuring progressive story generation.
    print("\033[92mStarting Storytime AI By J~Net 2024\033[0m")  # Green text for starting message
    script_dir=os.path.dirname(os.path.abspath(__file__))  # Directory where the script is located
    print(f"\033[92mScript directory: {script_dir}\033[0m")

    while True:
            user_input=input("Enter your command (type 'exit' to quit): ").strip()
            if user_input.lower() == 'exit':
            elif user_input.startswith("read "):

                # Read the story file from the same directory as the script
                story_file_path=os.path.join(script_dir, filename)
                print(f"\033[93mLooking for file: {story_file_path}\033[0m")  # Yellow text for file search

                if os.path.exists(story_file_path):
                    print(f"\033[92mFile '{filename}' found. Reading the file...\033[0m")

                    # Read the file content
                    with open(story_file_path, 'r') as file:

                    # Print the content to console
                    print("\033[92mFile content:\033[0m")

                    # Start by generating the first page of the story
                    current_page=get_last_processed_page(determine_project_folder(story_text)) + 1
                    story_length=200  # Assume we want 200 pages
                    while current_page <= story_length:
                        print(f"\033[92mGenerating page {current_page} of the story...\033[0m")
                        page_prompt=f"Based on the plot, please generate page {current_page} of the story."
                        # Ensure response is treated correctly
                        if hasattr(story_response, 'content'):

                        print(f"\033[92mGenerated content for page {current_page}:\033[0m")

                        # Save the generated story incrementally
                        save_story_to_file(f"page{current_page}.txt", story_response_text, story_text)
                        print(f"\033[92mPage {current_page} saved successfully.\033[0m")

                        # Update the current page counter
                        current_page += 1

                    print(f"\033[91mFile '{filename}' not found in '{script_dir}'.\033[0m")
                print("\033[93mUnknown command.\033[0m")
        except Exception as e:
            logging.error(f"Error during interactive mode: {e}", exc_info=True)
            print(f"\033[91mAn error occurred: {e}\033[0m")

def list_available_tools(tools_dir):
    List all available tool files in the specified directory.
        tools=[f[:-3] for f in os.listdir(tools_dir) if f.endswith('.py')]
        logging.debug(f"Available tools: {tools}")
        return tools
    except Exception as e:
        logging.error(f"Error listing available tools: {e}", exc_info=True)
        return []

def delete_tool(tools_dir, tool_name):
    Delete a Python tool file from the specified directory.
    tool_filename=os.path.join(tools_dir, f"{tool_name}.py")
    if os.path.exists(tool_filename):
            logging.info(f"Tool '{tool_name}' deleted.")
            return f"Tool '{tool_name}' deleted successfully."
        except Exception as e:
            logging.error(f"Error occurred while deleting the tool: {e}", exc_info=True)
            return f"An error occurred while deleting the tool: {e}"
        return f"Tool '{tool_name}' does not exist."

def update_tool(tools_dir, tool_name, new_code):
    Update the code of an existing Python tool.
    tool_filename=os.path.join(tools_dir, f"{tool_name}.py")
    if os.path.exists(tool_filename):
            with open(tool_filename, 'w') as tool_file:
            logging.info(f"Tool '{tool_name}' updated.")
            return f"Tool '{tool_name}' updated successfully."
        except Exception as e:
            logging.error(f"Error occurred while updating the tool: {e}", exc_info=True)
            return f"An error occurred while updating the tool: {e}"
        return f"Tool '{tool_name}' does not exist."

def execute_tool_function(tools_dir, tool_name, function_name, *args):
    Execute a function from a Python tool.
    tool=load_tool(tools_dir, tool_name)
    if hasattr(tool, function_name):
        func=getattr(tool, function_name)
            logging.debug(f"Executed function '{function_name}' from tool '{tool_name}' with result: {result}")
            return result
        except Exception as e:
            logging.error(f"Error occurred while executing function '{function_name}' from tool '{tool_name}': {e}", exc_info=True)
            return f"An error occurred while executing the function: {e}"
        return f"Function '{function_name}' not found in tool '{tool_name}'."

def analyze_sentiment(text):
    Analyze the sentiment of a given text using VADER.
    return sentiment_scores

def move_file(src_dir, dest_dir, filename):
    Move a file from one directory to another.
    src_path=os.path.join(src_dir, filename)
    dest_path=os.path.join(dest_dir, filename)
    if os.path.exists(src_path):
            os.rename(src_path, dest_path)
            logging.debug(f"File '{filename}' moved from '{src_dir}' to '{dest_dir}'.")
            return f"File '{filename}' moved successfully."
        except Exception as e:
            logging.error(f"Error occurred while moving file '{filename}': {e}", exc_info=True)
            return f"An error occurred while moving the file: {e}"
        return f"File '{filename}' does not exist in the source directory."

def copy_file(src_dir, dest_dir, filename):
    Copy a file from one directory to another.
    src_path=os.path.join(src_dir, filename)
    dest_path=os.path.join(dest_dir, filename)
    if os.path.exists(src_path):
            with open(src_path, 'rb') as src_file:
                with open(dest_path, 'wb') as dest_file:
            logging.debug(f"File '{filename}' copied from '{src_dir}' to '{dest_dir}'.")
            return f"File '{filename}' copied successfully."
        except Exception as e:
            logging.error(f"Error occurred while copying file '{filename}': {e}", exc_info=True)
            return f"An error occurred while copying the file: {e}"
        return f"File '{filename}' does not exist in the source directory."

def search_text_in_files(files_dir, search_text):
    Search for a text string within all files in the specified directory.
    for filename in os.listdir(files_dir):
        file_path=os.path.join(files_dir, filename)
        if os.path.isfile(file_path):
                with open(file_path, 'r') as file:
                    matching_lines=[line.strip() for line in lines if search_text.lower() in line.lower()]
                    if matching_lines:
            except Exception as e:
                logging.error(f"Error occurred while searching in file '{filename}': {e}", exc_info=True)
    return results

def list_files_in_directory(directory):
    List all files in the specified directory.
        return [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
    except Exception as e:
        logging.error(f"Error occurred while listing files in directory '{directory}': {e}", exc_info=True)
        return []

def execute_shell_command(command):
    Execute a shell command and return its output.
        result=subprocess.check_output(command, shell=True, text=True)
        logging.debug(f"Executed command: '{command}' with output: {result}")
        return result
    except subprocess.CalledProcessError as e:
        logging.error(f"Command '{command}' failed with return code {e.returncode} and output: {e.output}", exc_info=True)
        return f"Command failed with return code {e.returncode}."

def load_tool(tools_dir, tool_name):
    Load a Python tool module dynamically.
    tool_path=os.path.join(tools_dir, f"{tool_name}.py")
    spec=importlib.util.spec_from_file_location(tool_name, tool_path)
        logging.debug(f"Loaded tool '{tool_name}' from '{tool_path}'.")
        return tool
    except Exception as e:
        logging.error(f"Error occurred while loading tool '{tool_name}': {e}", exc_info=True)
        return None



pip install -r requirements.txt

