Skip to content

Core API

This is the primary public API for podcast_scraper. Use these functions for programmatic access.

Quick Start

import podcast_scraper

# Create configuration
cfg = podcast_scraper.Config(
    rss="https://example.com/feed.xml",
    output_dir="./transcripts",
    max_episodes=10
)

# Run the pipeline
count, summary = podcast_scraper.run_pipeline(cfg)
print(f"Downloaded {count} transcripts: {summary}")

API Reference

run_pipeline

run_pipeline(cfg: Config) -> Tuple[int, str]

Execute the main podcast scraping pipeline.

This is the primary entry point for programmatic use of podcast_scraper. It orchestrates the complete workflow from RSS feed fetching to transcript generation and optional metadata/summarization.

The pipeline executes the following stages:

  1. Setup output directory (with optional run ID subdirectory)
  2. Fetch and parse RSS feed
  3. Detect speakers (if auto-detection enabled)
  4. Process episodes concurrently:
  5. Download published transcripts
  6. Or queue media for Whisper transcription
  7. Transcribe queued media files sequentially (if Whisper enabled)
  8. Generate metadata documents (if enabled)
  9. Generate episode summaries (if enabled)
  10. Clean up temporary files

Parameters:

Name Type Description Default
cfg Config

Configuration object with all pipeline settings. See Config for available options. Download resilience (HTTP/RSS urllib3 retries, optional episode-level retries, and optional Issue #522 fair-HTTP fields) is controlled via http_retry_*, rss_retry_*, episode_retry_*, host_*, circuit_breaker_*, rss_conditional_get, and rss_cache_dir (defaults and CLI flags are documented in CONFIGURATION.md / CLI.md).

required

Returns:

Type Description
Tuple[int, str]

Tuple[int, str]: A tuple containing:

  • count (int): Number of episodes processed (transcripts saved or planned)
  • summary (str): Human-readable summary message describing the run

Raises:

Type Description
RuntimeError

If output directory cleanup fails when clean_output=True

ValueError

If RSS URL is invalid or feed cannot be parsed

FileNotFoundError

If configuration file references missing files

OSError

If file system operations fail

Example

from podcast_scraper import Config, run_pipeline

cfg = Config( ... rss="https://example.com/feed.xml", ... output_dir="./transcripts", ... max_episodes=10 ... ) count, summary = run_pipeline(cfg) print(f"Downloaded {count} transcripts: {summary}") Downloaded 10 transcripts: Processed 10/50 episodes

Example with Whisper transcription

cfg = Config( ... rss="https://example.com/feed.xml", ... transcribe_missing=True, ... whisper_model="base", ... screenplay=True, ... num_speakers=2 ... ) count, summary = run_pipeline(cfg)

Note

For non-interactive use (daemons, services), consider using the service.run() function instead, which provides structured error handling and return values.

See Also
  • Config: Configuration model with all available options
  • service.run(): Service API with structured error handling
  • load_config_file(): Load configuration from JSON/YAML file
Source code in src/podcast_scraper/workflow/orchestration.py
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
2339
2340
2341
2342
2343
2344
2345
2346
2347
2348
2349
2350
2351
2352
2353
2354
2355
2356
2357
2358
2359
2360
2361
def run_pipeline(cfg: config.Config) -> Tuple[int, str]:
    """Execute the main podcast scraping pipeline.

    This is the primary entry point for programmatic use of podcast_scraper. It orchestrates
    the complete workflow from RSS feed fetching to transcript generation and optional
    metadata/summarization.

    The pipeline executes the following stages:

    1. Setup output directory (with optional run ID subdirectory)
    2. Fetch and parse RSS feed
    3. Detect speakers (if auto-detection enabled)
    4. Process episodes concurrently:
       - Download published transcripts
       - Or queue media for Whisper transcription
    5. Transcribe queued media files sequentially (if Whisper enabled)
    6. Generate metadata documents (if enabled)
    7. Generate episode summaries (if enabled)
    8. Clean up temporary files

    Args:
        cfg: Configuration object with all pipeline settings. See `Config` for available options.
            Download resilience (HTTP/RSS urllib3 retries, optional episode-level retries,
            and optional Issue #522 fair-HTTP fields) is controlled via ``http_retry_*``,
            ``rss_retry_*``, ``episode_retry_*``, ``host_*``, ``circuit_breaker_*``,
            ``rss_conditional_get``, and ``rss_cache_dir`` (defaults and CLI flags are
            documented in CONFIGURATION.md / CLI.md).

    Returns:
        Tuple[int, str]: A tuple containing:

            - count (int): Number of episodes processed (transcripts saved or planned)
            - summary (str): Human-readable summary message describing the run

    Raises:
        RuntimeError: If output directory cleanup fails when `clean_output=True`
        ValueError: If RSS URL is invalid or feed cannot be parsed
        FileNotFoundError: If configuration file references missing files
        OSError: If file system operations fail

    Example:
        >>> from podcast_scraper import Config, run_pipeline
        >>>
        >>> cfg = Config(
        ...     rss="https://example.com/feed.xml",
        ...     output_dir="./transcripts",
        ...     max_episodes=10
        ... )
        >>> count, summary = run_pipeline(cfg)
        >>> print(f"Downloaded {count} transcripts: {summary}")
        Downloaded 10 transcripts: Processed 10/50 episodes

    Example with Whisper transcription:
        >>> cfg = Config(
        ...     rss="https://example.com/feed.xml",
        ...     transcribe_missing=True,
        ...     whisper_model="base",
        ...     screenplay=True,
        ...     num_speakers=2
        ... )
        >>> count, summary = run_pipeline(cfg)

    Example with metadata and summaries:
        >>> cfg = Config(
        ...     rss="https://example.com/feed.xml",
        ...     generate_metadata=True,
        ...     generate_summaries=True
        ... )
        >>> count, summary = run_pipeline(cfg)

    Note:
        For non-interactive use (daemons, services), consider using the `service.run()`
        function instead, which provides structured error handling and return values.

    See Also:
        - `Config`: Configuration model with all available options
        - `service.run()`: Service API with structured error handling
        - `load_config_file()`: Load configuration from JSON/YAML file
    """
    # GitHub #562: reset gates before setup (setup is outside try/finally below).
    try:
        config.reset_screenplay_issue_562_gates()
    except Exception:  # pragma: no cover - defensive import/cleanup
        logger.debug("reset_screenplay_issue_562_gates (startup) failed", exc_info=True)

    # Step 1: Setup pipeline environment
    effective_output_dir, run_suffix, full_config_string, pipeline_metrics = (
        _setup_pipeline_environment(cfg)
    )

    # GitHub #557: structured incident log (episode/feed scope); default beside run artifacts.
    if not (cfg.incident_log_path or "").strip():
        cfg = cfg.model_copy(
            update={
                "incident_log_path": str(Path(effective_output_dir) / "corpus_incidents.jsonl"),
            }
        )

    monitor_proc: Optional[Any] = None
    py_spy_stop: Optional[Callable[[], None]] = None
    if cfg.monitor:
        from ..monitor.py_spy_listener import start_py_spy_stdin_listener
        from ..monitor.runner import start_monitor_subprocess

        monitor_proc = start_monitor_subprocess(
            pipeline_pid=os.getpid(),
            output_dir=effective_output_dir,
        )
        py_spy_stop = start_py_spy_stdin_listener(
            output_dir=effective_output_dir,
            enabled=True,
        )

    try:
        from ..gi.deps import validate_gil_grounding_dependencies

        validate_gil_grounding_dependencies(cfg)

        # Initialize JSONL emitter if enabled
        jsonl_emitter = _setup_jsonl_emitter(cfg, effective_output_dir, pipeline_metrics)

        # Step 1.5: Preload ML models if configured
        wf_stages.setup.preload_ml_models_if_needed(cfg)

        # Step 1.6: Create all providers once (singleton pattern per run)
        # Providers are created here and passed to stages to avoid redundant initialization
        transcription_provider, speaker_detector, summary_provider = _create_all_providers(cfg)

        # Step 1.7-1.8: Setup logging and device tracking
        _setup_logging_and_devices(
            cfg, transcription_provider, speaker_detector, summary_provider, pipeline_metrics
        )

        # Step 1.5: Create run manifest
        run_manifest = _create_run_manifest(cfg, effective_output_dir)

        # Step 2-4: Fetch and prepare episodes
        maybe_update_pipeline_status(cfg, effective_output_dir, stage="rss_feed_fetch")
        feed, rss_bytes, feed_metadata, episodes = _fetch_and_prepare_episodes(
            cfg, pipeline_metrics
        )

        # Step 5-6.5: Setup pipeline resources
        normalizing_start, host_detection_result, transcription_resources, processing_resources = (
            _setup_pipeline_resources(
                cfg,
                feed,
                episodes,
                effective_output_dir,
                transcription_provider,
                speaker_detector,
                pipeline_metrics,
            )
        )

        # Wrap processing + finalize: JSONL must stay open until _finalize_pipeline
        # calls emit_run_finished (see _finalize_emit_and_save). Closing the emitter in
        # the inner finally was too early and broke run_finished emission.
        interim_checkpoint_manager = _InterimCheckpointManager(
            cfg=cfg,
            output_dir=effective_output_dir,
            pipeline_metrics=pipeline_metrics,
        )
        try:
            try:
                saved = _process_episodes_with_threading(
                    cfg=cfg,
                    episodes=episodes,
                    feed=feed,
                    effective_output_dir=effective_output_dir,
                    run_suffix=run_suffix,
                    feed_metadata=feed_metadata,
                    host_detection_result=host_detection_result,
                    transcription_resources=transcription_resources,
                    processing_resources=processing_resources,
                    pipeline_metrics=pipeline_metrics,
                    summary_provider=summary_provider,
                    transcription_provider=transcription_provider,
                    normalizing_start=normalizing_start,
                    interim_checkpoint_manager=interim_checkpoint_manager,
                )

            finally:
                interim_checkpoint_manager.stop()
                # Step 9.5: Unload models to free memory
                _cleanup_providers(transcription_resources, summary_provider)

            # Step 10-15: Finalize pipeline (metrics, JSONL run_finished, index, …)
            result = _finalize_pipeline(
                cfg=cfg,
                saved=saved,
                transcription_resources=transcription_resources,
                effective_output_dir=effective_output_dir,
                run_suffix=run_suffix,
                pipeline_metrics=pipeline_metrics,
                episodes=episodes,
                jsonl_emitter=jsonl_emitter,
                run_manifest=run_manifest,
                summary_provider=summary_provider,
                transcription_provider=transcription_provider,
            )
        except BaseException:
            if jsonl_emitter is not None:
                try:
                    jsonl_emitter.__exit__(None, None, None)
                except Exception:
                    pass
            raise

        maybe_update_pipeline_status(cfg, effective_output_dir, stage="done")
        return result
    finally:
        # GitHub #562: allow coercion INFO + screenplay warnings on the next Config / run.
        try:
            config.reset_screenplay_issue_562_gates()
        except Exception:  # pragma: no cover - defensive import/cleanup
            logger.debug("reset_screenplay_issue_562_gates failed", exc_info=True)
        if py_spy_stop is not None:
            py_spy_stop()
        if monitor_proc is not None:
            monitor_proc.join(timeout=30)
            if monitor_proc.is_alive():
                monitor_proc.terminate()
                monitor_proc.join(timeout=5)

load_config_file

load_config_file(path: str) -> Dict[str, Any]

Load configuration from a JSON or YAML file.

This function reads a configuration file and returns a dictionary of configuration values. The file format is auto-detected from the file extension (.json, .yaml, or .yml).

The returned dictionary can be unpacked into the Config constructor to create a configuration object.

Parameters:

Name Type Description Default
path str

Path to configuration file (JSON or YAML). Supports tilde expansion for home directory (e.g., "~/config.yaml").

required

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary containing configuration values from the file. Keys correspond to Config field names (using aliases where applicable).

Raises:

Type Description
ValueError

If any of the following occur:

  • Config path is empty
  • Config file does not exist
  • File format is invalid (not JSON or YAML)
  • JSON parsing fails
  • YAML parsing fails
OSError

If file cannot be read due to permissions or I/O errors

Example

from podcast_scraper import Config, load_config_file, run_pipeline

Load from YAML file

config_dict = load_config_file("config.yaml") cfg = Config(**config_dict) count, summary = run_pipeline(cfg)

Example with JSON

config_dict = load_config_file("config.json") cfg = Config(**config_dict)

Example with direct usage

from podcast_scraper import load_config_file, service

Service API provides load_config_file convenience

result = service.run_from_config_file("config.yaml")

Supported Formats

JSON (.json):

{
  "rss": "https://example.com/feed.xml",
  "output_dir": "./transcripts",
  "max_episodes": 50
}

YAML (.yaml, .yml):

rss: https://example.com/feed.xml
output_dir: ./transcripts
max_episodes: 50
Note
  • Field aliases are supported (e.g., both "rss" and "rss_url" work)
  • See Config documentation for all available configuration options
  • Configuration files should not contain sensitive data (API keys, passwords)
See Also
  • Config: Configuration model and field documentation
  • service.run_from_config_file(): Direct service API from config file
  • Configuration examples: config/examples/config.example.json, config/examples/config.example.yaml
Source code in src/podcast_scraper/config.py
4726
4727
4728
4729
4730
4731
4732
4733
4734
4735
4736
4737
4738
4739
4740
4741
4742
4743
4744
4745
4746
4747
4748
4749
4750
4751
4752
4753
4754
4755
4756
4757
4758
4759
4760
4761
4762
4763
4764
4765
4766
4767
4768
4769
4770
4771
4772
4773
4774
4775
4776
4777
4778
4779
4780
4781
4782
4783
4784
4785
4786
4787
4788
4789
4790
4791
4792
4793
4794
4795
4796
4797
4798
4799
4800
4801
4802
4803
4804
4805
4806
4807
4808
4809
4810
4811
4812
4813
4814
4815
4816
4817
4818
4819
4820
4821
4822
4823
4824
4825
4826
4827
4828
4829
4830
4831
4832
4833
4834
def load_config_file(
    path: str,
) -> Dict[str, Any]:  # noqa: C901 - file parsing handles multiple formats
    """Load configuration from a JSON or YAML file.

    This function reads a configuration file and returns a dictionary of configuration values.
    The file format is auto-detected from the file extension (`.json`, `.yaml`, or `.yml`).

    The returned dictionary can be unpacked into the `Config` constructor to create a
    configuration object.

    Args:
        path: Path to configuration file (JSON or YAML). Supports tilde expansion for
              home directory (e.g., "~/config.yaml").

    Returns:
        Dict[str, Any]: Dictionary containing configuration values from the file.
            Keys correspond to `Config` field names (using aliases where applicable).

    Raises:
        ValueError: If any of the following occur:

            - Config path is empty
            - Config file does not exist
            - File format is invalid (not JSON or YAML)
            - JSON parsing fails
            - YAML parsing fails

        OSError: If file cannot be read due to permissions or I/O errors

    Example:
        >>> from podcast_scraper import Config, load_config_file, run_pipeline
        >>>
        >>> # Load from YAML file
        >>> config_dict = load_config_file("config.yaml")
        >>> cfg = Config(**config_dict)
        >>> count, summary = run_pipeline(cfg)

    Example with JSON:
        >>> config_dict = load_config_file("config.json")
        >>> cfg = Config(**config_dict)

    Example with direct usage:
        >>> from podcast_scraper import load_config_file, service
        >>>
        >>> # Service API provides load_config_file convenience
        >>> result = service.run_from_config_file("config.yaml")

    Supported Formats:
        **JSON** (`.json`):

            {
              "rss": "https://example.com/feed.xml",
              "output_dir": "./transcripts",
              "max_episodes": 50
            }

        **YAML** (`.yaml`, `.yml`):

            rss: https://example.com/feed.xml
            output_dir: ./transcripts
            max_episodes: 50

    Note:
        - Field aliases are supported (e.g., both "rss" and "rss_url" work)
        - See `Config` documentation for all available configuration options
        - Configuration files should not contain sensitive data (API keys, passwords)

    See Also:
        - `Config`: Configuration model and field documentation
        - `service.run_from_config_file()`: Direct service API from config file
        - Configuration examples: `config/examples/config.example.json`,
          `config/examples/config.example.yaml`
    """
    if not path:
        raise ValueError("Config path cannot be empty")

    cfg_path = Path(path).expanduser()
    try:
        resolved = cfg_path.resolve()
    except (OSError, RuntimeError) as exc:
        raise ValueError(f"Invalid config path: {path} ({exc})") from exc

    if not resolved.exists():
        raise ValueError(f"Config file not found: {resolved}")

    suffix = resolved.suffix.lower()
    try:
        text = resolved.read_text(encoding="utf-8")
    except OSError as exc:
        raise ValueError(f"Failed to read config file {resolved}: {exc}") from exc

    if suffix == ".json":
        try:
            data = json.loads(text)
        except json.JSONDecodeError as exc:
            raise ValueError(f"Invalid JSON config file {resolved}: {exc}") from exc
    elif suffix in (".yaml", ".yml"):
        try:
            data = yaml.safe_load(text)
        except yaml.YAMLError as exc:  # type: ignore[attr-defined]
            raise ValueError(f"Invalid YAML config file {resolved}: {exc}") from exc
    else:
        raise ValueError(f"Unsupported config file type: {resolved.suffix}")

    if not isinstance(data, dict):
        raise ValueError("Config file must contain a mapping/object at the top level")

    return data

Package Information

Versioning

__version__ module-attribute

__version__ = '2.6.0'

__api_version__ module-attribute

__api_version__ = __version__