Security Best Practices

Verify Webhook Signatures

1

Obtain your public key

In Monitor, click “Webhook Secret” to copy the public key you’ll need to verify incoming webhook signatures.

2

Implement Verification

Verify every webhook signature before processing:

# Python example
import base64
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
from Crypto.Signature import pkcs1_15

def verify_webhook_signature(request_body, created_at, signature, public_key):
    try:
        # Concatenate body + timestamp
        message = request_body + created_at
        
        # Create hash
        hash_obj = SHA256.new(message.encode())
        
        # Verify signature
        key = RSA.import_key(public_key)
        pkcs1_15.new(key).verify(hash_obj, base64.b64decode(signature))
        return True
    except Exception:
        return False
3

Handle Verification Failures

Reject invalid signatures immediately:

def process_webhook(request):
    if not verify_webhook_signature(
        request.body, 
        request.json['created_at'], 
        request.headers['Signature'], 
        public_key
    ):
        return {"error": "Invalid signature"}, 401
    
    # Process valid webhook
    return process_event(request.json)

Network Security

Restrict Access by IP Address:

Configure your firewall to only accept webhooks from Orum’s IP addresses:

Sandbox:    34.232.246.7
Production: 34.231.148.64, 3.234.102.51, 54.221.230.177

Use HTTPS Only:

  • Always use HTTPS URLs for webhook endpoints
  • Ensure valid SSL certificates
  • Regularly renew SSL certificates before expiration

Avoid Authentication Requirements:

  • Don’t require API keys or authentication tokens
  • Webhook endpoints should be publicly accessible
  • Use signature verification instead of authentication

Reliability Best Practices

Implement Idempotent Processing

Handle Duplicate Events Gracefully:

Orum may retry webhook deliveries, so implement idempotent processing:

def process_webhook_event(event_data):
    event_id = event_data['event_id']
    
    # Check if event was already processed
    if is_event_processed(event_id):
        return {"status": "already_processed"}, 200
    
    # Process the event
    result = handle_event(event_data)
    
    # Mark event as processed
    mark_event_processed(event_id)
    
    return {"status": "success"}, 200

Use Event IDs for Deduplication:

  • Store processed event IDs in your database
  • Check for duplicates before processing
  • Include event ID in your processing logs

Respond Quickly

Return HTTP Response Within 30 Seconds:

import asyncio
from concurrent.futures import ThreadPoolExecutor

def webhook_handler(request):
    # Respond immediately
    event_data = request.json
    
    # Queue for async processing
    executor.submit(process_webhook_async, event_data)
    
    return {"status": "received"}, 200

def process_webhook_async(event_data):
    # Time-consuming processing happens here
    # Database updates, external API calls, etc.
    pass

Implement Asynchronous Processing:

  • Use message queues (Redis, RabbitMQ, SQS) for complex processing
  • Respond to webhook immediately, process later
  • Implement retry logic for failed background processing

Handle Webhook Failures Gracefully

Implement Retry Logic for Your Processing:

import time
from functools import wraps

def retry_on_failure(max_retries=3, delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise e
                    time.sleep(delay * (2 ** attempt))  # Exponential backoff
            return None
        return wrapper
    return decorator

@retry_on_failure(max_retries=3)
def update_database(event_data):
    # Database operation that might fail
    pass

Log All Webhook Processing:

  • Log successful and failed webhook processing
  • Include event IDs, timestamps, and error details
  • Monitor logs for patterns and issues

Performance Best Practices

Optimize Webhook Processing

Minimize Processing Time:

  • Validate webhook signature first
  • Perform minimal processing in the webhook handler
  • Use background jobs for complex operations
  • Cache frequently accessed data

Example Optimized Handler:

def webhook_handler(request):
    start_time = time.time()
    
    # Quick validation
    if not verify_signature(request):
        return {"error": "Invalid signature"}, 401
    
    event_data = request.json
    
    # Quick database lookup
    if is_duplicate_event(event_data['event_id']):
        return {"status": "duplicate"}, 200
    
    # Queue for background processing
    queue_event_processing(event_data)
    
    # Log processing time
    processing_time = time.time() - start_time
    logger.info(f"Webhook processed in {processing_time:.3f}s")
    
    return {"status": "queued"}, 200

Scale for High Volume

Design for Concurrent Processing:

  • Use connection pooling for database connections
  • Implement proper error handling for database timeouts
  • Consider using read replicas for webhook processing queries

Monitor Performance Metrics:

  • Track webhook processing times
  • Monitor queue depths and processing delays
  • Set up alerts for performance degradation

Event Handling Best Practices

Subscribe to Relevant Events Only

Choose Specific Event Types:

# Good: Subscribe to specific events you need
subscribed_events = [
    "transfer_updated",
    "person_verified",
    "business_rejected"
]

# Avoid: Subscribing to all events if you only need a few

Filter Events at Processing Time:

def process_webhook(event_data):
    event_type = event_data['event_type']
    
    if event_type == "transfer_updated":
        return handle_transfer_update(event_data)
    elif event_type == "person_verified":
        return handle_person_verification(event_data)
    else:
        logger.info(f"Ignoring event type: {event_type}")
        return {"status": "ignored"}, 200

Handle Event Order Correctly

Don’t Assume Event Order:

Orum doesn’t guarantee webhook delivery order, so design your processing accordingly:

def handle_transfer_update(event_data):
    transfer = event_data['event_data']['transfer']
    transfer_id = transfer['id']
    new_status = transfer['status']
    updated_at = transfer['updated_at']
    
    # Check if this is the latest update
    current_transfer = get_transfer_from_db(transfer_id)
    
    if current_transfer and current_transfer['updated_at'] > updated_at:
        logger.info(f"Ignoring older update for transfer {transfer_id}")
        return {"status": "outdated"}, 200
    
    # Process the update
    update_transfer_status(transfer_id, new_status, updated_at)
    return {"status": "processed"}, 200

Implement Proper Error Handling

Return Appropriate HTTP Status Codes:

def webhook_handler(request):
    try:
        # Webhook processing logic
        result = process_webhook(request.json)
        return {"status": "success"}, 200
        
    except ValidationError as e:
        # Client error - don't retry
        logger.warning(f"Validation error: {e}")
        return {"error": "Invalid webhook data"}, 400
        
    except DatabaseConnectionError as e:
        # Server error - retry
        logger.error(f"Database error: {e}")
        return {"error": "Processing error"}, 500
        
    except Exception as e:
        # Unknown error - retry
        logger.error(f"Unexpected error: {e}")
        return {"error": "Internal error"}, 500

Status Code Guidelines:

  • 2xx: Successfully processed (won’t be retried)
  • 4xx/5xx: Failed to process (will be retried)

Monitoring and Observability

Set Up Comprehensive Logging

Log Key Information:

import logging
import json

logger = logging.getLogger('webhook_handler')

def webhook_handler(request):
    event_data = request.json
    
    logger.info(
        "Webhook received",
        extra={
            "event_id": event_data.get('event_id'),
            "event_type": event_data.get('event_type'),
            "timestamp": event_data.get('created_at'),
            "source_ip": request.remote_addr
        }
    )
    
    # Process webhook...
    
    logger.info(
        "Webhook processed successfully",
        extra={
            "event_id": event_data.get('event_id'),
            "processing_time": processing_time
        }
    )

Monitor Webhook Health

Track Key Metrics:

  • Webhook processing success rate
  • Average processing time
  • Queue depth (for async processing)
  • Error rates by type

Set Up Alerts:

  • High error rates
  • Processing delays
  • Signature verification failures
  • Endpoint downtime

Use Monitor Tools

Leverage Built-in Monitoring:

  • Review delivery logs regularly
  • Monitor delivery success rates
  • Use test functionality to validate changes
  • Check webhook status and configuration

Development and Testing Best Practices

Local Development Setup

Use ngrok for Local Testing:

# Install ngrok
npm install -g ngrok

# Expose local webhook endpoint
ngrok http 3000

# Use the HTTPS URL for webhook configuration
# https://abc123.ngrok.io/webhook

Test with Realistic Data:

  • Use Monitor test functionality
  • Test with various event types you subscribe to
  • Verify signature verification works correctly
  • Test error scenarios and edge cases

Staging Environment Testing

Deploy to Staging First:

  • Test webhook configuration in staging environment
  • Use sandbox webhook IP addresses
  • Verify all event types are handled correctly
  • Test failure scenarios and recovery

Performance Testing:

  • Load test webhook endpoints
  • Verify handling of concurrent requests
  • Test with high volume of events
  • Monitor resource usage under load

Production Deployment Checklist