Track per-table processing status across crashes and partial runs. Tables move PENDING → IN_PROGRESS → COMPLETED (or FAILED). On restart, anything not COMPLETED is retried — including IN_PROGRESS, which means the previous run crashed mid-table.
This is the resume-after-crash story for any long-running data pipeline.
Tests this task must pass
Initial state — after initTables, every table is PENDING; getPending() returns all of them.
Happy path — markInProgress → markCompleted ends in COMPLETED.
Failure path — markInProgress → markFailed ends in FAILED; the error message is captured.
getPending() excludes COMPLETED but includes FAILED (must retry) and PENDING.
Crash recovery — a table left in IN_PROGRESS (process died mid-job) is included in getPending() on restart.
//sampleStart
enum class Status { PENDING, IN_PROGRESS, COMPLETED, FAILED }
class CheckpointManager {
// Track processing status of each table.
// initTables: set all to PENDING
// markInProgress/markCompleted/markFailed: update status
// getPending: return tables that need processing (PENDING + FAILED + IN_PROGRESS)
fun initTables(tables: List<String>) { TODO() }
fun getStatus(table: String): Status { TODO() }
fun markInProgress(table: String) { TODO() }
fun markCompleted(table: String) { TODO() }
fun markFailed(table: String, error: String) { TODO() }
fun getPending(): List<String> { TODO() }
}
//sampleEnd
fun main() {
val mgr = CheckpointManager()
mgr.initTables(listOf("users", "orders", "products", "items", "audit"))
// Test 1: All start as PENDING
check(mgr.getStatus("users") == Status.PENDING) { "FAIL: initial status" }
check(mgr.getPending().size == 5) { "FAIL: all 5 pending" }
println("✅ Test 1: All pending")
// Test 2: Complete flow
mgr.markInProgress("users")
check(mgr.getStatus("users") == Status.IN_PROGRESS) { "FAIL: in progress" }
mgr.markCompleted("users")
check(mgr.getStatus("users") == Status.COMPLETED) { "FAIL: completed" }
println("✅ Test 2: Complete flow")
// Test 3: Failed flow
mgr.markInProgress("orders")
mgr.markFailed("orders", "Connection timeout")
check(mgr.getStatus("orders") == Status.FAILED) { "FAIL: failed status" }
println("✅ Test 3: Failed flow")
// Test 4: getPending excludes COMPLETED
mgr.markCompleted("products")
mgr.markCompleted("items")
val pending = mgr.getPending()
check("users" !in pending && "products" !in pending && "items" !in pending) { "FAIL: completed not in pending" }
check("orders" in pending) { "FAIL: failed should be in pending for retry" }
check("audit" in pending) { "FAIL: untouched should be pending" }
check(pending.size == 2) { "FAIL: 2 pending (orders=FAILED, audit=PENDING). Got: $pending" }
println("✅ Test 4: getPending = $pending")
// Test 5: IN_PROGRESS included in pending (crash recovery)
val mgr2 = CheckpointManager()
mgr2.initTables(listOf("a", "b"))
mgr2.markInProgress("a") // simulates crash — stuck in IN_PROGRESS
check("a" in mgr2.getPending()) { "FAIL: IN_PROGRESS should be retried" }
println("✅ Test 5: IN_PROGRESS in pending (crash recovery)")
println("\n🎉 ALL TESTS PASSED!")
}
Hint
One MutableMap<String, Status> covers state. getPending is just filter { it.value != Status.COMPLETED }.keys.toList(). Store error strings in a separate map if you want them addressable later.
Solution
class CheckpointManager {
private val statuses = mutableMapOf<String, Status>()
private val errors = mutableMapOf<String, String>()
fun initTables(tables: List<String>) {
for (t in tables) statuses[t] = Status.PENDING
}
fun getStatus(table: String): Status = statuses[table] ?: Status.PENDING
fun markInProgress(table: String) { statuses[table] = Status.IN_PROGRESS }
fun markCompleted(table: String) { statuses[table] = Status.COMPLETED }
fun markFailed(table: String, error: String) {
statuses[table] = Status.FAILED
errors[table] = error
}
fun getPending(): List<String> =
statuses.filter { it.value != Status.COMPLETED }.keys.toList()
}