Skip to content

RFC-011: Per-Episode Metadata Document Generation

Abstract

Design and implement per-episode metadata document generation to capture comprehensive feed and episode information in structured JSON/YAML format. This enables search, analytics, integration, and archival use cases while maintaining backwards compatibility with existing workflows.

Problem Statement

Currently, podcast_scraper focuses on downloading transcripts but doesn't systematically capture and persist rich metadata about feeds and episodes. This metadata is valuable for:

  • Search and discovery: Finding episodes by guest names, topics, dates
  • Analytics: Understanding feed patterns, guest frequency, publication schedules
  • Integration: Enabling other tools to consume structured episode data
  • Database ingestion: Loading metadata directly into databases (PostgreSQL, MongoDB, Elasticsearch, ClickHouse) for querying, indexing, and analysis
  • Archival: Preserving complete episode context alongside transcripts
  • Future features: Episode categorization, recommendation systems, summarization

Without structured metadata, users must parse RSS feeds or transcripts manually, which is error-prone and doesn't scale. Additionally, loading metadata into databases requires custom transformation code, adding friction to data pipeline workflows.

Constraints & Assumptions

  • Metadata generation must be opt-in (default false) for backwards compatibility
  • Metadata files should be machine-readable (JSON) or human-readable (YAML)
  • Database-friendly schema: Metadata must be loadable into PostgreSQL (JSONB), MongoDB, Elasticsearch, and ClickHouse without transformation code
  • Unified format: Single JSON schema works across all target databases (no format variations needed)
  • Metadata generation should not significantly impact processing performance (<5% overhead)
  • Metadata schema must be versioned for future evolution
  • Metadata should integrate seamlessly with existing pipeline (RFC-001, RFC-004)
  • Metadata should leverage detected speaker names from RFC-010

Design & Implementation

1. Configuration

Add new configuration fields to config.Config:

generate_metadata: bool = False  # Opt-in for backwards compatibility
metadata_format: Literal["json", "yaml"] = "json"  # Default to JSON
metadata_subdirectory: Optional[str] = None  # None = same dir as transcripts, "metadata" = subdirectory
```yaml

- `--generate-metadata`: Enable metadata generation
- `--metadata-format`: Choose `json` or `yaml` (default: `json`)
- `--metadata-subdirectory`: Optional subdirectory name (default: same as transcripts)

### 2. Metadata Schema

Define Pydantic model for type safety and validation:

```python
from pydantic import BaseModel, Field, field_serializer
from datetime import datetime
from typing import List, Optional, Dict, Any

class FeedMetadata(BaseModel):
    """Feed-level metadata."""
    title: str
    url: str
    feed_id: str  # Stable unique identifier for database primary keys
    description: Optional[str] = None
    language: Optional[str] = None
    authors: List[str] = Field(default_factory=list)
    image_url: Optional[str] = None
    last_updated: Optional[datetime] = None

    @field_serializer('last_updated')
    def serialize_last_updated(self, value: Optional[datetime]) -> Optional[str]:
        """Serialize datetime as ISO 8601 string for database compatibility."""
        return value.isoformat() if value else None

class EpisodeMetadata(BaseModel):

```text

    """Episode-level metadata."""
    title: str
    description: Optional[str] = None
    published_date: Optional[datetime] = None
    guid: Optional[str] = None  # RSS GUID if available
    link: Optional[str] = None
    duration_seconds: Optional[int] = None
    episode_number: Optional[int] = None
    image_url: Optional[str] = None
    episode_id: str  # Stable unique identifier for database primary keys

```python

    @field_serializer('published_date')
    def serialize_published_date(self, value: Optional[datetime]) -> Optional[str]:
        """Serialize datetime as ISO 8601 string for database compatibility."""
        return value.isoformat() if value else None

```python

class TranscriptInfo(BaseModel):

```text

    """Transcript URL and type information."""
    url: str
    transcript_id: Optional[str] = None  # Optional stable identifier for tracking individual transcripts
    type: Optional[str] = None  # e.g., "text/plain", "text/vtt"
    language: Optional[str] = None

```python

class ContentMetadata(BaseModel):

```text

    """Content-related metadata."""
    transcript_urls: List[TranscriptInfo] = Field(default_factory=list)
    media_url: Optional[str] = None
    media_id: Optional[str] = None  # Optional stable identifier for media file
    media_type: Optional[str] = None
    transcript_file_path: Optional[str] = None
    transcript_source: Optional[Literal["direct_download", "whisper_transcription"]] = None
    whisper_model: Optional[str] = None
    detected_hosts: List[str] = Field(default_factory=list)
    detected_guests: List[str] = Field(default_factory=list)

```python

class ProcessingMetadata(BaseModel):

```text

    """Processing-related metadata."""
    processing_timestamp: datetime
    output_directory: str
    run_id: Optional[str] = None
    config_snapshot: Dict[str, Any] = Field(default_factory=dict)
    schema_version: str = "1.0.0"

```python

    @field_serializer('processing_timestamp')
    def serialize_processing_timestamp(self, value: datetime) -> str:
        """Serialize datetime as ISO 8601 string for database compatibility."""
        return value.isoformat()

```python
class EpisodeMetadataDocument(BaseModel):

```text

    """Complete episode metadata document.

```python

def generate_episode_metadata(
    feed: RssFeed,
    episode: Episode,
    content_metadata: ContentMetadata,
    cfg: Config,
    output_dir: str,
    run_suffix: Optional[str] = None,
) -> Optional[str]:
    """Generate metadata document for an episode.

    Returns:
        Path to generated metadata file, or None if generation skipped
    """

    # Build metadata document

    # Write to file (JSON or YAML)

```text

    # Return file path

```python

- Generate metadata after episode processing completes
- Pass detected speaker names from RFC-010
- Pass transcript file paths
- Pass Whisper model info (if applicable)

#### Episode Processor Integration (`episode_processor.py`)

- Capture transcript source (direct download vs Whisper)
- Capture Whisper model used
- Pass metadata to generation function

#### RSS Parser Integration (`rss_parser.py`)

- Extract additional feed metadata (image, last updated)
- Extract additional episode metadata (duration, episode number, image)

### 5. File Storage

**Default behavior** (same directory as transcripts):

- Metadata file: `<idx:04d> - <title_safe>.metadata.json`
- Stored alongside transcript file in same directory

**Optional subdirectory** (`metadata_subdirectory` set):

- Metadata file: `<metadata_subdirectory>/<idx:04d> - <title_safe>.metadata.json`
- Keeps metadata separate from transcripts

**Naming convention**:

- Match transcript filename base (without extension)
- Append `.metadata.json` or `.metadata.yaml`
- Respect `run_suffix` if present: `<base>_<run_suffix>.metadata.json`

### 6. Schema Versioning

- Include `schema_version` field in all metadata documents
- Start with version `1.0.0`
- Use semantic versioning:

  - **Major**: Breaking changes (structure changes, required field additions)
  - **Minor**: Non-breaking additions (optional fields)
  - **Patch**: Bug fixes, clarifications

### 7. Configuration Snapshot

Capture relevant configuration fields in metadata:

```python

config_snapshot = {
    "language": cfg.language,
    "whisper_model": cfg.whisper_model if cfg.transcribe_missing else None,
    "auto_speakers": cfg.auto_speakers,
    "screenplay": cfg.screenplay,
    "max_episodes": cfg.max_episodes,
}

```python

1. **Feed Metadata**: Extract from `RssFeed` object and RSS parsing
   - **Feed ID Generation**: Generate stable unique identifier from feed URL (see ID Generation Strategy below)
2. **Episode Metadata**: Extract from `Episode` object and RSS item parsing
   - **Episode ID Generation**: Generate stable unique identifier (see ID Generation Strategy below)
   - **RSS GUID**: Extract from RSS item `<guid>` tag if available
3. **Content Metadata**:
   - Transcript URLs: From `episode.transcript_urls`
   - Media URL: From `episode.media_url`
   - **Content IDs** (optional): Generate `transcript_id` and `media_id` if tracking content separately
   - Speaker names: From `TranscriptionJob.detected_speaker_names` or detection results
   - Transcript source: Track during processing (`direct_download` vs `whisper_transcription`)
   - Whisper model: From `cfg.whisper_model` if transcription used
4. **Processing Metadata**:
   - Timestamp: `datetime.now().isoformat()`
   - Output directory: From `cfg.output_dir` or derived path
   - Run ID: From `cfg.run_id` if set
   - Config snapshot: Selected fields from `cfg`

### ID Generation Strategy

Each metadata document must include stable, unique identifiers suitable for use as primary keys in databases:

1. **Feed ID** (`feed.feed_id`): Identifies the feed uniquely
2. **Episode ID** (`episode.episode_id`): Identifies the episode uniquely
3. **Content IDs** (optional): `transcript_id` and `media_id` for tracking individual content items

#### Feed ID Generation

The feed ID is generated from the feed URL (normalized):

```python

import hashlib
from urllib.parse import urlparse

def generate_feed_id(feed_url: str) -> str:
    """Generate stable unique identifier for feed.

    Args:
        feed_url: RSS feed URL

    Returns:
        Stable unique identifier string (format: sha256:<hex_digest>)
    """

    # Normalize feed URL (remove trailing slash, lowercase, remove query params/fragments)

    parsed = urlparse(feed_url)
    normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}".rstrip('/').lower()

```text

    # Generate SHA-256 hash

```

#### Episode ID Generation

Episode ID generation follows this priority:

1. **RSS GUID** (if available): Use the RSS item's `<guid>` tag value directly
   - Most reliable source as it's explicitly provided by the feed
   - Already unique and stable across runs
   - Format: Use as-is (may be URL, UUID, or other format)

2. **Deterministic Hash** (fallback if no GUID):
   - Generate SHA-256 hash from stable components:
     - Feed URL (normalized)
     - Episode title (normalized)
     - Published date (ISO 8601 format, if available)
     - Episode link/URL (if available, as additional uniqueness factor)
   - Format: `sha256:<hex_digest>` (e.g., `sha256:a1b2c3d4...`)
   - Ensures same episode = same ID across runs
   - Collision-resistant for practical purposes

3. **Composite Key** (fallback if no published date):
   - Format: `feed:<normalized_feed_url>:episode:<normalized_title>:idx:<episode_number>`
   - Less ideal but provides uniqueness when other fields unavailable

**Implementation**:

```python

import hashlib
from urllib.parse import urlparse

def generate_episode_id(
    feed_url: str,
    episode_title: str,
    episode_guid: Optional[str] = None,
    published_date: Optional[datetime] = None,
    episode_link: Optional[str] = None,
    episode_number: Optional[int] = None,
) -> str:
    """Generate stable unique identifier for episode.

    Priority:
    1. RSS GUID if available
    2. Deterministic hash from feed URL + title + published_date + link
    3. Composite key as last resort

```text

    Returns:
        Stable unique identifier string
    """

```python

    # Build hash input from stable components

```python

**Transcript ID** (`transcript_id`):

- Generated from transcript URL (normalized)
- Format: `sha256:<hex_digest>` of normalized URL
- Useful for tracking transcript availability across episodes

**Media ID** (`media_id`):

- Generated from media URL (normalized)
- Format: `sha256:<hex_digest>` of normalized URL
- Useful for tracking media files across episodes

```python

def generate_content_id(content_url: str) -> str:
    """Generate stable unique identifier for content item (transcript or media).

    Args:
        content_url: URL of the content item

    Returns:
        Stable unique identifier string (format: sha256:<hex_digest>)
    """

    # Normalize URL (remove trailing slash, lowercase, remove query params/fragments)

    parsed = urlparse(content_url)
    normalized = f"{parsed.scheme}://{parsed.netloc}{parsed.path}".rstrip('/').lower()

    # Generate SHA-256 hash

    hash_digest = hashlib.sha256(normalized.encode('utf-8')).hexdigest()

```text

    return f"sha256:{hash_digest}"

```
#### Database Usage

**Feed ID**:

- **PostgreSQL**: Use `feed.feed_id` as PRIMARY KEY in feed-level tables
- **MongoDB**: Use `feed.feed_id` as `_id` in feed collections
- **Elasticsearch**: Use `feed.feed_id` as document `_id` in feed indices
- **ClickHouse**: Use `feed.feed_id` in ORDER BY clause for feed tables

**Episode ID**:

- **PostgreSQL**: Use `episode.episode_id` as PRIMARY KEY or UNIQUE constraint
- **MongoDB**: Use `episode.episode_id` as `_id` field (or create unique index)
- **Elasticsearch**: Use `episode.episode_id` as document `_id`
- **ClickHouse**: Use `episode.episode_id` as ORDER BY key or primary key

**Content IDs** (if used):

- **PostgreSQL**: Use `transcript_id`/`media_id` as PRIMARY KEY in content tables
- **MongoDB**: Use as `_id` in content collections
- **Elasticsearch**: Use as document `_id` in content indices
- **ClickHouse**: Use in ORDER BY clause for content tables

**Benefits**:

- Stable across runs (same feed/episode/content = same ID)
- Unique across feeds (includes feed URL in hash)
- Database-friendly (string format works in all databases)
- Deterministic (no random UUIDs, reproducible)
- Collision-resistant (SHA-256 provides sufficient entropy)
- Enables relational queries (join episodes to feeds, link content to episodes)

### File Writing

- Use `filesystem.write_file()` for consistency
- Validate metadata structure before writing (Pydantic validation)
- Handle encoding (UTF-8)
- Respect `--skip-existing` (check if metadata file exists)
- Respect `--dry-run` (log planned metadata without writing)

### Error Handling

- If metadata generation fails, log warning but don't fail episode processing
- If metadata validation fails, log error and skip generation
- If file write fails, log error but continue processing

## Testing Strategy

- Unit tests for metadata model validation
- Unit tests for JSON/YAML serialization
- Unit tests for file naming and path construction
- Integration tests for metadata generation in workflow
- Integration tests for `--skip-existing` behavior
- Integration tests for `--dry-run` mode
- Tests for schema versioning
- Tests for database compatibility (JSON structure, ISO 8601 dates, snake_case fields)

## Database Integration Design Principles

### Unified Format Strategy

**Decision**: Use a single JSON schema that works across all target databases without format variations.

**Rationale**:

- JSON is the universal format supported natively by PostgreSQL (JSONB), MongoDB, Elasticsearch, and ClickHouse
- Format variations add complexity and maintenance burden
- Database-specific optimizations can be handled at ingestion time (indexing, flattening) rather than at generation time

### Schema Design Principles

1. **Field Naming**: Use `snake_case` for all field names
   - Compatible with SQL databases (avoids keyword conflicts)
   - Works naturally in MongoDB and Elasticsearch
   - ClickHouse supports both snake_case and camelCase, but snake_case is more universal

2. **Date/Time Serialization**: All datetime fields serialized as ISO 8601 strings
   - Format: `"2025-01-15T10:30:00Z"` or `"2025-01-15T10:30:00+00:00"`
   - Easily parsed by all databases
   - Supports timezone-aware queries
   - Can be indexed and queried efficiently

3. **Nested Structures**: Logical grouping (feed, episode, content, processing)
   - **Document databases** (MongoDB, Elasticsearch): Nested objects work natively
   - **PostgreSQL JSONB**: Supports nested queries with `->` and `->>` operators
   - **ClickHouse**: JSON column type supports nested field access
   - **Relational databases**: Can be flattened if needed using views or ETL

4. **Array Handling**: Arrays for multi-value fields (hosts, guests, transcript URLs)
   - Native support in all target databases
   - Enables array queries (contains, intersection, etc.)

5. **Type Consistency**: Consistent data types across all fields
   - Strings: Always strings (no mixed types)
   - Numbers: Integers for counts/durations, floats only when needed
   - Booleans: Explicit boolean values (not strings like "true"/"false")
   - Nulls: Use `null` (not empty strings or missing fields)

### Database-Specific Considerations

#### PostgreSQL (JSONB)

- Nested queries: `metadata->'episode'->>'title'`
- Array queries: `metadata @> '{"content": {"detected_guests": ["John"]}}'`
- Indexing: GIN indexes on JSONB columns for fast queries
- Flattening: Can create views with flattened columns if needed
- **Primary Key**: Use `metadata->'episode'->>'episode_id'` as PRIMARY KEY or UNIQUE constraint

#### MongoDB

- Document structure matches MongoDB document model exactly
- Nested queries: `db.episodes.find({"content.detected_guests": "John"})`
- Indexing: Can index nested fields directly
- **Document ID**: Use `episode.episode_id` as `_id` field: `db.episodes.insertOne({_id: doc.episode.episode_id, ...doc})`
- No transformation needed

#### Elasticsearch

- JSON is native document format
- Nested objects can be mapped as nested type for better querying
- Arrays are automatically handled
- Full-text search on text fields (title, description)
- **Document ID**: Use `episode.episode_id` as document `_id` in bulk operations

#### ClickHouse

- JSON column type supports nested field access
- Can query nested fields: `metadata.content.detected_guests`
- Can create materialized columns for frequently queried nested fields
- Supports JSONEachRow format for bulk loading
- **Primary Key**: Use `metadata.episode.episode_id` in ORDER BY clause or as primary key

## Backwards Compatibility

- Feature is opt-in (default `false`), so existing workflows unaffected
- When disabled, zero performance impact
- Metadata files are additive (don't modify existing transcript files)
- Schema versioning allows future evolution without breaking consumers

## Performance Considerations

- Metadata generation should add <5% overhead
- Use efficient JSON/YAML serialization (Pydantic's built-in methods)
- Lazy evaluation: only generate when `generate_metadata=True`
- Batch file writes if possible (though per-episode is fine)

## Alternatives Considered

### SQLite Database

- **Pros**: Queryable, efficient for large datasets
- **Cons**: Additional dependency, harder to version control, overkill for current use case
- **Decision**: Rejected in favor of file-based approach for simplicity

### Single Metadata File Per Feed

- **Pros**: Single file to manage
- **Cons**: Harder to update incrementally, larger files, concurrency issues
- **Decision**: Rejected in favor of per-episode files for flexibility

### Metadata in Transcript Files

- **Pros**: Single file per episode
- **Cons**: Mixes content and metadata, harder to parse programmatically
- **Decision**: Rejected in favor of separate metadata files

## Rollout Plan

1. Create PRD-004 and RFC-011 documents
2. Review with stakeholders
3. Implement metadata generation module
4. Integrate into workflow
5. Add tests
6. Update documentation
7. Release as opt-in feature
8. Collect user feedback
9. Consider making default in future release

## Database Loading Examples

### Example: PostgreSQL JSONB

```sql

-- Create table
CREATE TABLE episode_metadata (
    id SERIAL PRIMARY KEY,
    episode_guid VARCHAR(255) UNIQUE,
    metadata JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

-- Create GIN index for fast queries
CREATE INDEX idx_metadata_gin ON episode_metadata USING GIN (metadata);

-- Load single file (using episode_id from metadata)
INSERT INTO episode_metadata (episode_id, metadata)
SELECT
    metadata->'episode'->>'episode_id' as episode_id,
    metadata::jsonb
FROM (SELECT '{"feed": {...}, "episode": {"episode_id": "sha256:abc123..."}, ...}'::jsonb as metadata) as m
ON CONFLICT (episode_id) DO UPDATE SET metadata = EXCLUDED.metadata;

-- Load from file (using COPY or external tool)
-- Note: PostgreSQL COPY doesn't support JSONB directly, use psql or application code

-- Query examples
SELECT metadata->'episode'->>'title' as title
FROM episode_metadata
WHERE metadata->'episode'->>'published_date' > '2025-01-01';

SELECT * FROM episode_metadata
WHERE metadata @> '{"content": {"detected_guests": ["John Doe"]}}';

SELECT metadata->'content'->'detected_hosts' as hosts
FROM episode_metadata
WHERE jsonb_array_length(metadata->'content'->'detected_hosts') > 0;

```javascript

// Load single document (using episode_id as _id)
const doc = {
  feed: {...},
  episode: {episode_id: "sha256:abc123...", ...},
  content: {...},
  processing: {...}
};
db.episodes.insertOne({_id: doc.episode.episode_id, ...doc});

// Bulk load from JSON file
db.episodes.insertMany(JSON.parse(fs.readFileSync('/path/to/metadata.json', 'utf8')));

// Create indexes
db.episodes.createIndex({"episode.published_date": 1});
db.episodes.createIndex({"content.detected_guests": 1});
db.episodes.createIndex({"content.detected_hosts": 1});

// Query examples
db.episodes.find({"content.detected_guests": "John Doe"});
db.episodes.find({"episode.published_date": {"$gte": "2025-01-01"}});
db.episodes.find({"content.detected_hosts": {"$in": ["Jane Host"]}});

```text

# Create index with mapping

PUT /episodes
{
  "mappings": {
    "properties": {
      "feed": {"type": "object"},
      "episode": {
        "type": "object",
        "properties": {
          "title": {"type": "text"},
          "published_date": {"type": "date"},
          "description": {"type": "text"}
        }
      },
      "content": {
        "type": "object",
        "properties": {
          "detected_hosts": {"type": "keyword"},
          "detected_guests": {"type": "keyword"}
        }
      }
    }
  }
}

# Bulk load metadata files (using episode_id as document _id)

# Format: {"index": {"_id": "episode_id"}}\n{"feed": {...}, "episode": {...}, ...}\n

curl -X POST "localhost:9200/episodes/_bulk" \
  -H 'Content-Type: application/x-ndjson' \
  --data-binary @metadata_bulk.json

# Query examples

GET /episodes/_search
{
  "query": {
    "match": {
      "content.detected_guests": "John Doe"
    }
  }
}

GET /episodes/_search
{
  "query": {
    "range": {
      "episode.published_date": {
        "gte": "2025-01-01"
      }
    }
  }
}

```text

-- Create table with JSON column, using episode_id as ordering key
CREATE TABLE episode_metadata (
    metadata JSON
) ENGINE = MergeTree()
ORDER BY (metadata.episode.episode_id);

-- Load from JSON file (JSONEachRow format)
INSERT INTO episode_metadata
SELECT * FROM file('/path/to/metadata.json', JSONEachRow);

-- Create materialized columns for frequently queried fields
ALTER TABLE episode_metadata
ADD COLUMN episode_title String MATERIALIZED metadata.episode.title,
ADD COLUMN published_date Date MATERIALIZED toDate(metadata.episode.published_date);

-- Query examples
SELECT metadata.episode.title
FROM episode_metadata
WHERE has(metadata.content.detected_guests, 'John Doe');

SELECT * FROM episode_metadata
WHERE toDate(metadata.episode.published_date) >= '2025-01-01';

```text

- Should metadata support incremental updates? (Decision: Regenerate on each run)
- Should metadata include transcript excerpts? (Decision: No, transcripts are separate files)
- Do we need database-specific format variations? (Decision: No, unified JSON with snake_case and ISO 8601 dates works universally across all target databases)

## References

- PRD-004: Per-Episode Metadata Document Generation
- RFC-010: Automatic Speaker Name Detection
- RFC-004: Filesystem Layout & Run Management
- RFC-001: Workflow Orchestration
- Pydantic documentation: <https://docs.pydantic.dev/>
- JSON Schema: <https://json-schema.org/>