Skip to main content

Data Synchronization

Data synchronization is the process of keeping data in sync between different systems. This guide shows you how to use @databite/engine to create robust, scheduled data synchronization workflows.

Overview

The @databite/engine package provides a powerful data synchronization engine that can:
  • Schedule and execute data sync operations
  • Handle incremental and full syncs
  • Manage data transformations
  • Provide real-time sync status
  • Handle errors and retries automatically

Basic Data Synchronization

Let’s start with a simple data sync setup:
import { DatabiteEngine } from "@databite/engine";

const engine = new DatabiteEngine({
  provider: "postgres",
  connectionString: process.env.DATABASE_URL,
  schedule: {
    type: "cron",
    expression: "0 */6 * * *", // Every 6 hours
  },
});

// Define a sync job
const syncJob = {
  id: "user_sync",
  name: "User Data Sync",
  source: {
    type: "api",
    url: "https://api.service.com/users",
    headers: {
      Authorization: "Bearer {{tokens.access_token}}",
    },
  },
  destination: {
    type: "database",
    table: "users",
    schema: {
      id: "string",
      name: "string",
      email: "string",
      created_at: "datetime",
    },
  },
  transform: {
    id: "{{item.id}}",
    name: "{{item.name}}",
    email: "{{item.email}}",
    created_at: "{{item.created_at}}",
  },
};

// Schedule the sync job
await engine.scheduleJob(syncJob);

Advanced Sync Patterns

Incremental Sync with Timestamps

const incrementalSync = {
  id: "incremental_orders",
  name: "Incremental Orders Sync",
  source: {
    type: "api",
    url: "https://api.service.com/orders",
    query: {
      updated_since: "{{last_sync_time}}",
      limit: 1000,
    },
    headers: {
      Authorization: "Bearer {{tokens.access_token}}",
    },
  },
  destination: {
    type: "database",
    table: "orders",
    mode: "upsert", // Update or insert
    key: "id",
  },
  transform: {
    id: "{{item.id}}",
    customer_id: "{{item.customer_id}}",
    total: "{{item.total}}",
    status: "{{item.status}}",
    updated_at: "{{item.updated_at}}",
  },
  schedule: {
    type: "interval",
    minutes: 15, // Every 15 minutes
  },
};

Multi-Table Sync with Dependencies

const multiTableSync = {
  id: "customer_orders_sync",
  name: "Customer and Orders Sync",
  steps: [
    {
      id: "sync_customers",
      source: {
        type: "api",
        url: "https://api.service.com/customers",
      },
      destination: {
        type: "database",
        table: "customers",
      },
      transform: {
        id: "{{item.id}}",
        name: "{{item.name}}",
        email: "{{item.email}}",
      },
    },
    {
      id: "sync_orders",
      dependsOn: ["sync_customers"],
      source: {
        type: "api",
        url: "https://api.service.com/orders",
        query: {
          customer_id: "{{customers.id}}",
        },
      },
      destination: {
        type: "database",
        table: "orders",
      },
      transform: {
        id: "{{item.id}}",
        customer_id: "{{item.customer_id}}",
        total: "{{item.total}}",
      },
    },
  ],
};

Data Transformation

Complex Data Transformations

const transformSync = {
  id: "product_catalog_sync",
  name: "Product Catalog Sync",
  source: {
    type: "api",
    url: "https://api.service.com/products",
  },
  destination: {
    type: "database",
    table: "products",
  },
  transform: {
    id: "{{item.id}}",
    name: "{{item.name}}",
    description: "{{item.description}}",
    price: "{{item.pricing.amount}}",
    currency: "{{item.pricing.currency}}",
    category: "{{item.category.name}}",
    tags: '{{item.tags | join(",")}}',
    in_stock: "{{item.inventory.quantity > 0}}",
    created_at: "{{item.created_at | to_datetime}}",
    updated_at: "{{item.updated_at | to_datetime}}",
  },
};

Custom Transform Functions

import { DatabiteEngine } from "@databite/engine";

const engine = new DatabiteEngine({
  provider: "postgres",
  connectionString: process.env.DATABASE_URL,
  transforms: {
    // Custom transform function
    calculate_discount: (item) => {
      const originalPrice = item.original_price;
      const salePrice = item.sale_price;
      return ((originalPrice - salePrice) / originalPrice) * 100;
    },

    // Custom validation function
    validate_email: (email) => {
      const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
      return emailRegex.test(email);
    },
  },
});

const syncWithCustomTransforms = {
  id: "products_with_discounts",
  name: "Products with Discount Calculation",
  source: {
    type: "api",
    url: "https://api.service.com/products",
  },
  destination: {
    type: "database",
    table: "products",
  },
  transform: {
    id: "{{item.id}}",
    name: "{{item.name}}",
    original_price: "{{item.original_price}}",
    sale_price: "{{item.sale_price}}",
    discount_percentage: "{{item | calculate_discount}}",
    is_valid_email: "{{item.contact_email | validate_email}}",
  },
};

Error Handling and Retries

Robust Error Handling

const resilientSync = {
  id: "resilient_data_sync",
  name: "Resilient Data Sync",
  source: {
    type: "api",
    url: "https://api.service.com/data",
    retry: {
      attempts: 3,
      delay: 5000, // 5 seconds
      backoff: "exponential",
    },
  },
  destination: {
    type: "database",
    table: "data",
    onError: "log_and_continue",
  },
  errorHandling: {
    strategy: "retry_with_backoff",
    maxRetries: 3,
    retryDelay: 5000,
    onFailure: "notify_admin",
  },
};

Partial Success Handling

const partialSuccessSync = {
  id: "partial_success_sync",
  name: "Partial Success Sync",
  source: {
    type: "api",
    url: "https://api.service.com/batch_data",
    batchSize: 100,
  },
  destination: {
    type: "database",
    table: "batch_data",
    mode: "batch_insert",
  },
  errorHandling: {
    strategy: "partial_success",
    onItemError: "log_and_continue",
    onBatchError: "retry_batch",
  },
};

Real-time Synchronization

Webhook-based Sync

const webhookSync = {
  id: "webhook_sync",
  name: "Webhook-based Real-time Sync",
  trigger: {
    type: "webhook",
    endpoint: "/webhook/sync",
    secret: process.env.WEBHOOK_SECRET,
  },
  source: {
    type: "webhook_data",
    payload: "{{webhook.payload}}",
  },
  destination: {
    type: "database",
    table: "real_time_data",
  },
  transform: {
    id: "{{payload.id}}",
    event_type: "{{payload.event_type}}",
    data: "{{payload.data}}",
    timestamp: "{{payload.timestamp}}",
  },
};

Event-driven Sync

const eventDrivenSync = {
  id: "event_driven_sync",
  name: "Event-driven Sync",
  trigger: {
    type: "event",
    event: "data_updated",
    source: "external_system",
  },
  source: {
    type: "api",
    url: "https://api.service.com/updated_data",
    query: {
      since: "{{event.timestamp}}",
    },
  },
  destination: {
    type: "database",
    table: "updated_data",
  },
};

Monitoring and Logging

Sync Status Monitoring

// Check sync status
const status = await engine.getJobStatus("user_sync");
console.log("Sync Status:", status);

// Get sync history
const history = await engine.getJobHistory("user_sync", {
  limit: 10,
  status: "completed",
});

// Get sync metrics
const metrics = await engine.getJobMetrics("user_sync", {
  period: "7d",
});
console.log("Success Rate:", metrics.successRate);
console.log("Average Duration:", metrics.averageDuration);

Custom Logging

const engineWithLogging = new DatabiteEngine({
  provider: "postgres",
  connectionString: process.env.DATABASE_URL,
  logging: {
    level: "info",
    destination: "database",
    table: "sync_logs",
  },
  onJobStart: (jobId) => {
    console.log(`Job ${jobId} started at ${new Date().toISOString()}`);
  },
  onJobComplete: (jobId, result) => {
    console.log(`Job ${jobId} completed:`, result);
  },
  onJobError: (jobId, error) => {
    console.error(`Job ${jobId} failed:`, error);
  },
});

Performance Optimization

Batch Processing

const batchSync = {
  id: "batch_sync",
  name: "Batch Data Sync",
  source: {
    type: "api",
    url: "https://api.service.com/data",
    batchSize: 1000,
    pagination: {
      type: "offset",
      limit: 1000,
    },
  },
  destination: {
    type: "database",
    table: "data",
    mode: "batch_insert",
    batchSize: 1000,
  },
  performance: {
    concurrency: 5,
    timeout: 300000, // 5 minutes
    memoryLimit: "512MB",
  },
};

Parallel Processing

const parallelSync = {
  id: "parallel_sync",
  name: "Parallel Data Sync",
  source: {
    type: "api",
    url: "https://api.service.com/data",
    parallel: {
      enabled: true,
      maxConcurrency: 10,
      chunkSize: 100,
    },
  },
  destination: {
    type: "database",
    table: "data",
    mode: "parallel_insert",
  },
};

Testing Data Synchronization

Unit Testing

import { DatabiteEngine } from "@databite/engine";

describe("Data Synchronization", () => {
  let engine: DatabiteEngine;

  beforeEach(() => {
    engine = new DatabiteEngine({
      provider: "sqlite",
      connectionString: ":memory:",
    });
  });

  it("should sync data successfully", async () => {
    const syncJob = {
      id: "test_sync",
      name: "Test Sync",
      source: {
        type: "api",
        url: "https://api.service.com/test-data",
      },
      destination: {
        type: "database",
        table: "test_data",
      },
    };

    await engine.scheduleJob(syncJob);
    const result = await engine.executeJob("test_sync");

    expect(result.success).toBe(true);
    expect(result.recordsProcessed).toBeGreaterThan(0);
  });
});

Integration Testing

describe("Integration Sync", () => {
  it("should handle real API sync", async () => {
    const engine = new DatabiteEngine({
      provider: "postgres",
      connectionString: process.env.TEST_DATABASE_URL,
    });

    const syncJob = {
      id: "integration_sync",
      name: "Integration Sync",
      source: {
        type: "api",
        url: "https://jsonplaceholder.typicode.com/posts",
      },
      destination: {
        type: "database",
        table: "posts",
      },
      transform: {
        id: "{{item.id}}",
        title: "{{item.title}}",
        body: "{{item.body}}",
        user_id: "{{item.userId}}",
      },
    };

    await engine.scheduleJob(syncJob);
    const result = await engine.executeJob("integration_sync");

    expect(result.success).toBe(true);
    expect(result.recordsProcessed).toBe(100);
  });
});

Best Practices

Data Quality

Always validate data before syncing to ensure data quality and consistency.
const validatedSync = {
  id: "validated_sync",
  name: "Validated Data Sync",
  source: {
    type: "api",
    url: "https://api.service.com/data",
  },
  destination: {
    type: "database",
    table: "data",
  },
  validation: {
    required: ["id", "name", "email"],
    email: "{{item.email}}",
    phone: "{{item.phone}}",
  },
  transform: {
    id: "{{item.id}}",
    name: "{{item.name}}",
    email: "{{item.email | validate_email}}",
    phone: "{{item.phone | validate_phone}}",
  },
};

Resource Management

Be mindful of API rate limits and database connection limits when setting up sync jobs.
const resourceAwareSync = {
  id: "resource_aware_sync",
  name: "Resource Aware Sync",
  source: {
    type: "api",
    url: "https://api.service.com/data",
    rateLimit: {
      requestsPerMinute: 100,
      burstLimit: 20,
    },
  },
  destination: {
    type: "database",
    table: "data",
    connectionPool: {
      min: 2,
      max: 10,
      idleTimeout: 30000,
    },
  },
};

Common Issues and Solutions

Issue: Memory Usage

For large datasets, use streaming and batch processing to avoid memory issues.
const memoryEfficientSync = {
  id: "memory_efficient_sync",
  name: "Memory Efficient Sync",
  source: {
    type: "api",
    url: "https://api.service.com/large-dataset",
    streaming: true,
    batchSize: 1000,
  },
  destination: {
    type: "database",
    table: "large_dataset",
    mode: "streaming_insert",
  },
};

Issue: API Rate Limits

const rateLimitedSync = {
  id: "rate_limited_sync",
  name: "Rate Limited Sync",
  source: {
    type: "api",
    url: "https://api.service.com/data",
    rateLimit: {
      requestsPerMinute: 60,
      burstLimit: 10,
    },
    retry: {
      attempts: 3,
      delay: 1000,
      backoff: "exponential",
    },
  },
};

Next Steps

Now that you understand data synchronization, you can:
  1. Set up Monitoring: Implement comprehensive monitoring and alerting
  2. Optimize Performance: Fine-tune sync performance for your use case
  3. Handle Errors: Implement robust error handling and recovery
  4. Scale Up: Design sync patterns that can handle large datasets
Continue to the Custom Connectors Guide to learn how to build your own connectors.
I