Skip to main content

Serko Northsky Unit of Work Pattern

The Unit of Work (UoW) pattern manages database transactions across Serko Northsky, ensuring data consistency and proper session lifecycle management.

See UnitOfWork for the full implementation.

Core Concepts

The UoW provides:

  • Transaction boundary — All operations share the same database session
  • Repository access — Lazy-loaded repositories via properties (e.g., uow.users, uow.trips)
  • Explicit commits — Services must call await uow.save_changes() to persist
  • Safety net — Detects uncommitted changes at request/task end

Usage Patterns

1. Request-Scoped (Routers)

For HTTP endpoints, the UoW is automatically provided via dependency injection:

# containers/dependencies.py
UoWDep = Annotated[BaseUnitOfWork, Depends(APIContainer.get_unit_of_work)]

def get_user_service(uow: UoWDep) -> BaseUserService:
return APIContainer.get_user_service(uow)
# routers/user_router.py
@router.get("/profile")
async def get_user_profile(
user_service: Annotated[BaseUserService, Depends(dependencies.get_user_service)],
user: Annotated[User, Depends(get_authenticated_user)],
) -> GetUserProfileResponse:
# Service already has UoW injected
user_with_relations = await user_service.get_user_profile_with_relations(user.id)
return GetUserProfileResponse.model_validate(user_with_relations)

2. Service Layer

Services receive UoW via constructor injection:

class UserService(BaseUserService):
def __init__(self, uow: BaseUnitOfWork) -> None:
self.uow = uow

async def create(self, email: str, auth_id: str) -> User:
existing_user = await self.uow.users.get_by_field("email", email)

if existing_user is not None:
existing_user.auth_id = auth_id
updated_user = await self.uow.users.update(existing_user)
await self.uow.save_changes() # ✅ Explicit commit
return updated_user

language = await self.uow.languages.get_by_field("code", "en")
user = User(email=email, company_id=None, language_id=language.id, auth_id=auth_id)
created_user = await self.uow.users.create(user)
await self.uow.save_changes() # ✅ Explicit commit
return created_user
Important Rules
  • Always call await uow.save_changes() after write operations
  • Never call session.commit() directly
  • Use repositories via UoW properties (uow.users, uow.trips, etc.)

3. Background Tasks (MCP Servers)

MCP servers and background tasks are NOT request-scoped. They must create their own UoW:

@mcp.tool
async def search_flights(
user_explanation: str,
trip_search_id: str,
) -> dict[str, Any]:
# Create UoW for background task
uow = APIContainer.build_unit_of_work()
try:
async with uow.background_context():
orchestrator = dependencies.get_flight_orchestrator(uow)
result = await orchestrator.search_flights(
outbound_trip_search_id=UUID(trip_search_id),
)
return {"status": "success", "data": result}
except Exception as e:
raise ModelRetry("Flight search failed") from e

4. Batch Operations

Perform all writes then commit once:

async def batch_update_preferences(
self, user_id: UUID, request: UpdatePreferencesRequest
) -> list[UserPreference]:
result_preferences = []

for pref in request.preferences:
if pref.id:
existing = await self.uow.user_preferences.get_by_id(pref.id)
existing.weight = pref.weight
updated = await self.uow.user_preferences.update(existing)
result_preferences.append(updated)
else:
new_pref = UserPreference(user_id=user_id, weight=pref.weight)
created = await self.uow.user_preferences.create(new_pref)
result_preferences.append(created)

# Single commit for all operations
await self.uow.save_changes()
return result_preferences

UoW Methods Reference

MethodDescription
save_changes()Commits all pending changes. Auto-rollback on error.
flush()Flushes changes to DB without committing (useful for generated IDs).
refresh(instance, attribute_names)Reloads entity from DB. Use attribute_names to eager-load relationships.
has_pending_changes()Returns True if there are uncommitted changes.
close_session()Closes session and clears caches. Called automatically in contexts.
background_context()Context manager for background tasks with safety net.

Common Patterns

Using Flush for Generated IDs

async def create_trip_with_steps(self, trip_data: TripData) -> Trip:
trip = Trip(name=trip_data.name)
await self.uow.trips.create(trip)
await self.uow.flush() # Get generated trip.id

for step_data in trip_data.steps:
step = TripStep(trip_id=trip.id, name=step_data.name)
await self.uow.trip_steps.create(step)

await self.uow.save_changes()
return trip

Validation Before Write

async def update_user_profile(
self, user_id: UUID, request: UpdateUserProfileRequest
) -> User:
user = await self.uow.users.get_by_id(user_id)
if not user:
raise ValueError(f"User with id {user_id} not found")

if request.country_id:
country = await self.uow.countries.get_by_id(request.country_id)
if not country:
raise ValueError(f"Country with id {request.country_id} not found")

user.country_id = request.country_id
await self.uow.users.update(user)
await self.uow.save_changes()
return user

Anti-Patterns

# ❌ WRONG: Direct session access
await self.uow.session.commit()

# ❌ WRONG: Creating sessions manually
session = async_sessionmaker()
async with session() as s:
...

# ❌ WRONG: Forgetting to save changes
async def create_user(self, email: str) -> User:
user = User(email=email)
await self.uow.users.create(user)
return user # BUG: Changes not committed!

# ❌ WRONG: Using synchronous operations
user = session.query(User).first() # Use async methods

Environment Behavior

EnvironmentUncommitted Changes Behavior
localRaises RuntimeError
developmentRaises RuntimeError
testingRaises RuntimeError
productionLogs ERROR, continues

This ensures bugs are caught early in development while avoiding crashes in production.

Architecture Flow

Request


Router (FastAPI)

├── UoWDep (via Depends)

Service (injected with UoW)

├── uow.repositories.method()
├── await uow.save_changes()

Response


UoW cleanup (check_uncommitted_changes + close_session)