20 deep questions — performance, JDBC, bulk loading, FK handling, real Synthesized.io challenges
autoCommit = true (default), each statement runs in its own transaction. Cursors require an open transaction. So the driver fetches everything at once. Fix: autoCommit = false + fetchSize > 0Server-side cursors only work within an open transaction. autoCommit = true means no persistent transaction.
Default behavior kills data engines:
// DEFAULT — loads EVERYTHING into JVM heap:
val stmt = conn.createStatement()
val rs = stmt.executeQuery("SELECT * FROM users") // 50M rows
// → OutOfMemoryError! All 50M rows in heap before you read row #1.
// WHY: autoCommit = true → each statement = auto-committed transaction
// PostgreSQL server-side cursors need a transaction to stay open
// No transaction → no cursor → driver fetches all rows at once
The fix — enable streaming:
conn.autoCommit = false // open persistent transaction
val stmt = conn.createStatement()
stmt.fetchSize = 10000 // fetch 10,000 rows per round trip
val rs = stmt.executeQuery("SELECT * FROM users")
// Now: first call to rs.next() fetches rows 1-10,000
// After processing 10,000th row: next batch (10,001-20,000) fetched
// Only 10,000 rows in memory at any time
// IMPORTANT: commit or rollback when done!
conn.commit() // closes the server-side cursor
MySQL is different: MySQL JDBC uses fetchSize = Integer.MIN_VALUE as a special signal for streaming. The type must be TYPE_FORWARD_ONLY and concurrency CONCUR_READ_ONLY. Each driver has its own quirks — this is why Synthesized.io needs a DatabaseDialect abstraction for each supported database.
fetchSize tuning: too small (100) = many network round trips. Too large (1,000,000) = too much memory. Sweet spot depends on row size: for wide rows (50 columns, BLOBs) use 1,000. For narrow rows (5 int columns) use 50,000. Start with 10,000 and measure.
For a data engine like Synthesized: this is the FIRST thing you configure. Without streaming, you can't even start processing a production database. Every table scan must use fetchSize + explicit transaction.
INSERT INTO t VALUES (...), (...), (...)COPY FROM STDIN — bypasses SQL parser, writes directly to table storage. 5-10x faster than batch INSERT. Use CopyManager from PostgreSQL JDBC driverINSERT: parse SQL → plan → execute → log. COPY: stream bytes directly to heap pages. Skips most overhead.
Speed comparison for 10M rows:
Method Speed Time for 10M rows Single INSERT ~1,000 rows/sec ~2.8 hours Batch INSERT (5000/batch) ~50,000 rows/sec ~3.3 minutes Multi-row INSERT ~70,000 rows/sec ~2.4 minutes COPY FROM STDIN ~500,000 rows/sec ~20 seconds
How COPY works:
// PostgreSQL JDBC CopyManager:
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection
val copyMgr = CopyManager(conn.unwrap(BaseConnection::class.java))
val sql = "COPY users (id, name, email) FROM STDIN WITH (FORMAT csv)"
val writer = copyMgr.copyIn(sql)
val buffer = StringBuilder()
for (row in maskedRows) {
buffer.append("${row.id},${row.name},${row.email}\n")
if (buffer.length > 1_000_000) { // flush every ~1MB
writer.writeToCopy(buffer.toString().toByteArray(), 0, buffer.length)
buffer.clear()
}
}
writer.writeToCopy(buffer.toString().toByteArray(), 0, buffer.length)
val rowsInserted = writer.endCopy() // finalize
Why COPY is faster: no SQL parsing per row. No query planning. No per-row transaction overhead. Data streams directly into PostgreSQL's table storage as raw tuples. WAL logging is batched. Index updates are batched.
Additional speed tricks:
-- Before bulk load: ALTER TABLE users DISABLE TRIGGER ALL; -- no trigger execution DROP INDEX idx_users_email; -- no index maintenance during load -- Bulk load with COPY... -- After bulk load: CREATE INDEX idx_users_email ON users(email); -- build index on complete data (faster) ALTER TABLE users ENABLE TRIGGER ALL; ANALYZE users; -- update query planner statistics
MySQL equivalent: LOAD DATA LOCAL INFILE. Oracle: SQL*Loader. SQL Server: BULK INSERT or bcp. Each database has its own bulk loading mechanism — different API, different syntax. This is why Synthesized needs per-database dialect implementations.
CREATE TABLE masked AS SELECT ... is clean and often fasterPostgreSQL MVCC: UPDATE doesn't modify rows — it creates new versions. Old versions (dead tuples) remain until VACUUM.
UPDATE approach — the bloat problem:
UPDATE users SET email = mask(email); -- PostgreSQL internally: -- For EACH of 100M rows: -- 1. Mark old tuple as dead (set xmax) -- 2. Create NEW tuple with masked email -- 3. Update ALL indexes pointing to this row -- Result: table is now ~2x its original size -- 100M live tuples + 100M dead tuples -- VACUUM needed to reclaim space — slow, I/O intensive -- Indexes also bloated — need REINDEX
New table approach — clean and fast:
-- Step 1: Create new table with masked data
CREATE TABLE users_masked AS
SELECT id, mask(name) as name, mask(email) as email, created_at
FROM users;
-- Step 2: Recreate constraints and indexes
ALTER TABLE users_masked ADD PRIMARY KEY (id);
CREATE UNIQUE INDEX idx_email ON users_masked(email);
ALTER TABLE users_masked ADD CONSTRAINT fk_dept
FOREIGN KEY (dept_id) REFERENCES departments(id);
-- Step 3: Swap tables
ALTER TABLE users RENAME TO users_old;
ALTER TABLE users_masked RENAME TO users;
DROP TABLE users_old;
-- Result: no bloat, no dead tuples, no VACUUM needed
-- Indexes built on complete data = more efficient than maintained during updates
When UPDATE is acceptable: masking a few columns in a small table (under 1M rows). The overhead is tolerable. For TB-scale data engines like Synthesized — always write to new table or target database.
Synthesized's approach: reads from source DB, applies transformers, writes to target DB (can be the same server, different schema). This naturally avoids the UPDATE bloat problem. Source is read-only. Target gets clean inserts.
SET CONSTRAINTS ALL DEFERRED) so FKs are checked at COMMIT, not INSERTemployees.manager_id -> employees.id — can't insert manager before employee, but manager IS an employee.
Types of circular references:
Self-reference: employees.manager_id -> employees.id. Most common. An employee's manager is also an employee in the same table.
Mutual reference: table_a.b_id -> table_b.id AND table_b.a_id -> table_a.id. Rarer but occurs in complex schemas (order references latest shipment, shipment references order).
Strategy 1: NULL-then-UPDATE (Synthesized's CycleResolutionStrategy):
-- Self-referencing employees table: -- Phase 1: INSERT all employees with manager_id = NULL INSERT INTO employees (id, name, manager_id) VALUES (1, 'CEO', NULL); INSERT INTO employees (id, name, manager_id) VALUES (2, 'VP', NULL); INSERT INTO employees (id, name, manager_id) VALUES (3, 'Dev', NULL); -- Phase 2: UPDATE manager_id references UPDATE employees SET manager_id = 1 WHERE id = 2; -- VP reports to CEO UPDATE employees SET manager_id = 2 WHERE id = 3; -- Dev reports to VP -- Works because: Phase 1 has no FK violations (NULL is allowed) -- Phase 2 references already-existing rows
Strategy 2: Deferred constraints (PostgreSQL-specific):
-- FK must be declared DEFERRABLE:
ALTER TABLE employees ADD CONSTRAINT fk_manager
FOREIGN KEY (manager_id) REFERENCES employees(id)
DEFERRABLE INITIALLY DEFERRED;
-- Now within a transaction, FKs checked at COMMIT:
BEGIN;
INSERT INTO employees (id, name, manager_id) VALUES (3, 'Dev', 2);
-- Would normally fail: employee 2 doesn't exist yet!
-- But DEFERRED = check later
INSERT INTO employees (id, name, manager_id) VALUES (2, 'VP', 1);
INSERT INTO employees (id, name, manager_id) VALUES (1, 'CEO', NULL);
COMMIT; -- NOW all FKs checked. All references valid. Success.
Strategy 3: Disable triggers (risky but fast):
ALTER TABLE employees DISABLE TRIGGER ALL; -- Load all data in any order — no FK checks at all ALTER TABLE employees ENABLE TRIGGER ALL; -- Risk: if data is inconsistent, you won't know until queries fail
Synthesized's CycleResolutionStrategy: their docs mention this explicitly. The engine detects cycles in the FK graph during schema analysis, then applies the appropriate strategy. For topological sort: cycles are removed by breaking one edge (nullable FK), processing, then restoring.
SHA-256 = 256-bit output = 2^256 possible values. 100M = ~2^27. Birthday paradox collision probability = ~2^(2*27 - 256) = astronomically low.
Option B (HashSet) doesn't scale:
// 100M emails × ~50 bytes each = ~5GB RAM just for the set val seen = HashSet<String>(100_000_000) // Plus: requires all masked values in memory before writing any // Can't stream. Defeats the purpose of fetchSize streaming.
Deterministic hash — the practical solution:
fun maskEmail(original: String, salt: String): String {
val hash = MessageDigest.getInstance("SHA-256")
.digest("$salt:$original".toByteArray())
// Take first 8 bytes, encode to alphanumeric
val local = hash.take(8)
.joinToString("") { String.format("%02x", it) } // 16 hex chars
return "${local}@masked.io"
}
// Uniqueness math:
// 16 hex chars = 64 bits = 2^64 possible values = ~18 quintillion
// 100M emails = 10^8
// Birthday paradox: collision at ~sqrt(2^64) = ~4 billion
// 100M is well below 4B → practically guaranteed unique
BloomFilter for paranoid uniqueness:
// Guava BloomFilter: ~120MB for 100M entries with 0.01% false positive
val bloom = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),
100_000_000, 0.0001)
fun maskEmailSafe(original: String, salt: String): String {
var masked = maskEmail(original, salt)
var attempt = 0
while (bloom.mightContain(masked)) { // possible collision
attempt++
masked = maskEmail("$original:$attempt", salt) // retry with suffix
}
bloom.put(masked)
return masked
}
// 120MB RAM regardless of email length. Streaming-compatible.
Option C (sequential) problem: user_1@masked.io reveals the row count and potentially the original order. Not suitable when masking must not leak information about the original data.
Real-world approach: deterministic hash is sufficient for virtually all cases. Add BloomFilter only if the customer's UNIQUE constraint is strict and they run validation after loading.
IN clause has 1000 IDs. What's the problem?Query planner sees WHERE user_id IN (1, 2, ... 1000) — estimates selectivity wrong with many values → bad plan.
The problem with large IN clauses:
SELECT * FROM orders WHERE user_id IN (1, 2, 3, ..., 1000); -- PostgreSQL issues: -- 1. Query planner: estimates 1000 individual lookups vs Seq Scan cost -- With wide table, planner may choose Seq Scan (read all 500M rows!) -- 2. Parse time: parsing SQL with 1000 literals is slow -- 3. Some databases (Oracle) have hard limits: 1000 values per IN clause -- 4. With 10,000+ IDs it gets much worse — plan cache misses
Solution 1: Temporary table + JOIN (best for data engines):
-- Create temp table with seed IDs: CREATE TEMP TABLE seed_ids (id BIGINT PRIMARY KEY); INSERT INTO seed_ids VALUES (1), (2), ..., (1000); -- or use COPY for bulk loading seed IDs -- JOIN instead of IN: SELECT o.* FROM orders o JOIN seed_ids s ON o.user_id = s.id; -- PostgreSQL: Hash Join or Merge Join — predictable, efficient -- Uses index on orders.user_id if available -- Works for 10, 10K, or 1M seed IDs equally well
Solution 2: Batched IN clauses:
// Split 10,000 IDs into chunks of 500:
val chunks = allIds.chunked(500)
val results = chunks.flatMap { chunk ->
val placeholders = chunk.joinToString(",") { "?" }
val ps = conn.prepareStatement(
"SELECT * FROM orders WHERE user_id IN ($placeholders)"
)
chunk.forEachIndexed { i, id -> ps.setLong(i + 1, id) }
ps.executeQuery().toList()
}
// Multiple queries but each is small and uses index
Solution 3: = ANY(array) — PostgreSQL-specific:
val ps = conn.prepareStatement(
"SELECT * FROM orders WHERE user_id = ANY(?)"
)
ps.setArray(1, conn.createArrayOf("BIGINT", ids.toTypedArray()))
// Single parameter, single query. PostgreSQL handles array efficiently.
// But: not portable to MySQL/Oracle
For Synthesized's subsetter: temporary table approach is best — works with any number of seed IDs, portable across databases (temp tables are universal), and the query planner makes good decisions with JOINs.
Can't insert an order before the user exists. Can't insert order_items before products and orders exist.
Kahn's algorithm for topological sort:
fun topologicalSort(
tables: List<String>,
fks: List<FK>
): List<String> {
// Build adjacency: parent -> list of children
val children = mutableMapOf<String, MutableList<String>>()
val inDegree = mutableMapOf<String, Int>()
tables.forEach { inDegree[it] = 0 }
for (fk in fks) {
children.getOrPut(fk.parentTable) { mutableListOf() }
.add(fk.childTable)
inDegree[fk.childTable] = (inDegree[fk.childTable] ?: 0) + 1
}
// Start with tables that have no dependencies (inDegree = 0)
val queue: Queue<String> = ArrayDeque()
inDegree.filter { it.value == 0 }.keys.forEach { queue.add(it) }
val sorted = mutableListOf<String>()
while (queue.isNotEmpty()) {
val table = queue.poll()
sorted.add(table)
for (child in children[table].orEmpty()) {
inDegree[child] = inDegree[child]!! - 1
if (inDegree[child] == 0) queue.add(child)
}
}
if (sorted.size != tables.size) {
val cycled = tables - sorted.toSet()
throw IllegalStateException("Cycle detected: $cycled")
}
return sorted
}
Example:
Tables: users, departments, orders, order_items, products FKs: users.dept_id -> departments.id orders.user_id -> users.id order_items.order_id -> orders.id order_items.product_id -> products.id inDegree: departments=0, products=0, users=1, orders=1, order_items=2 Sort result: [departments, products, users, orders, order_items] Load in this order → no FK violations.
For deletion — reverse order: delete order_items first, then orders, then users, then departments. Children before parents.
int4 = MySQL INT = Oracle NUMBER(10) = SQL Server INT. Map to internal canonical types, then back to target-specific typesSynthesized supports 15+ databases. Each has different type names, different boolean representations, different date formats.
The type mapping nightmare — real examples:
Concept PostgreSQL MySQL Oracle SQL Server Integer int4, int8 INT, BIGINT NUMBER(10) INT, BIGINT Boolean boolean TINYINT(1) NUMBER(1) BIT Text (short) varchar(255) VARCHAR(255) VARCHAR2(255) NVARCHAR(255) Text (unlimited) text LONGTEXT CLOB NVARCHAR(MAX) Binary bytea LONGBLOB BLOB VARBINARY(MAX) Timestamp timestamptz DATETIME TIMESTAMP DATETIME2 Auto-increment SERIAL/IDENTITY AUTO_INCREMENT SEQUENCE IDENTITY UUID uuid CHAR(36) RAW(16) UNIQUEIDENTIFIER JSON jsonb JSON CLOB NVARCHAR(MAX) Array int[] not supported not supported not supported
How to solve — DatabaseDialect pattern:
interface DatabaseDialect {
fun mapType(jdbcType: Int, typeName: String, size: Int): CanonicalType
fun toNativeType(type: CanonicalType): String
fun bulkLoadCommand(table: String, columns: List<String>): String
fun streamingFetchSize(): Int // PG: 10000, MySQL: Int.MIN_VALUE
fun supportsDeferred(): Boolean
fun quoteIdentifier(name: String): String // "table" vs `table` vs [table]
}
class PostgresDialect : DatabaseDialect {
override fun mapType(jdbcType: Int, typeName: String, size: Int) = when {
typeName == "int4" -> CanonicalType.INTEGER
typeName == "int8" -> CanonicalType.LONG
typeName == "bool" -> CanonicalType.BOOLEAN
typeName == "timestamptz" -> CanonicalType.TIMESTAMP_TZ
typeName == "jsonb" -> CanonicalType.JSON
typeName.startsWith("_") -> CanonicalType.ARRAY // PG arrays: _int4
else -> CanonicalType.STRING
}
override fun bulkLoadCommand(table: String, columns: List<String>) =
"COPY $table (${columns.joinToString()}) FROM STDIN WITH CSV"
}
This is core Synthesized work: "add support for new databases, formats, and generation/masking configurations" from the job description. Each new database = new dialect implementation, new type mappings, new bulk load strategy, new edge cases.
Processing 500 tables takes 6 hours. Restarting from scratch = 6 more hours. Need to resume, not restart.
Checkpoint system:
-- Progress tracking table (in target database):
CREATE TABLE _synth_progress (
table_name VARCHAR(255) PRIMARY KEY,
status VARCHAR(20), -- 'pending', 'in_progress', 'completed', 'failed'
rows_processed BIGINT,
started_at TIMESTAMP,
completed_at TIMESTAMP,
error_message TEXT
);
-- Before processing each table:
UPDATE _synth_progress SET status = 'in_progress', started_at = NOW()
WHERE table_name = 'orders';
-- After success:
UPDATE _synth_progress SET status = 'completed',
rows_processed = 5000000, completed_at = NOW()
WHERE table_name = 'orders';
-- On failure:
UPDATE _synth_progress SET status = 'failed',
error_message = 'OOM at row 3500000'
WHERE table_name = 'orders';
Idempotent table processing:
fun processTable(table: String) {
// Check checkpoint:
val status = getProgress(table)
if (status == "completed") {
logger.info("Skipping $table — already completed")
return
}
// Make idempotent: clear any partial results
targetConn.execute("TRUNCATE TABLE $table")
// Process and load
readMaskWrite(table)
// Mark complete
updateProgress(table, "completed")
}
Option D (single transaction) doesn't work at scale: a transaction spanning 500 tables and 6 hours would hold locks for 6 hours, consume enormous WAL space, and a rollback would take hours itself. Per-table atomicity is the practical approach.
For very large tables: checkpoint within a table too. Track last processed primary key. On resume: SELECT * FROM orders WHERE id > last_checkpoint_id ORDER BY id. Process in chunks of 1M rows, checkpoint after each chunk.
getBlob().getBinaryStream() — never load entire BLOB into heap. Process chunk by chunk. For CLOBs: getClob().getCharacterStream(). Skip LOB columns if not needed for maskingA BLOB column with 50MB PDF per row. 100K rows = 5TB of binary data. Can't fit in heap.
The problem:
// BAD — loads entire BLOB into JVM heap:
val bytes: ByteArray = rs.getBytes("document") // 50MB per row!
// 100 concurrent rows × 50MB = 5GB heap just for BLOBs
// Plus: creates 50MB byte array → GC pressure → long pauses
Streaming approach:
// GOOD — stream BLOB chunk by chunk:
val blobStream: InputStream = rs.getBlob("document").binaryStream
val buffer = ByteArray(8192) // 8KB buffer — reused
val outputStream = targetBlob.setBinaryStream(1)
var bytesRead: Int
while (blobStream.read(buffer).also { bytesRead = it } != -1) {
outputStream.write(buffer, 0, bytesRead)
// Only 8KB in heap at any time. Even for 500MB BLOBs.
}
blobStream.close()
outputStream.close()
JSONB/CLOB with nested PII — parsing required:
// JSON column: {"user": {"email": "alex@test.com", "phone": "+7702..."}}
// Need to mask email and phone INSIDE the JSON
val reader = rs.getClob("metadata").characterStream
val json = JsonParser.parseReader(reader) // streaming JSON parser
// Walk JSON tree, mask PII fields:
val masked = json.deepCopy().apply {
getAsJsonObject("user").apply {
addProperty("email", maskEmail(get("email").asString))
addProperty("phone", maskPhone(get("phone").asString))
}
}
// Write masked JSON to target
Smart approach — skip what you don't need:
// If BLOB column doesn't contain PII, don't even read it: // SELECT id, name, email FROM users (skip 'avatar' BLOB column) // Saves massive I/O and memory // Only include LOB columns when masking rules reference them
Synthesized context: their YAML config specifies which columns to transform. Columns not mentioned can be copied as-is or skipped entirely. For LOBs that need transformation (JSONB with PII), streaming parsers are essential.
Your MVCC snapshot says "I need to see data as of 2 hours ago." PostgreSQL must keep ALL dead tuples from the last 2 hours — can't vacuum them.
How MVCC snapshots block VACUUM:
Your data engine starts reading at 10:00 AM. PostgreSQL takes a snapshot: "show data as of 10:00 AM." During your 3-hour read, production keeps writing — updates create dead tuples. Normally VACUUM removes dead tuples. But YOUR snapshot still references the 10:00 AM state. VACUUM can't remove anything created after 10:00 AM until your transaction ends at 1:00 PM.
In a busy production database: 3 hours of dead tuple accumulation = gigabytes of bloat.
Solutions for data engines:
1. Read from a REPLICA, not production primary - Replica has its own VACUUM cycle - No impact on production writes - Slight replication lag (milliseconds) — acceptable for masking 2. Use pg_dump with --snapshot - Takes consistent snapshot without holding open transaction - Outputs SQL/CSV that can be piped to masking engine 3. Chunk-based reading with short transactions - Instead of one 3-hour transaction: SELECT * FROM users WHERE id BETWEEN 1 AND 1000000; -- tx 1 (30 sec) SELECT * FROM users WHERE id BETWEEN 1000001 AND 2000000; -- tx 2 (30 sec) - Each transaction is short → VACUUM can work between chunks - Trade-off: not a consistent snapshot across chunks 4. Set statement_timeout on the connection SET statement_timeout = '600000'; -- 10 minutes max -- Prevents accidentally holding open transaction for hours
For Synthesized: reading from replica is the recommended approach for production databases. Their documentation likely mentions this in "Database Permissions" section — read-only access to a replica, not production primary.
max_connections limit. Leave room for other applicationsCustomer's production PostgreSQL: max_connections = 100. Their app uses 80. Your data engine opens 30 connections → total 110 → "FATAL: too many connections".
The problem at customer sites:
Customer production DB: max_connections = 100 Their application pool: 80 connections Monitoring/admin: 5 connections Available for us: 15 connections Our engine with 8 parallel workers: 8 read + 8 write = 16 connections → FATAL: too many connections for role "synth_user"
Solution — configurable, respectful connection management:
// Configuration:
data class EngineConfig(
val sourceMaxConnections: Int = 4, // read from source DB
val targetMaxConnections: Int = 8, // write to target DB
val workerThreads: Int = 8 // parallel processing
)
// HikariCP pool:
val sourcePool = HikariDataSource().apply {
jdbcUrl = config.sourceUrl
maximumPoolSize = config.sourceMaxConnections // never exceed
minimumIdle = 1
connectionTimeout = 30_000 // wait 30s for connection, don't crash
}
// Semaphore for additional control:
val dbSemaphore = Semaphore(config.sourceMaxConnections)
suspend fun readFromSource(query: String): ResultSet {
dbSemaphore.withPermit { // limit concurrent reads
val conn = sourcePool.connection
try {
return conn.createStatement().apply {
fetchSize = 10000
}.executeQuery(query)
} finally {
conn.close() // return to pool
}
}
}
Read max_connections dynamically:
// At startup, check what's available:
val maxConns = conn.createStatement()
.executeQuery("SHOW max_connections") // PostgreSQL
.use { rs -> rs.next(); rs.getInt(1) }
val currentConns = conn.createStatement()
.executeQuery("SELECT count(*) FROM pg_stat_activity")
.use { rs -> rs.next(); rs.getInt(1) }
val available = maxConns - currentConns - 5 // leave 5 for safety
val ourLimit = minOf(available, config.maxConnections)
logger.info("DB has $available free connections, we'll use $ourLimit")
mask("alex@gmail.com", salt) always produces the same result. So users.email and orders.contact_email containing the same original email produce the same masked email. FK integrity preserved automaticallySame input + same function + same salt = same output. Always. No coordination between tables needed.
Why deterministic masking solves cross-table consistency:
// Masking function is a pure function:
fun mask(value: String, salt: String): String {
val hash = sha256("$salt:$value")
return hash.take(16) + "@masked.io"
}
// Table: users
// Original: alex@gmail.com → Masked: a7f3b2c1e9d04f28@masked.io
// Table: orders (same email in contact_email column)
// Original: alex@gmail.com → Masked: a7f3b2c1e9d04f28@masked.io (SAME!)
// Table: audit_log (same email in actor_email column)
// Original: alex@gmail.com → Masked: a7f3b2c1e9d04f28@masked.io (SAME!)
// No lookup table needed. No coordination between tables.
// Each table processed independently. Parallelizable.
For FK columns (user_id, order_id):
// FK values are usually integers — they DON'T need masking! // users.id = 42 → keep 42 in both users and orders tables // Referential integrity preserved automatically // Exception: if the FK value itself is PII (e.g., SSN used as ID) // → mask deterministically, same result in both tables
Option B (lookup table) problems: requires storing all original→masked mappings. For 100M emails across 20 tables = huge lookup table in memory or on disk. Must be consulted for every row in every table. Slower, more complex, doesn't parallelize well. Deterministic masking avoids all this.
Synthesized's YAML config: you declare the transformer per column. The engine applies the same transformer (with same salt) everywhere that column value appears. The "deterministic" flag in their transformer config ensures consistency.
Many microservice databases have NO foreign keys — referential integrity enforced by application code. But the data engine needs to know "orders.user_id references users.id."
Why production databases often lack FKs:
Performance: every INSERT/UPDATE checks FK constraint. At 100K inserts/sec, FK validation is measurable overhead. Many high-throughput systems drop FKs and enforce integrity in application code.
Microservices: each service has its own database. orders.user_id references a user in a completely different database/service. Can't create a cross-database FK.
Legacy: old databases migrated from NoSQL, CSV imports, or denormalized schemas. Relationships exist logically but never declared as constraints.
The data engine's problem:
// Without FK knowledge, subsetter can't work: // "Give me 1000 users" → which orders belong to these users? // orders.user_id LOOKS like it references users.id // But without FK declaration, the engine doesn't know this // Without FK knowledge, masking is inconsistent: // users.email is masked, but orders.notification_email // contains the same email but is in a different table // Engine doesn't know they're related → masks differently → broken data
Synthesized's solution — Virtual Foreign Keys in YAML:
# In Synthesized config:
virtual_foreign_keys:
- from_table: orders
from_columns: [user_id]
to_table: users
to_columns: [id]
- from_table: audit_log
from_columns: [actor_email]
to_table: users
to_columns: [email]
# Now the engine treats these as real FKs:
# - Subsetting follows these relationships
# - Masking is consistent across declared relationships
# - Topological sort includes virtual FKs in the graph
This is explicitly in Synthesized's docs under "Managing Data Properties → Virtual Foreign Keys." It's a key feature for enterprise customers where production schemas are denormalized or FK-free.
10M rows × 5 String objects per row = 50M temporary objects. GC must scan and collect all of them. Reduce allocations → reduce GC work.
Where allocations hide in data processing code:
// BAD — 5 allocations per row:
for (row in resultSet) {
val email = row.getString("email") // String allocation #1
val parts = email.split("@") // Array + 2 Strings (#2,3,4)
val masked = "***@" + parts[1] // concatenation → new String (#5)
output.add(masked)
}
// 10M rows × 5 objects = 50M objects → GC nightmare
// GOOD — 1 allocation per row:
val sb = StringBuilder(128) // ONE StringBuilder, reused
for (row in resultSet) {
sb.clear()
val email = row.getString("email")
val atIndex = email.indexOf('@')
sb.append("***@")
sb.append(email, atIndex + 1, email.length) // no substring allocation
output.add(sb.toString()) // one String per row
}
// 10M rows × 1 object = 10M objects → 5x less GC
More techniques:
// Primitive arrays instead of boxed collections:
val ids = IntArray(size) // 4 bytes per element, zero GC
// vs List<Int> = List<Integer> // 16 bytes per element, each is GC object
// Object pooling for row containers:
class MutableRow(val values: Array<Any?>) {
fun reset() { values.fill(null) }
}
val pool = ArrayDeque<MutableRow>(1000)
// take from pool → fill → process → reset → return to pool
// Avoid autoboxing in hot loops:
var sum = 0L // primitive long
for (row in rows) {
sum += row.getLong("amount") // no boxing
}
// vs: rows.map { it.amount }.sum() — creates List<Long> (boxed)
// Enable String Deduplication in G1 GC:
// -XX:+UseStringDeduplication
// Country names, status values, type codes repeat across millions of rows
// G1 detects identical strings and shares the underlying char[]
Measure before optimizing: use async-profiler or JFR to see WHICH allocations dominate. Often 80% of GC pressure comes from one hot loop. Fix that one spot for biggest impact.
DatabaseMetaData API: getTables(), getColumns(), getPrimaryKeys(), getImportedKeys(). Works across all databases. Fallback: query information_schema directly for database-specific detailsJDBC provides a universal metadata API. connection.metaData gives you everything about the database structure.
Complete schema reading with JDBC:
val meta = conn.metaData
// 1. Discover all tables:
val tables = meta.getTables(null, "public", "%", arrayOf("TABLE"))
while (tables.next()) {
val name = tables.getString("TABLE_NAME")
val type = tables.getString("TABLE_TYPE") // TABLE, VIEW, etc.
}
// 2. Get columns for each table:
val cols = meta.getColumns(null, "public", "users", "%")
while (cols.next()) {
val colName = cols.getString("COLUMN_NAME") // "email"
val typeName = cols.getString("TYPE_NAME") // "varchar"
val size = cols.getInt("COLUMN_SIZE") // 255
val nullable = cols.getString("IS_NULLABLE") // "YES" or "NO"
val jdbcType = cols.getInt("DATA_TYPE") // java.sql.Types constant
}
// 3. Primary keys:
val pks = meta.getPrimaryKeys(null, "public", "users")
while (pks.next()) {
val colName = pks.getString("COLUMN_NAME") // "id"
val keySeq = pks.getInt("KEY_SEQ") // 1 (for composite PKs)
}
// 4. Foreign keys (imported = this table references others):
val fks = meta.getImportedKeys(null, "public", "orders")
while (fks.next()) {
val fkCol = fks.getString("FKCOLUMN_NAME") // "user_id"
val pkTable = fks.getString("PKTABLE_NAME") // "users"
val pkCol = fks.getString("PKCOLUMN_NAME") // "id"
val fkName = fks.getString("FK_NAME") // "fk_orders_users"
}
Database-specific fallbacks:
-- PostgreSQL: information_schema + pg_catalog for details SELECT column_name, udt_name, is_nullable, column_default FROM information_schema.columns WHERE table_schema = 'public' AND table_name = 'users'; -- Oracle: ALL_TAB_COLUMNS, ALL_CONSTRAINTS -- MySQL: INFORMATION_SCHEMA.COLUMNS -- SQL Server: sys.columns, sys.tables
Why JDBC metadata isn't always enough: some databases report different type names for the same concept. CHECK constraints, ENUM types, generated columns, partitioning info — often need database-specific queries. This is why Synthesized needs dialect-specific schema readers alongside the generic JDBC path.
Topo sort gives levels: Level 0 (no deps), Level 1 (depends on L0), Level 2... Tables within same level are independent.
Level-based parallel execution:
// Topological sort with levels:
// Level 0: [departments, products] — no dependencies, process in parallel
// Level 1: [users] — depends on departments
// Level 2: [orders] — depends on users
// Level 3: [order_items] — depends on orders AND products
// Execute:
for (level in levels) {
// All tables in this level can run in parallel:
val futures = level.map { table ->
executor.submit { processTable(table) }
}
futures.forEach { it.get() } // wait for all in level to complete
// Then move to next level
}
// Level 0: departments + products run in parallel (2 threads)
// Level 1: users runs alone (depends on departments being done)
// Level 2: orders runs alone
// Level 3: order_items runs alone
Within a large table — pipeline parallelism:
// For a 100M row table, parallelize the pipeline: // Thread 1 (reader): reads rows from source DB // Thread 2-9 (workers): mask/transform rows in parallel // Thread 10 (writer): batch-writes to target DB // Reader → BlockingQueue → Workers → BlockingQueue → Writer // Each stage runs at its own speed // Queues provide backpressure
Connection budget:
// If max 8 source connections and 8 target connections:
// Can process max 8 tables in parallel
// Or 1 table with 8 parallel readers (chunked by PK range)
// But NOT both — respect the pool limit
val tableParallelism = minOf(
config.maxParallelTables,
sourcePool.maximumPoolSize,
targetPool.maximumPoolSize
)
Database has 500 tables, 8000 columns. Which contain PII? Column named "notes" might have "Call Alex at alex@test.com" — free-text PII.
How PII detection works:
// Step 1: Column name heuristics
"email", "e_mail", "email_address" → likely EMAIL
"phone", "mobile", "tel", "fax" → likely PHONE
"ssn", "social_security" → likely SSN
"first_name", "last_name", "name" → likely PERSON_NAME
"address", "street", "city", "zip" → likely ADDRESS
"dob", "birth_date", "date_of_birth" → likely DATE_OF_BIRTH
"ip_address", "ip" → likely IP_ADDRESS
// Step 2: Sample data analysis (read first 100 rows)
// Apply regex patterns:
email: /^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$/
phone: /^\+?\d{10,15}$/
ssn: /^\d{3}-\d{2}-\d{4}$/
// If >80% of non-null samples match → classify as that PII type
// Step 3: ML-based detection (advanced)
// NLP model trained on column names + data samples
// Catches: "usr_em" (email), "kontakt_telefon" (German phone)
// Synthesized mentions "PII Detection & Protection" in their docs
Why automated detection matters:
A 500-table database with 8000 columns. Manual review: days of work, error-prone. PII in unexpected places: comments column containing "Please call John at 555-1234", metadata JSON with embedded emails, description field with customer addresses.
Option C (mask everything) problem: masking non-PII columns (timestamps, amounts, status codes) destroys data utility. Test data becomes unrealistic. The goal is to mask PII while preserving as much realistic structure as possible.
The product runs against customer databases. A bug = data breach or corrupt test data. Testing must be comprehensive.
Testing pyramid for a data engine:
// Level 1: Unit tests — each transformer in isolation
@Test fun `email masker produces valid format`() {
val result = EmailMasker("salt").mask("alex@gmail.com")
assertThat(result).matches("[a-z0-9]+@masked\\.io")
}
@Test fun `email masker is deterministic`() {
val m = EmailMasker("salt")
assertThat(m.mask("alex@gmail.com")).isEqualTo(m.mask("alex@gmail.com"))
}
@Test fun `different inputs produce different outputs`() {
val m = EmailMasker("salt")
assertThat(m.mask("a@test.com")).isNotEqualTo(m.mask("b@test.com"))
}
// Level 2: Integration tests — real databases via TestContainers
@Testcontainers
class PostgresMaskingTest {
@Container val pg = PostgreSQLContainer("postgres:15")
@Test fun `masks 10K rows preserving FK integrity`() {
// Seed source DB with 10K users + 50K orders
// Run masking engine
// Verify: all orders.user_id exist in users.id
// Verify: no original emails appear in masked output
// Verify: row counts match
}
}
// Level 3: Property-based tests (great for data engines)
@Test fun `masked data preserves column types`() {
forAll(Arb.string(1..255)) { email ->
val masked = masker.mask(email)
masked.length in 5..100 // reasonable length
masked.contains("@") // valid email format
}
}
Synthesized uses TestContainers — it's in their docs under integrations. Their product runs against real PostgreSQL, MySQL, Oracle containers in CI. This is exactly the kind of testing a core engine developer would write.
Terraform is Infrastructure as Code. Synthesized's YAML is "Data Transformation as Code."
Synthesized YAML config example (from their docs):
# masking_config.yaml — version controlled in Git
default_config:
masking_mode: auto # auto-detect PII and mask
tables:
- table_name_with_schema: "public.users"
transformations:
- columns: ["email"]
params:
type: "email_masker"
deterministic: true
- columns: ["phone"]
params:
type: "phone_masker"
format: "+X-XXX-XXX-XXXX"
- columns: ["first_name", "last_name"]
params:
type: "person_name_generator"
locale: "en_US"
- columns: ["salary"]
params:
type: "numeric_perturbation"
variance_percentage: 10
- table_name_with_schema: "public.orders"
subsetting:
target_ratio: 0.1 # keep 10% of orders
Why "as Code" matters for enterprises:
Version control: masking rules in Git. Who changed what, when. Full audit trail for compliance.
Code review: new masking rule → PR → review → approve → merge. Same workflow as application code.
CI/CD integration: on every commit, run masking pipeline → generate test data → run tests against it. Automated, repeatable.
Reproducibility: same config + same source = same output. Every time. No manual steps, no "click here in the UI" — everything declarative.
Environment promotion: dev config → staging config → production config. Different masking strengths per environment. All in code, all reviewable.
For the core engine team: you're building the engine that PARSES this YAML, builds the execution plan (topological sort, transformer registry, parallelism strategy), and executes it against real databases. The YAML config is your input. The masked/generated/subsetted database is your output.