Skip to content

processor

mlflow_assistant.engine.processor

Query processor that leverages the workflow engine for processing user queries and generating responses using an AI provider.

process_query(query, provider_config, verbose=False) async

Process a query through the MLflow Assistant workflow.

Parameters:

Name Type Description Default
query str

The query to process

required
provider_config dict[str, Any]

AI provider configuration

required
verbose bool

Whether to show verbose output

False

Returns:

Type Description
dict[str, Any]

Dict containing the response

Source code in src/mlflow_assistant/engine/processor.py
async def process_query(
    query: str, provider_config: dict[str, Any], verbose: bool = False,
) -> dict[str, Any]:
    """Process a query through the MLflow Assistant workflow.

    Args:
        query: The query to process
        provider_config: AI provider configuration
        verbose: Whether to show verbose output

    Returns:
        Dict containing the response

    """
    import time

    from .workflow import create_workflow

    # Track start time for duration calculation
    start_time = time.time()

    try:
        # Create workflow
        workflow = create_workflow()

        # Run workflow with provider config
        initial_state = {
            STATE_KEY_MESSAGES: [HumanMessage(content=query)],
            STATE_KEY_PROVIDER_CONFIG: provider_config,
        }

        if verbose:
            logger.info(f"Running workflow with query: {query}")
            logger.info(f"Using provider: {provider_config.get(CONFIG_KEY_TYPE)}")
            logger.info(
                f"Using model: {provider_config.get(CONFIG_KEY_MODEL, 'default')}",
            )

        result = await workflow.ainvoke(initial_state)

        # Calculate duration
        duration = time.time() - start_time

        return {
            "original_query": query,
            "response": result.get(STATE_KEY_MESSAGES)[-1],
            "duration": duration,  # Add duration to response
        }

    except Exception as e:
        # Calculate duration even for errors
        duration = time.time() - start_time

        logger.error(f"Error processing query: {e}")

        return {
            "error": str(e),
            "original_query": query,
            "response": f"Error processing query: {e!s}",
        }