PC & IT SUPPORT MADE EASY FORUM
Would you like to react to this message? Create an account in a few clicks or log in to continue.

CrewAI Quest V5

Go down

CrewAI Quest V5 Empty CrewAI Quest V5

Post by jamied_uk 9th August 2024, 13:46

start_quest.sh


Code:
#!/bin/bash

# Print a blank line
echo ""

# Create a Python virtual environment in the "venv" directory if not already created
if [ ! -d "venv" ]; then
    python -m venv venv
fi

# Activate the virtual environment
source venv/bin/activate

# Upgrade pip to the latest version (optional)
#pip install --upgrade pip

# Install required packages from requirements.txt (if needed)
#pip install -r requirements.txt

# Run the Python script with story.txt as an argument if provided
if [ $# -eq 1 ]; then
    python questV5.py "$1"
else
    # Run the Python script without any argument (interactive mode)
    python questV5.py
fi

# Deactivate the virtual environment
deactivate




questV5.py


Code:
#
# (c)J~Net 2024
#
# ./start_quest.sh
#
# https://jnet.forumotion.com/t2025-ceewai-ollama-storytime-ai-python-example#3120
#
import sys
import os
import time
import logging
import json
import importlib
import subprocess
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
from crewai import Agent, Task

# Load environment variables
load_dotenv()

# Configure the LLM model
llm=ChatOpenAI(
    model="crewai-dolphin-phi",
    base_url="http://localhost:11434/v1"
)

# Configure logging
logging.basicConfig(filename='script.log', level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')

def determine_project_folder(prompt):
    """
    Determine the project folder based on the initial words of the prompt.
    Replace spaces with underscores and exclude 'read' from the name.
    """
    parts=prompt.split()
    base_name='_'.join(parts[1:]) if len(parts) > 1 else 'default_project'
    project_folder=f"{base_name}_project".replace(" ", "_")
    return project_folder

def setup_directories(project_folder):
    """
    Setup directories for files, output, and tools within the project folder.
    """
    os.makedirs(project_folder, exist_ok=True)
    files_dir=os.path.join(project_folder, 'files')
    os.makedirs(files_dir, exist_ok=True)
    output_dir=os.path.join(project_folder, 'output')
    os.makedirs(output_dir, exist_ok=True)
    tools_dir=os.path.join(project_folder, 'tools')
    os.makedirs(tools_dir, exist_ok=True)
    return files_dir, output_dir, tools_dir

def read_file(files_dir, filename):
    """
    Read the contents of a file from the files directory.
    First check in the current working directory, then in the project directory.
    """
    possible_paths=[os.path.join(os.getcwd(), filename), os.path.join(files_dir, filename)]
    for path in possible_paths:
        if os.path.exists(path):
            logging.debug(f"Reading file from: {path}")
            with open(path, 'r') as file:
                return file.read()
    logging.error(f"File '{filename}' not found in any of the expected locations.")
    return f"File '{filename}' not found."

def create_file(files_dir, filename, content):
    """
    Create or overwrite a file with the specified content in the files directory.
    """
    path=os.path.join(files_dir, filename)
    try:
        with open(path, 'w') as file:
            file.write(content)
        logging.debug(f"File '{filename}' created/updated at: {path}")
        return f"File '{filename}' created successfully."
    except Exception as e:
        logging.error(f"Error occurred while creating the file: {e}", exc_info=True)
        return f"An error occurred while creating the file: {e}"

def append_output(output_dir, content):
    """
    Append output to a file in the output directory.
    """
    try:
        output_filename=os.path.join(output_dir, 'output.txt')
        with open(output_filename, 'a') as file:
            file.write(content + '\n')
        logging.debug(f"Appended output to: {output_filename}")
        return f"Output appended to '{output_filename}'."
    except Exception as e:
        logging.error(f"Error occurred while appending output: {e}", exc_info=True)
        return f"An error occurred while appending output: {e}"

def save_step_result(files_dir, step_number, result):
    """
    Save the result of a step to a file.
    """
    step_filename=os.path.join(files_dir, f"step{step_number}.txt")
    with open(step_filename, 'w') as step_file:
        step_file.write(result)
    logging.debug(f"Saved step result to: {step_filename}")

def update_stage(files_dir, stage):
    """
    Update the stage information in stage.json.
    """
    stage_file=os.path.join(files_dir, 'stage.json')
    try:
        with open(stage_file, 'w') as f:
            json.dump({"current_stage": stage}, f)
        logging.debug(f"Stage updated to: {stage}")
        return "Stage updated successfully."
    except Exception as e:
        logging.error(f"Error occurred while updating stage: {e}", exc_info=True)
        return "An error occurred while updating the stage."

def read_stage(files_dir):
    """
    Read the current stage from stage.json.
    """
    stage_file=os.path.join(files_dir, 'stage.json')
    if os.path.exists(stage_file):
        try:
            with open(stage_file, 'r') as f:
                stage_data=json.load(f)
            return stage_data.get("current_stage", 0)
        except Exception as e:
            logging.error(f"Error occurred while reading stage: {e}", exc_info=True)
    return 0

def read_existing_steps(files_dir):
    """
    Read existing steps from the files directory.
    """
    steps=[]
    try:
        for filename in os.listdir(files_dir):
            if filename.startswith("step") and filename.endswith(".txt"):
                file_path=os.path.join(files_dir, filename)
                with open(file_path, 'r') as file:
                    steps.append(file.read())
    except Exception as e:
        logging.error(f"Error occurred while reading step files: {e}", exc_info=True)
    return steps, steps[-1] if steps else ""

def generate_filename(files_dir, base_name):
    """
    Generate a unique filename based on a base name.
    """
    count=1
    while True:
        filename=os.path.join(files_dir, f"{base_name}-{count}")
        if not os.path.exists(filename):
            return filename
        count += 1

def create_tool(tools_dir, tool_name, tool_code):
    """
    Create a tool script file with the specified code in the tools directory.
    """
    tool_filename=os.path.join(tools_dir, f"{tool_name}.py")
    try:
        with open(tool_filename, 'w') as tool_file:
            tool_file.write(tool_code)
        logging.debug(f"Tool '{tool_name}' created at: {tool_filename}")
        return f"Tool '{tool_name}' created successfully."
    except Exception as e:
        logging.error(f"Error occurred while creating the tool: {e}", exc_info=True)
        return f"An error occurred while creating the tool: {e}"

def load_tool(tools_dir, tool_name):
    """
    Load a tool module from the tools directory.
    """
    tool_filename=os.path.join(tools_dir, f"{tool_name}.py")
    if os.path.exists(tool_filename):
        spec=importlib.util.spec_from_file_location(tool_name, tool_filename)
        tool_module=importlib.util.module_from_spec(spec)
        spec.loader.exec_module(tool_module)
        return tool_module
    else:
        raise FileNotFoundError(f"Tool '{tool_name}' not found.")

def install_module(module_name):
    """
    Install a Python module using pip.
    """
    try:
        subprocess.check_call([sys.executable, "-m", "pip", "install", module_name])
        logging.debug(f"Module '{module_name}' installed successfully.")
        return f"Module '{module_name}' installed successfully."
    except subprocess.CalledProcessError:
        logging.error(f"Error occurred while installing the module '{module_name}'.")
        return f"An error occurred while installing the module '{module_name}'."

def create_agent(role, goal, backstory):
    """
    Create an agent with the specified role, goal, and backstory.
    """
    if os.path.exists('goal.txt'):
        with open('goal.txt', 'r') as goal_file:
            goal=goal_file.read().strip()
    return Agent(role=role, goal=goal, backstory=backstory, allow_delegation=False, verbose=True, llm=llm)

def execute_task_chain(content, agent, start_step=0, files_dir='files', output_dir='output'):
    """
    Execute a chain of tasks based on the provided content.
    """
    tasks=content.splitlines()
    for index, task_description in enumerate(tasks[start_step:], start=start_step + 1):
        if task_description.strip():
            try:
                task_config={
                    'description': task_description.strip(),
                    'expected_output': 'No specific output expected',
                    'agent': agent
                }
                task=Task(**task_config)
                result=task.execute()
                print(result)
                logging.debug(f"Task executed with result: {result}")

                # Append result to output.txt
                append_output(output_dir, result)

                # Save step result
                save_step_result(files_dir, index, result)

                # Update stage
                update_stage(files_dir, index)
            except Exception as e:
                logging.error(f"Error occurred during task execution: {e}", exc_info=True)
                print(f"Error occurred during task execution: {e}")

def main():
    """
    Main function to handle user input and execute commands.
    """
    while True:
        prompt=input("Enter your command (type 'exit' to quit): ").strip()
        if prompt.lower() == 'exit':
            break

        # Determine project folder and display its path
        project_folder=determine_project_folder(prompt)
        print(f"Project folder: {project_folder}")

        files_dir, output_dir, tools_dir=setup_directories(project_folder)
        existing_steps, latest_output=read_existing_steps(files_dir)
        start_step=len(existing_steps)

        general_agent=create_agent(
            role='Requirements Manager',
            goal="""Handle any user input commands including reading a file
                    and creating files for apps based on user input. For file reading,
                    read the contents and display them. For file creation,
                    generate a filename based on user input or a pattern and create
                    the file with the specified content. Respond as if
                    you were an expert developer providing file creation services
                    on demand.""",
            backstory="""You are an expert developer with a strong background
                        in software engineering. You provide high quality,
                        thorough, and efficient file creation services based on
                        user requirements."""
        )

        if prompt.lower().startswith('read '):
            filename=prompt[len('read '):].strip()
            if filename:
                start_time=time.time()
                content=read_file(files_dir, filename)

                # Set the new goal description to the content read from the file
                general_agent.goal=content.strip()

                # Write the updated goal to goal.txt
                create_file(files_dir, 'goal.txt', general_agent.goal)

                execute_task_chain(content, general_agent, start_step, files_dir, output_dir)

                end_time=time.time()
                elapsed_time=end_time - start_time
                elapsed_hours, rem=divmod(elapsed_time, 3600)
                elapsed_minutes, elapsed_seconds=divmod(rem, 60)
                print(f"Execution time: {int(elapsed_hours)}h {int(elapsed_minutes)}m {int(elapsed_seconds)}s")
            else:
                print("Filename is empty or invalid.")
        elif prompt.lower().startswith('create file '):
            parts=prompt[len('create file '):].split(' with content ', 1)
            if len(parts) == 2:
                filename, content=parts
                filename=generate_filename(files_dir, filename.strip())
                result=create_file(files_dir, filename, content.strip())
                print(result)

                # Save result to latest_output.txt
                append_output(output_dir, result)
            else:
                print("Invalid command format. Use: create file <filename> with content <content>")
        elif prompt.lower().startswith('create tool '):
            parts=prompt[len('create tool '):].split(' with code ', 1)
            if len(parts) == 2:
                tool_name, tool_code=parts
                result=create_tool(tools_dir, tool_name.strip(), tool_code.strip())
                print(result)
            else:
                print("Invalid command format. Use: create tool <tool_name> with code <tool_code>")
        else:
            # Execute dynamic task based on user input
            task_description=prompt
            try:
                task=Task(description=task_description.strip(), agent=general_agent)
                result=task.execute()
                print(result)

                # Append result to output.txt
                append_output(output_dir, result)
            except Exception as e:
                logging.error(f"Error occurred during task execution: {e}", exc_info=True)
                print(f"Error occurred during task execution: {e}")

if __name__ == "__main__":
    if len(sys.argv) > 1:
        start_time=time.time()
        filename=sys.argv[1]
        project_folder=determine_project_folder(filename)
        print(f"Project folder: {project_folder}")

        files_dir, output_dir, tools_dir=setup_directories(project_folder)
        content=read_file(files_dir, filename)
        existing_steps, latest_output=read_existing_steps(files_dir)
        start_step=len(existing_steps)
        general_agent=create_agent(
            role='Requirements Manager',
            goal="""Handle any user input commands including reading a file
                    and creating files for apps based on user input. For file reading,
                    read the contents and display them. For file creation,
                    generate a filename based on user input or a pattern and create
                    the file with the specified content. Respond as if
                    you were an expert developer providing file creation services
                    on demand.""",
            backstory="""You are an expert developer with a strong background
                        in software engineering. You provide high quality,
                        thorough, and efficient file creation services based on
                        user requirements.
                        Also remember the files folder includes a json file tools.json and a tools folder for tools you may require to do a task."""
        )
        execute_task_chain(content, general_agent, start_step, files_dir, output_dir)

        end_time=time.time()
        elapsed_time=end_time - start_time
        elapsed_hours, rem=divmod(elapsed_time, 3600)
        elapsed_minutes, elapsed_seconds=divmod(rem, 60)
        print(f"Execution time: {int(elapsed_hours)}h {int(elapsed_minutes)}m {int(elapsed_seconds)}s")
    else:
        main()
jamied_uk
jamied_uk
Admin

Posts : 3016
Join date : 2010-05-09
Age : 41
Location : UK

https://jnet.sytes.net

Back to top Go down

Back to top

- Similar topics

 
Permissions in this forum:
You cannot reply to topics in this forum