Performance Optimization
Performance optimization is crucial for building scalable and efficient data integrations. This guide shows you how to optimize the performance of your Databite connectors, flows, and integrations.Overview
Performance optimization in Databite involves:- Caching Strategies for reducing API calls
- Connection Pooling for database efficiency
- Batch Processing for large datasets
- Memory Management to prevent leaks
- Async Operations for better concurrency
- Resource Optimization for cost efficiency
Caching Strategies
HTTP Response Caching
Copy
Ask AI
class ResponseCache {
private cache: Map<string, any> = new Map();
private ttl: Map<string, number> = new Map();
private defaultTTL: number = 300000; // 5 minutes
set(key: string, value: any, ttl: number = this.defaultTTL) {
this.cache.set(key, value);
this.ttl.set(key, Date.now() + ttl);
}
get(key: string): any | null {
const expiry = this.ttl.get(key);
if (expiry && Date.now() > expiry) {
this.delete(key);
return null;
}
return this.cache.get(key) || null;
}
delete(key: string) {
this.cache.delete(key);
this.ttl.delete(key);
}
clear() {
this.cache.clear();
this.ttl.clear();
}
generateKey(url: string, options: any = {}): string {
const sortedOptions = Object.keys(options)
.sort()
.reduce((result, key) => {
result[key] = options[key];
return result;
}, {} as any);
return `${url}:${JSON.stringify(sortedOptions)}`;
}
}
const responseCache = new ResponseCache();
// Cached connector
const cachedConnector = new ConnectorBuilder()
.identity({
id: "cached-connector",
name: "Cached Connector",
})
.actions([
{
id: "get_data",
name: "Get Data",
description: "Get data with caching",
inputs: {
endpoint: {
type: "string",
label: "Endpoint",
required: true,
},
},
execute: async (inputs, context) => {
const cacheKey = responseCache.generateKey(
`https://api.service.com${inputs.endpoint}`,
{ headers: context.tokens }
);
// Check cache first
const cached = responseCache.get(cacheKey);
if (cached) {
logger.debug("Cache hit", { cacheKey });
return cached;
}
// Fetch from API
const response = await fetch(
`https://api.service.com${inputs.endpoint}`,
{
headers: {
Authorization: `Bearer ${context.tokens.api_key}`,
},
}
);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
const data = await response.json();
// Cache the response
responseCache.set(cacheKey, data, 300000); // 5 minutes
logger.debug("Cache miss", { cacheKey });
return data;
},
},
])
.build();
Database Query Caching
Copy
Ask AI
class DatabaseCache {
private cache: Map<string, any> = new Map();
private queryCache: Map<string, any> = new Map();
async executeQuery(
query: string,
params: any[] = [],
ttl: number = 300000
): Promise<any> {
const cacheKey = this.generateQueryKey(query, params);
// Check cache first
const cached = this.queryCache.get(cacheKey);
if (cached && Date.now() < cached.expiry) {
logger.debug("Query cache hit", { cacheKey });
return cached.data;
}
// Execute query
const result = await this.executeQueryDirect(query, params);
// Cache the result
this.queryCache.set(cacheKey, {
data: result,
expiry: Date.now() + ttl,
});
logger.debug("Query cache miss", { cacheKey });
return result;
}
private generateQueryKey(query: string, params: any[]): string {
return `${query}:${JSON.stringify(params)}`;
}
private async executeQueryDirect(query: string, params: any[]): Promise<any> {
// Implementation depends on your database driver
return [];
}
clearQueryCache() {
this.queryCache.clear();
}
}
const dbCache = new DatabaseCache();
Connection Pooling
Database Connection Pooling
Copy
Ask AI
import { Pool } from "pg";
class DatabasePool {
private pool: Pool;
private maxConnections: number = 10;
private minConnections: number = 2;
private connectionTimeout: number = 30000;
constructor(config: any) {
this.pool = new Pool({
...config,
max: this.maxConnections,
min: this.minConnections,
connectionTimeoutMillis: this.connectionTimeout,
idleTimeoutMillis: 30000,
acquireTimeoutMillis: 30000,
});
this.pool.on("error", (err) => {
logger.error("Database pool error", err);
});
}
async query(text: string, params: any[] = []): Promise<any> {
const client = await this.pool.connect();
try {
const result = await client.query(text, params);
return result.rows;
} finally {
client.release();
}
}
async transaction<T>(callback: (client: any) => Promise<T>): Promise<T> {
const client = await this.pool.connect();
try {
await client.query("BEGIN");
const result = await callback(client);
await client.query("COMMIT");
return result;
} catch (error) {
await client.query("ROLLBACK");
throw error;
} finally {
client.release();
}
}
async close(): Promise<void> {
await this.pool.end();
}
getPoolStats() {
return {
totalCount: this.pool.totalCount,
idleCount: this.pool.idleCount,
waitingCount: this.pool.waitingCount,
};
}
}
const dbPool = new DatabasePool({
host: process.env.DB_HOST,
port: process.env.DB_PORT,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
});
HTTP Connection Pooling
Copy
Ask AI
import { Agent } from "https";
class HTTPPool {
private agent: Agent;
private maxSockets: number = 10;
private keepAlive: boolean = true;
constructor() {
this.agent = new Agent({
maxSockets: this.maxSockets,
keepAlive: this.keepAlive,
timeout: 30000,
});
}
async request(url: string, options: any = {}): Promise<Response> {
return fetch(url, {
...options,
agent: this.agent,
});
}
getStats() {
return {
maxSockets: this.maxSockets,
keepAlive: this.keepAlive,
};
}
}
const httpPool = new HTTPPool();
Batch Processing
Batch API Calls
Copy
Ask AI
class BatchProcessor {
private batchSize: number = 100;
private batchTimeout: number = 1000; // 1 second
private batches: Map<string, any[]> = new Map();
private timers: Map<string, NodeJS.Timeout> = new Map();
async addToBatch(
batchId: string,
item: any,
processor: (items: any[]) => Promise<any>
): Promise<any> {
if (!this.batches.has(batchId)) {
this.batches.set(batchId, []);
}
const batch = this.batches.get(batchId)!;
batch.push(item);
// Process batch if it's full
if (batch.length >= this.batchSize) {
return this.processBatch(batchId, processor);
}
// Set timer for batch timeout
if (!this.timers.has(batchId)) {
const timer = setTimeout(() => {
this.processBatch(batchId, processor);
}, this.batchTimeout);
this.timers.set(batchId, timer);
}
return Promise.resolve();
}
private async processBatch(
batchId: string,
processor: (items: any[]) => Promise<any>
): Promise<any> {
const batch = this.batches.get(batchId);
if (!batch || batch.length === 0) {
return;
}
// Clear timer
const timer = this.timers.get(batchId);
if (timer) {
clearTimeout(timer);
this.timers.delete(batchId);
}
// Process batch
try {
const result = await processor([...batch]);
// Clear batch
this.batches.set(batchId, []);
logger.debug("Batch processed", {
batchId,
batchSize: batch.length,
});
return result;
} catch (error) {
logger.error("Batch processing failed", error);
throw error;
}
}
async flushAll(processor: (items: any[]) => Promise<any>): Promise<void> {
const promises = Array.from(this.batches.keys()).map((batchId) =>
this.processBatch(batchId, processor)
);
await Promise.all(promises);
}
}
const batchProcessor = new BatchProcessor();
// Usage in connector
const batchConnector = new ConnectorBuilder()
.identity({
id: "batch-connector",
name: "Batch Connector",
})
.actions([
{
id: "batch_create",
name: "Batch Create",
description: "Create multiple items in batches",
inputs: {
items: {
type: "array",
label: "Items to Create",
required: true,
},
},
execute: async (inputs, context) => {
const results = [];
for (const item of inputs.items) {
const result = await batchProcessor.addToBatch(
"create_items",
item,
async (items) => {
const response = await fetch("https://api.service.com/items", {
method: "POST",
headers: {
Authorization: `Bearer ${context.tokens.api_key}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ items }),
});
return response.json();
}
);
if (result) {
results.push(result);
}
}
// Flush remaining items
await batchProcessor.flushAll(async (items) => {
const response = await fetch("https://api.service.com/items", {
method: "POST",
headers: {
Authorization: `Bearer ${context.tokens.api_key}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ items }),
});
return response.json();
});
return results;
},
},
])
.build();
Database Batch Operations
Copy
Ask AI
class DatabaseBatch {
private batchSize: number = 1000;
private batches: Map<string, any[]> = new Map();
async addToBatch(
tableName: string,
record: any,
operation: "insert" | "update" | "delete" = "insert"
): Promise<void> {
const batchKey = `${tableName}_${operation}`;
if (!this.batches.has(batchKey)) {
this.batches.set(batchKey, []);
}
const batch = this.batches.get(batchKey)!;
batch.push(record);
// Process batch if it's full
if (batch.length >= this.batchSize) {
await this.processBatch(tableName, operation, batch);
}
}
private async processBatch(
tableName: string,
operation: string,
records: any[]
): Promise<void> {
try {
switch (operation) {
case "insert":
await this.batchInsert(tableName, records);
break;
case "update":
await this.batchUpdate(tableName, records);
break;
case "delete":
await this.batchDelete(tableName, records);
break;
}
logger.debug("Batch processed", {
tableName,
operation,
recordCount: records.length,
});
} catch (error) {
logger.error("Batch processing failed", error);
throw error;
}
}
private async batchInsert(tableName: string, records: any[]): Promise<void> {
const columns = Object.keys(records[0]);
const values = records.map((record) => columns.map((col) => record[col]));
const placeholders = values
.map(
(_, i) =>
`(${columns
.map((_, j) => `$${i * columns.length + j + 1}`)
.join(", ")})`
)
.join(", ");
const query = `
INSERT INTO ${tableName} (${columns.join(", ")})
VALUES ${placeholders}
`;
const flatValues = values.flat();
await dbPool.query(query, flatValues);
}
private async batchUpdate(tableName: string, records: any[]): Promise<void> {
// Implementation for batch update
// This would depend on your specific requirements
}
private async batchDelete(tableName: string, records: any[]): Promise<void> {
const ids = records.map((record) => record.id);
const placeholders = ids.map((_, i) => `$${i + 1}`).join(", ");
const query = `DELETE FROM ${tableName} WHERE id IN (${placeholders})`;
await dbPool.query(query, ids);
}
async flushAll(): Promise<void> {
const promises = Array.from(this.batches.entries()).map(
([batchKey, records]) => {
const [tableName, operation] = batchKey.split("_");
return this.processBatch(tableName, operation, records);
}
);
await Promise.all(promises);
this.batches.clear();
}
}
const dbBatch = new DatabaseBatch();
Memory Management
Memory-Efficient Data Processing
Copy
Ask AI
class MemoryEfficientProcessor {
private chunkSize: number = 1000;
private maxMemoryUsage: number = 100 * 1024 * 1024; // 100MB
async processLargeDataset(
dataSource: AsyncGenerator<any>,
processor: (chunk: any[]) => Promise<any>
): Promise<any[]> {
const results = [];
let currentChunk = [];
let memoryUsage = 0;
for await (const item of dataSource) {
currentChunk.push(item);
memoryUsage += this.estimateMemoryUsage(item);
// Process chunk if it's full or memory usage is high
if (
currentChunk.length >= this.chunkSize ||
memoryUsage >= this.maxMemoryUsage
) {
const chunkResult = await processor(currentChunk);
results.push(chunkResult);
// Clear chunk and reset memory usage
currentChunk = [];
memoryUsage = 0;
}
}
// Process remaining items
if (currentChunk.length > 0) {
const chunkResult = await processor(currentChunk);
results.push(chunkResult);
}
return results;
}
private estimateMemoryUsage(item: any): number {
return JSON.stringify(item).length * 2; // Rough estimate
}
async streamProcess(
inputStream: ReadableStream,
processor: (item: any) => Promise<any>
): Promise<void> {
const reader = inputStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
const item = JSON.parse(new TextDecoder().decode(value));
await processor(item);
}
} finally {
reader.releaseLock();
}
}
}
const memoryProcessor = new MemoryEfficientProcessor();
Garbage Collection Optimization
Copy
Ask AI
class GarbageCollectionOptimizer {
private weakRefs: WeakMap<any, any> = new WeakMap();
private cleanupInterval: number = 60000; // 1 minute
constructor() {
setInterval(() => {
this.cleanup();
}, this.cleanupInterval);
}
createWeakReference(obj: any, metadata: any) {
this.weakRefs.set(obj, metadata);
}
private cleanup() {
// Force garbage collection if available
if (global.gc) {
global.gc();
}
// Log memory usage
const memoryUsage = process.memoryUsage();
logger.debug("Memory usage", memoryUsage);
}
optimizeForGC(data: any[]): any[] {
// Use Map instead of Object for better GC
const optimized = new Map();
data.forEach((item, index) => {
optimized.set(index, item);
});
return Array.from(optimized.values());
}
}
const gcOptimizer = new GarbageCollectionOptimizer();
Async Operations
Concurrent Processing
Copy
Ask AI
class ConcurrentProcessor {
private maxConcurrency: number = 10;
private semaphore: Semaphore;
constructor(maxConcurrency: number = 10) {
this.maxConcurrency = maxConcurrency;
this.semaphore = new Semaphore(maxConcurrency);
}
async processConcurrently<T, R>(
items: T[],
processor: (item: T) => Promise<R>
): Promise<R[]> {
const promises = items.map((item) =>
this.semaphore.acquire().then(async (release) => {
try {
return await processor(item);
} finally {
release();
}
})
);
return Promise.all(promises);
}
async processInBatches<T, R>(
items: T[],
processor: (item: T) => Promise<R>,
batchSize: number = this.maxConcurrency
): Promise<R[]> {
const results = [];
for (let i = 0; i < items.length; i += batchSize) {
const batch = items.slice(i, i + batchSize);
const batchResults = await this.processConcurrently(batch, processor);
results.push(...batchResults);
}
return results;
}
}
class Semaphore {
private permits: number;
private waiting: Array<() => void> = [];
constructor(permits: number) {
this.permits = permits;
}
async acquire(): Promise<() => void> {
return new Promise((resolve) => {
if (this.permits > 0) {
this.permits--;
resolve(() => this.release());
} else {
this.waiting.push(() => {
this.permits--;
resolve(() => this.release());
});
}
});
}
private release(): void {
this.permits++;
const next = this.waiting.shift();
if (next) {
next();
}
}
}
const concurrentProcessor = new ConcurrentProcessor(10);
Promise Pool
Copy
Ask AI
class PromisePool {
private maxConcurrency: number;
private running: Set<Promise<any>> = new Set();
private queue: Array<() => Promise<any>> = [];
constructor(maxConcurrency: number = 10) {
this.maxConcurrency = maxConcurrency;
}
async add<T>(task: () => Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
this.queue.push(async () => {
try {
const result = await task();
resolve(result);
} catch (error) {
reject(error);
}
});
this.process();
});
}
private async process(): Promise<void> {
if (this.running.size >= this.maxConcurrency || this.queue.length === 0) {
return;
}
const task = this.queue.shift();
if (!task) return;
const promise = task().finally(() => {
this.running.delete(promise);
this.process();
});
this.running.add(promise);
}
async waitForAll(): Promise<void> {
await Promise.all(Array.from(this.running));
}
}
const promisePool = new PromisePool(10);
Resource Optimization
Resource Monitoring
Copy
Ask AI
class ResourceMonitor {
private metrics: Map<string, any> = new Map();
private monitoringInterval: number = 5000; // 5 seconds
constructor() {
setInterval(() => {
this.collectMetrics();
}, this.monitoringInterval);
}
private collectMetrics() {
const memoryUsage = process.memoryUsage();
const cpuUsage = process.cpuUsage();
this.metrics.set("memory", {
rss: memoryUsage.rss,
heapTotal: memoryUsage.heapTotal,
heapUsed: memoryUsage.heapUsed,
external: memoryUsage.external,
});
this.metrics.set("cpu", {
user: cpuUsage.user,
system: cpuUsage.system,
});
this.metrics.set("timestamp", new Date().toISOString());
}
getMetrics(): any {
return Object.fromEntries(this.metrics);
}
getMemoryUsage(): any {
return this.metrics.get("memory");
}
getCpuUsage(): any {
return this.metrics.get("cpu");
}
isMemoryUsageHigh(threshold: number = 0.8): boolean {
const memory = this.getMemoryUsage();
const totalMemory = require("os").totalmem();
return memory.heapUsed / totalMemory > threshold;
}
}
const resourceMonitor = new ResourceMonitor();
Cost Optimization
Copy
Ask AI
class CostOptimizer {
private apiCallCosts: Map<string, number> = new Map();
private totalCost: number = 0;
trackApiCall(endpoint: string, cost: number) {
this.apiCallCosts.set(endpoint, cost);
this.totalCost += cost;
}
getTotalCost(): number {
return this.totalCost;
}
getCostByEndpoint(): any {
return Object.fromEntries(this.apiCallCosts);
}
optimizeApiCalls(requests: any[]): any[] {
// Group similar requests
const grouped = this.groupSimilarRequests(requests);
// Optimize each group
return grouped.map((group) => this.optimizeGroup(group));
}
private groupSimilarRequests(requests: any[]): any[][] {
const groups = new Map<string, any[]>();
requests.forEach((request) => {
const key = `${request.method}:${request.endpoint}`;
if (!groups.has(key)) {
groups.set(key, []);
}
groups.get(key)!.push(request);
});
return Array.from(groups.values());
}
private optimizeGroup(group: any[]): any {
// Implement group optimization logic
return group[0]; // Simplified
}
}
const costOptimizer = new CostOptimizer();
Testing Performance
Performance Testing
Copy
Ask AI
describe("Performance Tests", () => {
let connector: any;
let performanceMonitor: PerformanceMonitor;
beforeEach(() => {
connector = createTestConnector();
performanceMonitor = new PerformanceMonitor();
});
it("should handle concurrent requests efficiently", async () => {
const requests = Array(100)
.fill(null)
.map((_, i) => ({
input: `test_${i}`,
}));
const startTime = performance.now();
const results = await concurrentProcessor.processConcurrently(
requests,
(request) => connector.actions.test_action.execute(request)
);
const endTime = performance.now();
const duration = endTime - startTime;
expect(results).toHaveLength(100);
expect(duration).toBeLessThan(5000); // Should complete within 5 seconds
});
it("should use memory efficiently", async () => {
const initialMemory = process.memoryUsage().heapUsed;
await connector.actions.memory_intensive_action.execute({
data: generateLargeDataset(10000),
});
const finalMemory = process.memoryUsage().heapUsed;
const memoryIncrease = finalMemory - initialMemory;
expect(memoryIncrease).toBeLessThan(50 * 1024 * 1024); // Less than 50MB increase
});
it("should cache responses effectively", async () => {
const cacheKey = "test_cache_key";
// First call should miss cache
const start1 = performance.now();
await connector.actions.cached_action.execute({});
const duration1 = performance.now() - start1;
// Second call should hit cache
const start2 = performance.now();
await connector.actions.cached_action.execute({});
const duration2 = performance.now() - start2;
expect(duration2).toBeLessThan(duration1);
expect(duration2).toBeLessThan(100); // Should be very fast
});
});
Best Practices
Performance Guidelines
Always profile your code before optimizing. Measure first, then optimize based
on actual performance data.
Copy
Ask AI
// ✅ Good: Profile before optimizing
const profileConnector = async (connector: any) => {
const startTime = performance.now();
const startMemory = process.memoryUsage();
const result = await connector.actions.test_action.execute({});
const endTime = performance.now();
const endMemory = process.memoryUsage();
logger.info("Performance metrics", {
duration: endTime - startTime,
memoryUsed: endMemory.heapUsed - startMemory.heapUsed,
resultSize: JSON.stringify(result).length,
});
return result;
};
Resource Management
Always clean up resources to prevent memory leaks and ensure optimal
performance.
Copy
Ask AI
// ✅ Good: Proper resource cleanup
class ResourceManager {
private resources: Set<any> = new Set();
addResource(resource: any) {
this.resources.add(resource);
}
async cleanup() {
const cleanupPromises = Array.from(this.resources).map((resource) => {
if (resource.close) {
return resource.close();
}
if (resource.destroy) {
return resource.destroy();
}
return Promise.resolve();
});
await Promise.all(cleanupPromises);
this.resources.clear();
}
}
const resourceManager = new ResourceManager();
// Cleanup on process exit
process.on("exit", () => {
resourceManager.cleanup();
});
process.on("SIGINT", () => {
resourceManager.cleanup();
process.exit(0);
});
Common Performance Issues
Issue: Memory Leaks
Memory leaks often occur due to event listeners not being removed, timers not
being cleared, or large objects not being garbage collected.
Copy
Ask AI
// ❌ Bad: Memory leak
class LeakyComponent {
private timer: NodeJS.Timeout;
private listeners: Array<() => void> = [];
constructor() {
this.timer = setInterval(() => {
// Do something
}, 1000);
this.listeners.push(() => {
// Do something
});
}
}
// ✅ Good: Proper cleanup
class CleanComponent {
private timer: NodeJS.Timeout;
private listeners: Array<() => void> = [];
constructor() {
this.timer = setInterval(() => {
// Do something
}, 1000);
this.listeners.push(() => {
// Do something
});
}
cleanup() {
clearInterval(this.timer);
this.listeners = [];
}
}
Issue: Blocking Operations
Copy
Ask AI
// ❌ Bad: Blocking operation
const processData = (data: any[]) => {
return data.map((item) => {
// Synchronous operation that blocks
return heavyComputation(item);
});
};
// ✅ Good: Non-blocking operation
const processDataAsync = async (data: any[]) => {
return Promise.all(
data.map(async (item) => {
// Asynchronous operation that doesn't block
return heavyComputationAsync(item);
})
);
};
Next Steps
Now that you understand performance optimization, you can:- Implement Monitoring: Set up comprehensive performance monitoring
- Optimize Critical Paths: Focus on the most performance-critical parts of your application
- Add Caching: Implement caching strategies to reduce redundant operations
- Scale Efficiently: Design your system to handle increased load