Table of Contents
- Python SDK Guide
- Installation
- With async support
- With AWS S3 helpers
- All extras
- Quick Start
- Basic Configuration
- Configure the client
- Alternative: Initialize client directly
- Your First Conversion
- Create a conversion job
- New micro-precision API (recommended)
- Or using backward-compatible cents field
- Core Classes
- MediaConvert.ConversionJob
- Create a job
- Check job status
- Wait for completion with callback
- Access results
- New micro-precision API (recommended)
- Or using backward-compatible cents field
- MediaConvert.Account
- Get account details
- Check usage
- Django Integration
- Settings Configuration
- settings.py
- Configure at startup
- Celery Integration
- tasks.py
- Usage
- Django Model Integration
- models.py
- Flask Integration
- Application Factory Pattern
- app.py
- routes.py
- Error Handling
- Usage
- Batch Processing
- Async version
- Usage examples
- Async usage
- S3 Integration Helpers
- Usage
- Testing
- Test Helpers and Mocking
- pytest example
- Configuration and Best Practices
- Environment Configuration
- config.py
- Initialize configuration
- Logging Configuration
- Configure logging
- Enable mediaconvert debug logging
- Custom logging handler
- Performance and Optimization
- Connection Pooling
- Configure session with connection pooling
- Retry strategy
- Connection pooling
- Use with mediaconvert
- Memory-Efficient Processing
- Next Steps
- Support
Python SDK Guide
The MediaConvert.io Python SDK provides a convenient way to integrate media conversion into your Python applications. This guide covers installation, configuration, and common usage patterns.
Installation
Install via pip:
bash
pip install mediaconvert-io
Or with specific extras:
bash
# With async support
pip install mediaconvert-io[async]
# With AWS S3 helpers
pip install mediaconvert-io[s3]
# All extras
pip install mediaconvert-io[all]
Quick Start
Basic Configuration
python
import mediaconvert
from mediaconvert import ConversionJob, Account, MediaConvertError
# Configure the client
mediaconvert.configure(
api_token=os.environ['MEDIACONVERT_API_TOKEN'],
api_base_url='https://mediaconvert.io/api/v1',
timeout=30,
retries=3
)
# Alternative: Initialize client directly
client = mediaconvert.Client(
api_token=os.environ['MEDIACONVERT_API_TOKEN']
)
Your First Conversion
python
import os
import time
# Create a conversion job
job = ConversionJob.create(
job_type='video',
input_url='https://bucket.s3.amazonaws.com/input.mp4?...',
output_url='https://bucket.s3.amazonaws.com/output.webm?...',
input_format='mp4',
output_format='webm',
input_size_bytes=52428800,
webhook_url='https://your-app.com/webhooks/mediaconvert'
)
print(f"Job created: {job.id}")
print(f"Status: {job.status}")
# New micro-precision API (recommended)
print(f"Estimated cost: €{job.estimated_cost_micros / 1_000_000:.6f}")
# Or using backward-compatible cents field
print(f"Estimated cost: €{job.estimated_cost_cents / 100:.2f}")
Core Classes
MediaConvert.ConversionJob
The main class for managing conversion jobs:
python
# Create a job
job = ConversionJob.create(
job_type='video',
input_url=input_presigned_url,
output_url=output_presigned_url,
input_format='mp4',
output_format='webm',
input_size_bytes=file_size_bytes
)
# Check job status
job.reload()
print(f"Status: {job.status}")
print(f"Progress: {job.progress_percentage}%")
# Wait for completion with callback
def progress_callback(current_job):
print(f"Progress: {current_job.progress_percentage}%")
job.wait_until_complete(callback=progress_callback)
# Access results
print(f"Processing time: {job.processing_time_seconds}s")
print(f"Output size: {job.output_size_bytes} bytes")
# New micro-precision API (recommended)
print(f"Total cost: €{job.cost_micros / 1_000_000:.6f}")
# Or using backward-compatible cents field
print(f"Total cost: €{job.cost_cents / 100:.2f}")
MediaConvert.Account
Manage your account information:
python
# Get account details
account = Account.current()
print(f"Email: {account.email}")
print(f"Tier: {account.tier}")
print(f"Credits: €{account.credits_balance_cents / 100:.2f}")
# Check usage
usage = account.usage_summary()
print(f"Jobs this month: {usage['jobs_count']}")
print(f"Data processed: {usage['total_mb_processed']} MB")
print(f"Total spent: €{usage['total_cost_micros'] / 1_000_000:.6f}")
Django Integration
Settings Configuration
python
# settings.py
MEDIACONVERT = {
'API_TOKEN': os.environ.get('MEDIACONVERT_API_TOKEN'),
'API_BASE_URL': 'https://mediaconvert.io/api/v1',
'WEBHOOK_SECRET': os.environ.get('MEDIACONVERT_WEBHOOK_SECRET'),
'TIMEOUT': 30,
'RETRIES': 3,
}
# Configure at startup
import mediaconvert
mediaconvert.configure(**MEDIACONVERT)
Celery Integration
python
# tasks.py
from celery import shared_task
from mediaconvert import ConversionJob
from mediaconvert.exceptions import (
AuthenticationError,
InsufficientCreditsError,
ValidationError,
ConversionError,
RateLimitError,
APIError
)
import time
@shared_task(bind=True, max_retries=3)
def convert_media_task(self, user_id, input_url, output_url, options=None):
"""Celery task for media conversion"""
options = options or {}
try:
job = ConversionJob.create(
input_url=input_url,
output_url=output_url,
webhook_url=f"https://yourapp.com/webhooks/mediaconvert/{user_id}/",
**options
)
# Store job reference in your database
from myapp.models import MediaConversion
MediaConversion.objects.create(
user_id=user_id,
external_id=job.id,
status=job.status,
input_url=input_url,
output_url=output_url
)
return {'job_id': job.id, 'status': job.status}
except RateLimitError as e:
# Retry after rate limit delay
raise self.retry(countdown=e.retry_after, exc=e)
except (APIError, ConversionError) as e:
# Retry on transient errors
if self.request.retries < self.max_retries:
raise self.retry(countdown=60 * (2 ** self.request.retries), exc=e)
raise e
except InsufficientCreditsError as e:
# Don't retry - notify user to add credits
from myapp.tasks import notify_insufficient_credits
notify_insufficient_credits.delay(user_id, e.required_credits, e.available_credits)
raise e
# Usage
convert_media_task.delay(
user_id=current_user.id,
input_url=input_presigned_url,
output_url=output_presigned_url,
options={'job_type': 'video', 'output_format': 'webm'}
)
Django Model Integration
python
# models.py
from django.db import models
from django.contrib.auth.models import User
from mediaconvert import ConversionJob
import uuid
class MediaConversion(models.Model):
STATUS_CHOICES = [
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='media_conversions')
external_id = models.CharField(max_length=255, unique=True)
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
input_url = models.URLField()
output_url = models.URLField()
progress_percentage = models.IntegerField(default=0)
cost_micros = models.BigIntegerField(null=True, blank=True)
processing_time_seconds = models.IntegerField(null=True, blank=True)
error_message = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def refresh_from_api(self):
"""Fetch latest status from MediaConvert API"""
try:
job = ConversionJob.get(self.external_id)
self.status = job.status
self.progress_percentage = job.progress_percentage or 0
self.cost_micros = job.cost_micros
self.processing_time_seconds = job.processing_time_seconds
self.error_message = job.error_message or ''
self.save()
return job
except Exception as e:
self.error_message = str(e)
self.status = 'failed'
self.save()
raise
@property
def cost_euros(self):
"""Get cost in euros with micro precision"""
return self.cost_micros / 1_000_000 if self.cost_micros else 0
def __str__(self):
return f"{self.user.username} - {self.external_id} ({self.status})"
Flask Integration
Application Factory Pattern
python
# app.py
from flask import Flask, request, jsonify
from mediaconvert import ConversionJob
import mediaconvert
def create_app():
app = Flask(__name__)
# Configure MediaConvert
mediaconvert.configure(
api_token=app.config['MEDIACONVERT_API_TOKEN'],
webhook_secret=app.config['MEDIACONVERT_WEBHOOK_SECRET']
)
return app
# routes.py
from flask import Blueprint, request, jsonify
from mediaconvert import ConversionJob
from mediaconvert.exceptions import MediaConvertError
api = Blueprint('api', __name__)
@api.route('/convert', methods=['POST'])
def start_conversion():
try:
data = request.get_json()
job = ConversionJob.create(
job_type=data['job_type'],
input_url=data['input_url'],
output_url=data['output_url'],
input_format=data['input_format'],
output_format=data['output_format'],
input_size_bytes=data.get('input_size_bytes'),
webhook_url=url_for('api.webhook', _external=True)
)
return jsonify({
'job_id': job.id,
'status': job.status,
'estimated_cost_micros': job.estimated_cost_micros
}), 201
except MediaConvertError as e:
return jsonify({'error': str(e)}), 400
@api.route('/webhook', methods=['POST'])
def webhook():
# Verify signature
signature = request.headers.get('X-MediaConvert-Signature')
if not mediaconvert.verify_webhook_signature(request.data, signature):
return '', 401
data = request.get_json()
# Process webhook
job_id = data['job_id']
status = data['status']
if status == 'completed':
handle_completed_job(job_id, data)
elif status == 'failed':
handle_failed_job(job_id, data['error_message'])
return '', 200
Error Handling
Comprehensive error handling patterns:
python
from mediaconvert.exceptions import (
AuthenticationError,
InsufficientCreditsError,
ValidationError,
ConversionError,
RateLimitError,
APIError,
NetworkError,
TimeoutError
)
def robust_conversion(input_url, output_url, max_retries=3):
"""Robust conversion with comprehensive error handling"""
for attempt in range(max_retries + 1):
try:
job = ConversionJob.create(
job_type='video',
input_url=input_url,
output_url=output_url,
input_format='mp4',
output_format='webm'
)
# Wait for completion with timeout
job.wait_until_complete(timeout=3600) # 1 hour timeout
return {
'success': True,
'job_id': job.id,
'cost_micros': job.cost_micros,
'processing_time': job.processing_time_seconds
}
except AuthenticationError:
return {
'success': False,
'error': 'Authentication failed - check your API token',
'retry': False
}
except InsufficientCreditsError as e:
return {
'success': False,
'error': f'Need €{e.required_credits / 1_000_000:.6f} but only have €{e.available_credits / 1_000_000:.6f}',
'retry': False,
'required_credits_micros': e.required_credits,
'available_credits_micros': e.available_credits
}
except ValidationError as e:
return {
'success': False,
'error': f'Validation failed: {", ".join(e.errors)}',
'retry': False
}
except ConversionError as e:
return {
'success': False,
'error': f'Conversion failed: {e.message}',
'retry': False
}
except RateLimitError as e:
if attempt < max_retries:
print(f"Rate limited. Waiting {e.retry_after} seconds...")
time.sleep(e.retry_after)
continue
else:
return {
'success': False,
'error': f'Rate limited after {max_retries} attempts',
'retry_after': e.retry_after
}
except (NetworkError, TimeoutError) as e:
if attempt < max_retries:
wait_time = 2 ** attempt # Exponential backoff
print(f"Network error. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
else:
return {
'success': False,
'error': f'Network error after {max_retries} attempts: {e.message}',
'retry': True
}
except APIError as e:
if attempt < max_retries and e.status_code >= 500: # Server errors
wait_time = 2 ** attempt
print(f"Server error. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
else:
return {
'success': False,
'error': f'API error: {e.message} (Status: {e.status_code})',
'retry': e.status_code >= 500
}
return {
'success': False,
'error': 'Max retries exceeded',
'retry': True
}
# Usage
result = robust_conversion(input_url, output_url)
if result['success']:
print(f"Conversion completed! Cost: €{result['cost_micros'] / 1_000_000:.6f}")
else:
print(f"Conversion failed: {result['error']}")
if result.get('retry'):
print("You can retry this operation")
Batch Processing
Process multiple files efficiently:
python
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any
class BatchProcessor:
def __init__(self, max_workers=5):
self.max_workers = max_workers
self.jobs = []
def add_job(self, input_url: str, output_url: str, **options) -> ConversionJob:
"""Add a job to the batch"""
job = ConversionJob.create(
input_url=input_url,
output_url=output_url,
**options
)
self.jobs.append(job)
return job
def wait_for_completion(self, timeout=None) -> Dict[str, List]:
"""Wait for all jobs to complete using thread pool"""
completed = []
failed = []
def wait_for_job(job):
try:
job.wait_until_complete(timeout=timeout)
return {'status': 'completed', 'job': job}
except Exception as e:
return {'status': 'failed', 'job': job, 'error': e}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_job = {executor.submit(wait_for_job, job): job for job in self.jobs}
for future in as_completed(future_to_job):
result = future.result()
if result['status'] == 'completed':
completed.append(result['job'])
else:
failed.append({
'job': result['job'],
'error': result['error']
})
return {'completed': completed, 'failed': failed}
# Async version
class AsyncBatchProcessor:
def __init__(self, max_concurrent=5):
self.max_concurrent = max_concurrent
self.jobs = []
async def add_job(self, input_url: str, output_url: str, **options) -> ConversionJob:
"""Add a job to the batch"""
# Use async client if available
job = await ConversionJob.acreate(
input_url=input_url,
output_url=output_url,
**options
)
self.jobs.append(job)
return job
async def wait_for_completion(self, timeout=None) -> Dict[str, List]:
"""Wait for all jobs to complete asynchronously"""
semaphore = asyncio.Semaphore(self.max_concurrent)
async def wait_for_job(job):
async with semaphore:
try:
await job.await_completion(timeout=timeout)
return {'status': 'completed', 'job': job}
except Exception as e:
return {'status': 'failed', 'job': job, 'error': e}
tasks = [wait_for_job(job) for job in self.jobs]
results = await asyncio.gather(*tasks)
completed = [r['job'] for r in results if r['status'] == 'completed']
failed = [{'job': r['job'], 'error': r['error']} for r in results if r['status'] == 'failed']
return {'completed': completed, 'failed': failed}
# Usage examples
def batch_example():
processor = BatchProcessor(max_workers=3)
# Add multiple jobs
jobs = [
processor.add_job('s3://bucket/input1.mp4', 's3://bucket/output1.webm', output_format='webm'),
processor.add_job('s3://bucket/input2.mp4', 's3://bucket/output2.mp4', output_format='mp4'),
processor.add_job('s3://bucket/input3.jpg', 's3://bucket/output3.webp', output_format='webp')
]
# Process all jobs
results = processor.wait_for_completion(timeout=3600)
print(f"{len(results['completed'])} completed, {len(results['failed'])} failed")
# Calculate total cost
total_cost_micros = sum(job.cost_micros or 0 for job in results['completed'])
print(f"Total cost: €{total_cost_micros / 1_000_000:.6f}")
# Async usage
async def async_batch_example():
processor = AsyncBatchProcessor(max_concurrent=5)
# Add jobs asynchronously
await processor.add_job('s3://bucket/input1.mp4', 's3://bucket/output1.webm')
await processor.add_job('s3://bucket/input2.mp4', 's3://bucket/output2.mp4')
# Wait for completion
results = await processor.wait_for_completion()
print(f"Async batch completed: {len(results['completed'])} successful")
S3 Integration Helpers
Work with AWS S3 presigned URLs:
python
import boto3
from botocore.exceptions import ClientError
from urllib.parse import urlparse
class S3Helper:
def __init__(self, bucket_name, region='us-east-1'):
self.bucket_name = bucket_name
self.s3_client = boto3.client('s3', region_name=region)
def generate_presigned_url(self, key: str, method: str = 'get_object', expires_in: int = 3600) -> str:
"""Generate presigned URL for S3 object"""
try:
if method == 'put_object':
url = self.s3_client.generate_presigned_url(
'put_object',
Params={'Bucket': self.bucket_name, 'Key': key},
ExpiresIn=expires_in
)
else: # get_object
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': key},
ExpiresIn=expires_in
)
return url
except ClientError as e:
raise Exception(f"Failed to generate presigned URL: {e}")
def get_object_size(self, key: str) -> int:
"""Get size of S3 object in bytes"""
try:
response = self.s3_client.head_object(Bucket=self.bucket_name, Key=key)
return response['ContentLength']
except ClientError as e:
raise Exception(f"Failed to get object size: {e}")
def upload_file(self, local_path: str, key: str) -> str:
"""Upload file to S3 and return presigned URL"""
try:
self.s3_client.upload_file(local_path, self.bucket_name, key)
return self.generate_presigned_url(key)
except ClientError as e:
raise Exception(f"Failed to upload file: {e}")
def convert_from_s3(bucket_name, input_key, output_key, **options):
"""Complete S3-to-S3 conversion workflow"""
s3_helper = S3Helper(bucket_name)
# Get input file size
input_size_bytes = s3_helper.get_object_size(input_key)
# Generate presigned URLs
input_url = s3_helper.generate_presigned_url(input_key, 'get_object')
output_url = s3_helper.generate_presigned_url(output_key, 'put_object')
# Create conversion job
job = ConversionJob.create(
input_url=input_url,
output_url=output_url,
input_size_bytes=input_size_bytes,
**options
)
return job
# Usage
job = convert_from_s3(
bucket_name='my-media-bucket',
input_key='videos/input.mp4',
output_key='videos/output.webm',
job_type='video',
input_format='mp4',
output_format='webm'
)
print(f"Conversion started: {job.id}")
job.wait_until_complete()
print(f"Conversion completed! Cost: €{job.cost_micros / 1_000_000:.6f}")
Testing
Test Helpers and Mocking
python
import unittest
from unittest.mock import patch, MagicMock
import mediaconvert
from mediaconvert import ConversionJob
class MediaConvertTestHelper:
@staticmethod
def mock_successful_job():
"""Mock a successful conversion job"""
job = MagicMock(spec=ConversionJob)
job.id = 'job_test123'
job.status = 'completed'
job.progress_percentage = 100
job.cost_micros = 150_000 # €0.15
job.processing_time_seconds = 45
return job
@staticmethod
def mock_failed_job():
"""Mock a failed conversion job"""
job = MagicMock(spec=ConversionJob)
job.id = 'job_test456'
job.status = 'failed'
job.error_message = 'Unsupported input format'
return job
class TestMediaConversion(unittest.TestCase):
def setUp(self):
mediaconvert.configure(
api_token='test_token',
api_base_url='https://api.test.mediaconvert.io/v1'
)
@patch('mediaconvert.ConversionJob.create')
def test_successful_conversion(self, mock_create):
# Arrange
mock_create.return_value = MediaConvertTestHelper.mock_successful_job()
# Act
job = ConversionJob.create(
job_type='video',
input_url='https://example.com/input.mp4',
output_url='https://example.com/output.webv'
)
# Assert
self.assertEqual(job.status, 'completed')
self.assertEqual(job.cost_micros, 150_000)
mock_create.assert_called_once()
@patch('requests.post')
def test_api_error_handling(self, mock_post):
# Arrange - Mock insufficient credits response
mock_response = MagicMock()
mock_response.status_code = 402
mock_response.json.return_value = {
'error': 'insufficient_credits',
'required_credits_micros': 200_000,
'available_credits_micros': 50_000
}
mock_post.return_value = mock_response
# Act & Assert
with self.assertRaises(mediaconvert.InsufficientCreditsError) as cm:
ConversionJob.create(
job_type='video',
input_url='https://example.com/input.mp4',
output_url='https://example.com/output.webv'
)
self.assertEqual(cm.exception.required_credits, 200_000)
self.assertEqual(cm.exception.available_credits, 50_000)
# pytest example
import pytest
from unittest.mock import patch
@pytest.fixture
def mock_mediaconvert():
with patch('mediaconvert.configure') as mock_config:
yield mock_config
def test_batch_processing(mock_mediaconvert):
processor = BatchProcessor()
# Add mock jobs
with patch('mediaconvert.ConversionJob.create') as mock_create:
mock_create.return_value = MediaConvertTestHelper.mock_successful_job()
job1 = processor.add_job('input1.mp4', 'output1.webm')
job2 = processor.add_job('input2.mp4', 'output2.webm')
assert len(processor.jobs) == 2
Configuration and Best Practices
Environment Configuration
python
# config.py
import os
from typing import Optional
class MediaConvertConfig:
def __init__(self):
self.api_token: str = os.environ.get('MEDIACONVERT_API_TOKEN', '')
self.api_base_url: str = os.environ.get('MEDIACONVERT_API_BASE_URL', 'https://mediaconvert.io/api/v1')
self.webhook_secret: str = os.environ.get('MEDIACONVERT_WEBHOOK_SECRET', '')
self.timeout: int = int(os.environ.get('MEDIACONVERT_TIMEOUT', '30'))
self.retries: int = int(os.environ.get('MEDIACONVERT_RETRIES', '3'))
self.max_file_size_mb: int = int(os.environ.get('MEDIACONVERT_MAX_FILE_SIZE_MB', '1000'))
# Validation
if not self.api_token:
raise ValueError("MEDIACONVERT_API_TOKEN environment variable is required")
def configure_client(self):
"""Configure the global mediaconvert client"""
import mediaconvert
mediaconvert.configure(
api_token=self.api_token,
api_base_url=self.api_base_url,
webhook_secret=self.webhook_secret,
timeout=self.timeout,
retries=self.retries
)
# Initialize configuration
config = MediaConvertConfig()
config.configure_client()
Logging Configuration
python
import logging
import mediaconvert
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Enable mediaconvert debug logging
mediaconvert_logger = logging.getLogger('mediaconvert')
mediaconvert_logger.setLevel(logging.DEBUG)
# Custom logging handler
class MediaConvertHandler(logging.Handler):
def emit(self, record):
if record.levelno >= logging.ERROR:
# Send critical errors to monitoring service
send_to_monitoring(record.getMessage())
mediaconvert_logger.addHandler(MediaConvertHandler())
Performance and Optimization
Connection Pooling
python
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
# Configure session with connection pooling
session = requests.Session()
# Retry strategy
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "OPTIONS", "POST"],
backoff_factor=1
)
# Connection pooling
adapter = HTTPAdapter(
pool_connections=10,
pool_maxsize=10,
max_retries=retry_strategy
)
session.mount("https://", adapter)
session.mount("http://", adapter)
# Use with mediaconvert
mediaconvert.configure(
api_token=os.environ['MEDIACONVERT_API_TOKEN'],
session=session # Use custom session
)
Memory-Efficient Processing
python
def process_large_batch(file_list, batch_size=10):
"""Process large number of files in batches to manage memory"""
for i in range(0, len(file_list), batch_size):
batch = file_list[i:i + batch_size]
# Process batch
processor = BatchProcessor(max_workers=min(5, len(batch)))
for file_info in batch:
processor.add_job(
input_url=file_info['input_url'],
output_url=file_info['output_url'],
**file_info.get('options', {})
)
# Wait for batch completion
results = processor.wait_for_completion()
# Process results
yield results
# Clear processor to free memory
del processor
Next Steps
- JavaScript SDK Guide - Node.js and browser integration
- cURL Examples - Direct API usage
- API Reference - Complete API documentation
- Webhook Guide - Real-time notifications
Support
- PyPI Package: pypi.org/project/mediaconvert-io/
- GitHub Repository: github.com/mediaconvert-io/python-sdk
- Documentation: docs.mediaconvert.io/python
- Email Support: support@mediaconvert.io