← All tasks Task 10 of 12 unsolved

10. Checkpoint Manager

State Machine Resilience ~20 min

Track per-table processing status across crashes and partial runs. Tables move PENDING → IN_PROGRESS → COMPLETED (or FAILED). On restart, anything not COMPLETED is retried — including IN_PROGRESS, which means the previous run crashed mid-table.

This is the resume-after-crash story for any long-running data pipeline.

Tests this task must pass

  1. Initial state — after initTables, every table is PENDING; getPending() returns all of them.
  2. Happy pathmarkInProgress → markCompleted ends in COMPLETED.
  3. Failure pathmarkInProgress → markFailed ends in FAILED; the error message is captured.
  4. getPending() excludes COMPLETED but includes FAILED (must retry) and PENDING.
  5. Crash recovery — a table left in IN_PROGRESS (process died mid-job) is included in getPending() on restart.
//sampleStart enum class Status { PENDING, IN_PROGRESS, COMPLETED, FAILED } class CheckpointManager { // Track processing status of each table. // initTables: set all to PENDING // markInProgress/markCompleted/markFailed: update status // getPending: return tables that need processing (PENDING + FAILED + IN_PROGRESS) fun initTables(tables: List<String>) { TODO() } fun getStatus(table: String): Status { TODO() } fun markInProgress(table: String) { TODO() } fun markCompleted(table: String) { TODO() } fun markFailed(table: String, error: String) { TODO() } fun getPending(): List<String> { TODO() } } //sampleEnd fun main() { val mgr = CheckpointManager() mgr.initTables(listOf("users", "orders", "products", "items", "audit")) // Test 1: All start as PENDING check(mgr.getStatus("users") == Status.PENDING) { "FAIL: initial status" } check(mgr.getPending().size == 5) { "FAIL: all 5 pending" } println("✅ Test 1: All pending") // Test 2: Complete flow mgr.markInProgress("users") check(mgr.getStatus("users") == Status.IN_PROGRESS) { "FAIL: in progress" } mgr.markCompleted("users") check(mgr.getStatus("users") == Status.COMPLETED) { "FAIL: completed" } println("✅ Test 2: Complete flow") // Test 3: Failed flow mgr.markInProgress("orders") mgr.markFailed("orders", "Connection timeout") check(mgr.getStatus("orders") == Status.FAILED) { "FAIL: failed status" } println("✅ Test 3: Failed flow") // Test 4: getPending excludes COMPLETED mgr.markCompleted("products") mgr.markCompleted("items") val pending = mgr.getPending() check("users" !in pending && "products" !in pending && "items" !in pending) { "FAIL: completed not in pending" } check("orders" in pending) { "FAIL: failed should be in pending for retry" } check("audit" in pending) { "FAIL: untouched should be pending" } check(pending.size == 2) { "FAIL: 2 pending (orders=FAILED, audit=PENDING). Got: $pending" } println("✅ Test 4: getPending = $pending") // Test 5: IN_PROGRESS included in pending (crash recovery) val mgr2 = CheckpointManager() mgr2.initTables(listOf("a", "b")) mgr2.markInProgress("a") // simulates crash — stuck in IN_PROGRESS check("a" in mgr2.getPending()) { "FAIL: IN_PROGRESS should be retried" } println("✅ Test 5: IN_PROGRESS in pending (crash recovery)") println("\n🎉 ALL TESTS PASSED!") }
← Previous Next: JSON Deep Masking →