Custom Connectors
Custom connectors allow you to integrate with any API or service. This guide shows you how to build robust, production-ready connectors using@databite/build
and @databite/flow
.
Overview
A connector is a template that defines how to interact with an external service. It includes:- Authentication flows for secure access
- Actions for performing operations
- Syncs for data synchronization
- Configuration schemas for user settings
- Error handling and validation
Basic Connector Structure
Let’s start with a simple connector:Copy
Ask AI
import { ConnectorBuilder } from "@databite/build";
import { FlowBuilder } from "@databite/flow";
// Define authentication flow
const authFlow = new FlowBuilder()
.form("api_key_form", {
title: "Enter API Key",
description: "Please enter your API key to continue",
fields: [
{
name: "api_key",
type: "password",
label: "API Key",
required: true,
},
],
submitText: "Connect",
action: "validate_api_key",
})
.http("validate_api_key", {
method: "GET",
url: "https://api.service.com/user",
headers: {
Authorization: "Bearer {{form.api_key}}",
},
})
.transform("extract_user_info", {
api_key: "{{form.api_key}}",
user_id: "{{response.id}}",
username: "{{response.username}}",
})
.build();
// Build the connector
const connector = new ConnectorBuilder()
.identity({
id: "custom-service",
name: "Custom Service",
description: "A custom connector for Custom Service API",
version: "1.0.0",
logo: "https://example.com/logo.png",
})
.configuration({
api_key: {
type: "string",
label: "API Key",
required: true,
sensitive: true,
},
base_url: {
type: "string",
label: "Base URL",
default: "https://api.service.com",
required: true,
},
})
.flows([authFlow])
.actions([
{
id: "get_user",
name: "Get User",
description: "Retrieve user information",
inputs: {
user_id: {
type: "string",
label: "User ID",
required: true,
},
},
execute: async (inputs, context) => {
const response = await fetch(
`${context.config.base_url}/users/${inputs.user_id}`,
{
headers: {
Authorization: `Bearer ${context.tokens.api_key}`,
},
}
);
return response.json();
},
},
])
.syncs([
{
id: "sync_users",
name: "Sync Users",
description: "Synchronize user data",
source: {
type: "api",
url: "{{config.base_url}}/users",
headers: {
Authorization: "Bearer {{tokens.api_key}}",
},
},
destination: {
type: "database",
table: "users",
},
transform: {
id: "{{item.id}}",
name: "{{item.name}}",
email: "{{item.email}}",
created_at: "{{item.created_at}}",
},
},
])
.build();
Advanced Connector Patterns
OAuth 2.0 Connector
Copy
Ask AI
import { ConnectorBuilder } from "@databite/build";
import { FlowBuilder } from "@databite/flow";
const oauthFlow = new FlowBuilder()
.generic("oauth_start", {
title: "Connect to Service",
description: "Click the button below to authorize access",
buttonText: "Authorize",
action: "oauth_authorize",
})
.http("oauth_authorize", {
method: "GET",
url: "https://api.service.com/oauth/authorize",
query: {
client_id: "{{config.client_id}}",
redirect_uri: "{{config.redirect_uri}}",
response_type: "code",
scope: "{{config.scopes}}",
},
})
.http("oauth_callback", {
method: "POST",
url: "https://api.service.com/oauth/token",
body: {
client_id: "{{config.client_id}}",
client_secret: "{{config.client_secret}}",
code: "{{query.code}}",
grant_type: "authorization_code",
},
})
.transform("extract_tokens", {
access_token: "{{response.access_token}}",
refresh_token: "{{response.refresh_token}}",
expires_in: "{{response.expires_in}}",
})
.build();
const oauthConnector = new ConnectorBuilder()
.identity({
id: "oauth-service",
name: "OAuth Service",
description: "OAuth 2.0 enabled service connector",
})
.configuration({
client_id: {
type: "string",
label: "Client ID",
required: true,
},
client_secret: {
type: "string",
label: "Client Secret",
required: true,
sensitive: true,
},
redirect_uri: {
type: "string",
label: "Redirect URI",
required: true,
},
scopes: {
type: "string",
label: "Scopes",
default: "read write",
required: true,
},
})
.flows([oauthFlow])
.actions([
{
id: "create_post",
name: "Create Post",
description: "Create a new post",
inputs: {
title: {
type: "string",
label: "Title",
required: true,
},
content: {
type: "string",
label: "Content",
required: true,
},
},
execute: async (inputs, context) => {
const response = await fetch(`${context.config.base_url}/posts`, {
method: "POST",
headers: {
Authorization: `Bearer ${context.tokens.access_token}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
title: inputs.title,
content: inputs.content,
}),
});
return response.json();
},
},
])
.build();
Multi-Step Action Connector
Copy
Ask AI
const multiStepConnector = new ConnectorBuilder()
.identity({
id: "multi-step-service",
name: "Multi-Step Service",
description: "Service with complex multi-step operations",
})
.configuration({
api_key: {
type: "string",
label: "API Key",
required: true,
sensitive: true,
},
})
.actions([
{
id: "bulk_import",
name: "Bulk Import",
description: "Import multiple records in batches",
inputs: {
records: {
type: "array",
label: "Records to Import",
required: true,
schema: {
type: "object",
properties: {
name: { type: "string" },
email: { type: "string" },
phone: { type: "string" },
},
},
},
batch_size: {
type: "number",
label: "Batch Size",
default: 100,
minimum: 1,
maximum: 1000,
},
},
execute: async (inputs, context) => {
const { records, batch_size } = inputs;
const results = [];
// Process records in batches
for (let i = 0; i < records.length; i += batch_size) {
const batch = records.slice(i, i + batch_size);
const response = await fetch(
`${context.config.base_url}/bulk-import`,
{
method: "POST",
headers: {
Authorization: `Bearer ${context.tokens.api_key}`,
"Content-Type": "application/json",
},
body: JSON.stringify({ records: batch }),
}
);
const result = await response.json();
results.push(result);
}
return {
total_processed: records.length,
batches: results.length,
results,
};
},
},
])
.build();
Data Synchronization Patterns
Incremental Sync with Timestamps
Copy
Ask AI
const incrementalSyncConnector = new ConnectorBuilder()
.identity({
id: "incremental-sync-service",
name: "Incremental Sync Service",
description: "Service with incremental data synchronization",
})
.configuration({
api_key: {
type: "string",
label: "API Key",
required: true,
sensitive: true,
},
})
.syncs([
{
id: "sync_orders",
name: "Sync Orders",
description: "Synchronize orders with incremental updates",
source: {
type: "api",
url: "{{config.base_url}}/orders",
query: {
updated_since: "{{last_sync_time}}",
limit: 1000,
},
headers: {
Authorization: "Bearer {{tokens.api_key}}",
},
},
destination: {
type: "database",
table: "orders",
mode: "upsert",
key: "id",
},
transform: {
id: "{{item.id}}",
customer_id: "{{item.customer_id}}",
total: "{{item.total}}",
status: "{{item.status}}",
updated_at: "{{item.updated_at}}",
},
schedule: {
type: "interval",
minutes: 15,
},
},
])
.build();
Multi-Table Sync with Dependencies
Copy
Ask AI
const multiTableSyncConnector = new ConnectorBuilder()
.identity({
id: "multi-table-sync-service",
name: "Multi-Table Sync Service",
description: "Service with multi-table synchronization",
})
.configuration({
api_key: {
type: "string",
label: "API Key",
required: true,
sensitive: true,
},
})
.syncs([
{
id: "sync_customers",
name: "Sync Customers",
description: "Synchronize customer data",
source: {
type: "api",
url: "{{config.base_url}}/customers",
headers: {
Authorization: "Bearer {{tokens.api_key}}",
},
},
destination: {
type: "database",
table: "customers",
},
transform: {
id: "{{item.id}}",
name: "{{item.name}}",
email: "{{item.email}}",
phone: "{{item.phone}}",
},
},
{
id: "sync_orders",
name: "Sync Orders",
description: "Synchronize orders after customers",
dependsOn: ["sync_customers"],
source: {
type: "api",
url: "{{config.base_url}}/orders",
headers: {
Authorization: "Bearer {{tokens.api_key}}",
},
},
destination: {
type: "database",
table: "orders",
},
transform: {
id: "{{item.id}}",
customer_id: "{{item.customer_id}}",
total: "{{item.total}}",
status: "{{item.status}}",
},
},
])
.build();
Error Handling and Validation
Robust Error Handling
Copy
Ask AI
const errorHandlingConnector = new ConnectorBuilder()
.identity({
id: "error-handling-service",
name: "Error Handling Service",
description: "Service with comprehensive error handling",
})
.configuration({
api_key: {
type: "string",
label: "API Key",
required: true,
sensitive: true,
},
})
.actions([
{
id: "safe_api_call",
name: "Safe API Call",
description: "Make API call with error handling",
inputs: {
endpoint: {
type: "string",
label: "Endpoint",
required: true,
},
method: {
type: "string",
label: "HTTP Method",
enum: ["GET", "POST", "PUT", "DELETE"],
default: "GET",
},
},
execute: async (inputs, context) => {
try {
const response = await fetch(
`${context.config.base_url}${inputs.endpoint}`,
{
method: inputs.method,
headers: {
Authorization: `Bearer ${context.tokens.api_key}`,
"Content-Type": "application/json",
},
}
);
if (!response.ok) {
throw new Error(`HTTP ${response.status}: ${response.statusText}`);
}
return await response.json();
} catch (error) {
// Log error for debugging
console.error("API call failed:", error);
// Return structured error response
return {
success: false,
error: error.message,
timestamp: new Date().toISOString(),
};
}
},
},
])
.build();
Input Validation
Copy
Ask AI
const validationConnector = new ConnectorBuilder()
.identity({
id: "validation-service",
name: "Validation Service",
description: "Service with input validation",
})
.configuration({
api_key: {
type: "string",
label: "API Key",
required: true,
sensitive: true,
},
})
.actions([
{
id: "create_user",
name: "Create User",
description: "Create a new user with validation",
inputs: {
name: {
type: "string",
label: "Name",
required: true,
minLength: 2,
maxLength: 100,
},
email: {
type: "string",
label: "Email",
required: true,
pattern: "^[^@]+@[^@]+\\.[^@]+$",
},
age: {
type: "number",
label: "Age",
required: true,
minimum: 18,
maximum: 120,
},
},
execute: async (inputs, context) => {
// Additional validation
if (inputs.age < 18) {
throw new Error("User must be at least 18 years old");
}
const response = await fetch(`${context.config.base_url}/users`, {
method: "POST",
headers: {
Authorization: `Bearer ${context.tokens.api_key}`,
"Content-Type": "application/json",
},
body: JSON.stringify(inputs),
});
return response.json();
},
},
])
.build();
Testing Custom Connectors
Unit Testing
Copy
Ask AI
import { ConnectorBuilder } from "@databite/build";
describe("Custom Connector", () => {
let connector: any;
beforeEach(() => {
connector = new ConnectorBuilder()
.identity({
id: "test-service",
name: "Test Service",
})
.configuration({
api_key: {
type: "string",
label: "API Key",
required: true,
},
})
.actions([
{
id: "test_action",
name: "Test Action",
description: "Test action",
inputs: {
input: {
type: "string",
label: "Input",
required: true,
},
},
execute: async (inputs) => {
return { result: inputs.input };
},
},
])
.build();
});
it("should execute action successfully", async () => {
const result = await connector.actions.test_action.execute(
{ input: "test" },
{ config: { api_key: "test_key" }, tokens: {} }
);
expect(result.result).toBe("test");
});
});
Integration Testing
Copy
Ask AI
describe("Integration Tests", () => {
it("should handle real API calls", async () => {
const connector = new ConnectorBuilder()
.identity({
id: "integration-test-service",
name: "Integration Test Service",
})
.configuration({
api_key: {
type: "string",
label: "API Key",
required: true,
},
})
.actions([
{
id: "get_posts",
name: "Get Posts",
description: "Get posts from JSONPlaceholder",
inputs: {},
execute: async (inputs, context) => {
const response = await fetch(
"https://jsonplaceholder.typicode.com/posts"
);
return response.json();
},
},
])
.build();
const result = await connector.actions.get_posts.execute(
{},
{ config: { api_key: "test" }, tokens: {} }
);
expect(Array.isArray(result)).toBe(true);
expect(result.length).toBeGreaterThan(0);
});
});
Best Practices
Security Considerations
Always validate and sanitize user inputs to prevent security vulnerabilities.
Copy
Ask AI
const secureConnector = new ConnectorBuilder()
.identity({
id: "secure-service",
name: "Secure Service",
})
.configuration({
api_key: {
type: "string",
label: "API Key",
required: true,
sensitive: true,
},
})
.actions([
{
id: "secure_query",
name: "Secure Query",
description: "Execute secure database query",
inputs: {
table: {
type: "string",
label: "Table Name",
required: true,
pattern: "^[a-zA-Z_][a-zA-Z0-9_]*$", // Only allow valid table names
},
limit: {
type: "number",
label: "Limit",
default: 100,
minimum: 1,
maximum: 1000,
},
},
execute: async (inputs, context) => {
// Sanitize inputs
const table = inputs.table.replace(/[^a-zA-Z0-9_]/g, "");
const limit = Math.min(Math.max(inputs.limit, 1), 1000);
// Use parameterized queries
const query = `SELECT * FROM ${table} LIMIT ?`;
const result = await context.database.query(query, [limit]);
return result;
},
},
])
.build();
Performance Optimization
Use caching and connection pooling to improve connector performance.
Copy
Ask AI
const optimizedConnector = new ConnectorBuilder()
.identity({
id: "optimized-service",
name: "Optimized Service",
})
.configuration({
api_key: {
type: "string",
label: "API Key",
required: true,
},
})
.actions([
{
id: "cached_query",
name: "Cached Query",
description: "Query with caching",
inputs: {
query: {
type: "string",
label: "Query",
required: true,
},
},
execute: async (inputs, context) => {
// Check cache first
const cacheKey = `query:${inputs.query}`;
const cached = await context.cache.get(cacheKey);
if (cached) {
return cached;
}
// Execute query
const result = await context.database.query(inputs.query);
// Cache result for 5 minutes
await context.cache.set(cacheKey, result, 300);
return result;
},
},
])
.build();
Common Issues and Solutions
Issue: Authentication Failures
Always implement proper error handling for authentication failures and provide
clear error messages.
Copy
Ask AI
const authErrorConnector = new ConnectorBuilder()
.identity({
id: "auth-error-service",
name: "Auth Error Service",
})
.configuration({
api_key: {
type: "string",
label: "API Key",
required: true,
},
})
.actions([
{
id: "authenticated_call",
name: "Authenticated Call",
description: "Make authenticated API call",
inputs: {},
execute: async (inputs, context) => {
try {
const response = await fetch(`${context.config.base_url}/protected`, {
headers: {
Authorization: `Bearer ${context.tokens.api_key}`,
},
});
if (response.status === 401) {
throw new Error("Invalid API key. Please check your credentials.");
}
if (response.status === 403) {
throw new Error("Access denied. Please check your permissions.");
}
return response.json();
} catch (error) {
if (error.message.includes("Invalid API key")) {
throw new Error(
"Authentication failed. Please verify your API key."
);
}
throw error;
}
},
},
])
.build();
Issue: Rate Limiting
Copy
Ask AI
const rateLimitedConnector = new ConnectorBuilder()
.identity({
id: "rate-limited-service",
name: "Rate Limited Service",
})
.configuration({
api_key: {
type: "string",
label: "API Key",
required: true,
},
})
.actions([
{
id: "rate_limited_call",
name: "Rate Limited Call",
description: "Make API call with rate limiting",
inputs: {},
execute: async (inputs, context) => {
// Check rate limit
const rateLimitKey = `rate_limit:${context.tokens.api_key}`;
const currentCount = (await context.cache.get(rateLimitKey)) || 0;
if (currentCount >= 100) {
// 100 requests per minute
throw new Error("Rate limit exceeded. Please try again later.");
}
// Increment counter
await context.cache.set(rateLimitKey, currentCount + 1, 60);
const response = await fetch(`${context.config.base_url}/data`, {
headers: {
Authorization: `Bearer ${context.tokens.api_key}`,
},
});
return response.json();
},
},
])
.build();
Next Steps
Now that you understand custom connectors, you can:- Build Complex Connectors: Create connectors with advanced features
- Implement Error Handling: Add robust error handling and recovery
- Optimize Performance: Improve connector performance and reliability
- Test Thoroughly: Implement comprehensive testing strategies