Security Best Practices
Verify Webhook Signatures
Obtain your public key
In Monitor, click “Webhook Secret” to copy the public key you’ll need to verify incoming webhook signatures.
Implement Verification
Verify every webhook signature before processing:
# Python example
import base64
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
from Crypto.Signature import pkcs1_15
def verify_webhook_signature ( request_body , created_at , signature , public_key ):
try :
# Concatenate body + timestamp
message = request_body + created_at
# Create hash
hash_obj = SHA256 .new(message.encode())
# Verify signature
key = RSA .import_key(public_key)
pkcs1_15.new(key).verify(hash_obj, base64.b64decode(signature))
return True
except Exception :
return False
Handle Verification Failures
Reject invalid signatures immediately:
def process_webhook ( request ):
if not verify_webhook_signature(
request.body,
request.json[ 'created_at' ],
request.headers[ 'Signature' ],
public_key
):
return { "error" : "Invalid signature" }, 401
# Process valid webhook
return process_event(request.json)
Network Security
Restrict Access by IP Address:
Configure your firewall to only accept webhooks from Orum’s IP addresses:
Sandbox: 34.232.246.7
Production: 34.231.148.64, 3.234.102.51, 54.221.230.177
Use HTTPS Only:
Always use HTTPS URLs for webhook endpoints
Ensure valid SSL certificates
Regularly renew SSL certificates before expiration
Avoid Authentication Requirements:
Don’t require API keys or authentication tokens
Webhook endpoints should be publicly accessible
Use signature verification instead of authentication
Reliability Best Practices
Implement Idempotent Processing
Handle Duplicate Events Gracefully:
Orum may retry webhook deliveries, so implement idempotent processing:
def process_webhook_event ( event_data ):
event_id = event_data[ 'event_id' ]
# Check if event was already processed
if is_event_processed(event_id):
return { "status" : "already_processed" }, 200
# Process the event
result = handle_event(event_data)
# Mark event as processed
mark_event_processed(event_id)
return { "status" : "success" }, 200
Use Event IDs for Deduplication:
Store processed event IDs in your database
Check for duplicates before processing
Include event ID in your processing logs
Respond Quickly
Return HTTP Response Within 30 Seconds:
import asyncio
from concurrent.futures import ThreadPoolExecutor
def webhook_handler ( request ):
# Respond immediately
event_data = request.json
# Queue for async processing
executor.submit(process_webhook_async, event_data)
return { "status" : "received" }, 200
def process_webhook_async ( event_data ):
# Time-consuming processing happens here
# Database updates, external API calls, etc.
pass
Implement Asynchronous Processing:
Use message queues (Redis, RabbitMQ, SQS) for complex processing
Respond to webhook immediately, process later
Implement retry logic for failed background processing
Handle Webhook Failures Gracefully
Implement Retry Logic for Your Processing:
import time
from functools import wraps
def retry_on_failure ( max_retries = 3 , delay = 1 ):
def decorator ( func ):
@wraps (func)
def wrapper ( * args , ** kwargs ):
for attempt in range (max_retries):
try :
return func( * args, ** kwargs)
except Exception as e:
if attempt == max_retries - 1 :
raise e
time.sleep(delay * ( 2 ** attempt)) # Exponential backoff
return None
return wrapper
return decorator
@retry_on_failure ( max_retries = 3 )
def update_database ( event_data ):
# Database operation that might fail
pass
Log All Webhook Processing:
Log successful and failed webhook processing
Include event IDs, timestamps, and error details
Monitor logs for patterns and issues
Optimize Webhook Processing
Minimize Processing Time:
Validate webhook signature first
Perform minimal processing in the webhook handler
Use background jobs for complex operations
Cache frequently accessed data
Example Optimized Handler:
def webhook_handler ( request ):
start_time = time.time()
# Quick validation
if not verify_signature(request):
return { "error" : "Invalid signature" }, 401
event_data = request.json
# Quick database lookup
if is_duplicate_event(event_data[ 'event_id' ]):
return { "status" : "duplicate" }, 200
# Queue for background processing
queue_event_processing(event_data)
# Log processing time
processing_time = time.time() - start_time
logger.info( f "Webhook processed in { processing_time :.3f} s" )
return { "status" : "queued" }, 200
Scale for High Volume
Design for Concurrent Processing:
Use connection pooling for database connections
Implement proper error handling for database timeouts
Consider using read replicas for webhook processing queries
Monitor Performance Metrics:
Track webhook processing times
Monitor queue depths and processing delays
Set up alerts for performance degradation
Event Handling Best Practices
Subscribe to Relevant Events Only
Choose Specific Event Types:
# Good: Subscribe to specific events you need
subscribed_events = [
"transfer_updated" ,
"person_verified" ,
"business_rejected"
]
# Avoid: Subscribing to all events if you only need a few
Filter Events at Processing Time:
def process_webhook ( event_data ):
event_type = event_data[ 'event_type' ]
if event_type == "transfer_updated" :
return handle_transfer_update(event_data)
elif event_type == "person_verified" :
return handle_person_verification(event_data)
else :
logger.info( f "Ignoring event type: { event_type } " )
return { "status" : "ignored" }, 200
Handle Event Order Correctly
Don’t Assume Event Order:
Orum doesn’t guarantee webhook delivery order, so design your processing accordingly:
def handle_transfer_update ( event_data ):
transfer = event_data[ 'event_data' ][ 'transfer' ]
transfer_id = transfer[ 'id' ]
new_status = transfer[ 'status' ]
updated_at = transfer[ 'updated_at' ]
# Check if this is the latest update
current_transfer = get_transfer_from_db(transfer_id)
if current_transfer and current_transfer[ 'updated_at' ] > updated_at:
logger.info( f "Ignoring older update for transfer { transfer_id } " )
return { "status" : "outdated" }, 200
# Process the update
update_transfer_status(transfer_id, new_status, updated_at)
return { "status" : "processed" }, 200
Implement Proper Error Handling
Return Appropriate HTTP Status Codes:
def webhook_handler ( request ):
try :
# Webhook processing logic
result = process_webhook(request.json)
return { "status" : "success" }, 200
except ValidationError as e:
# Client error - don't retry
logger.warning( f "Validation error: { e } " )
return { "error" : "Invalid webhook data" }, 400
except DatabaseConnectionError as e:
# Server error - retry
logger.error( f "Database error: { e } " )
return { "error" : "Processing error" }, 500
except Exception as e:
# Unknown error - retry
logger.error( f "Unexpected error: { e } " )
return { "error" : "Internal error" }, 500
Status Code Guidelines:
2xx : Successfully processed (won’t be retried)
4xx/5xx : Failed to process (will be retried)
Monitoring and Observability
Set Up Comprehensive Logging
Log Key Information:
import logging
import json
logger = logging.getLogger( 'webhook_handler' )
def webhook_handler ( request ):
event_data = request.json
logger.info(
"Webhook received" ,
extra = {
"event_id" : event_data.get( 'event_id' ),
"event_type" : event_data.get( 'event_type' ),
"timestamp" : event_data.get( 'created_at' ),
"source_ip" : request.remote_addr
}
)
# Process webhook...
logger.info(
"Webhook processed successfully" ,
extra = {
"event_id" : event_data.get( 'event_id' ),
"processing_time" : processing_time
}
)
Monitor Webhook Health
Track Key Metrics:
Webhook processing success rate
Average processing time
Queue depth (for async processing)
Error rates by type
Set Up Alerts:
High error rates
Processing delays
Signature verification failures
Endpoint downtime
Leverage Built-in Monitoring:
Review delivery logs regularly
Monitor delivery success rates
Use test functionality to validate changes
Check webhook status and configuration
Development and Testing Best Practices
Local Development Setup
Use ngrok for Local Testing:
# Install ngrok
npm install -g ngrok
# Expose local webhook endpoint
ngrok http 3000
# Use the HTTPS URL for webhook configuration
# https://abc123.ngrok.io/webhook
Test with Realistic Data:
Use Monitor test functionality
Test with various event types you subscribe to
Verify signature verification works correctly
Test error scenarios and edge cases
Staging Environment Testing
Deploy to Staging First:
Test webhook configuration in staging environment
Use sandbox webhook IP addresses
Verify all event types are handled correctly
Test failure scenarios and recovery
Performance Testing:
Load test webhook endpoints
Verify handling of concurrent requests
Test with high volume of events
Monitor resource usage under load
Production Deployment Checklist
Security:
Webhook signature verification implemented
HTTPS endpoint with valid SSL certificate
IP address restrictions configured
No authentication requirements on webhook endpoint
Reliability:
Idempotent processing implemented
Response time under 30 seconds
Proper HTTP status codes returned
Error handling and logging in place
Performance:
Asynchronous processing for complex operations
Database connection pooling configured
Monitoring and alerting set up
Load testing completed
Testing:
All subscribed event types tested
Signature verification tested
Error scenarios tested
Recovery procedures documented