// Example: Orchestrated Bulk Workflow (Single DbContext)
// Demonstrates orchestrator input/output + schema sync + bulk operations
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.EntityFrameworkCore;
using DataArc;
using DataArc.Abstractions;
using DataArc.OrchestratR;
using DataArc.OrchestratR.Abstractions;
// SAMPLE ENTITY
public class Employee
{
public int Id { get; set; }
public string Name { get; set; } = "";
public decimal Salary { get; set; }
}
// SAMPLE CONTEXT
public class AppDbContext : DbContext, IPipelineContext<AppDbContext>
{
public DbSet<Employee> Employees => Set<Employee>();
public AppDbContext(DbContextOptions<AppDbContext> options) : base(options) { }
}
// ORCHESTRATION INPUT
public record BulkInsertRequest(List<Employee> Employees, int BatchSize) : IOrchestratorInput;
// ORCHESTRATION OUTPUT
public class BulkInsertResponse : IOrchestratorOutput
{
private readonly SqlResult _sqlResult;
public SqlResult SqlResult => _sqlResult;
public BulkInsertResponse(SqlResult result) => _sqlResult = result;
public BulkInsertResponse() { }
}
// SIMPLE ORCHESTRATOR (Single DbContext)
public class BulkInsertOrchestrator : Orchestrator<BulkInsertRequest, BulkInsertResponse>
{
private readonly IAsyncDatabaseCommandBuilder _commands;
public BulkInsertOrchestrator(IAsyncDatabaseCommandBuilder commands)
=> _commands = commands;
public override async Task<BulkInsertResponse> ExecuteAsync(
BulkInsertRequest input,
BulkInsertResponse output)
{
var pipeline = await _commands
.UseCommandContext<AppDbContext>()
.AddBulk(input.Employees, input.BatchSize)
.BuildAsync();
var result = await pipeline.ExecuteAsync();
return new BulkInsertResponse(result);
}
}
internal class Program
{
static IOrchestratorHandler _orchestratorHandler = default!;
static IDatabaseDefinitionBuilder _schemaBuilder = default!;
static async Task ExecuteMain()
{
// --- SCHEMA SYNC ---
var schema = _schemaBuilder
.UseContext<AppDbContext>()
.Build(applyChanges: true, generateScripts: false);
schema.ExecuteCreate();
// --- SAMPLE EMPLOYEE LIST ---
var employees = new List<Employee>
{
new Employee { Name = "Alice", Salary = 72000 },
new Employee { Name = "Bob", Salary = 61000 }
};
// --- EXECUTE ORCHESTRATOR ---
var response = await _orchestratorHandler
.OrchestrateAsync<BulkInsertOrchestrator, BulkInsertResponse>(
new BulkInsertRequest(employees, batchSize: 20000),
new BulkInsertResponse());
Console.WriteLine(
$"Inserted: {response.SqlResult.TotalEntitiesAffected} rows");
}
private static void Main(string[] args)
{
ILoggerFactory factory = LoggerFactory.Create(b => b.AddConsole());
var provider = new ServiceCollection()
.AddLogging()
.AddDataArcCore(ctx =>
{
ctx.AddDbContextPool<AppDbContext>(opts => opts
.UseSqlServer("Server=.;Database=ArcDemo;Integrated Security=True;TrustServerCertificate=True")
.UseLoggerFactory(factory));
})
.AddDataArcOrchestration(orch =>
{
orch.AddOrchestrator<BulkInsertOrchestrator>();
})
.BuildServiceProvider();
_schemaBuilder = provider.GetRequiredService<IDatabaseDefinitionBuilder>();
_orchestratorHandler = provider.GetRequiredService<IOrchestratorHandler>();
ExecuteMain().GetAwaiter().GetResult();
}
}
Full orchestration: schema creation, orchestrated command execution, and reactive event dispatch — all type-safe, async, and cross-context.