Перейти к содержанию

Репозитории (Repositories)

Слой доступа к данным.

app.repositories.parsing.ParsingRepository

Source code in app/repositories/parsing.py
class ParsingRepository:
    def __init__(self, session: AsyncSession):
        self.session = session

    async def get_due_sources(self, limit: int = 10) -> Sequence[ParsingSource]:
        stmt = (
            select(ParsingSource)
            .where(
                and_(
                    ParsingSource.is_active.is_(True),
                    ParsingSource.next_sync_at <= func.now()
                )
            )
            .order_by(ParsingSource.priority.desc(), ParsingSource.next_sync_at.asc())
            .limit(limit)
            .with_for_update(skip_locked=True)
        )
        result = await self.session.execute(stmt)
        return result.scalars().all()

    async def update_source_stats(self, source_id: int, stats: dict):
        # Basic update logic, can be made smarter later (Smart Scheduling)
        stmt = select(ParsingSource).where(ParsingSource.id == source_id)
        result = await self.session.execute(stmt)
        source = result.scalar_one_or_none()

        if source:
            source.last_synced_at = func.now()
            # Default next sync based on fixed interval
            source.next_sync_at = datetime.now() + timedelta(hours=source.refresh_interval_hours)
            # We could store stats in a JSON field if needed
            if source.config is None:
                source.config = {}
            source.config["last_stats"] = stats

        await self.session.commit()

    async def get_or_create_category_maps(self, names: List[str]) -> List[CategoryMap]:
        if not names:
            return []

        # Bulk get existing
        stmt = select(CategoryMap).where(CategoryMap.external_name.in_(names))
        result = await self.session.execute(stmt)
        existing = {m.external_name: m for m in result.scalars().all()}

        new_names = [n for n in names if n not in existing]
        if new_names:
            # Bulk insert new ones
            insert_stmt = insert(CategoryMap).values([
                {"external_name": name, "internal_category_id": None}
                for name in new_names
            ]).on_conflict_do_nothing()
            await self.session.execute(insert_stmt)

            # Fetch again to get all (including newly created)
            stmt = select(CategoryMap).where(CategoryMap.external_name.in_(names))
            result = await self.session.execute(stmt)
            return list(result.scalars().all())

        return list(existing.values())

    async def get_unmapped_categories(self, limit: int = 100) -> Sequence[CategoryMap]:
        """Возвращает категории, у которых еще нет привязки к внутренней категории Gifty."""
        stmt = (
            select(CategoryMap)
            .where(CategoryMap.internal_category_id.is_(None))
            .limit(limit)
        )
        result = await self.session.execute(stmt)
        return result.scalars().all()

    async def update_category_mappings(self, mappings: List[dict]) -> int:
        """
        Массово обновляет привязки внешних категорий к внутренним.
        mappings: [{"external_name": "...", "internal_category_id": 123}, ...]
        """
        if not mappings:
            return 0

        count = 0
        for m in mappings:
            stmt = (
                update(CategoryMap)
                .where(CategoryMap.external_name == m["external_name"])
                .values(internal_category_id=m["internal_category_id"])
            )
            await self.session.execute(stmt)
            count += 1

        await self.session.commit()
        return count

Functions

get_unmapped_categories(limit=100) async

Возвращает категории, у которых еще нет привязки к внутренней категории Gifty.

Source code in app/repositories/parsing.py
async def get_unmapped_categories(self, limit: int = 100) -> Sequence[CategoryMap]:
    """Возвращает категории, у которых еще нет привязки к внутренней категории Gifty."""
    stmt = (
        select(CategoryMap)
        .where(CategoryMap.internal_category_id.is_(None))
        .limit(limit)
    )
    result = await self.session.execute(stmt)
    return result.scalars().all()

update_category_mappings(mappings) async

Массово обновляет привязки внешних категорий к внутренним. mappings: [{"external_name": "...", "internal_category_id": 123}, ...]

Source code in app/repositories/parsing.py
async def update_category_mappings(self, mappings: List[dict]) -> int:
    """
    Массово обновляет привязки внешних категорий к внутренним.
    mappings: [{"external_name": "...", "internal_category_id": 123}, ...]
    """
    if not mappings:
        return 0

    count = 0
    for m in mappings:
        stmt = (
            update(CategoryMap)
            .where(CategoryMap.external_name == m["external_name"])
            .values(internal_category_id=m["internal_category_id"])
        )
        await self.session.execute(stmt)
        count += 1

    await self.session.commit()
    return count

app.repositories.catalog.PostgresCatalogRepository

Bases: CatalogRepository

Source code in app/repositories/catalog.py
class PostgresCatalogRepository(CatalogRepository):
    def __init__(self, session: AsyncSession):
        self.session = session

    async def upsert_products(self, products: list[dict]) -> int:
        if not products:
            return 0

        # Construct values for upsert.
        # We assume products list contains dicts matching Product model fields.
        stmt = insert(Product).values(products)

        # On conflict do update
        # We update everything except created_at (and gift_id obviously)
        update_dict = {
            col.name: col
            for col in stmt.excluded
            if col.name not in ("created_at", "gift_id")
        }

        stmt = stmt.on_conflict_do_update(
            index_elements=[Product.gift_id],
            set_=update_dict
        )

        result = await self.session.execute(stmt)
        return result.rowcount

    async def mark_inactive_except(self, seen_ids: set[str]) -> int:
        """
        Mark all products NOT in the provided set of gift_ids as inactive.
        Used for soft-delete during full sync.
        """
        if not seen_ids:
            # If seen_ids is empty, we don't deactivate everything 
            # as it might be a failed sync. We require at least some IDs.
            return 0

        stmt = (
            update(Product)
            .where(Product.gift_id.notin_(seen_ids))
            .where(Product.is_active.is_(True))
            .values(is_active=False, updated_at=func.now())
        )

        result = await self.session.execute(stmt)
        return result.rowcount

    async def get_active_products_count(self) -> int:
        query = select(func.count(Product.gift_id)).where(Product.is_active.is_(True))
        result = await self.session.execute(query)
        return result.scalar() or 0

    async def get_products_without_embeddings(self, model_version: str, limit: int = 100) -> list[Product]:
        """
        Fetch products that do not have an embedding for the specified model_version.
        We check if `product_embeddings` entry exists OR if content_hash doesn't match.
        """
        from sqlalchemy.orm import aliased
        from sqlalchemy import and_, or_
        from app.models import ProductEmbedding

        p = aliased(Product)
        pe = aliased(ProductEmbedding)

        stmt = (
            select(p)
            .outerjoin(pe, p.gift_id == pe.gift_id)
            .where(
                and_(
                    p.is_active.is_(True),
                    or_(
                        pe.gift_id.is_(None),
                        pe.content_hash != p.content_hash,
                        pe.model_version != model_version
                    )
                )
            )
            .limit(limit)
        )

        result = await self.session.execute(stmt)
        return list(result.scalars().all())

    async def save_embeddings(self, embeddings: list[dict]) -> int:
        """
        Upsert product embeddings.
        embeddings list should contain dicts matching ProductEmbedding model.
        """
        if not embeddings:
            return 0

        stmt = insert(ProductEmbedding).values(embeddings)

        update_dict = {
            "embedding": stmt.excluded.embedding, # This assumes we pass vector/list
            "content_hash": stmt.excluded.content_hash,
            "embedded_at": func.now(),
            "updated_at": datetime.now(),
        }

        stmt = stmt.on_conflict_do_update(
            constraint="pk_product_embeddings", # Primary key constraint name
            set_=update_dict
        )

        try:
            result = await self.session.execute(stmt)
            return result.rowcount
        except Exception as e:
            logger.error(f"Failed to upsert embeddings. Batch size: {len(embeddings)}. Error: {type(e).__name__}: {e}")
            raise e

    async def search_similar_products(self, embedding: list[float], limit: int = 10, min_similarity: float = 0.0, is_active_only: bool = True) -> list[Product]:
        # Perform vector search using cosine distance (operator <=>)
        # Note: pgvector's cosine_distance returns distance (0..2), where 0 is identical.
        # Similarity = 1 - distance.
        # However, for sorting, smaller distance = higher similarity.

        stmt = (
            select(Product)
            .join(ProductEmbedding, Product.gift_id == ProductEmbedding.gift_id)
        )

        if is_active_only:
            stmt = stmt.where(Product.is_active.is_(True))

        stmt = stmt.order_by(ProductEmbedding.embedding.cosine_distance(embedding)).limit(limit)

        result = await self.session.execute(stmt)
        return list(result.scalars().all())

    async def get_products_without_llm_score(self, limit: int = 100) -> list[Product]:
        """
        Fetch products that don't have an LLM gift score yet.
        """
        stmt = (
            select(Product)
            .where(
                and_(
                    Product.is_active.is_(True),
                    Product.llm_gift_score.is_(None)
                )
            )
            .order_by(Product.updated_at.desc()) # Or some other priority
            .limit(limit)
        )

        result = await self.session.execute(stmt)
        return list(result.scalars().all())

    async def save_llm_scores(self, scores: list[dict]) -> int:
        """
        Update product rows with LLM scores and reasoning.
        scores list should contain dicts: {'gift_id': str, 'llm_gift_score': float, 'llm_gift_reasoning': str, ...}
        """
        if not scores:
            return 0

        # We can't use insert().on_conflict_do_update easily for multiple primary keys update if we only want to update.
        # But since gift_id is PK, we can use it.
        # Using a bulk update approach or a temporary table might be faster for large batches,
        # but for small scoring batches, we can use a simpler loop or a compiled statement.

        count = 0
        for score_data in scores:
            gift_id = score_data.pop("gift_id")
            score_data["llm_scored_at"] = func.now()

            stmt = (
                update(Product)
                .where(Product.gift_id == gift_id)
                .values(**score_data)
            )
            await self.session.execute(stmt)
            count += 1

        return count

Functions

get_products_without_embeddings(model_version, limit=100) async

Fetch products that do not have an embedding for the specified model_version. We check if product_embeddings entry exists OR if content_hash doesn't match.

Source code in app/repositories/catalog.py
async def get_products_without_embeddings(self, model_version: str, limit: int = 100) -> list[Product]:
    """
    Fetch products that do not have an embedding for the specified model_version.
    We check if `product_embeddings` entry exists OR if content_hash doesn't match.
    """
    from sqlalchemy.orm import aliased
    from sqlalchemy import and_, or_
    from app.models import ProductEmbedding

    p = aliased(Product)
    pe = aliased(ProductEmbedding)

    stmt = (
        select(p)
        .outerjoin(pe, p.gift_id == pe.gift_id)
        .where(
            and_(
                p.is_active.is_(True),
                or_(
                    pe.gift_id.is_(None),
                    pe.content_hash != p.content_hash,
                    pe.model_version != model_version
                )
            )
        )
        .limit(limit)
    )

    result = await self.session.execute(stmt)
    return list(result.scalars().all())

get_products_without_llm_score(limit=100) async

Fetch products that don't have an LLM gift score yet.

Source code in app/repositories/catalog.py
async def get_products_without_llm_score(self, limit: int = 100) -> list[Product]:
    """
    Fetch products that don't have an LLM gift score yet.
    """
    stmt = (
        select(Product)
        .where(
            and_(
                Product.is_active.is_(True),
                Product.llm_gift_score.is_(None)
            )
        )
        .order_by(Product.updated_at.desc()) # Or some other priority
        .limit(limit)
    )

    result = await self.session.execute(stmt)
    return list(result.scalars().all())

mark_inactive_except(seen_ids) async

Mark all products NOT in the provided set of gift_ids as inactive. Used for soft-delete during full sync.

Source code in app/repositories/catalog.py
async def mark_inactive_except(self, seen_ids: set[str]) -> int:
    """
    Mark all products NOT in the provided set of gift_ids as inactive.
    Used for soft-delete during full sync.
    """
    if not seen_ids:
        # If seen_ids is empty, we don't deactivate everything 
        # as it might be a failed sync. We require at least some IDs.
        return 0

    stmt = (
        update(Product)
        .where(Product.gift_id.notin_(seen_ids))
        .where(Product.is_active.is_(True))
        .values(is_active=False, updated_at=func.now())
    )

    result = await self.session.execute(stmt)
    return result.rowcount

save_embeddings(embeddings) async

Upsert product embeddings. embeddings list should contain dicts matching ProductEmbedding model.

Source code in app/repositories/catalog.py
async def save_embeddings(self, embeddings: list[dict]) -> int:
    """
    Upsert product embeddings.
    embeddings list should contain dicts matching ProductEmbedding model.
    """
    if not embeddings:
        return 0

    stmt = insert(ProductEmbedding).values(embeddings)

    update_dict = {
        "embedding": stmt.excluded.embedding, # This assumes we pass vector/list
        "content_hash": stmt.excluded.content_hash,
        "embedded_at": func.now(),
        "updated_at": datetime.now(),
    }

    stmt = stmt.on_conflict_do_update(
        constraint="pk_product_embeddings", # Primary key constraint name
        set_=update_dict
    )

    try:
        result = await self.session.execute(stmt)
        return result.rowcount
    except Exception as e:
        logger.error(f"Failed to upsert embeddings. Batch size: {len(embeddings)}. Error: {type(e).__name__}: {e}")
        raise e

save_llm_scores(scores) async

Update product rows with LLM scores and reasoning. scores list should contain dicts: {'gift_id': str, 'llm_gift_score': float, 'llm_gift_reasoning': str, ...}

Source code in app/repositories/catalog.py
async def save_llm_scores(self, scores: list[dict]) -> int:
    """
    Update product rows with LLM scores and reasoning.
    scores list should contain dicts: {'gift_id': str, 'llm_gift_score': float, 'llm_gift_reasoning': str, ...}
    """
    if not scores:
        return 0

    # We can't use insert().on_conflict_do_update easily for multiple primary keys update if we only want to update.
    # But since gift_id is PK, we can use it.
    # Using a bulk update approach or a temporary table might be faster for large batches,
    # but for small scoring batches, we can use a simpler loop or a compiled statement.

    count = 0
    for score_data in scores:
        gift_id = score_data.pop("gift_id")
        score_data["llm_scored_at"] = func.now()

        stmt = (
            update(Product)
            .where(Product.gift_id == gift_id)
            .values(**score_data)
        )
        await self.session.execute(stmt)
        count += 1

    return count

repositories.recommendations.create_quiz_run(db, *, user_id, anon_id, answers_json) async

Source code in repositories/recommendations.py
async def create_quiz_run(
    db: AsyncSession,
    *,
    user_id: Optional[UUID],
    anon_id: Optional[str],
    answers_json: dict[str, Any],
) -> QuizRun:
    quiz_run = QuizRun(user_id=user_id, anon_id=anon_id, answers_json=answers_json)
    db.add(quiz_run)
    await db.commit()
    await db.refresh(quiz_run)
    return quiz_run

repositories.recommendations.log_event(db, event_name, *, user_id=None, anon_id=None, quiz_run_id=None, recommendation_run_id=None, gift_id=None, payload=None) async

Source code in repositories/recommendations.py
async def log_event(
    db: AsyncSession,
    event_name: str,
    *,
    user_id: Optional[UUID] = None,
    anon_id: Optional[str] = None,
    quiz_run_id: Optional[UUID] = None,
    recommendation_run_id: Optional[UUID] = None,
    gift_id: Optional[str] = None,
    payload: Optional[dict[str, Any]] = None,
) -> None:
    event = Event(
        event_name=event_name,
        user_id=user_id,
        anon_id=anon_id,
        quiz_run_id=quiz_run_id,
        recommendation_run_id=recommendation_run_id,
        gift_id=gift_id,
        payload=payload,
    )
    db.add(event)
    await db.commit()