Optimize Data Sync Script Performance
Data synchronization scripts often face severe performance bottlenecks when processing large volumes of data. This article presents a comprehensive optimization plan that achieves 162x performance improvement by addressing N+1 query problems, implementing batch operations, and leveraging in-memory computation.
The Problem
The original data synchronization script had severe performance bottlenecks:
| Issue Type | Description | Impact |
|---|---|---|
| N+1 Query Problem | Each record queries related tables individually | High database connection overhead, low throughput |
| Frequent Commits | COMMIT after each INSERT/UPDATE | High transaction overhead |
| Single Record Operations | Insert/update records one by one | Excessive network round-trips |
❌ Bad Practice: Row-by-Row Processing
# BAD: Query once per record - 1000 queries for 1000 records!
for row in rows:
detail_data = get_detail_data(source_conn, record_id, category)
check_sql = "SELECT * FROM target_table WHERE ref_id = %s"
cursor.execute(check_sql, (ref_id,))
existing_data = cursor.fetchone()
if existing_data:
cursor.execute(update_sql, params)
else:
cursor.execute(insert_sql, params)
target_conn.commit()Performance Impact:
- 3000+ queries for 1000 records
- ~60 seconds per batch
- High database connection overhead
- Excessive network round-trips
✅ Good Practice: Batch Operations
# GOOD: Batch query + batch write
category_record_ids = defaultdict(list)
for row in rows:
category_record_ids[row['category']].append(row['record_id'])
# Batch query all related data
detail_data_map = batch_get_detail_data(source_conn, category_record_ids)
existing_data_map = batch_check_target_table(target_conn, rows)
# Batch insert/update
insert_data, update_data = collect_batch_data(rows, existing_data_map)
batch_insert_or_update(target_conn, insert_data, update_data)Performance Impact:
- Only 5 queries for 1000 records
- ~0.37 seconds per batch
- 162x faster
Optimization Strategies
1. Batch Query Optimization
Before:
# ❌ Query once per record
for row in rows:
detail_data = get_detail_data(source_conn, record_id, category)After:
# ✅ Group by category and batch query
category_record_ids = defaultdict(list)
for row in rows:
category_record_ids[row['category']].append(row['record_id'])
detail_data_map = batch_get_detail_data(source_conn, category_record_ids)Key Implementation:
def batch_get_detail_data(source_conn, category_record_ids):
detail_data_map = {}
for category, record_ids in category_record_ids.items():
if not record_ids:
continue
placeholders = ','.join(['%s'] * len(record_ids))
if category == 1:
sql = f"SELECT * FROM detail_table_a WHERE id IN ({placeholders})"
elif category == 2:
sql = f"SELECT * FROM detail_table_b WHERE id IN ({placeholders})"
with source_conn.cursor() as cursor:
cursor.execute(sql, tuple(record_ids))
for row in cursor.fetchall():
detail_data_map[f"{category}_{row['id']}"] = row
return detail_data_map2. Target Table Batch Query
Before:
# ❌ Check existence row by row
for row in rows:
check_sql = "SELECT * FROM target_table WHERE ref_id = %s"
cursor.execute(check_sql, (ref_id,))
existing_data = cursor.fetchone()After:
# ✅ Batch query all at once
ref_ids = [row['ref_id'] for row in rows]
placeholders = ','.join(['%s'] * len(ref_ids))
check_sql = f"SELECT * FROM target_table WHERE ref_id IN ({placeholders})"
with target_conn.cursor() as cursor:
cursor.execute(check_sql, tuple(ref_ids))
existing_data_map = {row['ref_id']: row for row in cursor.fetchall()}3. Batch Insert and Update
Before:
# ❌ Single record operations with frequent commits
for row in rows:
if exists:
cursor.execute(update_sql, params)
else:
cursor.execute(insert_sql, params)
target_conn.commit()After:
# ✅ Batch operations with single commit
insert_data = []
update_data = []
for row in rows:
if existing_data and has_changes(row, existing_data):
update_data.append(params)
elif not existing_data:
insert_data.append(params)
if insert_data:
cursor.executemany(insert_sql, insert_data)
target_conn.commit()
if update_data:
insert_update_sql = """
INSERT INTO target_table (...) VALUES (...)
ON DUPLICATE KEY UPDATE col1=VALUES(col1), col2=VALUES(col2)
"""
cursor.executemany(insert_update_sql, update_data)
target_conn.commit()4. Field Comparison Optimization
Before:
# ❌ Separate query for field comparison
check_sql = "SELECT tag_field1 FROM rel_table WHERE record_id = %s"
cursor.execute(check_sql, (record_id,))
if cursor.fetchone():
return FalseAfter:
# ✅ Use fields from main query
existing_tag_fields = {
'tag_field1': row.get('tag_field1'),
'tag_field2': row.get('tag_field2'),
}Performance Comparison
Query Comparison
| Operation Type | Before (per 1000 records) | After (per 1000 records) | Optimization Rate |
|---|---|---|---|
| Business detail query | 1000 times | 4 times | 99.6% |
| Target table query | 1000 times | 1 time | 99.9% |
| Related table query | 1000 times | 0 times | 100% |
| Total | 3000 times | 5 times | 99.8% |
Write Operation Comparison
| Operation Type | Before (per 1000 records) | After (per 1000 records) | Optimization Rate |
|---|---|---|---|
| INSERT | Up to 1000 times | 1 time | 99.9% |
| UPDATE | Up to 1000 times | 1 time | 99.9% |
| COMMIT | Up to 2000 times | 2 times | 99.9% |
Performance Test Results
| Metric | Before | After | Improvement |
|---|---|---|---|
| Time per batch (1000 records) | ~60 seconds | ~0.37 seconds | 162x |
| Average time per record | ~60 ms | ~0.37 ms | 162x |
Key Technical Points
1. Map Structure for O(1) Lookup
# ✅ Use composite key for fast lookup
key = f"{category}_{record_id}"
detail_data = detail_data_map.get(key)2. ON DUPLICATE KEY UPDATE
INSERT INTO target_table (col1, col2, ..., ref_id)
VALUES (%s, %s, ..., %s)
ON DUPLICATE KEY UPDATE
col1 = VALUES(col1),
col2 = VALUES(col2)Prerequisite: ref_id must have a unique index or primary key.
3. In-Memory Field Comparison
# ✅ Compare in memory before update
has_changes = False
for i, field_name in enumerate(field_names):
if params[i] != existing_data.get(field_name):
has_changes = True
break
if has_changes:
update_data.append(params)Best Practices
1. Index Optimization
| Table Name | Field | Index Type |
|---|---|---|
main_table | id, record_id, create_time | Normal index |
target_table | ref_id | Unique index |
detail_table_a/b | id | Primary key |
rel_table | record_id | Primary key |
2. Transaction Management
# Commit in batches to avoid locking
BATCH_SIZE = 1000
for i in range(0, len(data), BATCH_SIZE):
batch = data[i:i+BATCH_SIZE]
cursor.executemany(sql, batch)
target_conn.commit()3. Error Handling
try:
cursor.executemany(sql, data)
target_conn.commit()
except Exception as e:
target_conn.rollback()
logger.error(f"Batch operation failed: {e}")Common Pitfalls to Avoid
❌ Pitfall 1: Ignoring Indexes
# Bad: Missing index on filter column
sql = "SELECT * FROM large_table WHERE status = %s" # No index on status
# Good: Ensure proper indexing
# CREATE INDEX idx_large_table_status ON large_table(status)❌ Pitfall 2: SELECT * in Batch Queries
# Bad: SELECT * returns unnecessary columns
sql = "SELECT * FROM detail_table WHERE id IN (...)"
# Good: Select only needed columns
sql = "SELECT id, name, value FROM detail_table WHERE id IN (...)"❌ Pitfall 3: Not Handling Master-Slave Delay
# Bad: Read from slave immediately after write
write_to_master(data)
read_from_slave(data_id) # May get stale data
# Good: Read from master for consistency-critical operations
read_from_master(data_id)Real-World Results
Production Test Results
| Data Size | Before | After | Improvement |
|---|---|---|---|
| 1000 records | ~60 seconds | ~0.37 seconds | 162x |
| 10000 records | ~600 seconds | ~3.7 seconds | 162x |
| 100000 records | ~6000 seconds | ~37 seconds | 162x |
Summary
When optimizing data synchronization scripts:
- Batch queries - Reduce N+1 queries to O(1) category-based queries
- Batch writes - Use
executemany()andON DUPLICATE KEY UPDATE - In-memory computation - Compare fields locally before updating
- Reuse query results - Avoid redundant queries by reusing JOIN results
- Proper indexing - Ensure filter columns have appropriate indexes
Remember: The key to performance improvement is changing from single record operations to batch operations, which reduces database interactions by 99.9%.
Quick Checklist
Before deploying optimized data sync scripts:
- [ ] No N+1 queries in loops
- [ ] Using batch query with IN clause
- [ ] Using
executemany()for batch writes - [ ] Using
ON DUPLICATE KEY UPDATEfor upserts - [ ] Filter columns have proper indexes
- [ ] Transaction batch size is configured
- [ ] Error handling is implemented
- [ ] Testing with production-scale data