Skip to main content

Curie Shopping API

The Curie Shopping API is a comprehensive backend system built on AWS Lambda that powers all product data, search functionality, and AI-driven features across the Curie ecosystem. It provides scalable, serverless APIs for product management, 3D model serving, intelligent search, and marketplace integrations.

Overview

The Shopping API serves as the backbone of the Curie platform, handling everything from basic product queries to sophisticated AI-powered semantic search. Built as a collection of AWS Lambda functions with API Gateway routing, it provides high availability, automatic scaling, and cost-effective operation.

Core Services

  • Product Data Management: Complete product catalog with 3D model metadata
  • AI-Powered Search: Vector-based semantic search using Amazon Bedrock
  • Marketplace Integrations: StockX scraping and external pricing APIs
  • Real-Time Data Sync: DynamoDB streams to OpenSearch integration
  • Internal Tools: Dashboard APIs and reconstruction pipeline management

Architecture

Technology Stack

Serverless Infrastructure

  • AWS Lambda: Serverless compute for all API endpoints
  • API Gateway: HTTP routing and request/response management
  • DynamoDB: Primary product database with global secondary indexes
  • OpenSearch: Search engine with vector similarity capabilities

AI & Machine Learning

  • Amazon Bedrock: Foundation models for embeddings and text processing
  • Titan Embeddings: Vector generation for semantic search
  • Bedrock Agents: Orchestrated AI workflows for complex queries

Data Processing & Integration

  • DynamoDB Streams: Real-time data synchronization
  • AWS Step Functions: Complex workflow orchestration
  • SQS & SNS: Asynchronous processing and notifications

External Integrations

  • StockX API: Product scraping and marketplace data
  • Pricing APIs: Third-party pricing comparison services
  • Google Merchant: Product listing management

System Architecture

graph TB
subgraph "Client Applications"
DS[Destination Site]
DW[Destination World]
DB[Internal Dashboard]
end

subgraph "API Gateway"
AG[API Gateway Routes]
end

subgraph "Lambda Functions"
PROD[Product APIs]
SEARCH[Search APIs]
AI[AI/LLM APIs]
DASH[Dashboard APIs]
DATA[Data Jobs]
end

subgraph "AI Services"
BR[Bedrock Agent Runtime]
EMB[Titan Embeddings]
end

subgraph "Data Layer"
DDB[(DynamoDB)]
OS[(OpenSearch)]
S3[(S3 Storage)]
end

subgraph "External Services"
SX[StockX]
PA[Pricing APIs]
GM[Google Merchant]
end

DS --> AG
DW --> AG
DB --> AG
AG --> PROD
AG --> SEARCH
AG --> AI
AG --> DASH

PROD --> DDB
SEARCH --> OS
AI --> BR
AI --> EMB
DASH --> DDB
DATA --> SX
DATA --> PA
DATA --> GM

DDB --> S3
OS --> S3

Project Structure

curie-shopping-api/
├── ShoppingApi/
│ ├── DestinationSite/ # Consumer-facing APIs
│ │ ├── GetAllProducts/ # Product catalog
│ │ ├── GetSingleProduct/ # Individual product details
│ │ └── GetRelatedProducts/ # Product recommendations
│ ├── Dashboard/ # Internal dashboard APIs
│ │ ├── Get3DProducts/ # 3D model management
│ │ ├── GetReconstructionRuns/ # Pipeline monitoring
│ │ └── GetInternalProductDetails/ # Admin product data
│ └── LLM/ # AI-powered features
│ ├── ProcessUserPrompt/ # Natural language queries
│ └── GetProductsFromVectorIndex/ # Semantic search
├── DataJobs/ # Batch processing
│ ├── GatherRefImages/ # Reference image collection
│ └── ScrapeStockX/ # Marketplace data ingestion
├── DynamoStreamJobs/ # Real-time data sync
│ ├── StreamDynamoToOpenSearch/
│ └── StreamDynamoToOpenSearchVector/
└── utils/ # Shared utilities
├── enrichproduct.py # Product data enhancement
└── openaiapi.py # AI service integration

API Endpoints

Product Management APIs

Get All Products

GET /products

Purpose: Retrieves paginated product catalog for the destination site Consumer: Curie Destination Site Features:

  • Pagination with configurable limits
  • Parallel data enrichment
  • CloudFront URL generation
  • Production model filtering
// Response structure
interface ProductsResponse {
products: Product[];
lastEvaluatedKey?: string;
total: number;
}

interface Product {
ID: string;
productName: string;
brand: string;
displayPrice: string;
"3DReconstructionUrl": string;
hasProductionGradeModel: boolean;
affiliateLinks: AffiliateLink[];
}

Get Single Product

GET /products/{id}

Purpose: Detailed product information for individual product pages Consumer: Curie Destination Site (planned) Features: Complete product metadata with pricing and availability

GET /products/{id}/related?num_related=6

Purpose: Finds semantically similar products using OpenSearch MLT Consumer: Curie Destination Site Algorithm: More Like This (MLT) query with weighted fields

// OpenSearch query structure
const mltQuery = {
more_like_this: {
fields: [
"productName^3", // Highest weight
"description^2",
"stockXData.brand^2",
"stockXData.model",
"stockXData.categories.default.value"
],
like: [{ _id: productId }],
min_term_freq: 1,
max_query_terms: 12
}
};

AI-Powered Search APIs

Process User Prompt

POST /llm/process-user-prompt
Content-Type: application/json

{
"prompt": "I need comfortable running shoes under $150 for trail running"
}

Purpose: Natural language product search using Bedrock Agent flows Features:

  • Conversational query processing
  • Multi-criteria product matching
  • Parallel product enrichment
  • Contextual understanding
# Bedrock Agent integration
response = bedrock_agent_runtime.invoke_flow(
flowIdentifier=BEDROCK_FLOW_ID,
flowAliasIdentifier=BEDROCK_FLOW_ALIAS_ID,
inputs=[{
"content": {"document": prompt},
"nodeName": "FlowInputNode",
"nodeOutputName": "document"
}]
)
POST /llm/vector-search
Content-Type: application/json

{
"query": "lightweight basketball shoes"
}

Purpose: Semantic similarity search using vector embeddings Process:

  1. Generate query embeddings with Titan model
  2. Perform KNN search in OpenSearch vector index
  3. Return most similar products with similarity scores
# Vector search implementation
def vector_search(query_text: str, k: int = 10):
# Generate embedding
embedding_response = bedrock.invoke_model(
modelId="amazon.titan-embed-text-v2:0",
body=json.dumps({"inputText": query_text})
)

query_vector = embedding_response['embedding']

# KNN search
search_query = {
"query": {
"knn": {
"product_embedding": {
"vector": query_vector,
"k": k
}
}
}
}

return opensearch.search(index="products_vector_index", body=search_query)

Dashboard APIs

Get 3D Products

GET /3d-products?limit=100

Purpose: Product catalog for internal dashboard with 3D model metadata Consumer: Curie Internal Dashboard Features: Production status, reconstruction URLs, reference images

Get Reconstruction Runs

GET /reconstruction-runs?limit=50

Purpose: Pipeline monitoring data for dashboard analytics Features:

  • Paginated job history
  • CloudWatch log integration
  • Performance metrics (SSIM, MSE)
  • Commit information and Docker tags

Get Internal Product Details

GET /products/{id}/internal-details

Purpose: Comprehensive product data for dashboard management Features:

  • All 3D model versions
  • Reference image collections
  • Complete reconstruction history
  • S3 file listings

Data Processing & Jobs

Real-Time Data Synchronization

DynamoDB to OpenSearch Streaming

# Stream processor for real-time search index updates
def lambda_handler(event, context):
for record in event['Records']:
if record['eventName'] in ['INSERT', 'MODIFY']:
# Convert DynamoDB format to OpenSearch document
document = convert_dynamodb_to_json(record['dynamodb']['NewImage'])

# Index in both structured and dynamic indices
opensearch.index(
index="curie-products-2",
id=document['ID'],
body=document
)
opensearch.index(
index="curie-products-dynamic",
id=document['ID'],
body=document
)

Vector Embedding Generation

# Batch processing for vector embeddings
def generate_product_embeddings():
# Scan products table
products = dynamodb.scan(TableName='curie-products')

for product in products['Items']:
if product.get('hasProductionGradeModel'):
# Aggregate text fields
text_content = combine_product_text(product)

# Generate embedding
embedding = generate_bedrock_embedding(text_content)

# Store in vector index
opensearch.index(
index="products_vector_index",
id=product['ID'],
body={
"product_embedding": embedding,
"product_metadata": product
}
)

External Data Integration

StockX Product Scraping

# StockX marketplace integration
class StockXScraper:
def scrape_search_results(self, search_url: str, max_pages: int):
"""Discover new products from StockX search pages"""
discovered_products = []

for page in range(1, max_pages + 1):
page_products = self.extract_product_data(search_url, page)

for product in page_products:
# Check if product exists in database
existing = self.check_existing_product(product['url_key'])
if not existing:
discovered_products.append(product)

# Trigger individual product processing
self.trigger_step_function(discovered_products)

def scrape_individual_product(self, stockx_url_key: str):
"""Extract detailed product information"""
product_data = self.extract_product_details(stockx_url_key)
return self.create_product_record(product_data)

Reference Image Collection

# AI-powered reference image gathering
def gather_reference_images(product_names: list):
for product_name in product_names:
# Generate search queries using Perplexity API
search_queries = generate_search_queries(product_name)

for query in search_queries:
# Search for high-quality product images
image_urls = perplexity_image_search(query)

# Download and validate images
for url in image_urls:
image_data = download_image(url)
if validate_image_quality(image_data):
# Store in S3 with metadata
s3_key = f"curie-product-images/{product_name}/ref_images/{hash}.png"
upload_to_s3(image_data, s3_key)

# Record in reference images table
store_image_metadata(product_name, s3_key, url)

Development & Deployment

Lambda Function Structure

Each Lambda function follows a consistent structure:

# Standard Lambda function template
import boto3
import json
import logging
from decimal import Decimal

# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Initialize clients at module level for reuse
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('curie-products')

class DecimalEncoder(json.JSONEncoder):
"""Handle DynamoDB Decimal types"""
def default(self, obj):
if isinstance(obj, Decimal):
return str(obj)
return super().default(obj)

def lambda_handler(event, context):
try:
# Extract parameters
params = extract_parameters(event)

# Business logic
result = process_request(params)

# Return formatted response
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps(result, cls=DecimalEncoder)
}
except Exception as e:
logger.error(f"Lambda error: {str(e)}")
return {
'statusCode': 500,
'body': json.dumps({'error': str(e)})
}

CI/CD Pipeline

# GitHub Actions deployment workflow
on:
push:
branches: [main]

jobs:
deploy:
runs-on: ubuntu-latest
strategy:
matrix:
lambda:
- name: GetAllProductsLambda
files: [ShoppingApi/DestinationSite/GetAllProducts/lambda_get_all_products.py]
requirements: ShoppingApi/DestinationSite/GetAllProducts/requirements.txt

- name: ProcessUserPromptLambda
files: [ShoppingApi/LLM/ProcessUserPrompt/lambda_process_user_prompt.py]
requirements: ShoppingApi/LLM/ProcessUserPrompt/requirements.txt

steps:
- name: Deploy Lambda
run: |
# Package function with dependencies
pip install -r ${{ matrix.lambda.requirements }} -t package/
cp ${{ matrix.lambda.files }} package/
cd package && zip -r ../function.zip .

# Deploy to AWS Lambda
aws lambda update-function-code \
--function-name ${{ matrix.lambda.name }} \
--zip-file fileb://function.zip

Local Development

# Set up development environment
python -m venv venv
source venv/bin/activate

# Install dependencies
pip install -r requirements.txt

# Set environment variables
export AWS_PROFILE=curie-dev
export DYNAMODB_TABLE=curie-products
export OPENSEARCH_ENDPOINT=your-opensearch-domain

# Test individual functions
python -m pytest tests/test_product_apis.py

Performance Optimization

Caching Strategies

# Multi-level caching for API responses
from cachetools import TTLCache
import functools

# In-memory caching with TTL
api_cache = TTLCache(maxsize=1000, ttl=300) # 5 minutes

@functools.lru_cache(maxsize=128)
def get_cloudfront_url(s3_key: str) -> str:
"""Cache CloudFront URL generation"""
return f"https://d1q72mmyyvs3z3.cloudfront.net/{s3_key}"

def cached_api_call(cache_key: str, fetch_function: callable):
"""Generic caching decorator"""
if cache_key in api_cache:
return api_cache[cache_key]

result = fetch_function()
api_cache[cache_key] = result
return result

Parallel Processing

# Concurrent product enrichment
from concurrent.futures import ThreadPoolExecutor, as_completed

def enrich_products_parallel(products: list, max_workers: int = 10):
"""Enrich multiple products concurrently"""
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all enrichment tasks
future_to_product = {
executor.submit(enrich_single_product, product): product
for product in products
}

enriched_products = []
for future in as_completed(future_to_product):
try:
enriched_product = future.result(timeout=30)
enriched_products.append(enriched_product)
except Exception as e:
logger.error(f"Product enrichment failed: {e}")
# Include original product as fallback
enriched_products.append(future_to_product[future])

return enriched_products

Database Optimization

# Efficient DynamoDB queries with pagination
def paginated_query(table, index_name: str, key_condition, limit: int = 50):
"""Paginate large DynamoDB queries"""
items = []
last_key = None

while len(items) < limit:
query_params = {
'IndexName': index_name,
'KeyConditionExpression': key_condition,
'Limit': min(limit - len(items), 100) # DynamoDB limit
}

if last_key:
query_params['ExclusiveStartKey'] = last_key

response = table.query(**query_params)
items.extend(response['Items'])

last_key = response.get('LastEvaluatedKey')
if not last_key:
break

return items[:limit], last_key

Monitoring & Observability

CloudWatch Integration

# Comprehensive logging and metrics
import time
from datetime import datetime

class APILogger:
def __init__(self, function_name: str):
self.function_name = function_name
self.logger = logging.getLogger(function_name)

def log_api_call(self, endpoint: str, params: dict, execution_time: float):
"""Structured logging for API calls"""
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'function': self.function_name,
'endpoint': endpoint,
'execution_time_ms': round(execution_time * 1000, 2),
'parameters': params
}
self.logger.info(json.dumps(log_data))

def log_error(self, error: Exception, context: dict = None):
"""Error logging with context"""
error_data = {
'timestamp': datetime.utcnow().isoformat(),
'function': self.function_name,
'error': str(error),
'error_type': type(error).__name__,
'context': context or {}
}
self.logger.error(json.dumps(error_data))

# Usage in Lambda functions
api_logger = APILogger('GetAllProducts')

def lambda_handler(event, context):
start_time = time.time()
try:
result = process_request(event)
execution_time = time.time() - start_time
api_logger.log_api_call('GET /products', event, execution_time)
return result
except Exception as e:
api_logger.log_error(e, {'event': event})
raise

Performance Metrics

# Custom CloudWatch metrics
import boto3

cloudwatch = boto3.client('cloudwatch')

def publish_custom_metrics(metric_name: str, value: float, unit: str = 'Count'):
"""Publish custom metrics to CloudWatch"""
cloudwatch.put_metric_data(
Namespace='Curie/ShoppingAPI',
MetricData=[{
'MetricName': metric_name,
'Value': value,
'Unit': unit,
'Dimensions': [
{'Name': 'Environment', 'Value': 'Production'},
{'Name': 'Service', 'Value': 'ShoppingAPI'}
]
}]
)

# Track API performance
def track_api_performance(func):
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
execution_time = time.time() - start_time
publish_custom_metrics(f'{func.__name__}_ExecutionTime', execution_time, 'Seconds')
publish_custom_metrics(f'{func.__name__}_Success', 1)
return result
except Exception as e:
publish_custom_metrics(f'{func.__name__}_Error', 1)
raise
return wrapper

Security & Best Practices

API Security

# Input validation and sanitization
from typing import Dict, Any
import re

class RequestValidator:
@staticmethod
def validate_product_id(product_id: str) -> bool:
"""Validate UUID format for product IDs"""
uuid_pattern = r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
return bool(re.match(uuid_pattern, product_id, re.IGNORECASE))

@staticmethod
def sanitize_search_query(query: str) -> str:
"""Sanitize user search input"""
# Remove potentially harmful characters
sanitized = re.sub(r'[<>\"\'%;()&+]', '', query)
return sanitized.strip()[:500] # Limit length

@staticmethod
def validate_pagination_params(params: Dict[str, Any]) -> Dict[str, Any]:
"""Validate and normalize pagination parameters"""
limit = min(int(params.get('limit', 50)), 100) # Max 100 items
offset = max(int(params.get('offset', 0)), 0) # Non-negative
return {'limit': limit, 'offset': offset}

Rate Limiting

# API Gateway rate limiting configuration
rate_limiting_config = {
"throttle": {
"burstLimit": 1000, # Max concurrent requests
"rateLimit": 500 # Requests per second
},
"quota": {
"limit": 10000, # Daily request limit
"period": "DAY"
}
}

API Reference

Product APIs

EndpointMethodPurposeConsumer
/productsGETProduct catalogDestination Site
/products/{id}GETProduct detailsDestination Site
/products/{id}/relatedGETSimilar productsDestination Site
/3d-productsGET3D model catalogInternal Dashboard
/3d-productsPOSTUpdate model statusInternal Dashboard

Search APIs

EndpointMethodPurposeConsumer
/llm/process-user-promptPOSTNatural language searchAll clients
/llm/vector-searchPOSTSemantic similarityAI features

Dashboard APIs

EndpointMethodPurposeConsumer
/reconstruction-runsGETPipeline monitoringInternal Dashboard
/products/{id}/internal-detailsGETAdmin product dataInternal Dashboard

Data Models

// Core API response types
interface APIResponse<T> {
statusCode: number;
data: T;
pagination?: PaginationInfo;
error?: string;
}

interface PaginationInfo {
limit: number;
offset: number;
total: number;
hasMore: boolean;
lastEvaluatedKey?: string;
}

interface Product {
ID: string;
productName: string;
brand: string;
model: string;
displayPrice: string;
imageUrl: string;
"3DReconstructionUrl": string;
hasProductionGradeModel: boolean;
affiliateLinks: AffiliateLink[];
stockXData?: StockXData;
}

interface AffiliateLink {
shop_name: string;
price: string;
url: string;
priority: boolean;
}

Contributing

Development Workflow

  1. Create feature branch from main
  2. Implement Lambda function following established patterns
  3. Add comprehensive tests including unit and integration tests
  4. Update API documentation with new endpoints
  5. Test deployment in staging environment
  6. Submit pull request with detailed description

Adding New Lambda Functions

# Template for new Lambda function
"""
New Lambda Function: {FunctionName}
Purpose: {Description of what this function does}
Consumer: {Who uses this API}
"""

import boto3
import json
import logging
from typing import Dict, Any

# Standard setup
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def lambda_handler(event: Dict[str, Any], context) -> Dict[str, Any]:
"""
Main Lambda handler function

Args:
event: API Gateway event object
context: Lambda runtime context

Returns:
API Gateway response object
"""
try:
# Extract and validate parameters
params = extract_parameters(event)
validate_parameters(params)

# Business logic
result = process_request(params)

# Return success response
return success_response(result)

except ValidationError as e:
return error_response(400, str(e))
except Exception as e:
logger.error(f"Unexpected error: {str(e)}")
return error_response(500, "Internal server error")

def extract_parameters(event: Dict[str, Any]) -> Dict[str, Any]:
"""Extract parameters from API Gateway event"""
# Implementation here
pass

def validate_parameters(params: Dict[str, Any]) -> None:
"""Validate request parameters"""
# Implementation here
pass

def process_request(params: Dict[str, Any]) -> Dict[str, Any]:
"""Main business logic"""
# Implementation here
pass

def success_response(data: Any) -> Dict[str, Any]:
"""Format success response"""
return {
'statusCode': 200,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps(data, cls=DecimalEncoder)
}

def error_response(status_code: int, message: str) -> Dict[str, Any]:
"""Format error response"""
return {
'statusCode': status_code,
'headers': {
'Content-Type': 'application/json',
'Access-Control-Allow-Origin': '*'
},
'body': json.dumps({'error': message})
}

Code Standards

  • Python PEP 8 styling with black formatter
  • Type hints for all function parameters and return values
  • Comprehensive docstrings following Google style
  • Error handling with appropriate HTTP status codes
  • Logging for debugging and monitoring
  • Input validation for all user-provided data

Ready to integrate with the Shopping API? This documentation covers all the backend services and APIs that power the Curie platform.