Skip to content

workflow

mlflow_assistant.engine.workflow

Core LangGraph-based workflow engine for processing user queries and generating responses using an AI provider.

This workflow supports tool-augmented generation: tool calls are detected and executed in a loop until a final AI response is produced.

State

Bases: TypedDict

State schema for the workflow engine.

create_workflow()

Create and return a compiled LangGraph workflow.

Source code in src/mlflow_assistant/engine/workflow.py
def create_workflow():
    """Create and return a compiled LangGraph workflow."""
    graph_builder = StateGraph(State)

    def call_model(state: State) -> State:
        """Call the AI model and return updated state with response."""
        messages = state[STATE_KEY_MESSAGES]
        provider_config = state.get(STATE_KEY_PROVIDER_CONFIG, {})
        try:
            provider = AIProvider.create(provider_config)
            model = provider.langchain_model().bind_tools(tools)
            response = model.invoke(messages)
            return {**state, STATE_KEY_MESSAGES: [response]}
        except Exception as e:
            logger.error(f"Error generating response: {e}", exc_info=True)
            return {**state, STATE_KEY_MESSAGES: messages}

    # Add nodes
    graph_builder.add_node("tools", ToolNode(tools))
    graph_builder.add_node("model", call_model)

    # Define graph transitions
    graph_builder.add_edge("tools", "model")
    graph_builder.add_conditional_edges("model", tools_condition)
    graph_builder.set_entry_point("model")

    return graph_builder.compile()