Data Synchronization
Data synchronization is the process of keeping data in sync between different systems. This guide shows you how to use @databite/engine
to create robust, scheduled data synchronization workflows.
Overview
The @databite/engine
package provides a powerful data synchronization engine that can:
- Schedule and execute data sync operations
- Handle incremental and full syncs
- Manage data transformations
- Provide real-time sync status
- Handle errors and retries automatically
Basic Data Synchronization
Let’s start with a simple data sync setup:
import { DatabiteEngine } from "@databite/engine";
const engine = new DatabiteEngine({
provider: "postgres",
connectionString: process.env.DATABASE_URL,
schedule: {
type: "cron",
expression: "0 */6 * * *", // Every 6 hours
},
});
// Define a sync job
const syncJob = {
id: "user_sync",
name: "User Data Sync",
source: {
type: "api",
url: "https://api.service.com/users",
headers: {
Authorization: "Bearer {{tokens.access_token}}",
},
},
destination: {
type: "database",
table: "users",
schema: {
id: "string",
name: "string",
email: "string",
created_at: "datetime",
},
},
transform: {
id: "{{item.id}}",
name: "{{item.name}}",
email: "{{item.email}}",
created_at: "{{item.created_at}}",
},
};
// Schedule the sync job
await engine.scheduleJob(syncJob);
Advanced Sync Patterns
Incremental Sync with Timestamps
const incrementalSync = {
id: "incremental_orders",
name: "Incremental Orders Sync",
source: {
type: "api",
url: "https://api.service.com/orders",
query: {
updated_since: "{{last_sync_time}}",
limit: 1000,
},
headers: {
Authorization: "Bearer {{tokens.access_token}}",
},
},
destination: {
type: "database",
table: "orders",
mode: "upsert", // Update or insert
key: "id",
},
transform: {
id: "{{item.id}}",
customer_id: "{{item.customer_id}}",
total: "{{item.total}}",
status: "{{item.status}}",
updated_at: "{{item.updated_at}}",
},
schedule: {
type: "interval",
minutes: 15, // Every 15 minutes
},
};
Multi-Table Sync with Dependencies
const multiTableSync = {
id: "customer_orders_sync",
name: "Customer and Orders Sync",
steps: [
{
id: "sync_customers",
source: {
type: "api",
url: "https://api.service.com/customers",
},
destination: {
type: "database",
table: "customers",
},
transform: {
id: "{{item.id}}",
name: "{{item.name}}",
email: "{{item.email}}",
},
},
{
id: "sync_orders",
dependsOn: ["sync_customers"],
source: {
type: "api",
url: "https://api.service.com/orders",
query: {
customer_id: "{{customers.id}}",
},
},
destination: {
type: "database",
table: "orders",
},
transform: {
id: "{{item.id}}",
customer_id: "{{item.customer_id}}",
total: "{{item.total}}",
},
},
],
};
const transformSync = {
id: "product_catalog_sync",
name: "Product Catalog Sync",
source: {
type: "api",
url: "https://api.service.com/products",
},
destination: {
type: "database",
table: "products",
},
transform: {
id: "{{item.id}}",
name: "{{item.name}}",
description: "{{item.description}}",
price: "{{item.pricing.amount}}",
currency: "{{item.pricing.currency}}",
category: "{{item.category.name}}",
tags: '{{item.tags | join(",")}}',
in_stock: "{{item.inventory.quantity > 0}}",
created_at: "{{item.created_at | to_datetime}}",
updated_at: "{{item.updated_at | to_datetime}}",
},
};
import { DatabiteEngine } from "@databite/engine";
const engine = new DatabiteEngine({
provider: "postgres",
connectionString: process.env.DATABASE_URL,
transforms: {
// Custom transform function
calculate_discount: (item) => {
const originalPrice = item.original_price;
const salePrice = item.sale_price;
return ((originalPrice - salePrice) / originalPrice) * 100;
},
// Custom validation function
validate_email: (email) => {
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
return emailRegex.test(email);
},
},
});
const syncWithCustomTransforms = {
id: "products_with_discounts",
name: "Products with Discount Calculation",
source: {
type: "api",
url: "https://api.service.com/products",
},
destination: {
type: "database",
table: "products",
},
transform: {
id: "{{item.id}}",
name: "{{item.name}}",
original_price: "{{item.original_price}}",
sale_price: "{{item.sale_price}}",
discount_percentage: "{{item | calculate_discount}}",
is_valid_email: "{{item.contact_email | validate_email}}",
},
};
Error Handling and Retries
Robust Error Handling
const resilientSync = {
id: "resilient_data_sync",
name: "Resilient Data Sync",
source: {
type: "api",
url: "https://api.service.com/data",
retry: {
attempts: 3,
delay: 5000, // 5 seconds
backoff: "exponential",
},
},
destination: {
type: "database",
table: "data",
onError: "log_and_continue",
},
errorHandling: {
strategy: "retry_with_backoff",
maxRetries: 3,
retryDelay: 5000,
onFailure: "notify_admin",
},
};
Partial Success Handling
const partialSuccessSync = {
id: "partial_success_sync",
name: "Partial Success Sync",
source: {
type: "api",
url: "https://api.service.com/batch_data",
batchSize: 100,
},
destination: {
type: "database",
table: "batch_data",
mode: "batch_insert",
},
errorHandling: {
strategy: "partial_success",
onItemError: "log_and_continue",
onBatchError: "retry_batch",
},
};
Real-time Synchronization
Webhook-based Sync
const webhookSync = {
id: "webhook_sync",
name: "Webhook-based Real-time Sync",
trigger: {
type: "webhook",
endpoint: "/webhook/sync",
secret: process.env.WEBHOOK_SECRET,
},
source: {
type: "webhook_data",
payload: "{{webhook.payload}}",
},
destination: {
type: "database",
table: "real_time_data",
},
transform: {
id: "{{payload.id}}",
event_type: "{{payload.event_type}}",
data: "{{payload.data}}",
timestamp: "{{payload.timestamp}}",
},
};
Event-driven Sync
const eventDrivenSync = {
id: "event_driven_sync",
name: "Event-driven Sync",
trigger: {
type: "event",
event: "data_updated",
source: "external_system",
},
source: {
type: "api",
url: "https://api.service.com/updated_data",
query: {
since: "{{event.timestamp}}",
},
},
destination: {
type: "database",
table: "updated_data",
},
};
Monitoring and Logging
Sync Status Monitoring
// Check sync status
const status = await engine.getJobStatus("user_sync");
console.log("Sync Status:", status);
// Get sync history
const history = await engine.getJobHistory("user_sync", {
limit: 10,
status: "completed",
});
// Get sync metrics
const metrics = await engine.getJobMetrics("user_sync", {
period: "7d",
});
console.log("Success Rate:", metrics.successRate);
console.log("Average Duration:", metrics.averageDuration);
Custom Logging
const engineWithLogging = new DatabiteEngine({
provider: "postgres",
connectionString: process.env.DATABASE_URL,
logging: {
level: "info",
destination: "database",
table: "sync_logs",
},
onJobStart: (jobId) => {
console.log(`Job ${jobId} started at ${new Date().toISOString()}`);
},
onJobComplete: (jobId, result) => {
console.log(`Job ${jobId} completed:`, result);
},
onJobError: (jobId, error) => {
console.error(`Job ${jobId} failed:`, error);
},
});
Batch Processing
const batchSync = {
id: "batch_sync",
name: "Batch Data Sync",
source: {
type: "api",
url: "https://api.service.com/data",
batchSize: 1000,
pagination: {
type: "offset",
limit: 1000,
},
},
destination: {
type: "database",
table: "data",
mode: "batch_insert",
batchSize: 1000,
},
performance: {
concurrency: 5,
timeout: 300000, // 5 minutes
memoryLimit: "512MB",
},
};
Parallel Processing
const parallelSync = {
id: "parallel_sync",
name: "Parallel Data Sync",
source: {
type: "api",
url: "https://api.service.com/data",
parallel: {
enabled: true,
maxConcurrency: 10,
chunkSize: 100,
},
},
destination: {
type: "database",
table: "data",
mode: "parallel_insert",
},
};
Testing Data Synchronization
Unit Testing
import { DatabiteEngine } from "@databite/engine";
describe("Data Synchronization", () => {
let engine: DatabiteEngine;
beforeEach(() => {
engine = new DatabiteEngine({
provider: "sqlite",
connectionString: ":memory:",
});
});
it("should sync data successfully", async () => {
const syncJob = {
id: "test_sync",
name: "Test Sync",
source: {
type: "api",
url: "https://api.service.com/test-data",
},
destination: {
type: "database",
table: "test_data",
},
};
await engine.scheduleJob(syncJob);
const result = await engine.executeJob("test_sync");
expect(result.success).toBe(true);
expect(result.recordsProcessed).toBeGreaterThan(0);
});
});
Integration Testing
describe("Integration Sync", () => {
it("should handle real API sync", async () => {
const engine = new DatabiteEngine({
provider: "postgres",
connectionString: process.env.TEST_DATABASE_URL,
});
const syncJob = {
id: "integration_sync",
name: "Integration Sync",
source: {
type: "api",
url: "https://jsonplaceholder.typicode.com/posts",
},
destination: {
type: "database",
table: "posts",
},
transform: {
id: "{{item.id}}",
title: "{{item.title}}",
body: "{{item.body}}",
user_id: "{{item.userId}}",
},
};
await engine.scheduleJob(syncJob);
const result = await engine.executeJob("integration_sync");
expect(result.success).toBe(true);
expect(result.recordsProcessed).toBe(100);
});
});
Best Practices
Data Quality
Always validate data before syncing to ensure data quality and consistency.
const validatedSync = {
id: "validated_sync",
name: "Validated Data Sync",
source: {
type: "api",
url: "https://api.service.com/data",
},
destination: {
type: "database",
table: "data",
},
validation: {
required: ["id", "name", "email"],
email: "{{item.email}}",
phone: "{{item.phone}}",
},
transform: {
id: "{{item.id}}",
name: "{{item.name}}",
email: "{{item.email | validate_email}}",
phone: "{{item.phone | validate_phone}}",
},
};
Resource Management
Be mindful of API rate limits and database connection limits when setting up
sync jobs.
const resourceAwareSync = {
id: "resource_aware_sync",
name: "Resource Aware Sync",
source: {
type: "api",
url: "https://api.service.com/data",
rateLimit: {
requestsPerMinute: 100,
burstLimit: 20,
},
},
destination: {
type: "database",
table: "data",
connectionPool: {
min: 2,
max: 10,
idleTimeout: 30000,
},
},
};
Common Issues and Solutions
Issue: Memory Usage
For large datasets, use streaming and batch processing to avoid memory issues.
const memoryEfficientSync = {
id: "memory_efficient_sync",
name: "Memory Efficient Sync",
source: {
type: "api",
url: "https://api.service.com/large-dataset",
streaming: true,
batchSize: 1000,
},
destination: {
type: "database",
table: "large_dataset",
mode: "streaming_insert",
},
};
Issue: API Rate Limits
const rateLimitedSync = {
id: "rate_limited_sync",
name: "Rate Limited Sync",
source: {
type: "api",
url: "https://api.service.com/data",
rateLimit: {
requestsPerMinute: 60,
burstLimit: 10,
},
retry: {
attempts: 3,
delay: 1000,
backoff: "exponential",
},
},
};
Next Steps
Now that you understand data synchronization, you can:
- Set up Monitoring: Implement comprehensive monitoring and alerting
- Optimize Performance: Fine-tune sync performance for your use case
- Handle Errors: Implement robust error handling and recovery
- Scale Up: Design sync patterns that can handle large datasets
Continue to the Custom Connectors Guide to learn how to build your own connectors.