Skip to main content

Custom Connectors

Custom connectors allow you to integrate with any API or service. This guide shows you how to build robust, production-ready connectors using @databite/build and @databite/flow.

Overview

A connector is a template that defines how to interact with an external service. It includes:
  • Authentication flows for secure access
  • Actions for performing operations
  • Syncs for data synchronization
  • Configuration schemas for user settings
  • Error handling and validation

Basic Connector Structure

Let’s start with a simple connector:
import { ConnectorBuilder } from "@databite/build";
import { FlowBuilder } from "@databite/flow";

// Define authentication flow
const authFlow = new FlowBuilder()
  .form("api_key_form", {
    title: "Enter API Key",
    description: "Please enter your API key to continue",
    fields: [
      {
        name: "api_key",
        type: "password",
        label: "API Key",
        required: true,
      },
    ],
    submitText: "Connect",
    action: "validate_api_key",
  })
  .http("validate_api_key", {
    method: "GET",
    url: "https://api.service.com/user",
    headers: {
      Authorization: "Bearer {{form.api_key}}",
    },
  })
  .transform("extract_user_info", {
    api_key: "{{form.api_key}}",
    user_id: "{{response.id}}",
    username: "{{response.username}}",
  })
  .build();

// Build the connector
const connector = new ConnectorBuilder()
  .identity({
    id: "custom-service",
    name: "Custom Service",
    description: "A custom connector for Custom Service API",
    version: "1.0.0",
    logo: "https://example.com/logo.png",
  })
  .configuration({
    api_key: {
      type: "string",
      label: "API Key",
      required: true,
      sensitive: true,
    },
    base_url: {
      type: "string",
      label: "Base URL",
      default: "https://api.service.com",
      required: true,
    },
  })
  .flows([authFlow])
  .actions([
    {
      id: "get_user",
      name: "Get User",
      description: "Retrieve user information",
      inputs: {
        user_id: {
          type: "string",
          label: "User ID",
          required: true,
        },
      },
      execute: async (inputs, context) => {
        const response = await fetch(
          `${context.config.base_url}/users/${inputs.user_id}`,
          {
            headers: {
              Authorization: `Bearer ${context.tokens.api_key}`,
            },
          }
        );
        return response.json();
      },
    },
  ])
  .syncs([
    {
      id: "sync_users",
      name: "Sync Users",
      description: "Synchronize user data",
      source: {
        type: "api",
        url: "{{config.base_url}}/users",
        headers: {
          Authorization: "Bearer {{tokens.api_key}}",
        },
      },
      destination: {
        type: "database",
        table: "users",
      },
      transform: {
        id: "{{item.id}}",
        name: "{{item.name}}",
        email: "{{item.email}}",
        created_at: "{{item.created_at}}",
      },
    },
  ])
  .build();

Advanced Connector Patterns

OAuth 2.0 Connector

import { ConnectorBuilder } from "@databite/build";
import { FlowBuilder } from "@databite/flow";

const oauthFlow = new FlowBuilder()
  .generic("oauth_start", {
    title: "Connect to Service",
    description: "Click the button below to authorize access",
    buttonText: "Authorize",
    action: "oauth_authorize",
  })
  .http("oauth_authorize", {
    method: "GET",
    url: "https://api.service.com/oauth/authorize",
    query: {
      client_id: "{{config.client_id}}",
      redirect_uri: "{{config.redirect_uri}}",
      response_type: "code",
      scope: "{{config.scopes}}",
    },
  })
  .http("oauth_callback", {
    method: "POST",
    url: "https://api.service.com/oauth/token",
    body: {
      client_id: "{{config.client_id}}",
      client_secret: "{{config.client_secret}}",
      code: "{{query.code}}",
      grant_type: "authorization_code",
    },
  })
  .transform("extract_tokens", {
    access_token: "{{response.access_token}}",
    refresh_token: "{{response.refresh_token}}",
    expires_in: "{{response.expires_in}}",
  })
  .build();

const oauthConnector = new ConnectorBuilder()
  .identity({
    id: "oauth-service",
    name: "OAuth Service",
    description: "OAuth 2.0 enabled service connector",
  })
  .configuration({
    client_id: {
      type: "string",
      label: "Client ID",
      required: true,
    },
    client_secret: {
      type: "string",
      label: "Client Secret",
      required: true,
      sensitive: true,
    },
    redirect_uri: {
      type: "string",
      label: "Redirect URI",
      required: true,
    },
    scopes: {
      type: "string",
      label: "Scopes",
      default: "read write",
      required: true,
    },
  })
  .flows([oauthFlow])
  .actions([
    {
      id: "create_post",
      name: "Create Post",
      description: "Create a new post",
      inputs: {
        title: {
          type: "string",
          label: "Title",
          required: true,
        },
        content: {
          type: "string",
          label: "Content",
          required: true,
        },
      },
      execute: async (inputs, context) => {
        const response = await fetch(`${context.config.base_url}/posts`, {
          method: "POST",
          headers: {
            Authorization: `Bearer ${context.tokens.access_token}`,
            "Content-Type": "application/json",
          },
          body: JSON.stringify({
            title: inputs.title,
            content: inputs.content,
          }),
        });
        return response.json();
      },
    },
  ])
  .build();

Multi-Step Action Connector

const multiStepConnector = new ConnectorBuilder()
  .identity({
    id: "multi-step-service",
    name: "Multi-Step Service",
    description: "Service with complex multi-step operations",
  })
  .configuration({
    api_key: {
      type: "string",
      label: "API Key",
      required: true,
      sensitive: true,
    },
  })
  .actions([
    {
      id: "bulk_import",
      name: "Bulk Import",
      description: "Import multiple records in batches",
      inputs: {
        records: {
          type: "array",
          label: "Records to Import",
          required: true,
          schema: {
            type: "object",
            properties: {
              name: { type: "string" },
              email: { type: "string" },
              phone: { type: "string" },
            },
          },
        },
        batch_size: {
          type: "number",
          label: "Batch Size",
          default: 100,
          minimum: 1,
          maximum: 1000,
        },
      },
      execute: async (inputs, context) => {
        const { records, batch_size } = inputs;
        const results = [];

        // Process records in batches
        for (let i = 0; i < records.length; i += batch_size) {
          const batch = records.slice(i, i + batch_size);

          const response = await fetch(
            `${context.config.base_url}/bulk-import`,
            {
              method: "POST",
              headers: {
                Authorization: `Bearer ${context.tokens.api_key}`,
                "Content-Type": "application/json",
              },
              body: JSON.stringify({ records: batch }),
            }
          );

          const result = await response.json();
          results.push(result);
        }

        return {
          total_processed: records.length,
          batches: results.length,
          results,
        };
      },
    },
  ])
  .build();

Data Synchronization Patterns

Incremental Sync with Timestamps

const incrementalSyncConnector = new ConnectorBuilder()
  .identity({
    id: "incremental-sync-service",
    name: "Incremental Sync Service",
    description: "Service with incremental data synchronization",
  })
  .configuration({
    api_key: {
      type: "string",
      label: "API Key",
      required: true,
      sensitive: true,
    },
  })
  .syncs([
    {
      id: "sync_orders",
      name: "Sync Orders",
      description: "Synchronize orders with incremental updates",
      source: {
        type: "api",
        url: "{{config.base_url}}/orders",
        query: {
          updated_since: "{{last_sync_time}}",
          limit: 1000,
        },
        headers: {
          Authorization: "Bearer {{tokens.api_key}}",
        },
      },
      destination: {
        type: "database",
        table: "orders",
        mode: "upsert",
        key: "id",
      },
      transform: {
        id: "{{item.id}}",
        customer_id: "{{item.customer_id}}",
        total: "{{item.total}}",
        status: "{{item.status}}",
        updated_at: "{{item.updated_at}}",
      },
      schedule: {
        type: "interval",
        minutes: 15,
      },
    },
  ])
  .build();

Multi-Table Sync with Dependencies

const multiTableSyncConnector = new ConnectorBuilder()
  .identity({
    id: "multi-table-sync-service",
    name: "Multi-Table Sync Service",
    description: "Service with multi-table synchronization",
  })
  .configuration({
    api_key: {
      type: "string",
      label: "API Key",
      required: true,
      sensitive: true,
    },
  })
  .syncs([
    {
      id: "sync_customers",
      name: "Sync Customers",
      description: "Synchronize customer data",
      source: {
        type: "api",
        url: "{{config.base_url}}/customers",
        headers: {
          Authorization: "Bearer {{tokens.api_key}}",
        },
      },
      destination: {
        type: "database",
        table: "customers",
      },
      transform: {
        id: "{{item.id}}",
        name: "{{item.name}}",
        email: "{{item.email}}",
        phone: "{{item.phone}}",
      },
    },
    {
      id: "sync_orders",
      name: "Sync Orders",
      description: "Synchronize orders after customers",
      dependsOn: ["sync_customers"],
      source: {
        type: "api",
        url: "{{config.base_url}}/orders",
        headers: {
          Authorization: "Bearer {{tokens.api_key}}",
        },
      },
      destination: {
        type: "database",
        table: "orders",
      },
      transform: {
        id: "{{item.id}}",
        customer_id: "{{item.customer_id}}",
        total: "{{item.total}}",
        status: "{{item.status}}",
      },
    },
  ])
  .build();

Error Handling and Validation

Robust Error Handling

const errorHandlingConnector = new ConnectorBuilder()
  .identity({
    id: "error-handling-service",
    name: "Error Handling Service",
    description: "Service with comprehensive error handling",
  })
  .configuration({
    api_key: {
      type: "string",
      label: "API Key",
      required: true,
      sensitive: true,
    },
  })
  .actions([
    {
      id: "safe_api_call",
      name: "Safe API Call",
      description: "Make API call with error handling",
      inputs: {
        endpoint: {
          type: "string",
          label: "Endpoint",
          required: true,
        },
        method: {
          type: "string",
          label: "HTTP Method",
          enum: ["GET", "POST", "PUT", "DELETE"],
          default: "GET",
        },
      },
      execute: async (inputs, context) => {
        try {
          const response = await fetch(
            `${context.config.base_url}${inputs.endpoint}`,
            {
              method: inputs.method,
              headers: {
                Authorization: `Bearer ${context.tokens.api_key}`,
                "Content-Type": "application/json",
              },
            }
          );

          if (!response.ok) {
            throw new Error(`HTTP ${response.status}: ${response.statusText}`);
          }

          return await response.json();
        } catch (error) {
          // Log error for debugging
          console.error("API call failed:", error);

          // Return structured error response
          return {
            success: false,
            error: error.message,
            timestamp: new Date().toISOString(),
          };
        }
      },
    },
  ])
  .build();

Input Validation

const validationConnector = new ConnectorBuilder()
  .identity({
    id: "validation-service",
    name: "Validation Service",
    description: "Service with input validation",
  })
  .configuration({
    api_key: {
      type: "string",
      label: "API Key",
      required: true,
      sensitive: true,
    },
  })
  .actions([
    {
      id: "create_user",
      name: "Create User",
      description: "Create a new user with validation",
      inputs: {
        name: {
          type: "string",
          label: "Name",
          required: true,
          minLength: 2,
          maxLength: 100,
        },
        email: {
          type: "string",
          label: "Email",
          required: true,
          pattern: "^[^@]+@[^@]+\\.[^@]+$",
        },
        age: {
          type: "number",
          label: "Age",
          required: true,
          minimum: 18,
          maximum: 120,
        },
      },
      execute: async (inputs, context) => {
        // Additional validation
        if (inputs.age < 18) {
          throw new Error("User must be at least 18 years old");
        }

        const response = await fetch(`${context.config.base_url}/users`, {
          method: "POST",
          headers: {
            Authorization: `Bearer ${context.tokens.api_key}`,
            "Content-Type": "application/json",
          },
          body: JSON.stringify(inputs),
        });

        return response.json();
      },
    },
  ])
  .build();

Testing Custom Connectors

Unit Testing

import { ConnectorBuilder } from "@databite/build";

describe("Custom Connector", () => {
  let connector: any;

  beforeEach(() => {
    connector = new ConnectorBuilder()
      .identity({
        id: "test-service",
        name: "Test Service",
      })
      .configuration({
        api_key: {
          type: "string",
          label: "API Key",
          required: true,
        },
      })
      .actions([
        {
          id: "test_action",
          name: "Test Action",
          description: "Test action",
          inputs: {
            input: {
              type: "string",
              label: "Input",
              required: true,
            },
          },
          execute: async (inputs) => {
            return { result: inputs.input };
          },
        },
      ])
      .build();
  });

  it("should execute action successfully", async () => {
    const result = await connector.actions.test_action.execute(
      { input: "test" },
      { config: { api_key: "test_key" }, tokens: {} }
    );

    expect(result.result).toBe("test");
  });
});

Integration Testing

describe("Integration Tests", () => {
  it("should handle real API calls", async () => {
    const connector = new ConnectorBuilder()
      .identity({
        id: "integration-test-service",
        name: "Integration Test Service",
      })
      .configuration({
        api_key: {
          type: "string",
          label: "API Key",
          required: true,
        },
      })
      .actions([
        {
          id: "get_posts",
          name: "Get Posts",
          description: "Get posts from JSONPlaceholder",
          inputs: {},
          execute: async (inputs, context) => {
            const response = await fetch(
              "https://jsonplaceholder.typicode.com/posts"
            );
            return response.json();
          },
        },
      ])
      .build();

    const result = await connector.actions.get_posts.execute(
      {},
      { config: { api_key: "test" }, tokens: {} }
    );

    expect(Array.isArray(result)).toBe(true);
    expect(result.length).toBeGreaterThan(0);
  });
});

Best Practices

Security Considerations

Always validate and sanitize user inputs to prevent security vulnerabilities.
const secureConnector = new ConnectorBuilder()
  .identity({
    id: "secure-service",
    name: "Secure Service",
  })
  .configuration({
    api_key: {
      type: "string",
      label: "API Key",
      required: true,
      sensitive: true,
    },
  })
  .actions([
    {
      id: "secure_query",
      name: "Secure Query",
      description: "Execute secure database query",
      inputs: {
        table: {
          type: "string",
          label: "Table Name",
          required: true,
          pattern: "^[a-zA-Z_][a-zA-Z0-9_]*$", // Only allow valid table names
        },
        limit: {
          type: "number",
          label: "Limit",
          default: 100,
          minimum: 1,
          maximum: 1000,
        },
      },
      execute: async (inputs, context) => {
        // Sanitize inputs
        const table = inputs.table.replace(/[^a-zA-Z0-9_]/g, "");
        const limit = Math.min(Math.max(inputs.limit, 1), 1000);

        // Use parameterized queries
        const query = `SELECT * FROM ${table} LIMIT ?`;
        const result = await context.database.query(query, [limit]);

        return result;
      },
    },
  ])
  .build();

Performance Optimization

Use caching and connection pooling to improve connector performance.
const optimizedConnector = new ConnectorBuilder()
  .identity({
    id: "optimized-service",
    name: "Optimized Service",
  })
  .configuration({
    api_key: {
      type: "string",
      label: "API Key",
      required: true,
    },
  })
  .actions([
    {
      id: "cached_query",
      name: "Cached Query",
      description: "Query with caching",
      inputs: {
        query: {
          type: "string",
          label: "Query",
          required: true,
        },
      },
      execute: async (inputs, context) => {
        // Check cache first
        const cacheKey = `query:${inputs.query}`;
        const cached = await context.cache.get(cacheKey);

        if (cached) {
          return cached;
        }

        // Execute query
        const result = await context.database.query(inputs.query);

        // Cache result for 5 minutes
        await context.cache.set(cacheKey, result, 300);

        return result;
      },
    },
  ])
  .build();

Common Issues and Solutions

Issue: Authentication Failures

Always implement proper error handling for authentication failures and provide clear error messages.
const authErrorConnector = new ConnectorBuilder()
  .identity({
    id: "auth-error-service",
    name: "Auth Error Service",
  })
  .configuration({
    api_key: {
      type: "string",
      label: "API Key",
      required: true,
    },
  })
  .actions([
    {
      id: "authenticated_call",
      name: "Authenticated Call",
      description: "Make authenticated API call",
      inputs: {},
      execute: async (inputs, context) => {
        try {
          const response = await fetch(`${context.config.base_url}/protected`, {
            headers: {
              Authorization: `Bearer ${context.tokens.api_key}`,
            },
          });

          if (response.status === 401) {
            throw new Error("Invalid API key. Please check your credentials.");
          }

          if (response.status === 403) {
            throw new Error("Access denied. Please check your permissions.");
          }

          return response.json();
        } catch (error) {
          if (error.message.includes("Invalid API key")) {
            throw new Error(
              "Authentication failed. Please verify your API key."
            );
          }
          throw error;
        }
      },
    },
  ])
  .build();

Issue: Rate Limiting

const rateLimitedConnector = new ConnectorBuilder()
  .identity({
    id: "rate-limited-service",
    name: "Rate Limited Service",
  })
  .configuration({
    api_key: {
      type: "string",
      label: "API Key",
      required: true,
    },
  })
  .actions([
    {
      id: "rate_limited_call",
      name: "Rate Limited Call",
      description: "Make API call with rate limiting",
      inputs: {},
      execute: async (inputs, context) => {
        // Check rate limit
        const rateLimitKey = `rate_limit:${context.tokens.api_key}`;
        const currentCount = (await context.cache.get(rateLimitKey)) || 0;

        if (currentCount >= 100) {
          // 100 requests per minute
          throw new Error("Rate limit exceeded. Please try again later.");
        }

        // Increment counter
        await context.cache.set(rateLimitKey, currentCount + 1, 60);

        const response = await fetch(`${context.config.base_url}/data`, {
          headers: {
            Authorization: `Bearer ${context.tokens.api_key}`,
          },
        });

        return response.json();
      },
    },
  ])
  .build();

Next Steps

Now that you understand custom connectors, you can:
  1. Build Complex Connectors: Create connectors with advanced features
  2. Implement Error Handling: Add robust error handling and recovery
  3. Optimize Performance: Improve connector performance and reliability
  4. Test Thoroughly: Implement comprehensive testing strategies
Continue to the Error Handling Guide to learn how to implement robust error handling in your connectors.
I