Python SDK Guide

The MediaConvert.io Python SDK provides a convenient way to integrate media conversion into your Python applications. This guide covers installation, configuration, and common usage patterns.

Installation

Install via pip:

bash
pip install mediaconvert-io

Or with specific extras:

bash
# With async support
pip install mediaconvert-io[async]

# With AWS S3 helpers
pip install mediaconvert-io[s3]

# All extras
pip install mediaconvert-io[all]

Quick Start

Basic Configuration

python
import mediaconvert
from mediaconvert import ConversionJob, Account, MediaConvertError

# Configure the client
mediaconvert.configure(
    api_token=os.environ['MEDIACONVERT_API_TOKEN'],
    api_base_url='https://mediaconvert.io/api/v1',
    timeout=30,
    retries=3
)

# Alternative: Initialize client directly
client = mediaconvert.Client(
    api_token=os.environ['MEDIACONVERT_API_TOKEN']
)

Your First Conversion

python
import os
import time

# Create a conversion job
job = ConversionJob.create(
    job_type='video',
    input_url='https://bucket.s3.amazonaws.com/input.mp4?...',
    output_url='https://bucket.s3.amazonaws.com/output.webm?...',
    input_format='mp4',
    output_format='webm',
    input_size_bytes=52428800,
    webhook_url='https://your-app.com/webhooks/mediaconvert'
)

print(f"Job created: {job.id}")
print(f"Status: {job.status}")
# New micro-precision API (recommended)
print(f"Estimated cost: €{job.estimated_cost_micros / 1_000_000:.6f}")
# Or using backward-compatible cents field
print(f"Estimated cost: €{job.estimated_cost_cents / 100:.2f}")

Core Classes

MediaConvert.ConversionJob

The main class for managing conversion jobs:

python
# Create a job
job = ConversionJob.create(
    job_type='video',
    input_url=input_presigned_url,
    output_url=output_presigned_url,
    input_format='mp4',
    output_format='webm',
    input_size_bytes=file_size_bytes
)

# Check job status
job.reload()
print(f"Status: {job.status}")
print(f"Progress: {job.progress_percentage}%")

# Wait for completion with callback
def progress_callback(current_job):
    print(f"Progress: {current_job.progress_percentage}%")

job.wait_until_complete(callback=progress_callback)

# Access results
print(f"Processing time: {job.processing_time_seconds}s")
print(f"Output size: {job.output_size_bytes} bytes")
# New micro-precision API (recommended)
print(f"Total cost: €{job.cost_micros / 1_000_000:.6f}")
# Or using backward-compatible cents field
print(f"Total cost: €{job.cost_cents / 100:.2f}")

MediaConvert.Account

Manage your account information:

python
# Get account details
account = Account.current()
print(f"Email: {account.email}")
print(f"Tier: {account.tier}")
print(f"Credits: €{account.credits_balance_cents / 100:.2f}")

# Check usage
usage = account.usage_summary()
print(f"Jobs this month: {usage['jobs_count']}")
print(f"Data processed: {usage['total_mb_processed']} MB")
print(f"Total spent: €{usage['total_cost_micros'] / 1_000_000:.6f}")

Django Integration

Settings Configuration

python
# settings.py
MEDIACONVERT = {
    'API_TOKEN': os.environ.get('MEDIACONVERT_API_TOKEN'),
    'API_BASE_URL': 'https://mediaconvert.io/api/v1',
    'WEBHOOK_SECRET': os.environ.get('MEDIACONVERT_WEBHOOK_SECRET'),
    'TIMEOUT': 30,
    'RETRIES': 3,
}

# Configure at startup
import mediaconvert
mediaconvert.configure(**MEDIACONVERT)

Celery Integration

python
# tasks.py
from celery import shared_task
from mediaconvert import ConversionJob
from mediaconvert.exceptions import (
    AuthenticationError,
    InsufficientCreditsError,
    ValidationError,
    ConversionError,
    RateLimitError,
    APIError
)
import time

@shared_task(bind=True, max_retries=3)
def convert_media_task(self, user_id, input_url, output_url, options=None):
    """Celery task for media conversion"""
    options = options or {}

    try:
        job = ConversionJob.create(
            input_url=input_url,
            output_url=output_url,
            webhook_url=f"https://yourapp.com/webhooks/mediaconvert/{user_id}/",
            **options
        )

        # Store job reference in your database
        from myapp.models import MediaConversion
        MediaConversion.objects.create(
            user_id=user_id,
            external_id=job.id,
            status=job.status,
            input_url=input_url,
            output_url=output_url
        )

        return {'job_id': job.id, 'status': job.status}

    except RateLimitError as e:
        # Retry after rate limit delay
        raise self.retry(countdown=e.retry_after, exc=e)

    except (APIError, ConversionError) as e:
        # Retry on transient errors
        if self.request.retries < self.max_retries:
            raise self.retry(countdown=60 * (2 ** self.request.retries), exc=e)
        raise e

    except InsufficientCreditsError as e:
        # Don't retry - notify user to add credits
        from myapp.tasks import notify_insufficient_credits
        notify_insufficient_credits.delay(user_id, e.required_credits, e.available_credits)
        raise e

# Usage
convert_media_task.delay(
    user_id=current_user.id,
    input_url=input_presigned_url,
    output_url=output_presigned_url,
    options={'job_type': 'video', 'output_format': 'webm'}
)

Django Model Integration

python
# models.py
from django.db import models
from django.contrib.auth.models import User
from mediaconvert import ConversionJob
import uuid

class MediaConversion(models.Model):
    STATUS_CHOICES = [
        ('pending', 'Pending'),
        ('processing', 'Processing'),
        ('completed', 'Completed'),
        ('failed', 'Failed'),
    ]

    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='media_conversions')
    external_id = models.CharField(max_length=255, unique=True)
    status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
    input_url = models.URLField()
    output_url = models.URLField()
    progress_percentage = models.IntegerField(default=0)
    cost_micros = models.BigIntegerField(null=True, blank=True)
    processing_time_seconds = models.IntegerField(null=True, blank=True)
    error_message = models.TextField(blank=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    def refresh_from_api(self):
        """Fetch latest status from MediaConvert API"""
        try:
            job = ConversionJob.get(self.external_id)
            self.status = job.status
            self.progress_percentage = job.progress_percentage or 0
            self.cost_micros = job.cost_micros
            self.processing_time_seconds = job.processing_time_seconds
            self.error_message = job.error_message or ''
            self.save()
            return job
        except Exception as e:
            self.error_message = str(e)
            self.status = 'failed'
            self.save()
            raise

    @property
    def cost_euros(self):
        """Get cost in euros with micro precision"""
        return self.cost_micros / 1_000_000 if self.cost_micros else 0

    def __str__(self):
        return f"{self.user.username} - {self.external_id} ({self.status})"

Flask Integration

Application Factory Pattern

python
# app.py
from flask import Flask, request, jsonify
from mediaconvert import ConversionJob
import mediaconvert

def create_app():
    app = Flask(__name__)

    # Configure MediaConvert
    mediaconvert.configure(
        api_token=app.config['MEDIACONVERT_API_TOKEN'],
        webhook_secret=app.config['MEDIACONVERT_WEBHOOK_SECRET']
    )

    return app

# routes.py
from flask import Blueprint, request, jsonify
from mediaconvert import ConversionJob
from mediaconvert.exceptions import MediaConvertError

api = Blueprint('api', __name__)

@api.route('/convert', methods=['POST'])
def start_conversion():
    try:
        data = request.get_json()

        job = ConversionJob.create(
            job_type=data['job_type'],
            input_url=data['input_url'],
            output_url=data['output_url'],
            input_format=data['input_format'],
            output_format=data['output_format'],
            input_size_bytes=data.get('input_size_bytes'),
            webhook_url=url_for('api.webhook', _external=True)
        )

        return jsonify({
            'job_id': job.id,
            'status': job.status,
            'estimated_cost_micros': job.estimated_cost_micros
        }), 201

    except MediaConvertError as e:
        return jsonify({'error': str(e)}), 400

@api.route('/webhook', methods=['POST'])
def webhook():
    # Verify signature
    signature = request.headers.get('X-MediaConvert-Signature')
    if not mediaconvert.verify_webhook_signature(request.data, signature):
        return '', 401

    data = request.get_json()

    # Process webhook
    job_id = data['job_id']
    status = data['status']

    if status == 'completed':
        handle_completed_job(job_id, data)
    elif status == 'failed':
        handle_failed_job(job_id, data['error_message'])

    return '', 200

Error Handling

Comprehensive error handling patterns:

python
from mediaconvert.exceptions import (
    AuthenticationError,
    InsufficientCreditsError,
    ValidationError,
    ConversionError,
    RateLimitError,
    APIError,
    NetworkError,
    TimeoutError
)

def robust_conversion(input_url, output_url, max_retries=3):
    """Robust conversion with comprehensive error handling"""

    for attempt in range(max_retries + 1):
        try:
            job = ConversionJob.create(
                job_type='video',
                input_url=input_url,
                output_url=output_url,
                input_format='mp4',
                output_format='webm'
            )

            # Wait for completion with timeout
            job.wait_until_complete(timeout=3600)  # 1 hour timeout

            return {
                'success': True,
                'job_id': job.id,
                'cost_micros': job.cost_micros,
                'processing_time': job.processing_time_seconds
            }

        except AuthenticationError:
            return {
                'success': False,
                'error': 'Authentication failed - check your API token',
                'retry': False
            }

        except InsufficientCreditsError as e:
            return {
                'success': False,
                'error': f'Need €{e.required_credits / 1_000_000:.6f} but only have €{e.available_credits / 1_000_000:.6f}',
                'retry': False,
                'required_credits_micros': e.required_credits,
                'available_credits_micros': e.available_credits
            }

        except ValidationError as e:
            return {
                'success': False,
                'error': f'Validation failed: {", ".join(e.errors)}',
                'retry': False
            }

        except ConversionError as e:
            return {
                'success': False,
                'error': f'Conversion failed: {e.message}',
                'retry': False
            }

        except RateLimitError as e:
            if attempt < max_retries:
                print(f"Rate limited. Waiting {e.retry_after} seconds...")
                time.sleep(e.retry_after)
                continue
            else:
                return {
                    'success': False,
                    'error': f'Rate limited after {max_retries} attempts',
                    'retry_after': e.retry_after
                }

        except (NetworkError, TimeoutError) as e:
            if attempt < max_retries:
                wait_time = 2 ** attempt  # Exponential backoff
                print(f"Network error. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
                continue
            else:
                return {
                    'success': False,
                    'error': f'Network error after {max_retries} attempts: {e.message}',
                    'retry': True
                }

        except APIError as e:
            if attempt < max_retries and e.status_code >= 500:  # Server errors
                wait_time = 2 ** attempt
                print(f"Server error. Retrying in {wait_time} seconds...")
                time.sleep(wait_time)
                continue
            else:
                return {
                    'success': False,
                    'error': f'API error: {e.message} (Status: {e.status_code})',
                    'retry': e.status_code >= 500
                }

    return {
        'success': False,
        'error': 'Max retries exceeded',
        'retry': True
    }

# Usage
result = robust_conversion(input_url, output_url)
if result['success']:
    print(f"Conversion completed! Cost: €{result['cost_micros'] / 1_000_000:.6f}")
else:
    print(f"Conversion failed: {result['error']}")
    if result.get('retry'):
        print("You can retry this operation")

Batch Processing

Process multiple files efficiently:

python
import asyncio
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any

class BatchProcessor:
    def __init__(self, max_workers=5):
        self.max_workers = max_workers
        self.jobs = []

    def add_job(self, input_url: str, output_url: str, **options) -> ConversionJob:
        """Add a job to the batch"""
        job = ConversionJob.create(
            input_url=input_url,
            output_url=output_url,
            **options
        )
        self.jobs.append(job)
        return job

    def wait_for_completion(self, timeout=None) -> Dict[str, List]:
        """Wait for all jobs to complete using thread pool"""
        completed = []
        failed = []

        def wait_for_job(job):
            try:
                job.wait_until_complete(timeout=timeout)
                return {'status': 'completed', 'job': job}
            except Exception as e:
                return {'status': 'failed', 'job': job, 'error': e}

        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_job = {executor.submit(wait_for_job, job): job for job in self.jobs}

            for future in as_completed(future_to_job):
                result = future.result()
                if result['status'] == 'completed':
                    completed.append(result['job'])
                else:
                    failed.append({
                        'job': result['job'],
                        'error': result['error']
                    })

        return {'completed': completed, 'failed': failed}

# Async version
class AsyncBatchProcessor:
    def __init__(self, max_concurrent=5):
        self.max_concurrent = max_concurrent
        self.jobs = []

    async def add_job(self, input_url: str, output_url: str, **options) -> ConversionJob:
        """Add a job to the batch"""
        # Use async client if available
        job = await ConversionJob.acreate(
            input_url=input_url,
            output_url=output_url,
            **options
        )
        self.jobs.append(job)
        return job

    async def wait_for_completion(self, timeout=None) -> Dict[str, List]:
        """Wait for all jobs to complete asynchronously"""
        semaphore = asyncio.Semaphore(self.max_concurrent)

        async def wait_for_job(job):
            async with semaphore:
                try:
                    await job.await_completion(timeout=timeout)
                    return {'status': 'completed', 'job': job}
                except Exception as e:
                    return {'status': 'failed', 'job': job, 'error': e}

        tasks = [wait_for_job(job) for job in self.jobs]
        results = await asyncio.gather(*tasks)

        completed = [r['job'] for r in results if r['status'] == 'completed']
        failed = [{'job': r['job'], 'error': r['error']} for r in results if r['status'] == 'failed']

        return {'completed': completed, 'failed': failed}

# Usage examples
def batch_example():
    processor = BatchProcessor(max_workers=3)

    # Add multiple jobs
    jobs = [
        processor.add_job('s3://bucket/input1.mp4', 's3://bucket/output1.webm', output_format='webm'),
        processor.add_job('s3://bucket/input2.mp4', 's3://bucket/output2.mp4', output_format='mp4'),
        processor.add_job('s3://bucket/input3.jpg', 's3://bucket/output3.webp', output_format='webp')
    ]

    # Process all jobs
    results = processor.wait_for_completion(timeout=3600)
    print(f"{len(results['completed'])} completed, {len(results['failed'])} failed")

    # Calculate total cost
    total_cost_micros = sum(job.cost_micros or 0 for job in results['completed'])
    print(f"Total cost: €{total_cost_micros / 1_000_000:.6f}")

# Async usage
async def async_batch_example():
    processor = AsyncBatchProcessor(max_concurrent=5)

    # Add jobs asynchronously
    await processor.add_job('s3://bucket/input1.mp4', 's3://bucket/output1.webm')
    await processor.add_job('s3://bucket/input2.mp4', 's3://bucket/output2.mp4')

    # Wait for completion
    results = await processor.wait_for_completion()
    print(f"Async batch completed: {len(results['completed'])} successful")

S3 Integration Helpers

Work with AWS S3 presigned URLs:

python
import boto3
from botocore.exceptions import ClientError
from urllib.parse import urlparse

class S3Helper:
    def __init__(self, bucket_name, region='us-east-1'):
        self.bucket_name = bucket_name
        self.s3_client = boto3.client('s3', region_name=region)

    def generate_presigned_url(self, key: str, method: str = 'get_object', expires_in: int = 3600) -> str:
        """Generate presigned URL for S3 object"""
        try:
            if method == 'put_object':
                url = self.s3_client.generate_presigned_url(
                    'put_object',
                    Params={'Bucket': self.bucket_name, 'Key': key},
                    ExpiresIn=expires_in
                )
            else:  # get_object
                url = self.s3_client.generate_presigned_url(
                    'get_object',
                    Params={'Bucket': self.bucket_name, 'Key': key},
                    ExpiresIn=expires_in
                )
            return url
        except ClientError as e:
            raise Exception(f"Failed to generate presigned URL: {e}")

    def get_object_size(self, key: str) -> int:
        """Get size of S3 object in bytes"""
        try:
            response = self.s3_client.head_object(Bucket=self.bucket_name, Key=key)
            return response['ContentLength']
        except ClientError as e:
            raise Exception(f"Failed to get object size: {e}")

    def upload_file(self, local_path: str, key: str) -> str:
        """Upload file to S3 and return presigned URL"""
        try:
            self.s3_client.upload_file(local_path, self.bucket_name, key)
            return self.generate_presigned_url(key)
        except ClientError as e:
            raise Exception(f"Failed to upload file: {e}")

def convert_from_s3(bucket_name, input_key, output_key, **options):
    """Complete S3-to-S3 conversion workflow"""
    s3_helper = S3Helper(bucket_name)

    # Get input file size
    input_size_bytes = s3_helper.get_object_size(input_key)

    # Generate presigned URLs
    input_url = s3_helper.generate_presigned_url(input_key, 'get_object')
    output_url = s3_helper.generate_presigned_url(output_key, 'put_object')

    # Create conversion job
    job = ConversionJob.create(
        input_url=input_url,
        output_url=output_url,
        input_size_bytes=input_size_bytes,
        **options
    )

    return job

# Usage
job = convert_from_s3(
    bucket_name='my-media-bucket',
    input_key='videos/input.mp4',
    output_key='videos/output.webm',
    job_type='video',
    input_format='mp4',
    output_format='webm'
)

print(f"Conversion started: {job.id}")
job.wait_until_complete()
print(f"Conversion completed! Cost: €{job.cost_micros / 1_000_000:.6f}")

Testing

Test Helpers and Mocking

python
import unittest
from unittest.mock import patch, MagicMock
import mediaconvert
from mediaconvert import ConversionJob

class MediaConvertTestHelper:
    @staticmethod
    def mock_successful_job():
        """Mock a successful conversion job"""
        job = MagicMock(spec=ConversionJob)
        job.id = 'job_test123'
        job.status = 'completed'
        job.progress_percentage = 100
        job.cost_micros = 150_000  # €0.15
        job.processing_time_seconds = 45
        return job

    @staticmethod
    def mock_failed_job():
        """Mock a failed conversion job"""
        job = MagicMock(spec=ConversionJob)
        job.id = 'job_test456'
        job.status = 'failed'
        job.error_message = 'Unsupported input format'
        return job

class TestMediaConversion(unittest.TestCase):
    def setUp(self):
        mediaconvert.configure(
            api_token='test_token',
            api_base_url='https://api.test.mediaconvert.io/v1'
        )

    @patch('mediaconvert.ConversionJob.create')
    def test_successful_conversion(self, mock_create):
        # Arrange
        mock_create.return_value = MediaConvertTestHelper.mock_successful_job()

        # Act
        job = ConversionJob.create(
            job_type='video',
            input_url='https://example.com/input.mp4',
            output_url='https://example.com/output.webv'
        )

        # Assert
        self.assertEqual(job.status, 'completed')
        self.assertEqual(job.cost_micros, 150_000)
        mock_create.assert_called_once()

    @patch('requests.post')
    def test_api_error_handling(self, mock_post):
        # Arrange - Mock insufficient credits response
        mock_response = MagicMock()
        mock_response.status_code = 402
        mock_response.json.return_value = {
            'error': 'insufficient_credits',
            'required_credits_micros': 200_000,
            'available_credits_micros': 50_000
        }
        mock_post.return_value = mock_response

        # Act & Assert
        with self.assertRaises(mediaconvert.InsufficientCreditsError) as cm:
            ConversionJob.create(
                job_type='video',
                input_url='https://example.com/input.mp4',
                output_url='https://example.com/output.webv'
            )

        self.assertEqual(cm.exception.required_credits, 200_000)
        self.assertEqual(cm.exception.available_credits, 50_000)

# pytest example
import pytest
from unittest.mock import patch

@pytest.fixture
def mock_mediaconvert():
    with patch('mediaconvert.configure') as mock_config:
        yield mock_config

def test_batch_processing(mock_mediaconvert):
    processor = BatchProcessor()

    # Add mock jobs
    with patch('mediaconvert.ConversionJob.create') as mock_create:
        mock_create.return_value = MediaConvertTestHelper.mock_successful_job()

        job1 = processor.add_job('input1.mp4', 'output1.webm')
        job2 = processor.add_job('input2.mp4', 'output2.webm')

        assert len(processor.jobs) == 2

Configuration and Best Practices

Environment Configuration

python
# config.py
import os
from typing import Optional

class MediaConvertConfig:
    def __init__(self):
        self.api_token: str = os.environ.get('MEDIACONVERT_API_TOKEN', '')
        self.api_base_url: str = os.environ.get('MEDIACONVERT_API_BASE_URL', 'https://mediaconvert.io/api/v1')
        self.webhook_secret: str = os.environ.get('MEDIACONVERT_WEBHOOK_SECRET', '')
        self.timeout: int = int(os.environ.get('MEDIACONVERT_TIMEOUT', '30'))
        self.retries: int = int(os.environ.get('MEDIACONVERT_RETRIES', '3'))
        self.max_file_size_mb: int = int(os.environ.get('MEDIACONVERT_MAX_FILE_SIZE_MB', '1000'))

        # Validation
        if not self.api_token:
            raise ValueError("MEDIACONVERT_API_TOKEN environment variable is required")

    def configure_client(self):
        """Configure the global mediaconvert client"""
        import mediaconvert
        mediaconvert.configure(
            api_token=self.api_token,
            api_base_url=self.api_base_url,
            webhook_secret=self.webhook_secret,
            timeout=self.timeout,
            retries=self.retries
        )

# Initialize configuration
config = MediaConvertConfig()
config.configure_client()

Logging Configuration

python
import logging
import mediaconvert

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# Enable mediaconvert debug logging
mediaconvert_logger = logging.getLogger('mediaconvert')
mediaconvert_logger.setLevel(logging.DEBUG)

# Custom logging handler
class MediaConvertHandler(logging.Handler):
    def emit(self, record):
        if record.levelno >= logging.ERROR:
            # Send critical errors to monitoring service
            send_to_monitoring(record.getMessage())

mediaconvert_logger.addHandler(MediaConvertHandler())

Performance and Optimization

Connection Pooling

python
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# Configure session with connection pooling
session = requests.Session()

# Retry strategy
retry_strategy = Retry(
    total=3,
    status_forcelist=[429, 500, 502, 503, 504],
    method_whitelist=["HEAD", "GET", "OPTIONS", "POST"],
    backoff_factor=1
)

# Connection pooling
adapter = HTTPAdapter(
    pool_connections=10,
    pool_maxsize=10,
    max_retries=retry_strategy
)

session.mount("https://", adapter)
session.mount("http://", adapter)

# Use with mediaconvert
mediaconvert.configure(
    api_token=os.environ['MEDIACONVERT_API_TOKEN'],
    session=session  # Use custom session
)

Memory-Efficient Processing

python
def process_large_batch(file_list, batch_size=10):
    """Process large number of files in batches to manage memory"""
    for i in range(0, len(file_list), batch_size):
        batch = file_list[i:i + batch_size]

        # Process batch
        processor = BatchProcessor(max_workers=min(5, len(batch)))

        for file_info in batch:
            processor.add_job(
                input_url=file_info['input_url'],
                output_url=file_info['output_url'],
                **file_info.get('options', {})
            )

        # Wait for batch completion
        results = processor.wait_for_completion()

        # Process results
        yield results

        # Clear processor to free memory
        del processor

Next Steps

Support