Serko Northsky Unit of Work Pattern
The Unit of Work (UoW) pattern manages database transactions across Serko Northsky, ensuring data consistency and proper session lifecycle management.
See UnitOfWork for the full implementation.
Core Concepts
The UoW provides:
- Transaction boundary — All operations share the same database session
- Repository access — Lazy-loaded repositories via properties (e.g.,
uow.users,uow.trips) - Explicit commits — Services must call
await uow.save_changes()to persist - Safety net — Detects uncommitted changes at request/task end
Usage Patterns
1. Request-Scoped (Routers)
For HTTP endpoints, the UoW is automatically provided via dependency injection:
# containers/dependencies.py
UoWDep = Annotated[BaseUnitOfWork, Depends(APIContainer.get_unit_of_work)]
def get_user_service(uow: UoWDep) -> BaseUserService:
return APIContainer.get_user_service(uow)
# routers/user_router.py
@router.get("/profile")
async def get_user_profile(
user_service: Annotated[BaseUserService, Depends(dependencies.get_user_service)],
user: Annotated[User, Depends(get_authenticated_user)],
) -> GetUserProfileResponse:
# Service already has UoW injected
user_with_relations = await user_service.get_user_profile_with_relations(user.id)
return GetUserProfileResponse.model_validate(user_with_relations)
2. Service Layer
Services receive UoW via constructor injection:
class UserService(BaseUserService):
def __init__(self, uow: BaseUnitOfWork) -> None:
self.uow = uow
async def create(self, email: str, auth_id: str) -> User:
existing_user = await self.uow.users.get_by_field("email", email)
if existing_user is not None:
existing_user.auth_id = auth_id
updated_user = await self.uow.users.update(existing_user)
await self.uow.save_changes() # ✅ Explicit commit
return updated_user
language = await self.uow.languages.get_by_field("code", "en")
user = User(email=email, company_id=None, language_id=language.id, auth_id=auth_id)
created_user = await self.uow.users.create(user)
await self.uow.save_changes() # ✅ Explicit commit
return created_user
Important Rules
- Always call
await uow.save_changes()after write operations - Never call
session.commit()directly - Use repositories via UoW properties (
uow.users,uow.trips, etc.)
3. Background Tasks (MCP Servers)
MCP servers and background tasks are NOT request-scoped. They must create their own UoW:
@mcp.tool
async def search_flights(
user_explanation: str,
trip_search_id: str,
) -> dict[str, Any]:
# Create UoW for background task
uow = APIContainer.build_unit_of_work()
try:
async with uow.background_context():
orchestrator = dependencies.get_flight_orchestrator(uow)
result = await orchestrator.search_flights(
outbound_trip_search_id=UUID(trip_search_id),
)
return {"status": "success", "data": result}
except Exception as e:
raise ModelRetry("Flight search failed") from e
4. Batch Operations
Perform all writes then commit once:
async def batch_update_preferences(
self, user_id: UUID, request: UpdatePreferencesRequest
) -> list[UserPreference]:
result_preferences = []
for pref in request.preferences:
if pref.id:
existing = await self.uow.user_preferences.get_by_id(pref.id)
existing.weight = pref.weight
updated = await self.uow.user_preferences.update(existing)
result_preferences.append(updated)
else:
new_pref = UserPreference(user_id=user_id, weight=pref.weight)
created = await self.uow.user_preferences.create(new_pref)
result_preferences.append(created)
# Single commit for all operations
await self.uow.save_changes()
return result_preferences
UoW Methods Reference
| Method | Description |
|---|---|
save_changes() | Commits all pending changes. Auto-rollback on error. |
flush() | Flushes changes to DB without committing (useful for generated IDs). |
refresh(instance, attribute_names) | Reloads entity from DB. Use attribute_names to eager-load relationships. |
has_pending_changes() | Returns True if there are uncommitted changes. |
close_session() | Closes session and clears caches. Called automatically in contexts. |
background_context() | Context manager for background tasks with safety net. |
Common Patterns
Using Flush for Generated IDs
async def create_trip_with_steps(self, trip_data: TripData) -> Trip:
trip = Trip(name=trip_data.name)
await self.uow.trips.create(trip)
await self.uow.flush() # Get generated trip.id
for step_data in trip_data.steps:
step = TripStep(trip_id=trip.id, name=step_data.name)
await self.uow.trip_steps.create(step)
await self.uow.save_changes()
return trip
Validation Before Write
async def update_user_profile(
self, user_id: UUID, request: UpdateUserProfileRequest
) -> User:
user = await self.uow.users.get_by_id(user_id)
if not user:
raise ValueError(f"User with id {user_id} not found")
if request.country_id:
country = await self.uow.countries.get_by_id(request.country_id)
if not country:
raise ValueError(f"Country with id {request.country_id} not found")
user.country_id = request.country_id
await self.uow.users.update(user)
await self.uow.save_changes()
return user
Anti-Patterns
# ❌ WRONG: Direct session access
await self.uow.session.commit()
# ❌ WRONG: Creating sessions manually
session = async_sessionmaker()
async with session() as s:
...
# ❌ WRONG: Forgetting to save changes
async def create_user(self, email: str) -> User:
user = User(email=email)
await self.uow.users.create(user)
return user # BUG: Changes not committed!
# ❌ WRONG: Using synchronous operations
user = session.query(User).first() # Use async methods
Environment Behavior
| Environment | Uncommitted Changes Behavior |
|---|---|
local | Raises RuntimeError |
development | Raises RuntimeError |
testing | Raises RuntimeError |
production | Logs ERROR, continues |
This ensures bugs are caught early in development while avoiding crashes in production.
Architecture Flow
Request
│
▼
Router (FastAPI)
│
├── UoWDep (via Depends)
▼
Service (injected with UoW)
│
├── uow.repositories.method()
├── await uow.save_changes()
▼
Response
│
▼
UoW cleanup (check_uncommitted_changes + close_session)