Skip to main content

Performance Optimization

Performance optimization is crucial for building scalable and efficient data integrations. This guide shows you how to optimize the performance of your Databite connectors, flows, and integrations.

Overview

Performance optimization in Databite involves:
  • Caching Strategies for reducing API calls
  • Connection Pooling for database efficiency
  • Batch Processing for large datasets
  • Memory Management to prevent leaks
  • Async Operations for better concurrency
  • Resource Optimization for cost efficiency

Caching Strategies

HTTP Response Caching

class ResponseCache {
  private cache: Map<string, any> = new Map();
  private ttl: Map<string, number> = new Map();
  private defaultTTL: number = 300000; // 5 minutes

  set(key: string, value: any, ttl: number = this.defaultTTL) {
    this.cache.set(key, value);
    this.ttl.set(key, Date.now() + ttl);
  }

  get(key: string): any | null {
    const expiry = this.ttl.get(key);
    if (expiry && Date.now() > expiry) {
      this.delete(key);
      return null;
    }
    return this.cache.get(key) || null;
  }

  delete(key: string) {
    this.cache.delete(key);
    this.ttl.delete(key);
  }

  clear() {
    this.cache.clear();
    this.ttl.clear();
  }

  generateKey(url: string, options: any = {}): string {
    const sortedOptions = Object.keys(options)
      .sort()
      .reduce((result, key) => {
        result[key] = options[key];
        return result;
      }, {} as any);

    return `${url}:${JSON.stringify(sortedOptions)}`;
  }
}

const responseCache = new ResponseCache();

// Cached connector
const cachedConnector = new ConnectorBuilder()
  .identity({
    id: "cached-connector",
    name: "Cached Connector",
  })
  .actions([
    {
      id: "get_data",
      name: "Get Data",
      description: "Get data with caching",
      inputs: {
        endpoint: {
          type: "string",
          label: "Endpoint",
          required: true,
        },
      },
      execute: async (inputs, context) => {
        const cacheKey = responseCache.generateKey(
          `https://api.service.com${inputs.endpoint}`,
          { headers: context.tokens }
        );

        // Check cache first
        const cached = responseCache.get(cacheKey);
        if (cached) {
          logger.debug("Cache hit", { cacheKey });
          return cached;
        }

        // Fetch from API
        const response = await fetch(
          `https://api.service.com${inputs.endpoint}`,
          {
            headers: {
              Authorization: `Bearer ${context.tokens.api_key}`,
            },
          }
        );

        if (!response.ok) {
          throw new Error(`HTTP ${response.status}: ${response.statusText}`);
        }

        const data = await response.json();

        // Cache the response
        responseCache.set(cacheKey, data, 300000); // 5 minutes

        logger.debug("Cache miss", { cacheKey });
        return data;
      },
    },
  ])
  .build();

Database Query Caching

class DatabaseCache {
  private cache: Map<string, any> = new Map();
  private queryCache: Map<string, any> = new Map();

  async executeQuery(
    query: string,
    params: any[] = [],
    ttl: number = 300000
  ): Promise<any> {
    const cacheKey = this.generateQueryKey(query, params);

    // Check cache first
    const cached = this.queryCache.get(cacheKey);
    if (cached && Date.now() < cached.expiry) {
      logger.debug("Query cache hit", { cacheKey });
      return cached.data;
    }

    // Execute query
    const result = await this.executeQueryDirect(query, params);

    // Cache the result
    this.queryCache.set(cacheKey, {
      data: result,
      expiry: Date.now() + ttl,
    });

    logger.debug("Query cache miss", { cacheKey });
    return result;
  }

  private generateQueryKey(query: string, params: any[]): string {
    return `${query}:${JSON.stringify(params)}`;
  }

  private async executeQueryDirect(query: string, params: any[]): Promise<any> {
    // Implementation depends on your database driver
    return [];
  }

  clearQueryCache() {
    this.queryCache.clear();
  }
}

const dbCache = new DatabaseCache();

Connection Pooling

Database Connection Pooling

import { Pool } from "pg";

class DatabasePool {
  private pool: Pool;
  private maxConnections: number = 10;
  private minConnections: number = 2;
  private connectionTimeout: number = 30000;

  constructor(config: any) {
    this.pool = new Pool({
      ...config,
      max: this.maxConnections,
      min: this.minConnections,
      connectionTimeoutMillis: this.connectionTimeout,
      idleTimeoutMillis: 30000,
      acquireTimeoutMillis: 30000,
    });

    this.pool.on("error", (err) => {
      logger.error("Database pool error", err);
    });
  }

  async query(text: string, params: any[] = []): Promise<any> {
    const client = await this.pool.connect();

    try {
      const result = await client.query(text, params);
      return result.rows;
    } finally {
      client.release();
    }
  }

  async transaction<T>(callback: (client: any) => Promise<T>): Promise<T> {
    const client = await this.pool.connect();

    try {
      await client.query("BEGIN");
      const result = await callback(client);
      await client.query("COMMIT");
      return result;
    } catch (error) {
      await client.query("ROLLBACK");
      throw error;
    } finally {
      client.release();
    }
  }

  async close(): Promise<void> {
    await this.pool.end();
  }

  getPoolStats() {
    return {
      totalCount: this.pool.totalCount,
      idleCount: this.pool.idleCount,
      waitingCount: this.pool.waitingCount,
    };
  }
}

const dbPool = new DatabasePool({
  host: process.env.DB_HOST,
  port: process.env.DB_PORT,
  database: process.env.DB_NAME,
  user: process.env.DB_USER,
  password: process.env.DB_PASSWORD,
});

HTTP Connection Pooling

import { Agent } from "https";

class HTTPPool {
  private agent: Agent;
  private maxSockets: number = 10;
  private keepAlive: boolean = true;

  constructor() {
    this.agent = new Agent({
      maxSockets: this.maxSockets,
      keepAlive: this.keepAlive,
      timeout: 30000,
    });
  }

  async request(url: string, options: any = {}): Promise<Response> {
    return fetch(url, {
      ...options,
      agent: this.agent,
    });
  }

  getStats() {
    return {
      maxSockets: this.maxSockets,
      keepAlive: this.keepAlive,
    };
  }
}

const httpPool = new HTTPPool();

Batch Processing

Batch API Calls

class BatchProcessor {
  private batchSize: number = 100;
  private batchTimeout: number = 1000; // 1 second
  private batches: Map<string, any[]> = new Map();
  private timers: Map<string, NodeJS.Timeout> = new Map();

  async addToBatch(
    batchId: string,
    item: any,
    processor: (items: any[]) => Promise<any>
  ): Promise<any> {
    if (!this.batches.has(batchId)) {
      this.batches.set(batchId, []);
    }

    const batch = this.batches.get(batchId)!;
    batch.push(item);

    // Process batch if it's full
    if (batch.length >= this.batchSize) {
      return this.processBatch(batchId, processor);
    }

    // Set timer for batch timeout
    if (!this.timers.has(batchId)) {
      const timer = setTimeout(() => {
        this.processBatch(batchId, processor);
      }, this.batchTimeout);

      this.timers.set(batchId, timer);
    }

    return Promise.resolve();
  }

  private async processBatch(
    batchId: string,
    processor: (items: any[]) => Promise<any>
  ): Promise<any> {
    const batch = this.batches.get(batchId);
    if (!batch || batch.length === 0) {
      return;
    }

    // Clear timer
    const timer = this.timers.get(batchId);
    if (timer) {
      clearTimeout(timer);
      this.timers.delete(batchId);
    }

    // Process batch
    try {
      const result = await processor([...batch]);

      // Clear batch
      this.batches.set(batchId, []);

      logger.debug("Batch processed", {
        batchId,
        batchSize: batch.length,
      });

      return result;
    } catch (error) {
      logger.error("Batch processing failed", error);
      throw error;
    }
  }

  async flushAll(processor: (items: any[]) => Promise<any>): Promise<void> {
    const promises = Array.from(this.batches.keys()).map((batchId) =>
      this.processBatch(batchId, processor)
    );

    await Promise.all(promises);
  }
}

const batchProcessor = new BatchProcessor();

// Usage in connector
const batchConnector = new ConnectorBuilder()
  .identity({
    id: "batch-connector",
    name: "Batch Connector",
  })
  .actions([
    {
      id: "batch_create",
      name: "Batch Create",
      description: "Create multiple items in batches",
      inputs: {
        items: {
          type: "array",
          label: "Items to Create",
          required: true,
        },
      },
      execute: async (inputs, context) => {
        const results = [];

        for (const item of inputs.items) {
          const result = await batchProcessor.addToBatch(
            "create_items",
            item,
            async (items) => {
              const response = await fetch("https://api.service.com/items", {
                method: "POST",
                headers: {
                  Authorization: `Bearer ${context.tokens.api_key}`,
                  "Content-Type": "application/json",
                },
                body: JSON.stringify({ items }),
              });

              return response.json();
            }
          );

          if (result) {
            results.push(result);
          }
        }

        // Flush remaining items
        await batchProcessor.flushAll(async (items) => {
          const response = await fetch("https://api.service.com/items", {
            method: "POST",
            headers: {
              Authorization: `Bearer ${context.tokens.api_key}`,
              "Content-Type": "application/json",
            },
            body: JSON.stringify({ items }),
          });

          return response.json();
        });

        return results;
      },
    },
  ])
  .build();

Database Batch Operations

class DatabaseBatch {
  private batchSize: number = 1000;
  private batches: Map<string, any[]> = new Map();

  async addToBatch(
    tableName: string,
    record: any,
    operation: "insert" | "update" | "delete" = "insert"
  ): Promise<void> {
    const batchKey = `${tableName}_${operation}`;

    if (!this.batches.has(batchKey)) {
      this.batches.set(batchKey, []);
    }

    const batch = this.batches.get(batchKey)!;
    batch.push(record);

    // Process batch if it's full
    if (batch.length >= this.batchSize) {
      await this.processBatch(tableName, operation, batch);
    }
  }

  private async processBatch(
    tableName: string,
    operation: string,
    records: any[]
  ): Promise<void> {
    try {
      switch (operation) {
        case "insert":
          await this.batchInsert(tableName, records);
          break;
        case "update":
          await this.batchUpdate(tableName, records);
          break;
        case "delete":
          await this.batchDelete(tableName, records);
          break;
      }

      logger.debug("Batch processed", {
        tableName,
        operation,
        recordCount: records.length,
      });
    } catch (error) {
      logger.error("Batch processing failed", error);
      throw error;
    }
  }

  private async batchInsert(tableName: string, records: any[]): Promise<void> {
    const columns = Object.keys(records[0]);
    const values = records.map((record) => columns.map((col) => record[col]));

    const placeholders = values
      .map(
        (_, i) =>
          `(${columns
            .map((_, j) => `$${i * columns.length + j + 1}`)
            .join(", ")})`
      )
      .join(", ");

    const query = `
      INSERT INTO ${tableName} (${columns.join(", ")})
      VALUES ${placeholders}
    `;

    const flatValues = values.flat();
    await dbPool.query(query, flatValues);
  }

  private async batchUpdate(tableName: string, records: any[]): Promise<void> {
    // Implementation for batch update
    // This would depend on your specific requirements
  }

  private async batchDelete(tableName: string, records: any[]): Promise<void> {
    const ids = records.map((record) => record.id);
    const placeholders = ids.map((_, i) => `$${i + 1}`).join(", ");

    const query = `DELETE FROM ${tableName} WHERE id IN (${placeholders})`;
    await dbPool.query(query, ids);
  }

  async flushAll(): Promise<void> {
    const promises = Array.from(this.batches.entries()).map(
      ([batchKey, records]) => {
        const [tableName, operation] = batchKey.split("_");
        return this.processBatch(tableName, operation, records);
      }
    );

    await Promise.all(promises);
    this.batches.clear();
  }
}

const dbBatch = new DatabaseBatch();

Memory Management

Memory-Efficient Data Processing

class MemoryEfficientProcessor {
  private chunkSize: number = 1000;
  private maxMemoryUsage: number = 100 * 1024 * 1024; // 100MB

  async processLargeDataset(
    dataSource: AsyncGenerator<any>,
    processor: (chunk: any[]) => Promise<any>
  ): Promise<any[]> {
    const results = [];
    let currentChunk = [];
    let memoryUsage = 0;

    for await (const item of dataSource) {
      currentChunk.push(item);
      memoryUsage += this.estimateMemoryUsage(item);

      // Process chunk if it's full or memory usage is high
      if (
        currentChunk.length >= this.chunkSize ||
        memoryUsage >= this.maxMemoryUsage
      ) {
        const chunkResult = await processor(currentChunk);
        results.push(chunkResult);

        // Clear chunk and reset memory usage
        currentChunk = [];
        memoryUsage = 0;
      }
    }

    // Process remaining items
    if (currentChunk.length > 0) {
      const chunkResult = await processor(currentChunk);
      results.push(chunkResult);
    }

    return results;
  }

  private estimateMemoryUsage(item: any): number {
    return JSON.stringify(item).length * 2; // Rough estimate
  }

  async streamProcess(
    inputStream: ReadableStream,
    processor: (item: any) => Promise<any>
  ): Promise<void> {
    const reader = inputStream.getReader();

    try {
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const item = JSON.parse(new TextDecoder().decode(value));
        await processor(item);
      }
    } finally {
      reader.releaseLock();
    }
  }
}

const memoryProcessor = new MemoryEfficientProcessor();

Garbage Collection Optimization

class GarbageCollectionOptimizer {
  private weakRefs: WeakMap<any, any> = new WeakMap();
  private cleanupInterval: number = 60000; // 1 minute

  constructor() {
    setInterval(() => {
      this.cleanup();
    }, this.cleanupInterval);
  }

  createWeakReference(obj: any, metadata: any) {
    this.weakRefs.set(obj, metadata);
  }

  private cleanup() {
    // Force garbage collection if available
    if (global.gc) {
      global.gc();
    }

    // Log memory usage
    const memoryUsage = process.memoryUsage();
    logger.debug("Memory usage", memoryUsage);
  }

  optimizeForGC(data: any[]): any[] {
    // Use Map instead of Object for better GC
    const optimized = new Map();

    data.forEach((item, index) => {
      optimized.set(index, item);
    });

    return Array.from(optimized.values());
  }
}

const gcOptimizer = new GarbageCollectionOptimizer();

Async Operations

Concurrent Processing

class ConcurrentProcessor {
  private maxConcurrency: number = 10;
  private semaphore: Semaphore;

  constructor(maxConcurrency: number = 10) {
    this.maxConcurrency = maxConcurrency;
    this.semaphore = new Semaphore(maxConcurrency);
  }

  async processConcurrently<T, R>(
    items: T[],
    processor: (item: T) => Promise<R>
  ): Promise<R[]> {
    const promises = items.map((item) =>
      this.semaphore.acquire().then(async (release) => {
        try {
          return await processor(item);
        } finally {
          release();
        }
      })
    );

    return Promise.all(promises);
  }

  async processInBatches<T, R>(
    items: T[],
    processor: (item: T) => Promise<R>,
    batchSize: number = this.maxConcurrency
  ): Promise<R[]> {
    const results = [];

    for (let i = 0; i < items.length; i += batchSize) {
      const batch = items.slice(i, i + batchSize);
      const batchResults = await this.processConcurrently(batch, processor);
      results.push(...batchResults);
    }

    return results;
  }
}

class Semaphore {
  private permits: number;
  private waiting: Array<() => void> = [];

  constructor(permits: number) {
    this.permits = permits;
  }

  async acquire(): Promise<() => void> {
    return new Promise((resolve) => {
      if (this.permits > 0) {
        this.permits--;
        resolve(() => this.release());
      } else {
        this.waiting.push(() => {
          this.permits--;
          resolve(() => this.release());
        });
      }
    });
  }

  private release(): void {
    this.permits++;
    const next = this.waiting.shift();
    if (next) {
      next();
    }
  }
}

const concurrentProcessor = new ConcurrentProcessor(10);

Promise Pool

class PromisePool {
  private maxConcurrency: number;
  private running: Set<Promise<any>> = new Set();
  private queue: Array<() => Promise<any>> = [];

  constructor(maxConcurrency: number = 10) {
    this.maxConcurrency = maxConcurrency;
  }

  async add<T>(task: () => Promise<T>): Promise<T> {
    return new Promise((resolve, reject) => {
      this.queue.push(async () => {
        try {
          const result = await task();
          resolve(result);
        } catch (error) {
          reject(error);
        }
      });

      this.process();
    });
  }

  private async process(): Promise<void> {
    if (this.running.size >= this.maxConcurrency || this.queue.length === 0) {
      return;
    }

    const task = this.queue.shift();
    if (!task) return;

    const promise = task().finally(() => {
      this.running.delete(promise);
      this.process();
    });

    this.running.add(promise);
  }

  async waitForAll(): Promise<void> {
    await Promise.all(Array.from(this.running));
  }
}

const promisePool = new PromisePool(10);

Resource Optimization

Resource Monitoring

class ResourceMonitor {
  private metrics: Map<string, any> = new Map();
  private monitoringInterval: number = 5000; // 5 seconds

  constructor() {
    setInterval(() => {
      this.collectMetrics();
    }, this.monitoringInterval);
  }

  private collectMetrics() {
    const memoryUsage = process.memoryUsage();
    const cpuUsage = process.cpuUsage();

    this.metrics.set("memory", {
      rss: memoryUsage.rss,
      heapTotal: memoryUsage.heapTotal,
      heapUsed: memoryUsage.heapUsed,
      external: memoryUsage.external,
    });

    this.metrics.set("cpu", {
      user: cpuUsage.user,
      system: cpuUsage.system,
    });

    this.metrics.set("timestamp", new Date().toISOString());
  }

  getMetrics(): any {
    return Object.fromEntries(this.metrics);
  }

  getMemoryUsage(): any {
    return this.metrics.get("memory");
  }

  getCpuUsage(): any {
    return this.metrics.get("cpu");
  }

  isMemoryUsageHigh(threshold: number = 0.8): boolean {
    const memory = this.getMemoryUsage();
    const totalMemory = require("os").totalmem();
    return memory.heapUsed / totalMemory > threshold;
  }
}

const resourceMonitor = new ResourceMonitor();

Cost Optimization

class CostOptimizer {
  private apiCallCosts: Map<string, number> = new Map();
  private totalCost: number = 0;

  trackApiCall(endpoint: string, cost: number) {
    this.apiCallCosts.set(endpoint, cost);
    this.totalCost += cost;
  }

  getTotalCost(): number {
    return this.totalCost;
  }

  getCostByEndpoint(): any {
    return Object.fromEntries(this.apiCallCosts);
  }

  optimizeApiCalls(requests: any[]): any[] {
    // Group similar requests
    const grouped = this.groupSimilarRequests(requests);

    // Optimize each group
    return grouped.map((group) => this.optimizeGroup(group));
  }

  private groupSimilarRequests(requests: any[]): any[][] {
    const groups = new Map<string, any[]>();

    requests.forEach((request) => {
      const key = `${request.method}:${request.endpoint}`;
      if (!groups.has(key)) {
        groups.set(key, []);
      }
      groups.get(key)!.push(request);
    });

    return Array.from(groups.values());
  }

  private optimizeGroup(group: any[]): any {
    // Implement group optimization logic
    return group[0]; // Simplified
  }
}

const costOptimizer = new CostOptimizer();

Testing Performance

Performance Testing

describe("Performance Tests", () => {
  let connector: any;
  let performanceMonitor: PerformanceMonitor;

  beforeEach(() => {
    connector = createTestConnector();
    performanceMonitor = new PerformanceMonitor();
  });

  it("should handle concurrent requests efficiently", async () => {
    const requests = Array(100)
      .fill(null)
      .map((_, i) => ({
        input: `test_${i}`,
      }));

    const startTime = performance.now();

    const results = await concurrentProcessor.processConcurrently(
      requests,
      (request) => connector.actions.test_action.execute(request)
    );

    const endTime = performance.now();
    const duration = endTime - startTime;

    expect(results).toHaveLength(100);
    expect(duration).toBeLessThan(5000); // Should complete within 5 seconds
  });

  it("should use memory efficiently", async () => {
    const initialMemory = process.memoryUsage().heapUsed;

    await connector.actions.memory_intensive_action.execute({
      data: generateLargeDataset(10000),
    });

    const finalMemory = process.memoryUsage().heapUsed;
    const memoryIncrease = finalMemory - initialMemory;

    expect(memoryIncrease).toBeLessThan(50 * 1024 * 1024); // Less than 50MB increase
  });

  it("should cache responses effectively", async () => {
    const cacheKey = "test_cache_key";

    // First call should miss cache
    const start1 = performance.now();
    await connector.actions.cached_action.execute({});
    const duration1 = performance.now() - start1;

    // Second call should hit cache
    const start2 = performance.now();
    await connector.actions.cached_action.execute({});
    const duration2 = performance.now() - start2;

    expect(duration2).toBeLessThan(duration1);
    expect(duration2).toBeLessThan(100); // Should be very fast
  });
});

Best Practices

Performance Guidelines

Always profile your code before optimizing. Measure first, then optimize based on actual performance data.
// ✅ Good: Profile before optimizing
const profileConnector = async (connector: any) => {
  const startTime = performance.now();
  const startMemory = process.memoryUsage();

  const result = await connector.actions.test_action.execute({});

  const endTime = performance.now();
  const endMemory = process.memoryUsage();

  logger.info("Performance metrics", {
    duration: endTime - startTime,
    memoryUsed: endMemory.heapUsed - startMemory.heapUsed,
    resultSize: JSON.stringify(result).length,
  });

  return result;
};

Resource Management

Always clean up resources to prevent memory leaks and ensure optimal performance.
// ✅ Good: Proper resource cleanup
class ResourceManager {
  private resources: Set<any> = new Set();

  addResource(resource: any) {
    this.resources.add(resource);
  }

  async cleanup() {
    const cleanupPromises = Array.from(this.resources).map((resource) => {
      if (resource.close) {
        return resource.close();
      }
      if (resource.destroy) {
        return resource.destroy();
      }
      return Promise.resolve();
    });

    await Promise.all(cleanupPromises);
    this.resources.clear();
  }
}

const resourceManager = new ResourceManager();

// Cleanup on process exit
process.on("exit", () => {
  resourceManager.cleanup();
});

process.on("SIGINT", () => {
  resourceManager.cleanup();
  process.exit(0);
});

Common Performance Issues

Issue: Memory Leaks

Memory leaks often occur due to event listeners not being removed, timers not being cleared, or large objects not being garbage collected.
// ❌ Bad: Memory leak
class LeakyComponent {
  private timer: NodeJS.Timeout;
  private listeners: Array<() => void> = [];

  constructor() {
    this.timer = setInterval(() => {
      // Do something
    }, 1000);

    this.listeners.push(() => {
      // Do something
    });
  }
}

// ✅ Good: Proper cleanup
class CleanComponent {
  private timer: NodeJS.Timeout;
  private listeners: Array<() => void> = [];

  constructor() {
    this.timer = setInterval(() => {
      // Do something
    }, 1000);

    this.listeners.push(() => {
      // Do something
    });
  }

  cleanup() {
    clearInterval(this.timer);
    this.listeners = [];
  }
}

Issue: Blocking Operations

// ❌ Bad: Blocking operation
const processData = (data: any[]) => {
  return data.map((item) => {
    // Synchronous operation that blocks
    return heavyComputation(item);
  });
};

// ✅ Good: Non-blocking operation
const processDataAsync = async (data: any[]) => {
  return Promise.all(
    data.map(async (item) => {
      // Asynchronous operation that doesn't block
      return heavyComputationAsync(item);
    })
  );
};

Next Steps

Now that you understand performance optimization, you can:
  1. Implement Monitoring: Set up comprehensive performance monitoring
  2. Optimize Critical Paths: Focus on the most performance-critical parts of your application
  3. Add Caching: Implement caching strategies to reduce redundant operations
  4. Scale Efficiently: Design your system to handle increased load
You’ve now completed the comprehensive Databite SDK documentation! The documentation covers all the essential topics for building robust data integrations with Databite.
I