Curie Shopping API
The Curie Shopping API is a comprehensive backend system built on AWS Lambda that powers all product data, search functionality, and AI-driven features across the Curie ecosystem. It provides scalable, serverless APIs for product management, 3D model serving, intelligent search, and marketplace integrations.
Overview
The Shopping API serves as the backbone of the Curie platform, handling everything from basic product queries to sophisticated AI-powered semantic search. Built as a collection of AWS Lambda functions with API Gateway routing, it provides high availability, automatic scaling, and cost-effective operation.
Core Services
- Product Data Management: Complete product catalog with 3D model metadata
- AI-Powered Search: Vector-based semantic search using Amazon Bedrock
- Marketplace Integrations: StockX scraping and external pricing APIs
- Real-Time Data Sync: DynamoDB streams to OpenSearch integration
- Internal Tools: Dashboard APIs and reconstruction pipeline management
Architecture
Technology Stack
Serverless Infrastructure
- AWS Lambda: Serverless compute for all API endpoints
- API Gateway: HTTP routing and request/response management
- DynamoDB: Primary product database with global secondary indexes
- OpenSearch: Search engine with vector similarity capabilities
AI & Machine Learning
- Amazon Bedrock: Foundation models for embeddings and text processing
- Titan Embeddings: Vector generation for semantic search
- Bedrock Agents: Orchestrated AI workflows for complex queries
Data Processing & Integration
- DynamoDB Streams: Real-time data synchronization
- AWS Step Functions: Complex workflow orchestration
- SQS & SNS: Asynchronous processing and notifications
External Integrations
- StockX API: Product scraping and marketplace data
- Pricing APIs: Third-party pricing comparison services
- Google Merchant: Product listing management
System Architecture
graph TB
subgraph "Client Applications"
DS[Destination Site]
DW[Destination World]
DB[Internal Dashboard]
end
subgraph "API Gateway"
AG[API Gateway Routes]
end
subgraph "Lambda Functions"
PROD[Product APIs]
SEARCH[Search APIs]
AI[AI/LLM APIs]
DASH[Dashboard APIs]
DATA[Data Jobs]
end
subgraph "AI Services"
BR[Bedrock Agent Runtime]
EMB[Titan Embeddings]
end
subgraph "Data Layer"
DDB[(DynamoDB)]
OS[(OpenSearch)]
S3[(S3 Storage)]
end
subgraph "External Services"
SX[StockX]
PA[Pricing APIs]
GM[Google Merchant]
end
DS --> AG
DW --> AG
DB --> AG
AG --> PROD
AG --> SEARCH
AG --> AI
AG --> DASH
PROD --> DDB
SEARCH --> OS
AI --> BR
AI --> EMB
DASH --> DDB
DATA --> SX
DATA --> PA
DATA --> GM
DDB --> S3
OS --> S3
Project Structure
curie-shopping-api/
├── ShoppingApi/
│ ├── DestinationSite/ # Consumer-facing APIs
│ │ ├── GetAllProducts/ # Product catalog
│ │ ├── GetSingleProduct/ # Individual product details
│ │ └── GetRelatedProducts/ # Product recommendations
│ ├── Dashboard/ # Internal dashboard APIs
│ │ ├── Get3DProducts/ # 3D model management
│ │ ├── GetReconstructionRuns/ # Pipeline monitoring
│ │ └── GetInternalProductDetails/ # Admin product data
│ └── LLM/ # AI-powered features
│ ├── ProcessUserPrompt/ # Natural language queries
│ └── GetProductsFromVectorIndex/ # Semantic search
├── DataJobs/ # Batch processing
│ ├── GatherRefImages/ # Reference image collection
│ └── ScrapeStockX/ # Marketplace data ingestion
├── DynamoStreamJobs/ # Real-time data sync
│ ├── StreamDynamoToOpenSearch/
│ └── StreamDynamoToOpenSearchVector/
└── utils/ # Shared utilities
├── enrichproduct.py # Product data enhancement
└── openaiapi.py # AI service integration
API Endpoints
Product Management APIs
Get All Products
GET /products
Purpose: Retrieves paginated product catalog for the destination site Consumer: Curie Destination Site Features:
- Pagination with configurable limits
- Parallel data enrichment
- CloudFront URL generation
- Production model filtering
// Response structure
interface ProductsResponse {
products: Product[];
lastEvaluatedKey?: string;
total: number;
}
interface Product {
ID: string;
productName: string;
brand: string;
displayPrice: string;
"3DReconstructionUrl": string;
hasProductionGradeModel: boolean;
affiliateLinks: AffiliateLink[];
}
Get Single Product
GET /products/{id}
Purpose: Detailed product information for individual product pages Consumer: Curie Destination Site (planned) Features: Complete product metadata with pricing and availability
Get Related Products
GET /products/{id}/related?num_related=6
Purpose: Finds semantically similar products using OpenSearch MLT Consumer: Curie Destination Site Algorithm: More Like This (MLT) query with weighted fields
// OpenSearch query structure
const mltQuery = {
more_like_this: {
fields: [
"productName^3", // Highest weight
"description^2",
"stockXData.brand^2",
"stockXData.model",
"stockXData.categories.default.value"
],
like: [{ _id: productId }],
min_term_freq: 1,
max_query_terms: 12
}
};
AI-Powered Search APIs
Process User Prompt
POST /llm/process-user-prompt
Content-Type: application/json
{
"prompt": "I need comfortable running shoes under $150 for trail running"
}
Purpose: Natural language product search using Bedrock Agent flows Features:
- Conversational query processing
- Multi-criteria product matching
- Parallel product enrichment
- Contextual understanding
# Bedrock Agent integration
response = bedrock_agent_runtime.invoke_flow(
flowIdentifier=BEDROCK_FLOW_ID,
flowAliasIdentifier=BEDROCK_FLOW_ALIAS_ID,
inputs=[{
"content": {"document": prompt},
"nodeName": "FlowInputNode",
"nodeOutputName": "document"
}]
)
Vector Similarity Search
POST /llm/vector-search
Content-Type: application/json
{
"query": "lightweight basketball shoes"
}
Purpose: Semantic similarity search using vector embeddings Process:
- Generate query embeddings with Titan model
- Perform KNN search in OpenSearch vector index
- Return most similar products with similarity scores
# Vector search implementation
def vector_search(query_text: str, k: int = 10):
# Generate embedding
embedding_response = bedrock.invoke_model(
modelId="amazon.titan-embed-text-v2:0",
body=json.dumps({"inputText": query_text})
)
query_vector = embedding_response['embedding']
# KNN search
search_query = {
"query": {
"knn": {
"product_embedding": {
"vector": query_vector,
"k": k
}
}
}
}
return opensearch.search(index="products_vector_index", body=search_query)
Dashboard APIs
Get 3D Products
GET /3d-products?limit=100
Purpose: Product catalog for internal dashboard with 3D model metadata Consumer: Curie Internal Dashboard Features: Production status, reconstruction URLs, reference images
Get Reconstruction Runs
GET /reconstruction-runs?limit=50
Purpose: Pipeline monitoring data for dashboard analytics Features:
- Paginated job history
- CloudWatch log integration
- Performance metrics (SSIM, MSE)
- Commit information and Docker tags
Get Internal Product Details
GET /products/{id}/internal-details
Purpose: Comprehensive product data for dashboard management Features:
- All 3D model versions
- Reference image collections
- Complete reconstruction history
- S3 file listings
Data Processing & Jobs
Real-Time Data Synchronization
DynamoDB to OpenSearch Streaming
# Stream processor for real-time search index updates
def lambda_handler(event, context):
for record in event['Records']:
if record['eventName'] in ['INSERT', 'MODIFY']:
# Convert DynamoDB format to OpenSearch document
document = convert_dynamodb_to_json(record['dynamodb']['NewImage'])
# Index in both structured and dynamic indices
opensearch.index(
index="curie-products-2",
id=document['ID'],
body=document
)
opensearch.index(
index="curie-products-dynamic",
id=document['ID'],
body=document
)
Vector Embedding Generation
# Batch processing for vector embeddings
def generate_product_embeddings():
# Scan products table
products = dynamodb.scan(TableName='curie-products')
for product in products['Items']:
if product.get('hasProductionGradeModel'):
# Aggregate text fields
text_content = combine_product_text(product)
# Generate embedding
embedding = generate_bedrock_embedding(text_content)
# Store in vector index
opensearch.index(
index="products_vector_index",
id=product['ID'],
body={
"product_embedding": embedding,
"product_metadata": product
}
)
External Data Integration
StockX Product Scraping
# StockX marketplace integration
class StockXScraper:
def scrape_search_results(self, search_url: str, max_pages: int):
"""Discover new products from StockX search pages"""
discovered_products = []
for page in range(1, max_pages + 1):
page_products = self.extract_product_data(search_url, page)
for product in page_products:
# Check if product exists in database
existing = self.check_existing_product(product['url_key'])
if not existing:
discovered_products.append(product)
# Trigger individual product processing
self.trigger_step_function(discovered_products)
def scrape_individual_product(self, stockx_url_key: str):
"""Extract detailed product information"""
product_data = self.extract_product_details(stockx_url_key)
return self.create_product_record(product_data)
Reference Image Collection
# AI-powered reference image gathering
def gather_reference_images(product_names: list):
for product_name in product_names:
# Generate search queries using Perplexity API
search_queries = generate_search_queries(product_name)
for query in search_queries:
# Search for high-quality product images
image_urls = perplexity_image_search(query)
# Download and validate images
for url in image_urls:
image_data = download_image(url)
if validate_image_quality(image_data):
# Store in S3 with metadata
s3_key = f"curie-product-images/{product_name}/ref_images/{hash}.png"
upload_to_s3(image_data, s3_key)
# Record in reference images table
store_image_metadata(product_name, s3_key, url)
Development & Deployment
Lambda Function Structure
Each Lambda function follows a consistent structure:
# Standard Lambda function template
import boto3
import json
import logging
from decimal import Decimal
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Initialize clients at module level for reuse
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('curie-products')
class DecimalEncoder(json.JSONEncoder):
"""Handle DynamoDB Decimal types"""
def default(self, obj):
if isinstance(obj, Decimal):
return str(obj)
return super().default(obj)
def lambda_handler(event, context):
try:
# Extract parameters
params = extract_parameters(event)
# Business logic
result = process_request(params)
# Return formatted response
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps(result, cls=DecimalEncoder)
}
except Exception as e:
logger.error(f"Lambda error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}
CI/CD Pipeline
# GitHub Actions deployment workflow
on:
push:
branches: [main]
jobs:
deploy:
runs-on: ubuntu-latest
strategy:
matrix:
lambda:
- name: GetAllProductsLambda
files: [ShoppingApi/DestinationSite/GetAllProducts/lambda_get_all_products.py]
requirements: ShoppingApi/DestinationSite/GetAllProducts/requirements.txt
- name: ProcessUserPromptLambda
files: [ShoppingApi/LLM/ProcessUserPrompt/lambda_process_user_prompt.py]
requirements: ShoppingApi/LLM/ProcessUserPrompt/requirements.txt
steps:
- name: Deploy Lambda
run: |
# Package function with dependencies
pip install -r ${{ matrix.lambda.requirements }} -t package/
cp ${{ matrix.lambda.files }} package/
cd package && zip -r ../function.zip .
# Deploy to AWS Lambda
aws lambda update-function-code \
--function-name ${{ matrix.lambda.name }} \
--zip-file fileb://function.zip
Local Development
# Set up development environment
python -m venv venv
source venv/bin/activate
# Install dependencies
pip install -r requirements.txt
# Set environment variables
export AWS_PROFILE=curie-dev
export DYNAMODB_TABLE=curie-products
export OPENSEARCH_ENDPOINT=your-opensearch-domain
# Test individual functions
python -m pytest tests/test_product_apis.py
Performance Optimization
Caching Strategies
# Multi-level caching for API responses
from cachetools import TTLCache
import functools
# In-memory caching with TTL
api_cache = TTLCache(maxsize=1000, ttl=300) # 5 minutes
@functools.lru_cache(maxsize=128)
def get_cloudfront_url(s3_key: str) -> str:
"""Cache CloudFront URL generation"""
return f"https://d1q72mmyyvs3z3.cloudfront.net/{s3_key}"
def cached_api_call(cache_key: str, fetch_function: callable):
"""Generic caching decorator"""
if cache_key in api_cache:
return api_cache[cache_key]
result = fetch_function()
api_cache[cache_key] = result
return result
Parallel Processing
# Concurrent product enrichment
from concurrent.futures import ThreadPoolExecutor, as_completed
def enrich_products_parallel(products: list, max_workers: int = 10):
"""Enrich multiple products concurrently"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all enrichment tasks
future_to_product = {
executor.submit(enrich_single_product, product): product
for product in products
}
enriched_products = []
for future in as_completed(future_to_product):
try:
enriched_product = future.result(timeout=30)
enriched_products.append(enriched_product)
except Exception as e:
logger.error(f"Product enrichment failed: {e}")
# Include original product as fallback
enriched_products.append(future_to_product[future])
return enriched_products
Database Optimization
# Efficient DynamoDB queries with pagination
def paginated_query(table, index_name: str, key_condition, limit: int = 50):
"""Paginate large DynamoDB queries"""
items = []
last_key = None
while len(items) < limit:
query_params = {
'IndexName': index_name,
'KeyConditionExpression': key_condition,
'Limit': min(limit - len(items), 100) # DynamoDB limit
}
if last_key:
query_params['ExclusiveStartKey'] = last_key
response = table.query(**query_params)
items.extend(response['Items'])
last_key = response.get('LastEvaluatedKey')
if not last_key:
break
return items[:limit], last_key
Monitoring & Observability
CloudWatch Integration
# Comprehensive logging and metrics
import time
from datetime import datetime
class APILogger:
def __init__(self, function_name: str):
self.function_name = function_name
self.logger = logging.getLogger(function_name)
def log_api_call(self, endpoint: str, params: dict, execution_time: float):
"""Structured logging for API calls"""
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'function': self.function_name,
'endpoint': endpoint,
'execution_time_ms': round(execution_time * 1000, 2),
'parameters': params
}
self.logger.info(json.dumps(log_data))
def log_error(self, error: Exception, context: dict = None):
"""Error logging with context"""
error_data = {
'timestamp': datetime.utcnow().isoformat(),
'function': self.function_name,
'error': str(error),
'error_type': type(error).__name__,
'context': context or {}
}
self.logger.error(json.dumps(error_data))
# Usage in Lambda functions
api_logger = APILogger('GetAllProducts')
def lambda_handler(event, context):
start_time = time.time()
try:
result = process_request(event)
execution_time = time.time() - start_time
api_logger.log_api_call('GET /products', event, execution_time)
return result
except Exception as e:
api_logger.log_error(e, {'event': event})
raise
Performance Metrics
# Custom CloudWatch metrics
import boto3
cloudwatch = boto3.client('cloudwatch')
def publish_custom_metrics(metric_name: str, value: float, unit: str = 'Count'):
"""Publish custom metrics to CloudWatch"""
cloudwatch.put_metric_data(
Namespace='Curie/ShoppingAPI',
MetricData=[{
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Dimensions': [
{'Name': 'Environment', 'Value': 'Production'},
{'Name': 'Service', 'Value': 'ShoppingAPI'}
]
}]
)
# Track API performance
def track_api_performance(func):
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
publish_custom_metrics(f'{func.__name__}_ExecutionTime', execution_time, 'Seconds')
publish_custom_metrics(f'{func.__name__}_Success', 1)
return result
except Exception as e:
publish_custom_metrics(f'{func.__name__}_Error', 1)
raise
return wrapper
Security & Best Practices
API Security
# Input validation and sanitization
from typing import Dict, Any
import re
class RequestValidator:
@staticmethod
def validate_product_id(product_id: str) -> bool:
"""Validate UUID format for product IDs"""
uuid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
return bool(re.match(uuid_pattern, product_id, re.IGNORECASE))
@staticmethod
def sanitize_search_query(query: str) -> str:
"""Sanitize user search input"""
# Remove potentially harmful characters
sanitized = re.sub(r'[<>\"\'%;()&+]', '', query)
return sanitized.strip()[:500] # Limit length
@staticmethod
def validate_pagination_params(params: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and normalize pagination parameters"""
limit = min(int(params.get('limit', 50)), 100) # Max 100 items
offset = max(int(params.get('offset', 0)), 0) # Non-negative
return {'limit': limit, 'offset': offset}
Rate Limiting
# API Gateway rate limiting configuration
rate_limiting_config = {
"throttle": {
"burstLimit": 1000, # Max concurrent requests
"rateLimit": 500 # Requests per second
},
"quota": {
"limit": 10000, # Daily request limit
"period": "DAY"
}
}
API Reference
Product APIs
| Endpoint | Method | Purpose | Consumer |
|---|---|---|---|
/products | GET | Product catalog | Destination Site |
/products/{id} | GET | Product details | Destination Site |
/products/{id}/related | GET | Similar products | Destination Site |
/3d-products | GET | 3D model catalog | Internal Dashboard |
/3d-products | POST | Update model status | Internal Dashboard |
Search APIs
| Endpoint | Method | Purpose | Consumer |
|---|---|---|---|
/llm/process-user-prompt | POST | Natural language search | All clients |
/llm/vector-search | POST | Semantic similarity | AI features |
Dashboard APIs
| Endpoint | Method | Purpose | Consumer |
|---|---|---|---|
/reconstruction-runs | GET | Pipeline monitoring | Internal Dashboard |
/products/{id}/internal-details | GET | Admin product data | Internal Dashboard |
Data Models
// Core API response types
interface APIResponse<T> {
statusCode: number;
data: T;
pagination?: PaginationInfo;
error?: string;
}
interface PaginationInfo {
limit: number;
offset: number;
total: number;
hasMore: boolean;
lastEvaluatedKey?: string;
}
interface Product {
ID: string;
productName: string;
brand: string;
model: string;
displayPrice: string;
imageUrl: string;
"3DReconstructionUrl": string;
hasProductionGradeModel: boolean;
affiliateLinks: AffiliateLink[];
stockXData?: StockXData;
}
interface AffiliateLink {
shop_name: string;
price: string;
url: string;
priority: boolean;
}
Contributing
Development Workflow
- Create feature branch from
main - Implement Lambda function following established patterns
- Add comprehensive tests including unit and integration tests
- Update API documentation with new endpoints
- Test deployment in staging environment
- Submit pull request with detailed description
Adding New Lambda Functions
# Template for new Lambda function
"""
New Lambda Function: {FunctionName}
Purpose: {Description of what this function does}
Consumer: {Who uses this API}
"""
import boto3
import json
import logging
from typing import Dict, Any
# Standard setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""
Main Lambda handler function
Args:
event: API Gateway event object
context: Lambda runtime context
Returns:
API Gateway response object
"""
try:
# Extract and validate parameters
params = extract_parameters(event)
validate_parameters(params)
# Business logic
result = process_request(params)
# Return success response
return success_response(result)
except ValidationError as e:
return error_response(400, str(e))
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return error_response(500, "Internal server error")
def extract_parameters(event: Dict[str, Any]) -> Dict[str, Any]:
"""Extract parameters from API Gateway event"""
# Implementation here
pass
def validate_parameters(params: Dict[str, Any]) -> None:
"""Validate request parameters"""
# Implementation here
pass
def process_request(params: Dict[str, Any]) -> Dict[str, Any]:
"""Main business logic"""
# Implementation here
pass
def success_response(data: Any) -> Dict[str, Any]:
"""Format success response"""
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps(data, cls=DecimalEncoder)
}
def error_response(status_code: int, message: str) -> Dict[str, Any]:
"""Format error response"""
return {
'statusCode': status_code,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({'error': message})
}
Code Standards
- Python PEP 8 styling with black formatter
- Type hints for all function parameters and return values
- Comprehensive docstrings following Google style
- Error handling with appropriate HTTP status codes
- Logging for debugging and monitoring
- Input validation for all user-provided data
Ready to integrate with the Shopping API? This documentation covers all the backend services and APIs that power the Curie platform.