Skip to content

create_db

gffbase.create_db.create_db

create_db(data, dbfn, *, id_spec=None, force: bool = False, verbose: bool = False, checklines: int = 10, from_string: bool = False, force_gff: bool = False, force_dialect_check: bool = False, merge_strategy: str = 'error', force_merge_fields=None, transform=None, gtf_transcript_key: str = 'transcript_id', gtf_gene_key: str = 'gene_id', gtf_subfeature: str = 'exon', disable_infer_genes: bool = False, disable_infer_transcripts: bool = False, infer_gene_extent: bool = True, keep_order: bool = False, text_factory=str, pragmas: Optional[dict] = None, sort_attribute_values: bool = False, dialect: Optional[dict] = None, _keep_tempfiles: bool = False, **kwargs) -> FeatureDB

Create a database from a GFF3/GTF source.

For Phase 5 the actively-honored kwargs are: data, dbfn, force, checklines, from_string, disable_infer_genes, disable_infer_transcripts, gtf_subfeature. The rest are accepted for signature compatibility and will be wired up in Phase 6.

Source code in python/gffbase/create_db.py
def create_db(
    data,
    dbfn,
    *,
    id_spec=None,
    force: bool = False,
    verbose: bool = False,
    checklines: int = 10,
    from_string: bool = False,
    force_gff: bool = False,
    force_dialect_check: bool = False,
    merge_strategy: str = "error",
    force_merge_fields=None,
    transform=None,
    gtf_transcript_key: str = "transcript_id",
    gtf_gene_key: str = "gene_id",
    gtf_subfeature: str = "exon",
    disable_infer_genes: bool = False,
    disable_infer_transcripts: bool = False,
    infer_gene_extent: bool = True,
    keep_order: bool = False,
    text_factory=str,
    pragmas: Optional[dict] = None,
    sort_attribute_values: bool = False,
    dialect: Optional[dict] = None,
    _keep_tempfiles: bool = False,
    **kwargs,
) -> FeatureDB:
    """Create a database from a GFF3/GTF source.

    For Phase 5 the actively-honored kwargs are: ``data``, ``dbfn``, ``force``,
    ``checklines``, ``from_string``, ``disable_infer_genes``,
    ``disable_infer_transcripts``, ``gtf_subfeature``. The rest are accepted
    for signature compatibility and will be wired up in Phase 6.
    """
    cleanup_path: Optional[str] = None
    if from_string:
        # Materialize to a temp file so the parser can mmap-style read it.
        tmp = tempfile.NamedTemporaryFile(
            mode="w", suffix=".gff3", delete=False, encoding="utf-8"
        )
        tmp.write(data)
        tmp.close()
        path = tmp.name
        cleanup_path = path
    else:
        path = data

    try:
        con, stats = _ingest.from_file(
            path,
            dbfn=dbfn,
            force=force,
            disable_infer_genes=disable_infer_genes,
            disable_infer_transcripts=disable_infer_transcripts,
            gtf_subfeature=gtf_subfeature,
        )
    finally:
        if cleanup_path and not _keep_tempfiles:
            try:
                os.unlink(cleanup_path)
            except OSError:
                pass

    db = FeatureDB((con, stats), keep_order=keep_order,
                   sort_attribute_values=sort_attribute_values,
                   text_factory=text_factory, pragmas=pragmas)
    return db

gffbase.ingest.from_file

Lower-level ingestion entrypoint used by create_db. Returns the raw (duckdb.Connection, IngestStats) pair.

gffbase.ingest.from_file

from_file(path: str, dbfn: str = ':memory:', *, force: bool = False, batch_size: int = DEFAULT_BATCH_SIZE, max_depth: int = DEFAULT_MAX_DEPTH, disable_infer_genes: bool = False, disable_infer_transcripts: bool = False, gtf_subfeature: str = 'exon', engine: Optional[str] = 'auto', build_rtree: bool = True) -> Tuple[duckdb.DuckDBPyConnection, IngestStats]

Ingest a GFF3 or GTF file into a DuckDB database.

Returns the open connection plus an IngestStats summary. The connection is the canonical handle the (Phase 5) FeatureDB will wrap.

Source code in python/gffbase/ingest.py
def from_file(
    path: str,
    dbfn: str = ":memory:",
    *,
    force: bool = False,
    batch_size: int = DEFAULT_BATCH_SIZE,
    max_depth: int = DEFAULT_MAX_DEPTH,
    disable_infer_genes: bool = False,
    disable_infer_transcripts: bool = False,
    gtf_subfeature: str = "exon",
    engine: Optional[str] = "auto",
    build_rtree: bool = True,
) -> Tuple[duckdb.DuckDBPyConnection, IngestStats]:
    """Ingest a GFF3 or GTF file into a DuckDB database.

    Returns the open connection plus an `IngestStats` summary. The connection
    is the canonical handle the (Phase 5) `FeatureDB` will wrap.
    """
    if dbfn != ":memory:":
        if os.path.exists(dbfn) and not force:
            raise ValueError(
                f"{dbfn} already exists. Pass force=True to overwrite."
            )
        if os.path.exists(dbfn) and force:
            os.unlink(dbfn)

    con = duckdb.connect(dbfn)
    _apply_pragmas(con)
    con.execute(DDL)

    # Phase 19: load the spatial extension UPFRONT (was: lazy after bulk
    # load). This lets us widen the `features` schema to include `bbox`
    # and stamp the R-tree envelope inline during the Arrow batch INSERT,
    # eliminating two full-table UPDATE passes that used to dominate
    # ingest wall time.
    has_spatial = (
        build_rtree
        and not _rtree_disabled_by_env()
        and _try_load_spatial(con)
    )
    if has_spatial:
        con.execute("ALTER TABLE features ADD COLUMN IF NOT EXISTS bbox GEOMETRY")

    # Drive the parser.
    it = _parser.parse_gff(path, engine=engine)
    seqid_to_y: dict = {}
    builder = _ArrowBatchBuilder(seqid_to_y, has_spatial=has_spatial)
    autoinc: dict = {}
    # NCBI RefSeq emits multiple GFF3 rows that share an `ID=cds-…` (a CDS
    # is "split" across exon-segments). Our schema has `id` as a primary
    # key, so we mimic gffutils' `merge_strategy="create_unique"`: track
    # how many times we've seen each base id and append `__N` (N >= 2)
    # when needed. The first occurrence keeps the bare id.
    id_counts: dict = {}
    duplicate_pairs: list = []  # (base_id, new_id)
    file_order = 0
    n_raw = 0
    # Resolve the dialect format ONCE — calling `it.dialect()` per record
    # is a Rust↔Python boundary cost we shouldn't pay 5 M times. The
    # parser commits to a dialect during the peek phase, so the value is
    # stable from the first yielded record onward.
    _fmt_cache: Optional[str] = None

    for feat in it:
        file_order += 1
        n_raw += 1
        if _fmt_cache is None:
            _fmt_cache = _dialect_fmt_safe(it)
        fid = _derive_id(feat, _fmt_cache, autoinc)
        seen = id_counts.get(fid, 0)
        if seen:
            new_fid = f"{fid}__{seen + 1}"
            duplicate_pairs.append((fid, new_fid))
            id_counts[fid] = seen + 1
            fid = new_fid
        else:
            id_counts[fid] = 1
        builder.append(fid, feat, file_order)
        if len(builder) >= batch_size:
            builder.flush_into(con)
    builder.flush_into(con)

    # Record duplicate-id remappings (informational; the schema already has
    # this table — Phase 5).
    if duplicate_pairs:
        dup_tbl = pa.table({
            "original_id": [b for b, _ in duplicate_pairs],
            "new_id":      [n for _, n in duplicate_pairs],
        })
        con.register("__staging_dups", dup_tbl)
        con.execute(
            "INSERT INTO duplicates (original_id, new_id) "
            "SELECT original_id, new_id FROM __staging_dups"
        )
        con.unregister("__staging_dups")

    dialect = it.dialect()
    directives = list(it.directives())
    fmt = (dialect or {}).get("fmt", "gff3")

    # Persist directives in one shot (set-based).
    if directives:
        dir_table = pa.table({"directive": directives})
        con.register("__staging_directives", dir_table)
        con.execute("INSERT INTO directives (directive) SELECT directive FROM __staging_directives")
        con.unregister("__staging_directives")

    # Set-based normalization passes.
    n_synth_t = 0
    n_synth_g = 0

    if fmt == "gtf":
        if not disable_infer_transcripts:
            n_synth_t = _synthesize_transcripts(con, gtf_subfeature)
        if not disable_infer_genes:
            n_synth_g = _synthesize_genes(con, gtf_subfeature)
        con.execute(EDGES_FROM_GTF)
        # GTF synthesis inserts new rows without seqid_y / bbox set. Patch
        # them up in a single targeted UPDATE (touches only synthesized
        # rows; ~4-9 % of features at GENCODE scale).
        if has_spatial:
            con.execute(
                "UPDATE features "
                "SET seqid_y = m.seqid_y, "
                "    bbox = ST_MakeEnvelope("
                "        features.start, m.seqid_y, "
                "        features.\"end\", m.seqid_y + 1) "
                "FROM seqid_map m "
                "WHERE features.seqid = m.seqid AND features.seqid_y IS NULL"
            )
        else:
            con.execute(
                "UPDATE features SET seqid_y = m.seqid_y "
                "FROM seqid_map m "
                "WHERE features.seqid = m.seqid AND features.seqid_y IS NULL"
            )
    else:
        con.execute(EDGES_FROM_PARENT)

    # Closure via recursive CTE.
    con.execute(CLOSURE_RECURSIVE_CTE, [max_depth])

    # Indexes — only after all data is materialized.
    con.execute(POST_LOAD_INDEXES)

    # Optional R-tree. Phase 19: when spatial is loaded, this is now a
    # single CREATE INDEX over the bbox column we already populated
    # inline during the Arrow batch INSERTs (no UPDATE pass).
    rtree_built = False
    if has_spatial:
        rtree_built = _finalize_rtree(con, seqid_to_y)

    # SQLite-compat views (must run after closure has been populated).
    con.execute(COMPAT_VIEWS_SQL)

    # Stats.
    n_attributes = con.execute("SELECT COUNT(*) FROM attributes").fetchone()[0]
    n_edges = con.execute("SELECT COUNT(*) FROM edges").fetchone()[0]
    n_closure = con.execute("SELECT COUNT(*) FROM closure").fetchone()[0]

    # Meta — record dialect, fmt, and the rtree availability so a re-opened
    # DB can route queries correctly without probing.
    _write_meta(con, dialect, fmt, rtree_built=rtree_built, max_depth=max_depth)

    return con, IngestStats(
        n_features_raw=n_raw,
        n_features_synthetic_transcripts=n_synth_t,
        n_features_synthetic_genes=n_synth_g,
        n_attributes=n_attributes,
        n_edges=n_edges,
        n_closure_rows=n_closure,
        rtree_built=rtree_built,
        fmt=fmt,
        dialect=dialect,
        directives=directives,
    )

gffbase.ingest.IngestStats

gffbase.ingest.IngestStats dataclass

IngestStats(n_features_raw: int = 0, n_features_synthetic_transcripts: int = 0, n_features_synthetic_genes: int = 0, n_attributes: int = 0, n_edges: int = 0, n_closure_rows: int = 0, rtree_built: bool = False, fmt: str = 'gff3', dialect: dict = None, directives: List[str] = None)

Reported back to the caller for benchmarking and tests.