Building Smart Workflows with the Microsoft Agent Framework-Part -I

Introduction

The Microsoft Agent Framework (MAF) empowers developers to build automated workflows that blend AI agents with business processes. It allows us to orchestrate complex tasks by connecting intelligent components that can act, reason, and collaborate.

How Are Workflows Different from AI Agents?

Although both AI agents and workflows can include multiple steps to achieve a goal, they serve different purposes and operate at different levels of abstraction.

🤖 AI Agent

Think of every human as a kind of AI agent our brain acts like a large language model (LLM), while our eyes and ears serve as tools that bring in external context. Similarly, AI agents use tools or MCP servers to gather external information and maintain short-term and long-term memory, just like humans remember past conversations or experiences.

🔄 Workflow

Workflows, on the other hand, are like the human body’s digestive system a predefined and structured sequence of steps. Each step performs a specific function and passes the result to the next.
For example, when you receive sensory input, your brain transforms it into signals, leading to actions. This resembles how business workflows process data through a series of interconnected executors.

In short:

Agents make dynamic decisions in real-time.

Workflows follow explicitly defined paths for predictable and controlled execution.

⚙️ Microsoft Agent Framework – Core Components

MAF workflows are made up of four main building blocks:

  1. Executors
  2. Edges
  3. Workflows
  4. Events

🧠 Executors — The Brains of the Workflow

Executors are the fundamental processing units in a workflow.
They:

  1. Receive typed messages
  2. Perform operations
  3. Produce outputs as new messages or events
  4. Every executor inherits from the base Executor class and can process specific message types using methods decorated with the @handler decorator.

💻 Example: Building a Sentiment Analyzer Executor

from agent_framework import Executor, WorkflowContext, handler
from textblob import TextBlob

class SentimentAnalyzer(Executor):

    @handler
    async def analyze_sentiment(self, text: str, ctx: WorkflowContext[dict]) -> None:
        """Analyze sentiment of the input text and forward result as dict."""
        analysis = TextBlob(text)
        result = {
            "text": text,
            "polarity": analysis.sentiment.polarity,
            "subjectivity": analysis.sentiment.subjectivity
        }
        await ctx.send_message(result)

Creating an Executor Using a Function

You can also create an executor directly from a function using the @executor decorator:

from agent_framework import WorkflowContext, executor
from textblob import TextBlob

@executor(id="sentiment_executor")
async def analyze_sentiment(text: str, ctx: WorkflowContext[dict]) -> None:
    """
    Analyze the sentiment of the input text and forward a result dictionary.
    """
    analysis = TextBlob(text)
    result = {
        "text": text,
        "polarity": analysis.sentiment.polarity,
        "subjectivity": analysis.sentiment.subjectivity
    }
    await ctx.send_message(result)

🧠 Handling Multiple Input Types

Executors can have multiple handlers to process different types of messages:

from agent_framework import Executor, WorkflowContext, handler

class RealWorldExecutor(Executor):

    @handler
    async def analyze_sentiment(self, text: str, ctx: WorkflowContext[dict]) -> None:
        """Analyze sentiment of input text."""

    @handler
    async def translate_to_french(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Translate text to French."""

    @handler
    async def summarize_text(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Summarize text."""

    @handler
    async def extract_entities(self, text: str, ctx: WorkflowContext[dict]) -> None:
        """Extract named entities."""

    @handler
    async def detect_spam(self, message: str, ctx: WorkflowContext[bool]) -> None:
        """Detect spam messages."""

The WorkflowContext Object

WorkflowContext allows handlers to interact with the workflow during execution. It defines what type of messages a handler emits and what outputs it can produce.
send_message – sends data to downstream executors
yield_output – sends output back to the workflow caller

Example:

from agent_framework import WorkflowContext

class SomeHandler(Executor):

    @handler
    async def some_handler(message: str, ctx: WorkflowContext[str]) -> None:
        await ctx.send_message("Updated the all records")

Or, to yield an output directly:

from agent_framework import WorkflowContext

class SomeHandler(Executor):

    @handler
    async def some_handler(message: str, ctx: WorkflowContext[None, str]) -> None:
        await ctx.yield_output("I am done ...")

If a handler doesn’t send or yield messages, no type parameter is required:

from agent_framework import WorkflowContext

class SomeHandler(Executor):

    @handler
    async def some_handler(message: str, ctx: WorkflowContext) -> None:
        print("doing or processing something...")

🔗 Edges — Connecting Executors Together

Edges define how executors are connected within a workflow.
They determine how messages flow from one executor to another optionally under specific conditions. In other words, edges define the data flow or process flow path between the building blocks of your workflow.

The Microsoft Agent Framework supports four main types of edges:

Direct Edges – Simple one-to-one connections

Conditional Edges – Connections based on a condition

Fan-out Edges – One executor sending messages to multiple targets

Fan-in Edges – Multiple executors sending messages to a single target

1️⃣ ## Direct Edges

The simplest connection between two executors — like connecting an email parser to a classifier.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.add_edge(email_parser, spam_classifier)
builder.set_start_executor(email_parser)
workflow = builder.build()

2️⃣ Conditional Edges

Edges that activate only when certain conditions are met for example, sending flagged transactions to a fraud detector.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.add_edge(transaction_checker, fraud_detector,
                 condition=lambda result: result["amount"] > 5000)
builder.add_edge(transaction_checker, normal_flow_handler,
                 condition=lambda result: result["amount"] <= 5000)
builder.set_start_executor(transaction_checker)
workflow = builder.build()

3️⃣ Switch-Case Edges

Route messages to different executors based on complex conditions similar to switch or if-else logic.

from agent_framework import Case, Default, WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(priority_router)

builder.add_switch_case_edge_group(
    priority_router,
    [
        Case(condition=lambda msg: msg.level == "LOW", target=low_priority_executor),
        Case(condition=lambda msg: msg.level == "MEDIUM", target=medium_priority_executor),
        Default(target=high_priority_executor),
    ],
)
workflow = builder.build()

4️⃣ Fan-out Edges

Send messages from one executor to multiple downstream executors for example, broadcasting notifications.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(notification_sender)
builder.add_fan_out_edges(notification_sender, [sms_executor, email_executor, push_executor])
workflow = builder.build()

You can even add logic to selectively send messages:

builder.add_fan_out_edges(
    notification_sender,
    [sms_executor, email_executor, push_executor],
    selection_func=lambda msg, _: (
        [0] if msg.channel == "SMS"
        else [1, 2] if msg.channel == "EMAIL"
        else list(range(3))
    )
)

5️⃣ Fan-in Edges

Collect messages from multiple executors into a single one — for example, merging outputs from multiple scrapers into one aggregator.

builder.add_fan_in_edge([scraper_a, scraper_b, scraper_c], aggregator_executor)

Workflows

A workflow orchestrates a series of tasks, handling execution, message routing, and event streaming between different executors. Think of it as a project manager ensuring each step runs smoothly and in the right order.

Building Workflows

Here’s an example of a workflow for processing job applications:

from agent_framework import WorkflowBuilder

# Define executors for a resume screening workflow
resume_uploader = ResumeUploader()       # Uploads resumes to the system
keyword_analyzer = KeywordAnalyzer()    # Extracts and scores keywords
candidate_ranker = CandidateRanker()    # Ranks candidates based on score

# Build the workflow
builder = WorkflowBuilder()
builder.set_start_executor(resume_uploader)          # Start with uploading resumes
builder.add_edge(resume_uploader, keyword_analyzer)  # Pass resumes for keyword analysis
builder.add_edge(keyword_analyzer, candidate_ranker) # Pass analyzed data for ranking
workflow = builder.build()

Workflow Execution

Workflows can run in streaming or non-streaming modes:

Streaming Mode – Receive updates as each step completes:

from agent_framework import WorkflowCompletedEvent

async for event in workflow.run_stream(new_resume_file):
    if isinstance(event, WorkflowCompletedEvent):
        print(f"Workflow completed. Final candidate ranking: {event.data}")

Non-Streaming Mode

Wait until the workflow finishes and get the final result:

events = await workflow.run(new_resume_file)
print(f"Final ranking: {events.get_completed_event()}")

Workflow Validation

  1. The framework ensures your workflow is correct before execution:
  2. Type Compatibility: Resumes, scores, and rankings must match expected formats
  3. Graph Connectivity: Each executor must be reachable from the starting point
  4. Executor Binding: All executors must be instantiated properly
  5. Edge Validation: Prevents duplicate or invalid connections

Execution Model

The workflow uses a Pregel-inspired execution model, processing messages in supersteps for parallel and efficient execution.

What is Pregel?

Pregel is a distributed computing model developed by Google, designed to efficiently process large scale graphs. It works by dividing computation into supersteps, where each node (or executor) in a graph performs its computation in parallel, exchanges messages with other nodes, and waits for the next superstep. This ensures a clear, predictable flow of data while taking advantage of parallelism.

Workflow Events

Workflows not only manage task execution but also emit events at key points, giving you visibility into what’s happening in real time. These events help monitor, debug, and understand the workflow’s progress.

Built-in Event Types

The framework provides several built-in events for different aspects of workflow execution:

Workflow Lifecycle Events

WorkflowStartedEvent – Triggered when a workflow begins execution

WorkflowOutputEvent – Triggered when the workflow produces a result

WorkflowErrorEvent – Triggered if the workflow encounters an error

Executor Events

ExecutorInvokeEvent – Triggered when an executor starts processing

ExecutorCompleteEvent – Triggered when an executor finishes processing

Request Events

RequestInfoEvent – Triggered when a request is issued

Observing Events

You can consume these events in streaming mode to see progress as it happens. For example, in a resume screening workflow:

from agent_framework import (
    ExecutorInvokeEvent,
    ExecutorCompleteEvent,
    WorkflowOutputEvent,
    WorkflowErrorEvent,
)

async for event in workflow.run_stream(resume_file):
    match event:
        case ExecutorInvokeEvent() as invoke:
            print(f"Executor started: {invoke.executor_id}")
        case ExecutorCompleteEvent() as complete:
            print(f"Executor finished: {complete.executor_id} -> {complete.data}")
        case WorkflowOutputEvent() as output:
            print(f"Workflow output: {output.data}")
            break
        case WorkflowErrorEvent() as error:
            print(f"Workflow error: {error.exception}")
            break

Here, you can see exactly which executor is running, when it finishes, and what output or errors occur during execution.

Custom Events

You can also define your own events for more granular observability. For instance, logging every resume processed:

from agent_framework import handler, Executor, WorkflowContext, WorkflowEvent

# Define a custom event
class ResumeProcessedEvent(WorkflowEvent):
    def __init__(self, candidate_name: str, score: float):
        super().__init__(f"{candidate_name} scored {score}")
        self.candidate_name = candidate_name
        self.score = score

# Custom executor that emits this event
class RankingExecutor(Executor):

    @handler
    async def handle(self, candidate: dict, ctx: WorkflowContext[dict]) -> None:
        score = candidate['score']
        # Emit custom event for observability
        await ctx.add_event(ResumeProcessedEvent(candidate['name'], score))
        # Continue processing...

With custom events, you can track domain-specific milestones, like candidate scoring, document validation, or any other step meaningful to your workflow.

Conclusion

The Microsoft Agent Framework provides a powerful, flexible way to build intelligent, observable workflows. By combining executors to perform discrete tasks, edges to control message flow, and workflows to orchestrate execution, you can design systems that are both modular and scalable.

Events—both built-in and custom give you real-time visibility into your workflow, making it easier to monitor progress, debug issues, and gain insights into how data flows through your system. Whether you are building AI agents for resume screening, customer support, or other enterprise applications, this framework helps you structure your logic cleanly while keeping execution transparent and maintainable.

In essence, the Microsoft Agent Framework empowers developers to move beyond simple linear processes, enabling dynamic, parallel, and intelligent workflows that can adapt to complex, real-world scenarios.

In my next blog Part-II , I’ll show you how to build an AI agent workflow using the Microsoft Agent Framework to automate real business processes.

Thanks
Sreeni Ramadorai

Similar Posts