Skip to content

Data Module Reference

This section provides a detailed API reference for the datarec.data package, which defines the core dataset abstraction, dataset builders, and supporting utilities.

On This Page

Core Data Utilities

Utilities shared across the data layer (encoders, helpers, characteristics helpers).

Encoder

A simple encoder class to encode and decode IDs.

Source code in datarec/data/utils.py
class Encoder:
    """
    A simple encoder class to encode and decode IDs.
    """

    def __init__(self):
        self.encoding = dict()

    def is_encoded(self) -> bool:
        """
        Checks if the encoding dictionary is not empty.

        Returns:
            (bool): True if the encoding dictionary is not empty, False otherwise.
        """
        return bool(self.encoding)

    def build_encoding(self, lst: list, offset: int=0) -> None:
        """
        Encodes a list of public IDs into integer IDs.

        Args:
            lst (list): A list of public IDs.
            offset (int): The starting integer for the private IDs.
        """
        if self.is_encoded():
            raise ValueError('Encoding dictionary is not empty. Please, reset it before building a new encoding.')
        self.encoding = dict(zip(lst, range(offset, offset + len(lst))))

    def reset_encoding(self) -> None:
        """
        Resets the encoding dictionary.
        """
        self.encoding = dict()

    def change_offset(self, offset: int) -> None:
        """
        Changes the offset of the current encoding.

        Args:
            offset (int): The new starting integer for the private IDs.
        """
        if not self.encoding:
            raise ValueError('Encoding dictionary is empty. Please, build the encoding first.')
        lst = list(self.encoding.values())
        current_min = min(lst)
        new_offset = offset - current_min
        self.encoding = {idx: el + new_offset for idx, el in self.encoding.items()}

    def apply_encoding(self, encoding: dict) -> None:
        """
        Applies an external encoding dictionary.

        Args:
            encoding (dict): A dictionary encoding IDs.
        """
        if self.is_encoded():
            raise ValueError('Encoding dictionary is not empty. Please, reset it before applying a new encoding.')
        self.encoding = encoding

    def encode(self, lst: list) -> list:
        """
        Encodes a list of public IDs into integer IDs using the built encoding.

        Args:
            lst (list): A list of public IDs.

        Returns:
            (list): A list of encoded integer IDs.
        """
        if self.is_encoded() is False:
            raise ValueError('Encoding dictionary is empty. Please, build the encoding first.')
        return [self.encoding[el] for el in lst]

    def decode(self, lst: list) -> list:
        """
        Creates a reverse mapping from private IDs back to public IDs.

        Args:
            lst (list): A list of private IDs.

        Returns:
            (list): A list of decoded public IDs.
        """
        if self.is_encoded() is False:
            raise ValueError('Encoding dictionary is empty. Please, build the encoding first.')
        decoder = {el: idx for idx, el in self.encoding.items()}
        if len(decoder) != len(self.encoding):
            print('WARNING: the ID encoding could be incorrect. Please, check your data.')
        return [decoder[el] for el in lst]

is_encoded()

Checks if the encoding dictionary is not empty.

Returns:

Type Description
bool

True if the encoding dictionary is not empty, False otherwise.

Source code in datarec/data/utils.py
def is_encoded(self) -> bool:
    """
    Checks if the encoding dictionary is not empty.

    Returns:
        (bool): True if the encoding dictionary is not empty, False otherwise.
    """
    return bool(self.encoding)

build_encoding(lst, offset=0)

Encodes a list of public IDs into integer IDs.

Parameters:

Name Type Description Default
lst list

A list of public IDs.

required
offset int

The starting integer for the private IDs.

0
Source code in datarec/data/utils.py
def build_encoding(self, lst: list, offset: int=0) -> None:
    """
    Encodes a list of public IDs into integer IDs.

    Args:
        lst (list): A list of public IDs.
        offset (int): The starting integer for the private IDs.
    """
    if self.is_encoded():
        raise ValueError('Encoding dictionary is not empty. Please, reset it before building a new encoding.')
    self.encoding = dict(zip(lst, range(offset, offset + len(lst))))

reset_encoding()

Resets the encoding dictionary.

Source code in datarec/data/utils.py
def reset_encoding(self) -> None:
    """
    Resets the encoding dictionary.
    """
    self.encoding = dict()

change_offset(offset)

Changes the offset of the current encoding.

Parameters:

Name Type Description Default
offset int

The new starting integer for the private IDs.

required
Source code in datarec/data/utils.py
def change_offset(self, offset: int) -> None:
    """
    Changes the offset of the current encoding.

    Args:
        offset (int): The new starting integer for the private IDs.
    """
    if not self.encoding:
        raise ValueError('Encoding dictionary is empty. Please, build the encoding first.')
    lst = list(self.encoding.values())
    current_min = min(lst)
    new_offset = offset - current_min
    self.encoding = {idx: el + new_offset for idx, el in self.encoding.items()}

apply_encoding(encoding)

Applies an external encoding dictionary.

Parameters:

Name Type Description Default
encoding dict

A dictionary encoding IDs.

required
Source code in datarec/data/utils.py
def apply_encoding(self, encoding: dict) -> None:
    """
    Applies an external encoding dictionary.

    Args:
        encoding (dict): A dictionary encoding IDs.
    """
    if self.is_encoded():
        raise ValueError('Encoding dictionary is not empty. Please, reset it before applying a new encoding.')
    self.encoding = encoding

encode(lst)

Encodes a list of public IDs into integer IDs using the built encoding.

Parameters:

Name Type Description Default
lst list

A list of public IDs.

required

Returns:

Type Description
list

A list of encoded integer IDs.

Source code in datarec/data/utils.py
def encode(self, lst: list) -> list:
    """
    Encodes a list of public IDs into integer IDs using the built encoding.

    Args:
        lst (list): A list of public IDs.

    Returns:
        (list): A list of encoded integer IDs.
    """
    if self.is_encoded() is False:
        raise ValueError('Encoding dictionary is empty. Please, build the encoding first.')
    return [self.encoding[el] for el in lst]

decode(lst)

Creates a reverse mapping from private IDs back to public IDs.

Parameters:

Name Type Description Default
lst list

A list of private IDs.

required

Returns:

Type Description
list

A list of decoded public IDs.

Source code in datarec/data/utils.py
def decode(self, lst: list) -> list:
    """
    Creates a reverse mapping from private IDs back to public IDs.

    Args:
        lst (list): A list of private IDs.

    Returns:
        (list): A list of decoded public IDs.
    """
    if self.is_encoded() is False:
        raise ValueError('Encoding dictionary is empty. Please, build the encoding first.')
    decoder = {el: idx for idx, el in self.encoding.items()}
    if len(decoder) != len(self.encoding):
        print('WARNING: the ID encoding could be incorrect. Please, check your data.')
    return [decoder[el] for el in lst]

IncrementalEncoder

Streaming-friendly encoder that assigns integer IDs incrementally.

Preserves reproducibility by keeping both forward (public -> int) and reverse (int -> public) mappings without requiring the full list upfront.

Source code in datarec/data/utils.py
class IncrementalEncoder:
    """
    Streaming-friendly encoder that assigns integer IDs incrementally.

    Preserves reproducibility by keeping both forward (public -> int)
    and reverse (int -> public) mappings without requiring the full list upfront.
    """

    def __init__(self, offset: int = 0):
        self.offset = offset
        self._forward: dict = {}          # public -> int
        self._reverse: list = []          # index -> public (index = id - offset)

    def __len__(self):
        return len(self._forward)

    def encode_one(self, key):
        """
        Encode a single key, creating a new id if unseen.
        """
        if key in self._forward:
            return self._forward[key]
        idx = self.offset + len(self._forward)
        self._forward[key] = idx
        self._reverse.append(key)
        return idx

    def encode_many(self, iterable):
        """
        Encode an iterable of keys, returning a list of int ids.
        """
        return [self.encode_one(k) for k in iterable]

    def decode_one(self, idx: int):
        """
        Decode a single id back to the original key.
        """
        pos = idx - self.offset
        if pos < 0 or pos >= len(self._reverse):
            raise KeyError(f"Id {idx} not in encoder")
        return self._reverse[pos]

    def decode_many(self, iterable):
        """
        Decode an iterable of ids back to original keys.
        """
        return [self.decode_one(i) for i in iterable]

    @property
    def forward(self):
        return self._forward

    @property
    def reverse(self):
        return self._reverse

encode_one(key)

Encode a single key, creating a new id if unseen.

Source code in datarec/data/utils.py
def encode_one(self, key):
    """
    Encode a single key, creating a new id if unseen.
    """
    if key in self._forward:
        return self._forward[key]
    idx = self.offset + len(self._forward)
    self._forward[key] = idx
    self._reverse.append(key)
    return idx

encode_many(iterable)

Encode an iterable of keys, returning a list of int ids.

Source code in datarec/data/utils.py
def encode_many(self, iterable):
    """
    Encode an iterable of keys, returning a list of int ids.
    """
    return [self.encode_one(k) for k in iterable]

decode_one(idx)

Decode a single id back to the original key.

Source code in datarec/data/utils.py
def decode_one(self, idx: int):
    """
    Decode a single id back to the original key.
    """
    pos = idx - self.offset
    if pos < 0 or pos >= len(self._reverse):
        raise KeyError(f"Id {idx} not in encoder")
    return self._reverse[pos]

decode_many(iterable)

Decode an iterable of ids back to original keys.

Source code in datarec/data/utils.py
def decode_many(self, iterable):
    """
    Decode an iterable of ids back to original keys.
    """
    return [self.decode_one(i) for i in iterable]

set_column_name(columns, value, rename=True, default_name=None)

Identifies a column by its name or index and optionally renames it.

This utility function provides a flexible way to handle DataFrame columns. It can find a column based on its current name (string) or its position (integer). If rename is True, it replaces the found column name in the list of columns with a default_name.

Parameters:

Name Type Description Default
columns list

The list of current column names in the DataFrame.

required
value Union[str, int]

The identifier for the column, either its name or its integer index.

required
rename bool

If True, the identified column's name is changed to default_name in the returned list. Defaults to True.

True
default_name str

The new name for the column if rename is True. Defaults to None.

None

Returns:

Type Description
tuple[list, str]

A tuple containing: - The (potentially modified) list of column names. - The final name of the selected column (either the original or the default_name if renamed).

Raises:

Type Description
ValueError

If the value is not a valid column name or index, or if it is not a string or integer.

Source code in datarec/data/utils.py
def set_column_name(columns: list, value: Union[str, int], rename=True, default_name=None) -> (list, str):
    """
    Identifies a column by its name or index and optionally renames it.

    This utility function provides a flexible way to handle DataFrame columns. It
    can find a column based on its current name (string) or its position (integer).
    If `rename` is True, it replaces the found column name in the list of columns
    with a `default_name`.

    Args:
        columns (list): The list of current column names in the DataFrame.
        value (Union[str, int]): The identifier for the column, either its name
            or its integer index.
        rename (bool, optional): If True, the identified column's name is
            changed to `default_name` in the returned list. Defaults to True.
        default_name (str, optional): The new name for the column if `rename` is
            True. Defaults to None.

    Returns:
        (tuple[list, str]): A tuple containing:
            - The (potentially modified) list of column names.
            - The final name of the selected column (either the original or the
              `default_name` if renamed).

    Raises:
        ValueError: If the `value` is not a valid column name or index, or if
            it is not a string or integer.
    """
    columns = list(columns)

    if isinstance(value, str):
        if value not in columns:
            raise ValueError(f'column \'{value}\' is not a valid column name.')
        selected_column = value

    elif isinstance(value, int):
        if value in columns:
            selected_column = value
        else:
            if value not in range(len(columns)):
                raise ValueError(f'column int \'{value}\' is out of range ({len(columns)} columns).')
            selected_column = columns[value]
    else:
        raise ValueError(f'column value must be either a string (column name) or an integer (column index). Got {type(value)} instead.')

    if rename is True:
        columns[columns.index(selected_column)] = default_name
        return columns, default_name

    return columns, selected_column

quartiles(count)

Assigns quartile indices (0-3) to items based on their frequency counts.

The function divides the input values into four quartiles using the median and quantiles. Each item is assigned an integer: 0: long tail (lowest quartile) 1: common 2: popular 3: most popular (highest quartile)

Parameters:

Name Type Description Default
count dict

A dictionary mapping items to numeric counts or frequencies.

required

Returns:

Type Description
dict

A dictionary mapping each item to its quartile index (0-3).

Source code in datarec/data/utils.py
def quartiles(count: dict):
    """ 
    Assigns quartile indices (0-3) to items based on their frequency counts.

    The function divides the input values into four quartiles using the 
    median and quantiles. Each item is assigned an integer:
        0: long tail (lowest quartile)
        1: common
        2: popular
        3: most popular (highest quartile)

    Args:
        count (dict): A dictionary mapping items to numeric counts or frequencies.

    Returns:
        (dict): A dictionary mapping each item to its quartile index (0-3).
    """
    q1, q2, q3 = statistics.quantiles(count.values())

    def assign(value):
        if value <= q2:
            if value <= q1:
                return 0
            else:
                return 1
        else:
            if value <= q3:
                return 2
            else:
                return 3

    return {k: assign(f) for k, f in count.items()}

popularity(quartiles)

Categorizes items based on their quartile indices.

Converts quartile indices (0-3) into descriptive popularity categories: 0 -> 'long tail' 1 -> 'common' 2 -> 'popular' 3 -> 'most popular'

Parameters:

Name Type Description Default
quartiles dict

A dictionary mapping items to quartile indices (0-3).

required

Returns:

Type Description
dict

A dictionary mapping each popularity category to a list of items.

Source code in datarec/data/utils.py
def popularity(quartiles: dict):
    """ 
    Categorizes items based on their quartile indices.

    Converts quartile indices (0-3) into descriptive popularity categories:
        0 -> 'long tail'
        1 -> 'common'
        2 -> 'popular'
        3 -> 'most popular'

    Args:
        quartiles (dict): A dictionary mapping items to quartile indices (0-3).

    Returns:
        (dict): A dictionary mapping each popularity category to a list of items.
    """
    categories_map = \
        {3: 'most popular',
         2: 'popular',
         1: 'common',
         0: 'long tail'}

    categories = \
        {'most popular': [],
         'popular': [],
         'common': [],
         'long tail': []}

    for k, q in quartiles.items():
        categories[categories_map[q]].append(k)

    return categories

verify_checksum(file_path, checksum)

Verifies the MD5 checksum of a file.

This function computes the MD5 hash of the file at the given path and compares it to the expected checksum. If the file does not exist, a FileNotFoundError is raised. If the checksum does not match, a RuntimeError is raised indicating possible corruption or version mismatch.

Parameters:

Name Type Description Default
file_path str

The path to the file to verify.

required
checksum str

The expected MD5 checksum.

required

Raises:

Type Description
FileNotFoundError

If the specified file does not exist.

RuntimeError

If the computed checksum does not match the expected value.

Source code in datarec/data/utils.py
def verify_checksum(file_path: str, checksum: str) -> None:
    """
    Verifies the MD5 checksum of a file.

    This function computes the MD5 hash of the file at the given path and
    compares it to the expected checksum. If the file does not exist, a
    FileNotFoundError is raised. If the checksum does not match, a RuntimeError
    is raised indicating possible corruption or version mismatch.

    Args:
        file_path (str): The path to the file to verify.
        checksum (str): The expected MD5 checksum.

    Raises:
        FileNotFoundError: If the specified file does not exist.
        RuntimeError: If the computed checksum does not match the expected value.
    """

    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"File '{file_path}' not found.")

    md5 = hashlib.md5()
    with open(file_path, "rb") as f:
        for chunck in iter(lambda: f.read(65536), b""):
            md5.update(chunck)

    digest = md5.hexdigest()
    if not digest == checksum:
        raise RuntimeError(f"Checksum mismatch for '{file_path}': expected {checksum}, but got {digest}. "
                           f"The file may be corrupted or a new version has been downloaded.")

    print(f'Checksum verified.')

Dataset Builders

Dataset builder used by the registry to prepare and load resources.

RegisteredDataset

Source code in datarec/data/datarec_builder.py
class RegisteredDataset:

    def __init__(self, dataset_name:str, version:str, folder=None):

        self.dataset_name = dataset_name
        self.version = version
        self.output_folder = self.find_output_folder(folder=folder)
        self.config = load_dataset_config(dataset_name=self.dataset_name,
                                          dataset_version=self.version)

        # sets sources and resources data structures
        self.sources = set_sources(self.config, folder=self.output_folder)
        self.resources = set_resources(self.config)

        # links resources with sources and dataset info
        for resource in self.resources.values():
            resource.link_source(self.sources)
            resource.assign_dataset_info(self.dataset_name, self.version)
            resource.output_folder = self.output_folder

        # initialize prepared resources dictionary to keep track of prepared resources
        self.prepared_resources = {}
        self.registry_url = None

        # load precomputed stats if available
        self.characteristics = read_char_file(self.dataset_name, self.version)




    @classmethod
    def from_url(cls, url: str, folder=None):
        """
        Build a RegisteredDataset from a remote registry YAML.

        Args:
            url (str): URL to a registry version YAML.
            folder (str | None): Optional output folder override.

        Returns:
            RegisteredDataset: An instance backed by the remote registry config.
        """
        config = load_dataset_config_from_url(url)
        dataset_name = config.get("dataset_name")
        version = config.get("version")
        if dataset_name is None or version is None:
            raise ValueError("Remote registry config must include 'dataset_name' and 'version'.")

        instance = cls.__new__(cls)
        instance.dataset_name = dataset_name
        instance.version = version
        instance.output_folder = instance.find_output_folder(folder=folder)
        instance.config = config

        instance.sources = set_sources(instance.config, folder=instance.output_folder)
        instance.resources = set_resources(instance.config)
        for resource in instance.resources.values():
            resource.link_source(instance.sources)
            resource.assign_dataset_info(instance.dataset_name, instance.version)
            resource.output_folder = instance.output_folder

        instance.prepared_resources = {}
        instance.registry_url = url
        instance.characteristics = {}
        return instance

    def _get_characteristic(self, name: str):
        if not isinstance(self.characteristics, dict) or name not in self.characteristics:
            warnings.warn(
                f"Characteristic '{name}' not available for {self.dataset_name} {self.version}.",
                RuntimeWarning,
            )
            return None
        return self.characteristics.get(name)

    @property
    def n_users(self):
        return self._get_characteristic("n_users")

    @property
    def n_items(self):
        return self._get_characteristic("n_items")

    @property
    def n_interactions(self):
        return self._get_characteristic("n_interactions")

    @property
    def space_size(self):
        return self._get_characteristic("space_size")

    @property
    def space_size_log(self):
        return self._get_characteristic("space_size_log")

    @property
    def shape(self):
        return self._get_characteristic("shape")

    @property
    def shape_log(self):
        return self._get_characteristic("shape_log")

    @property
    def density(self):
        return self._get_characteristic("density")

    @property
    def density_log(self):
        return self._get_characteristic("density_log")

    @property
    def gini_item(self):
        return self._get_characteristic("gini_item")

    @property
    def gini_user(self):
        return self._get_characteristic("gini_user")

    @property
    def ratings_per_user(self):
        return self._get_characteristic("ratings_per_user")

    @property
    def ratings_per_item(self):
        return self._get_characteristic("ratings_per_item")


    def find_resource_by_constraints(
        self,
        *,
        only_required: bool = True,
        resource_types: Union[Collection[str], str, None] = None,
        resource_names: Union[Collection[str], str, None] = None):

        collection = self.resources
        if len(collection) == 0:
            raise ValueError('No prepared resource found. Use the method .prepare to prepare the dataset resources.')

        if isinstance(resource_types, str):
            resource_types = [resource_types]

        if isinstance(resource_names, str):
            resource_names = [resource_names]

        selected: dict[str, Any] = {}
        for res_name, res in collection.items():
            res_type = res.type
            # filter by required attribute
            if only_required and not res.required:
                continue
            # filter by resource type
            if resource_types is not None:
                if res_type not in resource_types:
                    continue
            # filter by resource name
            if resource_names is not None:
                if res_name not in resource_names:
                    continue

            selected |= {res_name: res}

        return selected


    def prepare(
        self,
        *,
        only_required: bool = True,
        use_cache: bool = True,
        resource_types: Union[Collection[str], str, None] = None,
        resource_names: Union[Collection[str], str, None] = None):
        """
        Prepare (download and decompress) all selected resources.
        Args:
            only_required (bool): Prepare only resources marked as required when True.
            use_cache (bool): Use cached resources when True.
            resource_types (Collection[str] | str | None): Resource types to include; all when None.
            resource_names (Collection[str] | str | None): Specific resource names to include; all when None.
        Returns:
            dict[str, Any]: Map from resource name to the prepared artifact (e.g. local path).
        """

        selected = self.find_resource_by_constraints(
            only_required=only_required,
            resource_names=resource_names,
            resource_types=resource_types)

        if len(selected) == 0:
            print('No resource found with the given requirements.')

        for res_name, res in selected.items():
            if res_name in self.prepared_resources.keys():
                print(f"Resource {res_name} was already prepared.")
                continue
            res.prepare(use_cache=use_cache)
            print(f"Resource {res_name} ready")
            self.prepared_resources |= {res_name: res}

    def prepare_interactions(
        self,
        *,
        only_required: bool = True,
        use_cache: bool = True,
        resource_names: Union[Collection[str], str, None] = None):
        """
        Prepare (download and decompress) ratings resources.
        Args:
            only_required (bool): Prepare only resources marked as required when True.
            use_cache (bool): Use cached resources when True.
            resource_names (Collection[str] | str | None): Specific resource names to include; all when None.
        Returns:
            dict[str, Any]: Map from resource name to the prepared artifact (e.g. local path).
        """
        self.prepare(
                    resource_types='interactions',
                    only_required=only_required,
                    use_cache=use_cache,
                    resource_names=resource_names)


    def load(
        self, *,
        use_cache: bool = False,
        to_cache: bool = False,
        resource_name: Union[str, None] = None,
        resource_type: Union[str, None] = 'interactions',
        only_required: bool = False) -> DataRec:
        """
        Load the dataset into a DataRec object.
        Args:
            use_cache (bool): Load from cache when True.
            to_cache (bool): Save to cache when True.
            resource_name (str | None): Specific resource name to load; all when None.
            resource_type (str | None): Resource type to load; all when None.
            only_required (bool): When True, consider only resources marked as required.
        Returns:
            DataRec: The loaded dataset.
        """
        resource = self.find_resource_by_constraints(
            only_required=only_required,
            resource_names=resource_name,
            resource_types=resource_type)
        if len(resource) > 1:
            raise ValueError(f'More than one resource match the name: {resource_name} and type: {resource_type}.\n \
                             Resources found: {resource.keys}. Please, select one by using the resource_name attribute.')
        if len(resource) == 0:
            raise ValueError(f'No resource found that matches the name: {resource_name} and type: {resource_type}')

        if len(resource) == 1:
            res_name, res = next(iter(resource.items()))
            if res.type == 'interactions':
                if res.prepared == True:
                    return res.load(use_cache=use_cache, to_cache=to_cache)
                else:
                    raise ValueError(f'Resource \'{res_name}\' must be prepared before loading it. Try calling .prepare method before.')
            else:
                raise ValueError(f'DataRec does not support load method for resources with \'{res.type}\' type.\
                                 This version of DataRec supports only \'ratings\' type.')

        raise ValueError('Something went wrong while loading the resource')

    def prepare_and_load(self) -> DataRec:
        """
        A convenience method that runs the full prepare and load pipeline.

        Returns:
            (DataRec): The fully prepared and loaded dataset.
        """
        self.prepare()
        return self.load()


    def prepare_content(self, ctype:str='all'):
        """
        Prepares content resources of the specified type.
        Args:
            ctype (str): The type of content to prepare. Use 'all' to prepare all content types.
        Raises:
            AssertionError: If the specified content type is not found.
        Returns:
            (None): None
        """
        resources = find_resource_by_type(self.resources, 'content')
        ctypes = set(resources.keys())
        assert ctype in ctypes or ctype == 'all', f"Content type {ctype} not found. Available content types: {ctypes}"
        if ctype == 'all':
            for res in resources.values():
                res_name = res.resource_name
                if res_name in self.prepared_resources:
                    continue
                res_path = res.prepare()
                self.prepared_resources = {res_name: res_path}
        else:
            res = resources[ctype]
            res_path= res.prepare()
            self.prepared_resources = {res.resource_name: res_path}

    def download(self)->List[str]:
        """
        Downloads the raw dataset files.
        Returns:
            (str): The path to the downloaded files.
        """
        resources = []
        for source in self.sources.values():
            resources.append(source.download())
        return resources

    def find_output_folder(self, folder=None) -> str:
        """
        Find the output folder for the given dataset and version.
        Args:
            folder (str): Explicit output folder path.
        Returns:
            (str): The output folder path.
        """
        if folder:
            return os.path.abspath(os.path.join(folder, RAW_DATA_FOLDER))
        return os.path.join(dataset_raw_directory(self.dataset_name, self.version))

    def free_cache(self, 
                   *,
                   resource_types: Union[Collection[str], str, None] = None,
                   resource_names: Union[Collection[str], str, None] = None):

        selected = self.find_resource_by_constraints(
            resource_names=resource_names,
            resource_types=resource_types)
        for res in selected.values():
            res.free_cache()

from_url(url, folder=None) classmethod

Build a RegisteredDataset from a remote registry YAML.

Parameters:

Name Type Description Default
url str

URL to a registry version YAML.

required
folder str | None

Optional output folder override.

None

Returns:

Name Type Description
RegisteredDataset

An instance backed by the remote registry config.

Source code in datarec/data/datarec_builder.py
@classmethod
def from_url(cls, url: str, folder=None):
    """
    Build a RegisteredDataset from a remote registry YAML.

    Args:
        url (str): URL to a registry version YAML.
        folder (str | None): Optional output folder override.

    Returns:
        RegisteredDataset: An instance backed by the remote registry config.
    """
    config = load_dataset_config_from_url(url)
    dataset_name = config.get("dataset_name")
    version = config.get("version")
    if dataset_name is None or version is None:
        raise ValueError("Remote registry config must include 'dataset_name' and 'version'.")

    instance = cls.__new__(cls)
    instance.dataset_name = dataset_name
    instance.version = version
    instance.output_folder = instance.find_output_folder(folder=folder)
    instance.config = config

    instance.sources = set_sources(instance.config, folder=instance.output_folder)
    instance.resources = set_resources(instance.config)
    for resource in instance.resources.values():
        resource.link_source(instance.sources)
        resource.assign_dataset_info(instance.dataset_name, instance.version)
        resource.output_folder = instance.output_folder

    instance.prepared_resources = {}
    instance.registry_url = url
    instance.characteristics = {}
    return instance

prepare(*, only_required=True, use_cache=True, resource_types=None, resource_names=None)

Prepare (download and decompress) all selected resources. Args: only_required (bool): Prepare only resources marked as required when True. use_cache (bool): Use cached resources when True. resource_types (Collection[str] | str | None): Resource types to include; all when None. resource_names (Collection[str] | str | None): Specific resource names to include; all when None. Returns: dict[str, Any]: Map from resource name to the prepared artifact (e.g. local path).

Source code in datarec/data/datarec_builder.py
def prepare(
    self,
    *,
    only_required: bool = True,
    use_cache: bool = True,
    resource_types: Union[Collection[str], str, None] = None,
    resource_names: Union[Collection[str], str, None] = None):
    """
    Prepare (download and decompress) all selected resources.
    Args:
        only_required (bool): Prepare only resources marked as required when True.
        use_cache (bool): Use cached resources when True.
        resource_types (Collection[str] | str | None): Resource types to include; all when None.
        resource_names (Collection[str] | str | None): Specific resource names to include; all when None.
    Returns:
        dict[str, Any]: Map from resource name to the prepared artifact (e.g. local path).
    """

    selected = self.find_resource_by_constraints(
        only_required=only_required,
        resource_names=resource_names,
        resource_types=resource_types)

    if len(selected) == 0:
        print('No resource found with the given requirements.')

    for res_name, res in selected.items():
        if res_name in self.prepared_resources.keys():
            print(f"Resource {res_name} was already prepared.")
            continue
        res.prepare(use_cache=use_cache)
        print(f"Resource {res_name} ready")
        self.prepared_resources |= {res_name: res}

prepare_interactions(*, only_required=True, use_cache=True, resource_names=None)

Prepare (download and decompress) ratings resources. Args: only_required (bool): Prepare only resources marked as required when True. use_cache (bool): Use cached resources when True. resource_names (Collection[str] | str | None): Specific resource names to include; all when None. Returns: dict[str, Any]: Map from resource name to the prepared artifact (e.g. local path).

Source code in datarec/data/datarec_builder.py
def prepare_interactions(
    self,
    *,
    only_required: bool = True,
    use_cache: bool = True,
    resource_names: Union[Collection[str], str, None] = None):
    """
    Prepare (download and decompress) ratings resources.
    Args:
        only_required (bool): Prepare only resources marked as required when True.
        use_cache (bool): Use cached resources when True.
        resource_names (Collection[str] | str | None): Specific resource names to include; all when None.
    Returns:
        dict[str, Any]: Map from resource name to the prepared artifact (e.g. local path).
    """
    self.prepare(
                resource_types='interactions',
                only_required=only_required,
                use_cache=use_cache,
                resource_names=resource_names)

load(*, use_cache=False, to_cache=False, resource_name=None, resource_type='interactions', only_required=False)

Load the dataset into a DataRec object. Args: use_cache (bool): Load from cache when True. to_cache (bool): Save to cache when True. resource_name (str | None): Specific resource name to load; all when None. resource_type (str | None): Resource type to load; all when None. only_required (bool): When True, consider only resources marked as required. Returns: DataRec: The loaded dataset.

Source code in datarec/data/datarec_builder.py
def load(
    self, *,
    use_cache: bool = False,
    to_cache: bool = False,
    resource_name: Union[str, None] = None,
    resource_type: Union[str, None] = 'interactions',
    only_required: bool = False) -> DataRec:
    """
    Load the dataset into a DataRec object.
    Args:
        use_cache (bool): Load from cache when True.
        to_cache (bool): Save to cache when True.
        resource_name (str | None): Specific resource name to load; all when None.
        resource_type (str | None): Resource type to load; all when None.
        only_required (bool): When True, consider only resources marked as required.
    Returns:
        DataRec: The loaded dataset.
    """
    resource = self.find_resource_by_constraints(
        only_required=only_required,
        resource_names=resource_name,
        resource_types=resource_type)
    if len(resource) > 1:
        raise ValueError(f'More than one resource match the name: {resource_name} and type: {resource_type}.\n \
                         Resources found: {resource.keys}. Please, select one by using the resource_name attribute.')
    if len(resource) == 0:
        raise ValueError(f'No resource found that matches the name: {resource_name} and type: {resource_type}')

    if len(resource) == 1:
        res_name, res = next(iter(resource.items()))
        if res.type == 'interactions':
            if res.prepared == True:
                return res.load(use_cache=use_cache, to_cache=to_cache)
            else:
                raise ValueError(f'Resource \'{res_name}\' must be prepared before loading it. Try calling .prepare method before.')
        else:
            raise ValueError(f'DataRec does not support load method for resources with \'{res.type}\' type.\
                             This version of DataRec supports only \'ratings\' type.')

    raise ValueError('Something went wrong while loading the resource')

prepare_and_load()

A convenience method that runs the full prepare and load pipeline.

Returns:

Type Description
DataRec

The fully prepared and loaded dataset.

Source code in datarec/data/datarec_builder.py
def prepare_and_load(self) -> DataRec:
    """
    A convenience method that runs the full prepare and load pipeline.

    Returns:
        (DataRec): The fully prepared and loaded dataset.
    """
    self.prepare()
    return self.load()

prepare_content(ctype='all')

Prepares content resources of the specified type. Args: ctype (str): The type of content to prepare. Use 'all' to prepare all content types. Raises: AssertionError: If the specified content type is not found. Returns: (None): None

Source code in datarec/data/datarec_builder.py
def prepare_content(self, ctype:str='all'):
    """
    Prepares content resources of the specified type.
    Args:
        ctype (str): The type of content to prepare. Use 'all' to prepare all content types.
    Raises:
        AssertionError: If the specified content type is not found.
    Returns:
        (None): None
    """
    resources = find_resource_by_type(self.resources, 'content')
    ctypes = set(resources.keys())
    assert ctype in ctypes or ctype == 'all', f"Content type {ctype} not found. Available content types: {ctypes}"
    if ctype == 'all':
        for res in resources.values():
            res_name = res.resource_name
            if res_name in self.prepared_resources:
                continue
            res_path = res.prepare()
            self.prepared_resources = {res_name: res_path}
    else:
        res = resources[ctype]
        res_path= res.prepare()
        self.prepared_resources = {res.resource_name: res_path}

download()

Downloads the raw dataset files. Returns: (str): The path to the downloaded files.

Source code in datarec/data/datarec_builder.py
def download(self)->List[str]:
    """
    Downloads the raw dataset files.
    Returns:
        (str): The path to the downloaded files.
    """
    resources = []
    for source in self.sources.values():
        resources.append(source.download())
    return resources

find_output_folder(folder=None)

Find the output folder for the given dataset and version. Args: folder (str): Explicit output folder path. Returns: (str): The output folder path.

Source code in datarec/data/datarec_builder.py
def find_output_folder(self, folder=None) -> str:
    """
    Find the output folder for the given dataset and version.
    Args:
        folder (str): Explicit output folder path.
    Returns:
        (str): The output folder path.
    """
    if folder:
        return os.path.abspath(os.path.join(folder, RAW_DATA_FOLDER))
    return os.path.join(dataset_raw_directory(self.dataset_name, self.version))

DataRec and Data Wrappers

Core dataset container and helpers.

DataRec

Core data structure for recommendation datasets in the DataRec framework.

This class wraps a Pandas DataFrame and standardizes common columns (user, item, rating, timestamp) to provide a consistent interface for recommendation tasks. It supports data preprocessing, user/item remapping (public vs private IDs), frequency analysis, sparsity/density metrics, Gini coefficients, and conversion into PyTorch datasets for training.

Source code in datarec/data/dataset.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
class DataRec:

    """
    Core data structure for recommendation datasets in the DataRec framework.

    This class wraps a Pandas DataFrame and standardizes common columns
    (user, item, rating, timestamp) to provide a consistent interface
    for recommendation tasks. It supports data preprocessing, 
    user/item remapping (public vs private IDs), frequency analysis, 
    sparsity/density metrics, Gini coefficients, and conversion into 
    PyTorch datasets for training.
    """

    def __init__(
            self,
            rawdata: RawData = None,
            copy: bool = False,
            dataset_name: str = 'datarec',
            version_name: str = 'no_version_provided',
            *args,
            **kwargs
    ):
        """
        Initializes the DataRec object.

        Args:
            rawdata (RawData): The input dataset wrapped in a RawData object.
                If None, the DataRec is initialized empty.
            copy (bool): Whether to copy the input DataFrame to avoid 
                modifying the original RawData.
            dataset_name (str): A name to identify the dataset.
            version_name (str): A version identifier 
                for the dataset.
            pipeline (Pipeline): A pipeline object to track preprocessing steps.
            registry_dataset (bool): Whether the DataRec derives from a registered dataset.

        """
        self.path = None
        self._data = None
        self.dataset_name = dataset_name
        self.version_name = version_name

        rawdata_step = None
        if rawdata is not None:
            if copy:
                self._data: pd.DataFrame = rawdata.data.copy()
            else:
                self._data: pd.DataFrame = rawdata.data

            if rawdata.pipeline_step is not None:
                rawdata_step = rawdata.pipeline_step

        # PIPELINE INITIALIZATION
        # if pipeline:
        #     self.pipeline = pipeline
        # else:
        #     # if the dataset is from a registry, we create a pipeline with a single load step. It overwrites the rawdata step if it exists.
        #     # If it is from rawdata, but not from a registry, we create a pipeline with the rawdata's pipeline step. 
        #     # Otherwise, we create an empty pipeline and warn the user.
        #     if registry_dataset:
        #         self.pipeline = Pipeline()
        #         self.pipeline.add_step("load", "registry_dataset", {"dataset_name": self.dataset_name, "version": self.version_name})
        #     elif rawdata_step is not None:
        #         self.pipeline = Pipeline()
        #         self.pipeline.steps.append(rawdata_step)
        #     else:
        #         warnings.warn("No pipeline provided. Initializing empty pipeline.")
        #         self.pipeline = Pipeline()
        self.pipeline = Pipeline()
        self._origin: str = "unknown"

        if rawdata_step:
            if rawdata_step.name == "load":
                self.set_origin_registry()
            elif rawdata_step.name == "read":
                self.set_origin_file(rawdata_step)
            else:
                warnings.warn(f"Unrecognized pipeline step '{rawdata_step.name}' in RawData. Setting origin to 'unknown'.")

        # ------------------------------------
        # --------- STANDARD COLUMNS ---------
        # if a column is None it means that the DataRec does not have that information
        self.__assigned_columns = []

        self._user_col = None
        self._item_col = None
        self._rating_col = None
        self._timestamp_col = None

        if rawdata:
            self.set_columns(rawdata)

        # map users and items with a 0-indexed mapping
        self.user_id_encoder = Encoder()
        self.item_id_encoder = Encoder()

        # Apply external encoders (e.g., from streaming readers) if provided
        if rawdata is not None:
            if getattr(rawdata, "user_encoder", None):
                self.user_id_encoder.apply_encoding(rawdata.user_encoder)
            if getattr(rawdata, "item_encoder", None):
                self.item_id_encoder.apply_encoding(rawdata.item_encoder)

        # ------------------------------
        # --------- PROPERTIES ---------
        self._sorted_users = None
        self._sorted_items = None

        self._n_users = None
        self._n_items = None
        self._transactions = None

        self.characteristics = CharacteristicAccessor(self)

        self._origin= "unknown"


    def __str__(self):
        """
        Returns 'self.data' as a string variable.

        Returns:
            (str): 'self.data' as a string variable.
        """
        return self.data.__str__()

    def __repr__(self):
        """
        Returns the official string representation of the internal DataFrame.
        """
        return self.data.__repr__()

    def _repr_html_(self):
        """
        Returns an HTML representation of the internal DataFrame for rich displays.
        """
        return self.data._repr_html_()

    def __len__(self):
        """
        Returns the total number of samples in the dataset.

        Returns:
            (int): number of samples in the dataset.
        """
        return len(self.data)

    def set_columns(self, rawdata):
        """
        Assign dataset column names from a RawData object and reorder the data accordingly.

        Args:
            rawdata (RawData): A RawData object containing the column names for 
                user, item, rating, and timestamp.
        """
        if rawdata.user is not None:
            self.user_col = rawdata.user
            self.__assigned_columns.append(self.user_col)
        if rawdata.item is not None:
            self.item_col = rawdata.item
            self.__assigned_columns.append(self.item_col)
        if rawdata.rating is not None:
            self.rating_col = rawdata.rating
            self.__assigned_columns.append(self.rating_col)
        if rawdata.timestamp is not None:
            self.timestamp_col = rawdata.timestamp
            self.__assigned_columns.append(self.timestamp_col)

        # re-order columns
        self._data = self.data[self.__assigned_columns]

    def reset(self):

        """
        Reset cached statistics and assigned columns of the DataRec object.

        This method clears all precomputed dataset statistics (e.g., sorted users, 
        density, Gini indices, shape, ratings per user/item) and empties the list 
        of assigned columns. It is automatically called when the underlying data is changed.
        """

        self.__assigned_columns = []

    @property
    def data(self) -> pd.DataFrame:
        """
        The underlying pandas DataFrame holding the interaction data.
        """
        return self._data

    @data.setter
    def data(self, value: RawData):
        """
        Sets the internal DataFrame from a RawData object and resets stats.
        """
        if (value is not None and
                not isinstance(value, RawData)):
            raise ValueError(f'Data must be RawData or None if empty. Found {type(value)}')
        value = value if value is not None else pd.DataFrame()

        self._data = value.data
        self.reset()
        self.set_columns(value)

    @property
    def user_col(self):
        """
        The name of the user ID column.
        """
        return self._user_col

    @user_col.setter
    def user_col(self, value: Union[str, int]):
        """
        Sets and renames the user column to the internal standard name.
        """
        self.set_user_col(value, rename=True)

    def set_user_col(self, value: Union[str, int] = DATAREC_USER_COL, rename=True):
        """
        Identifies and optionally renames the user column.

        Args:
            value (Union[str, int]): The current name or index of the user column.
            rename (bool): If True, renames the column to the standard internal name.
        """
        self.data.columns, self._user_col = set_column_name(columns=list(self.data.columns),
                                                            value=value,
                                                            default_name=DATAREC_USER_COL,
                                                            rename=rename)

    @property
    def item_col(self):
        """
        The name of the item ID column.
        """
        return self._item_col

    @item_col.setter
    def item_col(self, value: Union[str, int]):
        """
        Sets and renames the item column to the internal standard name.
        """
        self.set_item_col(value, rename=True)

    def set_item_col(self, value: Union[str, int] = DATAREC_ITEM_COL, rename=True):
        """
        Identifies and optionally renames the item column.

        Args:
            value (Union[str, int]): The current name or index of the item column.
            rename (bool): If True, renames the column to the standard internal name.
        """
        self.data.columns, self._item_col = set_column_name(columns=list(self.data.columns),
                                                            value=value,
                                                            default_name=DATAREC_ITEM_COL,
                                                            rename=rename)

    @property
    def rating_col(self):
        """
        The name of the rating column.
        """
        return self._rating_col

    @rating_col.setter
    def rating_col(self, value: Union[str, int]):
        """
        Sets and renames the rating column to the internal standard name.
        """
        self.set_rating_col(value, rename=True)

    def set_rating_col(self, value: Union[str, int] = DATAREC_RATING_COL, rename=True):
        """
        Identifies and optionally renames the rating column.

        Args:
            value (Union[str, int]): The current name or index of the rating column.
            rename (bool): If True, renames the column to the standard internal name.
        """
        self.data.columns, self._rating_col = set_column_name(columns=list(self.data.columns),
                                                              value=value,
                                                              default_name=DATAREC_RATING_COL,
                                                              rename=rename)

    @property
    def timestamp_col(self):
        """
        The name of the timestamp column.
        """
        return self._timestamp_col

    @timestamp_col.setter
    def timestamp_col(self, value: Union[str, int]):
        """
        Sets and renames the timestamp column to the internal standard name.
        """
        self.set_timestamp_col(value, rename=True)

    def set_timestamp_col(self, value: Union[str, int] = DATAREC_TIMESTAMP_COL, rename=True):
        """
        Identifies and optionally renames the timestamp column.

        Args:
            value (Union[str, int]): The current name or index of the timestamp column.
            rename (bool): If True, renames the column to the standard internal name.
        """
        self.data.columns, self._timestamp_col = set_column_name(columns=list(self.data.columns),
                                                                 value=value,
                                                                 default_name=DATAREC_TIMESTAMP_COL,
                                                                 rename=rename)

    @property
    def users(self):
        """
        Returns a list of unique user IDs in the dataset.
        """
        return self.data[self.user_col].unique().tolist()

    @property
    def items(self):
        """
        Returns a list of unique item IDs in the dataset.
        """
        return self.data[self.item_col].unique().tolist()

    @property
    def n_users(self):
        """
        Returns the number of unique users.
        """
        return int(self.data[self.user_col].nunique())

    @property
    def n_items(self):
        """
        Returns the number of unique items.
        """
        return int(self.data[self.item_col].nunique())

    @property
    def columns(self):
        """
        Returns the list of column names of the internal DataFrame.
        """
        return self.data.columns

    @columns.setter
    def columns(self, columns):
        """
        Sets the column names of the internal DataFrame.
        """
        self.data.columns = columns

    @property
    def sorted_items(self):
        """
        Returns a dictionary of items sorted by their interaction count.
        """
        if self._sorted_items is None:
            count_items = self.data.groupby(self.item_col).count().sort_values(by=[self.user_col])
            self._sorted_items = dict(zip(count_items.index, count_items[self.user_col]))
        return self._sorted_items

    @property
    def sorted_users(self):
        """
        Returns a dictionary of users sorted by their interaction count.
        """
        if self._sorted_users is None:
            count_users = self.data.groupby(self.user_col).count().sort_values(by=[self.item_col])
            self._sorted_users = dict(zip(count_users.index, count_users[self.item_col]))
        return self._sorted_users

    @property
    def transactions(self):
        """
        Returns the total number of interactions (rows) in the dataset.
        """
        if self._transactions is None:
            self._transactions = len(self.data)
        return self._transactions

    @property
    def origin(self):
        """
        Returns the origin of the dataset (e.g., 'unknown', 'registry', 'file').
        """
        return self._origin

    # --- ID ENCODING/DECODING FUNCTIONS ---

    def is_encoded(self, on:str) -> bool:
        """
        Checks if user or item IDs are encoded to private integer IDs.

        Args:
            on (str): 'users' to check user encoding, 'items' for item encoding.
        Returns:
            (bool): True if the specified IDs are encoded, False otherwise.
        """
        if on == 'users':
            return self.user_id_encoder.is_encoded()
        elif on == 'items':
            return self.item_id_encoder.is_encoded()
        else:
            raise ValueError("Parameter 'on' must be either 'users' or 'items'.")


    def encode(self, users=True, items=True) -> None:
        """
        Converts user and item IDs to encoded integer IDs.
        Args:
            users (bool): If True, encodes user IDs.
            items (bool): If True, encodes item IDs.
        """
        if users:
            if not self.user_id_encoder.is_encoded():
                raise ValueError("User encoder is empty. Build or apply an encoding before calling encode().")
            self.data[self.user_col] = self.user_id_encoder.encode(self.data[self.user_col].tolist())
        if items:
            if not self.item_id_encoder.is_encoded():
                raise ValueError("Item encoder is empty. Build or apply an encoding before calling encode().")
            self.data[self.item_col] = self.item_id_encoder.encode(self.data[self.item_col].tolist())

    def decode(self, users=True, items=True) -> None:
        """
        Converts user and item IDs back to original IDs.
        Args:
            users (bool): If True, encodes user IDs.
            items (bool): If True, encodes item IDs.
        """
        if users:
            self.data[self.user_col] = self.user_id_encoder.decode(self.data[self.user_col].tolist())
        if items:
            self.data[self.item_col] = self.item_id_encoder.decode(self.data[self.item_col].tolist())

    def reset_encoding(self, on='all') -> None:
        """
        Resets the encoding for users or items.

        Args:
            on (str): 'users' to reset user encoding, 'items' for item encoding.
        """
        if on == 'users':
            self.decode(users=True, items=False)
            self.user_id_encoder.reset_encoding()
        elif on == 'items':
            self.decode(users=False, items=True)
            self.item_id_encoder.reset_encoding()
        elif on == 'all':
            self.decode(users=True, items=True)
            self.user_id_encoder.reset_encoding()
            self.item_id_encoder.reset_encoding()
        else:
            raise ValueError("Parameter 'on' must be either 'users' or 'items'.")


    def build_encoding(self, on='users', offset=0) -> None:
        """
        Builds the encoding for users or items.

        Args:
            on (str): 'users' to build user encoding, 'items' for item encoding, 'all' for both.
            offset (int): The starting integer for the private IDs.
        """
        if on == 'users':
            self.user_id_encoder.build_encoding(self.users, offset=offset)
        elif on == 'items':
            self.item_id_encoder.build_encoding(self.items, offset=offset)
        elif on == 'all':
            self.user_id_encoder.build_encoding(self.users, offset=offset)
            self.item_id_encoder.build_encoding(self.items, offset=offset)
        else:
            raise ValueError("Parameter 'on' must be either 'users' or 'items'.")


    # -- CHARACTERISTICS --

    def characteristic(self, name: str, **kwargs: Any) -> Any:
        """
        Retrieves a calculated dataset characteristic by name.

        Args:
            name (str): The name of the characteristic to retrieve.
            **kwargs: Additional arguments to pass to the characteristic function.

        Returns:
            The value of the requested characteristic.
        """
        return getattr(self.characteristics, name)(**kwargs)

    def space_size(self, **kwargs):
        return self.characteristic("space_size", **kwargs)

    def space_size_log(self, **kwargs):
        return self.characteristic("space_size_log", **kwargs)

    def shape(self, **kwargs):
        return self.characteristic("shape", **kwargs)

    def shape_log(self, **kwargs):
        return self.characteristic("shape_log", **kwargs)

    def density(self, **kwargs):
        return self.characteristic("density", **kwargs)

    def density_log(self, **kwargs):
        return self.characteristic("density_log", **kwargs)

    def gini_item(self, **kwargs):
        return self.characteristic("gini_item", **kwargs)

    def gini_user(self, **kwargs):
        return self.characteristic("gini_user", **kwargs)

    def ratings_per_user(self, **kwargs):
        return self.characteristic("ratings_per_user", **kwargs)

    def ratings_per_item(self, **kwargs):
        return self.characteristic("ratings_per_item", **kwargs)

    def users_frequency(self):
        """
        Computes the absolute frequency of each user in the dataset.

        Returns:
            (dict): A dictionary mapping user IDs to the number of interactions, 
                sorted in descending order of frequency.
        """
        fr = dict(Counter(self.data[self.user_col]))
        return dict(sorted(fr.items(), key=lambda item: item[1], reverse=True))

    def users_relative_frequency(self):
        """
        Computes the relative frequency of each user in the dataset.

        Returns:
            (dict): A dictionary mapping user IDs to their relative frequency 
                (fraction of total transactions).
        """
        return {u: (f / self.transactions) for u, f in self.users_frequency().items()}

    def items_frequency(self):
        """
        Computes the absolute frequency of each item in the dataset.

        Returns:
            (dict): A dictionary mapping item IDs to the number of interactions, 
                sorted in descending order of frequency.
        """
        fr = dict(Counter(self.data[self.item_col]))
        return dict(sorted(fr.items(), key=lambda item: item[1], reverse=True))

    def items_relative_frequency(self):
        """
        Computes the relative frequency of each item in the dataset.

        Returns:
            (dict): A dictionary mapping item IDs to their relative frequency 
                (fraction of total transactions).
        """
        return {u: (f / self.transactions) for u, f in self.items_frequency().items()}

    def users_quartiles(self):
        """
        Assigns quartile indices to users based on their frequency.

        Returns:
            (dict): A dictionary mapping each user ID to a quartile index (0-3),
                where 0 = lowest, 3 = highest frequency.
        """ 
        return quartiles(self.users_frequency())

    def items_quartiles(self):
        """
        Assigns quartile indices to items based on their frequency.

        Returns:
            (dict): A dictionary mapping each item ID to a quartile index (0-3),
                where 0 = lowest, 3 = highest frequency.
        """
        return quartiles(self.items_frequency())

    def users_popularity(self):
        """
        Categorizes users into descriptive popularity groups based on quartiles.

        Returns:
            (dict): A dictionary mapping popularity categories ('long tail', 
                'common', 'popular', 'most popular') to lists of user IDs.
        """
        return popularity(self.users_quartiles())

    def items_popularity(self):
        """
        Categorizes items into descriptive popularity groups based on quartiles.

        Returns:
            (dict): A dictionary mapping popularity categories ('long tail', 
                'common', 'popular', 'most popular') to lists of item IDs.
        """
        return popularity(self.items_quartiles())

    def get_user_interactions(self, user_id: Any) -> pd.DataFrame:
        """
        Retrieves all interactions for a specific user.

        Args:
            user_id (Any): The ID of the user whose interactions are to be retrieved.
        """
        if user_id in self.users:
            df = self.data[self.data[self.user_col] == user_id]
            return df
        else:   
            raise ValueError(f"User ID {user_id} not found in dataset.")

    def get_item_interactions(self, item_id: Any) -> pd.DataFrame:
        """
        Retrieves all interactions for a specific item.

        Args:
            item_id (Any): The ID of the item whose interactions are to be retrieved.
        """
        if item_id in self.items:
            df = self.data[self.data[self.item_col] == item_id]
            return df
        else:   
            raise ValueError(f"Item ID {item_id} not found in dataset.")

    def list_characteristics(self) -> list[str]:
        """Return the names of all characteristics that can be computed on this dataset."""
        return sorted(CHARACTERISTICS.keys())

    def describe_characteristics(self) -> dict[str, str]:
        """Return a mapping name -> short docstring for each available characteristic."""
        return {
            name: (func.__doc__ or "").strip()
            for name, func in CHARACTERISTICS.items()
        }

    def copy(self):
        """
        Create a deep copy of the current DataRec object.

        This method duplicates the DataRec instance, including its data,
        metadata (user, item, rating, timestamp columns), pipeline, and internal
        state such as privacy settings and implicit flags.

        Returns:
            (DataRec): A new DataRec object that is a deep copy of the current instance.
        """
        pipeline = self.pipeline.copy()

        new_dr = DataRec(rawdata=self.to_rawdata(),
                         pipeline=pipeline,
                         copy=True)

        new_dr._user_col = self.user_col
        new_dr._item_col = self.item_col
        new_dr._rating_col = self.rating_col
        new_dr._timestamp_col = self.timestamp_col
        return new_dr

    def to_rawdata(self):
        """
        Convert the current DataRec object into a RawData object.

        This method creates a RawData instance containing the same data and
        metadata (user, item, rating, timestamp columns) as the DataRec object.

        Returns:
            (RawData): A new RawData object containing the DataRec's data and column information.
        """
        raw = RawData(self.data)
        raw.user = self.user_col
        raw.item = self.item_col
        raw.rating = self.rating_col
        raw.timestamp = self.timestamp_col
        return raw

    ##### PIPELINE FUNCTIONS #####


    def set_origin_file(self, pipeline_step) -> None:
        """
        Set the origin of the dataset.

        Args:
            origin (str): A string describing the origin of the dataset (e.g., 'registry', 'file', 'unknown').
        """
        if pipeline_step.name != "read":
            raise ValueError(f"Pipeline step must be a read step to set file origin. Found step name: {pipeline_step.name}")
        if self.pipeline.steps:
            warnings.warn("Prepending file load step to the pipeline.")
            self.pipeline.steps.insert(0, pipeline_step)
        else:
            self.pipeline.steps.append(pipeline_step)
        self._origin = "file"


    def set_origin_registry(self) -> None:
        """
        Set the origin of the dataset to 'registry'.
        """
        from datarec.pipeline import PipelineStep
        pipeline_step = PipelineStep("load", "registry_dataset", {"dataset_name": self.dataset_name, "version": self.version_name})
        if self.pipeline.steps:
            if self.pipeline.steps[0].name == "read":
                self.pipeline.steps[0] = pipeline_step
            else:
                warnings.warn("First pipeline step is not a read step. Prepending registry load step to the pipeline.")
                self.pipeline.steps.insert(0, pipeline_step)
        else:
            self.pipeline.steps.append(pipeline_step)
        self._origin = "registry"

    def save_pipeline(self, filepath: str) -> None:
        """
        Save the current processing pipeline to a YAML file.

        Args:
            filepath (str): The path (including filename) where the pipeline 
                YAML file will be saved.
        """
        print(f'Saving pipeline to {filepath}')

        self.pipeline.to_yaml(filepath)

        print(f'Pipeline correctly saved to {filepath}')

    def to_torch_dataset(self, task: str = "pointwise", autoprepare: bool = True, **kwargs: Any) -> Any:
        """
        Converts the current DataRec object into a PyTorch-compatible dataset.

        This method prepares the dataset (e.g., remaps user/item IDs to a dense index space)
        and returns a `torch.utils.data.Dataset` object suitable for training with PyTorch.

        Args:
            task (str): The recommendation task type. Must be one of:
                - "pointwise": returns PointwiseTorchDataset
                - "pairwise": returns PairwiseTorchDataset
                - "ranking": returns RankingTorchDataset
            autoprepare (bool): If True, automatically applies user/item remapping
                and switches the dataset to private IDs. If False, assumes the dataset
                is already properly prepared.
            **kwargs: Additional arguments passed to the specific torch dataset class.

        Returns:
            (torch.utils.data.Dataset): A PyTorch dataset instance corresponding to the selected task.

        Raises:
            ImportError: If PyTorch is not installed.
            ValueError: If an unknown task name is provided.
        """

        try:
            import torch
        except ModuleNotFoundError:
            raise ImportError(
                "Torch is required for this feature. Please install it with `pip install datarec[torch]` or"
                " `pip install -r requirements/requirements-torch.txt`."
            )

        if autoprepare:
            self.build_encoding(on='all', offset=0)
            self.encode()
        else:
            warnings.warn(
                "Autoprepare is set to False. "
                "Ensure that the dataset is prepared correctly before using it with PyTorch."
            )

        if task == "pointwise":
            from datarec.data.torch_dataset import PointwiseTorchDataset
            return PointwiseTorchDataset(self, **kwargs)
        elif task == "pairwise":
            from datarec.data.torch_dataset import PairwiseTorchDataset
            return PairwiseTorchDataset(self, **kwargs)
        elif task == "ranking":
            from datarec.data.torch_dataset import RankingTorchDataset
            return RankingTorchDataset(self, **kwargs)
        else:
            raise ValueError(f"Unknown task: {task}")

    def to_graphrec(self):
        """
        Converts the current DataRec object into a GraphRec object.

        This method creates a GraphRec instance representing the bipartite graph
        of user-item interactions contained in the DataRec object.

        Returns:
            (GraphRec): A new GraphRec object representing the interaction graph.
        """
        from datarec.data.graph import GraphRec
        return GraphRec(self)

    def to_pickle(self, filepath: str = '') -> None:
        """
        Save the current DataRec object to a pickle file.

        Args:
            filepath (str): The path (including filename) where the pickle file will be saved.
        """
        import pickle

        if filepath == '':
            filepath = paths.pickle_version_filepath(self.dataset_name, self.version_name)

        print(f'Saving DataRec to {filepath}')

        with open(filepath, 'wb') as f:
            pickle.dump(self, f)

        print(f'DataRec correctly saved to {filepath}')

data property writable

The underlying pandas DataFrame holding the interaction data.

user_col property writable

The name of the user ID column.

item_col property writable

The name of the item ID column.

rating_col property writable

The name of the rating column.

timestamp_col property writable

The name of the timestamp column.

users property

Returns a list of unique user IDs in the dataset.

items property

Returns a list of unique item IDs in the dataset.

n_users property

Returns the number of unique users.

n_items property

Returns the number of unique items.

columns property writable

Returns the list of column names of the internal DataFrame.

sorted_items property

Returns a dictionary of items sorted by their interaction count.

sorted_users property

Returns a dictionary of users sorted by their interaction count.

transactions property

Returns the total number of interactions (rows) in the dataset.

origin property

Returns the origin of the dataset (e.g., 'unknown', 'registry', 'file').

__init__(rawdata=None, copy=False, dataset_name='datarec', version_name='no_version_provided', *args, **kwargs)

Initializes the DataRec object.

Parameters:

Name Type Description Default
rawdata RawData

The input dataset wrapped in a RawData object. If None, the DataRec is initialized empty.

None
copy bool

Whether to copy the input DataFrame to avoid modifying the original RawData.

False
dataset_name str

A name to identify the dataset.

'datarec'
version_name str

A version identifier for the dataset.

'no_version_provided'
pipeline Pipeline

A pipeline object to track preprocessing steps.

required
registry_dataset bool

Whether the DataRec derives from a registered dataset.

required
Source code in datarec/data/dataset.py
def __init__(
        self,
        rawdata: RawData = None,
        copy: bool = False,
        dataset_name: str = 'datarec',
        version_name: str = 'no_version_provided',
        *args,
        **kwargs
):
    """
    Initializes the DataRec object.

    Args:
        rawdata (RawData): The input dataset wrapped in a RawData object.
            If None, the DataRec is initialized empty.
        copy (bool): Whether to copy the input DataFrame to avoid 
            modifying the original RawData.
        dataset_name (str): A name to identify the dataset.
        version_name (str): A version identifier 
            for the dataset.
        pipeline (Pipeline): A pipeline object to track preprocessing steps.
        registry_dataset (bool): Whether the DataRec derives from a registered dataset.

    """
    self.path = None
    self._data = None
    self.dataset_name = dataset_name
    self.version_name = version_name

    rawdata_step = None
    if rawdata is not None:
        if copy:
            self._data: pd.DataFrame = rawdata.data.copy()
        else:
            self._data: pd.DataFrame = rawdata.data

        if rawdata.pipeline_step is not None:
            rawdata_step = rawdata.pipeline_step

    # PIPELINE INITIALIZATION
    # if pipeline:
    #     self.pipeline = pipeline
    # else:
    #     # if the dataset is from a registry, we create a pipeline with a single load step. It overwrites the rawdata step if it exists.
    #     # If it is from rawdata, but not from a registry, we create a pipeline with the rawdata's pipeline step. 
    #     # Otherwise, we create an empty pipeline and warn the user.
    #     if registry_dataset:
    #         self.pipeline = Pipeline()
    #         self.pipeline.add_step("load", "registry_dataset", {"dataset_name": self.dataset_name, "version": self.version_name})
    #     elif rawdata_step is not None:
    #         self.pipeline = Pipeline()
    #         self.pipeline.steps.append(rawdata_step)
    #     else:
    #         warnings.warn("No pipeline provided. Initializing empty pipeline.")
    #         self.pipeline = Pipeline()
    self.pipeline = Pipeline()
    self._origin: str = "unknown"

    if rawdata_step:
        if rawdata_step.name == "load":
            self.set_origin_registry()
        elif rawdata_step.name == "read":
            self.set_origin_file(rawdata_step)
        else:
            warnings.warn(f"Unrecognized pipeline step '{rawdata_step.name}' in RawData. Setting origin to 'unknown'.")

    # ------------------------------------
    # --------- STANDARD COLUMNS ---------
    # if a column is None it means that the DataRec does not have that information
    self.__assigned_columns = []

    self._user_col = None
    self._item_col = None
    self._rating_col = None
    self._timestamp_col = None

    if rawdata:
        self.set_columns(rawdata)

    # map users and items with a 0-indexed mapping
    self.user_id_encoder = Encoder()
    self.item_id_encoder = Encoder()

    # Apply external encoders (e.g., from streaming readers) if provided
    if rawdata is not None:
        if getattr(rawdata, "user_encoder", None):
            self.user_id_encoder.apply_encoding(rawdata.user_encoder)
        if getattr(rawdata, "item_encoder", None):
            self.item_id_encoder.apply_encoding(rawdata.item_encoder)

    # ------------------------------
    # --------- PROPERTIES ---------
    self._sorted_users = None
    self._sorted_items = None

    self._n_users = None
    self._n_items = None
    self._transactions = None

    self.characteristics = CharacteristicAccessor(self)

    self._origin= "unknown"

__str__()

Returns 'self.data' as a string variable.

Returns:

Type Description
str

'self.data' as a string variable.

Source code in datarec/data/dataset.py
def __str__(self):
    """
    Returns 'self.data' as a string variable.

    Returns:
        (str): 'self.data' as a string variable.
    """
    return self.data.__str__()

__repr__()

Returns the official string representation of the internal DataFrame.

Source code in datarec/data/dataset.py
def __repr__(self):
    """
    Returns the official string representation of the internal DataFrame.
    """
    return self.data.__repr__()

__len__()

Returns the total number of samples in the dataset.

Returns:

Type Description
int

number of samples in the dataset.

Source code in datarec/data/dataset.py
def __len__(self):
    """
    Returns the total number of samples in the dataset.

    Returns:
        (int): number of samples in the dataset.
    """
    return len(self.data)

set_columns(rawdata)

Assign dataset column names from a RawData object and reorder the data accordingly.

Parameters:

Name Type Description Default
rawdata RawData

A RawData object containing the column names for user, item, rating, and timestamp.

required
Source code in datarec/data/dataset.py
def set_columns(self, rawdata):
    """
    Assign dataset column names from a RawData object and reorder the data accordingly.

    Args:
        rawdata (RawData): A RawData object containing the column names for 
            user, item, rating, and timestamp.
    """
    if rawdata.user is not None:
        self.user_col = rawdata.user
        self.__assigned_columns.append(self.user_col)
    if rawdata.item is not None:
        self.item_col = rawdata.item
        self.__assigned_columns.append(self.item_col)
    if rawdata.rating is not None:
        self.rating_col = rawdata.rating
        self.__assigned_columns.append(self.rating_col)
    if rawdata.timestamp is not None:
        self.timestamp_col = rawdata.timestamp
        self.__assigned_columns.append(self.timestamp_col)

    # re-order columns
    self._data = self.data[self.__assigned_columns]

reset()

Reset cached statistics and assigned columns of the DataRec object.

This method clears all precomputed dataset statistics (e.g., sorted users, density, Gini indices, shape, ratings per user/item) and empties the list of assigned columns. It is automatically called when the underlying data is changed.

Source code in datarec/data/dataset.py
def reset(self):

    """
    Reset cached statistics and assigned columns of the DataRec object.

    This method clears all precomputed dataset statistics (e.g., sorted users, 
    density, Gini indices, shape, ratings per user/item) and empties the list 
    of assigned columns. It is automatically called when the underlying data is changed.
    """

    self.__assigned_columns = []

set_user_col(value=DATAREC_USER_COL, rename=True)

Identifies and optionally renames the user column.

Parameters:

Name Type Description Default
value Union[str, int]

The current name or index of the user column.

DATAREC_USER_COL
rename bool

If True, renames the column to the standard internal name.

True
Source code in datarec/data/dataset.py
def set_user_col(self, value: Union[str, int] = DATAREC_USER_COL, rename=True):
    """
    Identifies and optionally renames the user column.

    Args:
        value (Union[str, int]): The current name or index of the user column.
        rename (bool): If True, renames the column to the standard internal name.
    """
    self.data.columns, self._user_col = set_column_name(columns=list(self.data.columns),
                                                        value=value,
                                                        default_name=DATAREC_USER_COL,
                                                        rename=rename)

set_item_col(value=DATAREC_ITEM_COL, rename=True)

Identifies and optionally renames the item column.

Parameters:

Name Type Description Default
value Union[str, int]

The current name or index of the item column.

DATAREC_ITEM_COL
rename bool

If True, renames the column to the standard internal name.

True
Source code in datarec/data/dataset.py
def set_item_col(self, value: Union[str, int] = DATAREC_ITEM_COL, rename=True):
    """
    Identifies and optionally renames the item column.

    Args:
        value (Union[str, int]): The current name or index of the item column.
        rename (bool): If True, renames the column to the standard internal name.
    """
    self.data.columns, self._item_col = set_column_name(columns=list(self.data.columns),
                                                        value=value,
                                                        default_name=DATAREC_ITEM_COL,
                                                        rename=rename)

set_rating_col(value=DATAREC_RATING_COL, rename=True)

Identifies and optionally renames the rating column.

Parameters:

Name Type Description Default
value Union[str, int]

The current name or index of the rating column.

DATAREC_RATING_COL
rename bool

If True, renames the column to the standard internal name.

True
Source code in datarec/data/dataset.py
def set_rating_col(self, value: Union[str, int] = DATAREC_RATING_COL, rename=True):
    """
    Identifies and optionally renames the rating column.

    Args:
        value (Union[str, int]): The current name or index of the rating column.
        rename (bool): If True, renames the column to the standard internal name.
    """
    self.data.columns, self._rating_col = set_column_name(columns=list(self.data.columns),
                                                          value=value,
                                                          default_name=DATAREC_RATING_COL,
                                                          rename=rename)

set_timestamp_col(value=DATAREC_TIMESTAMP_COL, rename=True)

Identifies and optionally renames the timestamp column.

Parameters:

Name Type Description Default
value Union[str, int]

The current name or index of the timestamp column.

DATAREC_TIMESTAMP_COL
rename bool

If True, renames the column to the standard internal name.

True
Source code in datarec/data/dataset.py
def set_timestamp_col(self, value: Union[str, int] = DATAREC_TIMESTAMP_COL, rename=True):
    """
    Identifies and optionally renames the timestamp column.

    Args:
        value (Union[str, int]): The current name or index of the timestamp column.
        rename (bool): If True, renames the column to the standard internal name.
    """
    self.data.columns, self._timestamp_col = set_column_name(columns=list(self.data.columns),
                                                             value=value,
                                                             default_name=DATAREC_TIMESTAMP_COL,
                                                             rename=rename)

is_encoded(on)

Checks if user or item IDs are encoded to private integer IDs.

Parameters:

Name Type Description Default
on str

'users' to check user encoding, 'items' for item encoding.

required

Returns: (bool): True if the specified IDs are encoded, False otherwise.

Source code in datarec/data/dataset.py
def is_encoded(self, on:str) -> bool:
    """
    Checks if user or item IDs are encoded to private integer IDs.

    Args:
        on (str): 'users' to check user encoding, 'items' for item encoding.
    Returns:
        (bool): True if the specified IDs are encoded, False otherwise.
    """
    if on == 'users':
        return self.user_id_encoder.is_encoded()
    elif on == 'items':
        return self.item_id_encoder.is_encoded()
    else:
        raise ValueError("Parameter 'on' must be either 'users' or 'items'.")

encode(users=True, items=True)

Converts user and item IDs to encoded integer IDs. Args: users (bool): If True, encodes user IDs. items (bool): If True, encodes item IDs.

Source code in datarec/data/dataset.py
def encode(self, users=True, items=True) -> None:
    """
    Converts user and item IDs to encoded integer IDs.
    Args:
        users (bool): If True, encodes user IDs.
        items (bool): If True, encodes item IDs.
    """
    if users:
        if not self.user_id_encoder.is_encoded():
            raise ValueError("User encoder is empty. Build or apply an encoding before calling encode().")
        self.data[self.user_col] = self.user_id_encoder.encode(self.data[self.user_col].tolist())
    if items:
        if not self.item_id_encoder.is_encoded():
            raise ValueError("Item encoder is empty. Build or apply an encoding before calling encode().")
        self.data[self.item_col] = self.item_id_encoder.encode(self.data[self.item_col].tolist())

decode(users=True, items=True)

Converts user and item IDs back to original IDs. Args: users (bool): If True, encodes user IDs. items (bool): If True, encodes item IDs.

Source code in datarec/data/dataset.py
def decode(self, users=True, items=True) -> None:
    """
    Converts user and item IDs back to original IDs.
    Args:
        users (bool): If True, encodes user IDs.
        items (bool): If True, encodes item IDs.
    """
    if users:
        self.data[self.user_col] = self.user_id_encoder.decode(self.data[self.user_col].tolist())
    if items:
        self.data[self.item_col] = self.item_id_encoder.decode(self.data[self.item_col].tolist())

reset_encoding(on='all')

Resets the encoding for users or items.

Parameters:

Name Type Description Default
on str

'users' to reset user encoding, 'items' for item encoding.

'all'
Source code in datarec/data/dataset.py
def reset_encoding(self, on='all') -> None:
    """
    Resets the encoding for users or items.

    Args:
        on (str): 'users' to reset user encoding, 'items' for item encoding.
    """
    if on == 'users':
        self.decode(users=True, items=False)
        self.user_id_encoder.reset_encoding()
    elif on == 'items':
        self.decode(users=False, items=True)
        self.item_id_encoder.reset_encoding()
    elif on == 'all':
        self.decode(users=True, items=True)
        self.user_id_encoder.reset_encoding()
        self.item_id_encoder.reset_encoding()
    else:
        raise ValueError("Parameter 'on' must be either 'users' or 'items'.")

build_encoding(on='users', offset=0)

Builds the encoding for users or items.

Parameters:

Name Type Description Default
on str

'users' to build user encoding, 'items' for item encoding, 'all' for both.

'users'
offset int

The starting integer for the private IDs.

0
Source code in datarec/data/dataset.py
def build_encoding(self, on='users', offset=0) -> None:
    """
    Builds the encoding for users or items.

    Args:
        on (str): 'users' to build user encoding, 'items' for item encoding, 'all' for both.
        offset (int): The starting integer for the private IDs.
    """
    if on == 'users':
        self.user_id_encoder.build_encoding(self.users, offset=offset)
    elif on == 'items':
        self.item_id_encoder.build_encoding(self.items, offset=offset)
    elif on == 'all':
        self.user_id_encoder.build_encoding(self.users, offset=offset)
        self.item_id_encoder.build_encoding(self.items, offset=offset)
    else:
        raise ValueError("Parameter 'on' must be either 'users' or 'items'.")

characteristic(name, **kwargs)

Retrieves a calculated dataset characteristic by name.

Parameters:

Name Type Description Default
name str

The name of the characteristic to retrieve.

required
**kwargs Any

Additional arguments to pass to the characteristic function.

{}

Returns:

Type Description
Any

The value of the requested characteristic.

Source code in datarec/data/dataset.py
def characteristic(self, name: str, **kwargs: Any) -> Any:
    """
    Retrieves a calculated dataset characteristic by name.

    Args:
        name (str): The name of the characteristic to retrieve.
        **kwargs: Additional arguments to pass to the characteristic function.

    Returns:
        The value of the requested characteristic.
    """
    return getattr(self.characteristics, name)(**kwargs)

users_frequency()

Computes the absolute frequency of each user in the dataset.

Returns:

Type Description
dict

A dictionary mapping user IDs to the number of interactions, sorted in descending order of frequency.

Source code in datarec/data/dataset.py
def users_frequency(self):
    """
    Computes the absolute frequency of each user in the dataset.

    Returns:
        (dict): A dictionary mapping user IDs to the number of interactions, 
            sorted in descending order of frequency.
    """
    fr = dict(Counter(self.data[self.user_col]))
    return dict(sorted(fr.items(), key=lambda item: item[1], reverse=True))

users_relative_frequency()

Computes the relative frequency of each user in the dataset.

Returns:

Type Description
dict

A dictionary mapping user IDs to their relative frequency (fraction of total transactions).

Source code in datarec/data/dataset.py
def users_relative_frequency(self):
    """
    Computes the relative frequency of each user in the dataset.

    Returns:
        (dict): A dictionary mapping user IDs to their relative frequency 
            (fraction of total transactions).
    """
    return {u: (f / self.transactions) for u, f in self.users_frequency().items()}

items_frequency()

Computes the absolute frequency of each item in the dataset.

Returns:

Type Description
dict

A dictionary mapping item IDs to the number of interactions, sorted in descending order of frequency.

Source code in datarec/data/dataset.py
def items_frequency(self):
    """
    Computes the absolute frequency of each item in the dataset.

    Returns:
        (dict): A dictionary mapping item IDs to the number of interactions, 
            sorted in descending order of frequency.
    """
    fr = dict(Counter(self.data[self.item_col]))
    return dict(sorted(fr.items(), key=lambda item: item[1], reverse=True))

items_relative_frequency()

Computes the relative frequency of each item in the dataset.

Returns:

Type Description
dict

A dictionary mapping item IDs to their relative frequency (fraction of total transactions).

Source code in datarec/data/dataset.py
def items_relative_frequency(self):
    """
    Computes the relative frequency of each item in the dataset.

    Returns:
        (dict): A dictionary mapping item IDs to their relative frequency 
            (fraction of total transactions).
    """
    return {u: (f / self.transactions) for u, f in self.items_frequency().items()}

users_quartiles()

Assigns quartile indices to users based on their frequency.

Returns:

Type Description
dict

A dictionary mapping each user ID to a quartile index (0-3), where 0 = lowest, 3 = highest frequency.

Source code in datarec/data/dataset.py
def users_quartiles(self):
    """
    Assigns quartile indices to users based on their frequency.

    Returns:
        (dict): A dictionary mapping each user ID to a quartile index (0-3),
            where 0 = lowest, 3 = highest frequency.
    """ 
    return quartiles(self.users_frequency())

items_quartiles()

Assigns quartile indices to items based on their frequency.

Returns:

Type Description
dict

A dictionary mapping each item ID to a quartile index (0-3), where 0 = lowest, 3 = highest frequency.

Source code in datarec/data/dataset.py
def items_quartiles(self):
    """
    Assigns quartile indices to items based on their frequency.

    Returns:
        (dict): A dictionary mapping each item ID to a quartile index (0-3),
            where 0 = lowest, 3 = highest frequency.
    """
    return quartiles(self.items_frequency())

users_popularity()

Categorizes users into descriptive popularity groups based on quartiles.

Returns:

Type Description
dict

A dictionary mapping popularity categories ('long tail', 'common', 'popular', 'most popular') to lists of user IDs.

Source code in datarec/data/dataset.py
def users_popularity(self):
    """
    Categorizes users into descriptive popularity groups based on quartiles.

    Returns:
        (dict): A dictionary mapping popularity categories ('long tail', 
            'common', 'popular', 'most popular') to lists of user IDs.
    """
    return popularity(self.users_quartiles())

items_popularity()

Categorizes items into descriptive popularity groups based on quartiles.

Returns:

Type Description
dict

A dictionary mapping popularity categories ('long tail', 'common', 'popular', 'most popular') to lists of item IDs.

Source code in datarec/data/dataset.py
def items_popularity(self):
    """
    Categorizes items into descriptive popularity groups based on quartiles.

    Returns:
        (dict): A dictionary mapping popularity categories ('long tail', 
            'common', 'popular', 'most popular') to lists of item IDs.
    """
    return popularity(self.items_quartiles())

get_user_interactions(user_id)

Retrieves all interactions for a specific user.

Parameters:

Name Type Description Default
user_id Any

The ID of the user whose interactions are to be retrieved.

required
Source code in datarec/data/dataset.py
def get_user_interactions(self, user_id: Any) -> pd.DataFrame:
    """
    Retrieves all interactions for a specific user.

    Args:
        user_id (Any): The ID of the user whose interactions are to be retrieved.
    """
    if user_id in self.users:
        df = self.data[self.data[self.user_col] == user_id]
        return df
    else:   
        raise ValueError(f"User ID {user_id} not found in dataset.")

get_item_interactions(item_id)

Retrieves all interactions for a specific item.

Parameters:

Name Type Description Default
item_id Any

The ID of the item whose interactions are to be retrieved.

required
Source code in datarec/data/dataset.py
def get_item_interactions(self, item_id: Any) -> pd.DataFrame:
    """
    Retrieves all interactions for a specific item.

    Args:
        item_id (Any): The ID of the item whose interactions are to be retrieved.
    """
    if item_id in self.items:
        df = self.data[self.data[self.item_col] == item_id]
        return df
    else:   
        raise ValueError(f"Item ID {item_id} not found in dataset.")

list_characteristics()

Return the names of all characteristics that can be computed on this dataset.

Source code in datarec/data/dataset.py
def list_characteristics(self) -> list[str]:
    """Return the names of all characteristics that can be computed on this dataset."""
    return sorted(CHARACTERISTICS.keys())

describe_characteristics()

Return a mapping name -> short docstring for each available characteristic.

Source code in datarec/data/dataset.py
def describe_characteristics(self) -> dict[str, str]:
    """Return a mapping name -> short docstring for each available characteristic."""
    return {
        name: (func.__doc__ or "").strip()
        for name, func in CHARACTERISTICS.items()
    }

copy()

Create a deep copy of the current DataRec object.

This method duplicates the DataRec instance, including its data, metadata (user, item, rating, timestamp columns), pipeline, and internal state such as privacy settings and implicit flags.

Returns:

Type Description
DataRec

A new DataRec object that is a deep copy of the current instance.

Source code in datarec/data/dataset.py
def copy(self):
    """
    Create a deep copy of the current DataRec object.

    This method duplicates the DataRec instance, including its data,
    metadata (user, item, rating, timestamp columns), pipeline, and internal
    state such as privacy settings and implicit flags.

    Returns:
        (DataRec): A new DataRec object that is a deep copy of the current instance.
    """
    pipeline = self.pipeline.copy()

    new_dr = DataRec(rawdata=self.to_rawdata(),
                     pipeline=pipeline,
                     copy=True)

    new_dr._user_col = self.user_col
    new_dr._item_col = self.item_col
    new_dr._rating_col = self.rating_col
    new_dr._timestamp_col = self.timestamp_col
    return new_dr

to_rawdata()

Convert the current DataRec object into a RawData object.

This method creates a RawData instance containing the same data and metadata (user, item, rating, timestamp columns) as the DataRec object.

Returns:

Type Description
RawData

A new RawData object containing the DataRec's data and column information.

Source code in datarec/data/dataset.py
def to_rawdata(self):
    """
    Convert the current DataRec object into a RawData object.

    This method creates a RawData instance containing the same data and
    metadata (user, item, rating, timestamp columns) as the DataRec object.

    Returns:
        (RawData): A new RawData object containing the DataRec's data and column information.
    """
    raw = RawData(self.data)
    raw.user = self.user_col
    raw.item = self.item_col
    raw.rating = self.rating_col
    raw.timestamp = self.timestamp_col
    return raw

set_origin_file(pipeline_step)

Set the origin of the dataset.

Parameters:

Name Type Description Default
origin str

A string describing the origin of the dataset (e.g., 'registry', 'file', 'unknown').

required
Source code in datarec/data/dataset.py
def set_origin_file(self, pipeline_step) -> None:
    """
    Set the origin of the dataset.

    Args:
        origin (str): A string describing the origin of the dataset (e.g., 'registry', 'file', 'unknown').
    """
    if pipeline_step.name != "read":
        raise ValueError(f"Pipeline step must be a read step to set file origin. Found step name: {pipeline_step.name}")
    if self.pipeline.steps:
        warnings.warn("Prepending file load step to the pipeline.")
        self.pipeline.steps.insert(0, pipeline_step)
    else:
        self.pipeline.steps.append(pipeline_step)
    self._origin = "file"

set_origin_registry()

Set the origin of the dataset to 'registry'.

Source code in datarec/data/dataset.py
def set_origin_registry(self) -> None:
    """
    Set the origin of the dataset to 'registry'.
    """
    from datarec.pipeline import PipelineStep
    pipeline_step = PipelineStep("load", "registry_dataset", {"dataset_name": self.dataset_name, "version": self.version_name})
    if self.pipeline.steps:
        if self.pipeline.steps[0].name == "read":
            self.pipeline.steps[0] = pipeline_step
        else:
            warnings.warn("First pipeline step is not a read step. Prepending registry load step to the pipeline.")
            self.pipeline.steps.insert(0, pipeline_step)
    else:
        self.pipeline.steps.append(pipeline_step)
    self._origin = "registry"

save_pipeline(filepath)

Save the current processing pipeline to a YAML file.

Parameters:

Name Type Description Default
filepath str

The path (including filename) where the pipeline YAML file will be saved.

required
Source code in datarec/data/dataset.py
def save_pipeline(self, filepath: str) -> None:
    """
    Save the current processing pipeline to a YAML file.

    Args:
        filepath (str): The path (including filename) where the pipeline 
            YAML file will be saved.
    """
    print(f'Saving pipeline to {filepath}')

    self.pipeline.to_yaml(filepath)

    print(f'Pipeline correctly saved to {filepath}')

to_torch_dataset(task='pointwise', autoprepare=True, **kwargs)

Converts the current DataRec object into a PyTorch-compatible dataset.

This method prepares the dataset (e.g., remaps user/item IDs to a dense index space) and returns a torch.utils.data.Dataset object suitable for training with PyTorch.

Parameters:

Name Type Description Default
task str

The recommendation task type. Must be one of: - "pointwise": returns PointwiseTorchDataset - "pairwise": returns PairwiseTorchDataset - "ranking": returns RankingTorchDataset

'pointwise'
autoprepare bool

If True, automatically applies user/item remapping and switches the dataset to private IDs. If False, assumes the dataset is already properly prepared.

True
**kwargs Any

Additional arguments passed to the specific torch dataset class.

{}

Returns:

Type Description
Dataset

A PyTorch dataset instance corresponding to the selected task.

Raises:

Type Description
ImportError

If PyTorch is not installed.

ValueError

If an unknown task name is provided.

Source code in datarec/data/dataset.py
def to_torch_dataset(self, task: str = "pointwise", autoprepare: bool = True, **kwargs: Any) -> Any:
    """
    Converts the current DataRec object into a PyTorch-compatible dataset.

    This method prepares the dataset (e.g., remaps user/item IDs to a dense index space)
    and returns a `torch.utils.data.Dataset` object suitable for training with PyTorch.

    Args:
        task (str): The recommendation task type. Must be one of:
            - "pointwise": returns PointwiseTorchDataset
            - "pairwise": returns PairwiseTorchDataset
            - "ranking": returns RankingTorchDataset
        autoprepare (bool): If True, automatically applies user/item remapping
            and switches the dataset to private IDs. If False, assumes the dataset
            is already properly prepared.
        **kwargs: Additional arguments passed to the specific torch dataset class.

    Returns:
        (torch.utils.data.Dataset): A PyTorch dataset instance corresponding to the selected task.

    Raises:
        ImportError: If PyTorch is not installed.
        ValueError: If an unknown task name is provided.
    """

    try:
        import torch
    except ModuleNotFoundError:
        raise ImportError(
            "Torch is required for this feature. Please install it with `pip install datarec[torch]` or"
            " `pip install -r requirements/requirements-torch.txt`."
        )

    if autoprepare:
        self.build_encoding(on='all', offset=0)
        self.encode()
    else:
        warnings.warn(
            "Autoprepare is set to False. "
            "Ensure that the dataset is prepared correctly before using it with PyTorch."
        )

    if task == "pointwise":
        from datarec.data.torch_dataset import PointwiseTorchDataset
        return PointwiseTorchDataset(self, **kwargs)
    elif task == "pairwise":
        from datarec.data.torch_dataset import PairwiseTorchDataset
        return PairwiseTorchDataset(self, **kwargs)
    elif task == "ranking":
        from datarec.data.torch_dataset import RankingTorchDataset
        return RankingTorchDataset(self, **kwargs)
    else:
        raise ValueError(f"Unknown task: {task}")

to_graphrec()

Converts the current DataRec object into a GraphRec object.

This method creates a GraphRec instance representing the bipartite graph of user-item interactions contained in the DataRec object.

Returns:

Type Description
GraphRec

A new GraphRec object representing the interaction graph.

Source code in datarec/data/dataset.py
def to_graphrec(self):
    """
    Converts the current DataRec object into a GraphRec object.

    This method creates a GraphRec instance representing the bipartite graph
    of user-item interactions contained in the DataRec object.

    Returns:
        (GraphRec): A new GraphRec object representing the interaction graph.
    """
    from datarec.data.graph import GraphRec
    return GraphRec(self)

to_pickle(filepath='')

Save the current DataRec object to a pickle file.

Parameters:

Name Type Description Default
filepath str

The path (including filename) where the pickle file will be saved.

''
Source code in datarec/data/dataset.py
def to_pickle(self, filepath: str = '') -> None:
    """
    Save the current DataRec object to a pickle file.

    Args:
        filepath (str): The path (including filename) where the pickle file will be saved.
    """
    import pickle

    if filepath == '':
        filepath = paths.pickle_version_filepath(self.dataset_name, self.version_name)

    print(f'Saving DataRec to {filepath}')

    with open(filepath, 'wb') as f:
        pickle.dump(self, f)

    print(f'DataRec correctly saved to {filepath}')

CharacteristicAccessor

Accessor for dataset characteristics. Allows dynamic retrieval of dataset characteristics as attributes.

Parameters:

Name Type Description Default
dr DataRec

The DataRec object to access characteristics from.

required
Source code in datarec/data/dataset.py
class CharacteristicAccessor:
    """
    Accessor for dataset characteristics.
    Allows dynamic retrieval of dataset characteristics as attributes.

    Args:
        dr (DataRec): The DataRec object to access characteristics from.
    """
    def __init__(self, dr: DataRec):
        self._datarec = dr

    def __getattr__(self, name):
        try:
            func = CHARACTERISTICS[name]
        except KeyError:
            raise AttributeError(name) from None

        def bound(**kwargs):
            return func(self._datarec, **kwargs)

        bound.__doc__ = func.__doc__
        return bound

from_pickle(dataset_name='', version_name='', filepath='')

Load a DataRec object from a pickle file.

Parameters:

Name Type Description Default
dataset_name str

The name of the dataset.

''
version_name str

The version identifier of the dataset.

''
filepath str

The path to the pickle file.

''

Returns: (DataRec): The loaded DataRec object.

Source code in datarec/data/dataset.py
def from_pickle(dataset_name:str = '', version_name:str = '', filepath: str = '') -> DataRec:
    """
    Load a DataRec object from a pickle file.

    Args:
        dataset_name (str): The name of the dataset.
        version_name (str): The version identifier of the dataset.
        filepath (str): The path to the pickle file.
    Returns:
        (DataRec): The loaded DataRec object.
    """
    import pickle

    if filepath == '':
        if dataset_name == '' and version_name == '':
            raise ValueError("Either dataset_name and version_name or filepath must be provided.")
        filepath = paths.pickle_version_filepath(dataset_name, version_name)

    print(f'Loading DataRec from {filepath}')

    with open(filepath, 'rb') as f:
        dr = pickle.load(f)

    print(f'DataRec correctly loaded from {filepath}')

    return dr

Source dataclass

Source code in datarec/data/source.py
@dataclass
class Source:
    checksum: str = None
    checksum_algorithm: str = "md5"
    prepared: bool = False
    source_name: Optional[str] = None
    filename: Optional[str] = None
    archive: Optional[str] = None
    inner_paths: Optional[Dict[str, str]] = None
    downloadable: bool = False
    output_folder: Optional[str] = None

    def path(self, output_folder=None) -> str:
        if output_folder is None:
            if self.output_folder is None:
                raise ValueError("Must specify an output folder")
            output_folder = self.output_folder
        return os.path.join(output_folder, self.filename)

    def is_locally_available(self, output_folder=None) -> bool:
        """
        Check if the source file exists in the output folder.
        """
        if output_folder is None:
            if self.output_folder is None:
                raise ValueError("Must specify an output folder")
        return os.path.exists(self.path())

    def verify_checksum(self, output_folder=None) -> None:
        print(f'{self.filename}: verifying checksum')
        if output_folder is None:
            output_folder = self.output_folder
        verify_checksum(self.path(output_folder), self.checksum)

    def download(self) -> str:
        pass

    def prepare(self) -> None:
        """
        Prepares the source by downloading it if not available locally
        and verifying its checksum.
        Returns:
            (None)
        """
        if self.prepared:
            return
        # check if source file exists, if not download it
        if not self.is_locally_available():
            self.download()
        # verify source checksum
        self.verify_checksum()
        self.prepared = True

    def resource_paths(self) -> Dict[str, str]:
        """
        Returns a dictionary containing the paths of the resources inside the source.
        Returns:
            (dict): A dictionary containing the paths of the resources inside the source.
        """
        resources_path = {res: os.path.join(self.output_folder, inner_path) for res, inner_path in self.inner_paths.items()}
        return resources_path

    def resources_available(self) -> bool:
        """
        Check if all resources inside the source are available locally.
        Returns:
            (bool): True if all resources are available, False otherwise.
        """
        resource_paths = self.resource_paths()
        for resource_path in resource_paths.values():
            if not os.path.exists(resource_path):
                return False
        return True


    def get_resources(self, force=False) -> Dict[str, str]:
        """
        Returns a dictionary containing the paths of the resources inside the source.
        Args:
            force (bool): If True, forces re-extraction of resources even if they are already available.
        Returns:
            (dict): A dictionary containing the paths of the resources inside the source.
        """
        if self.resources_available() and not force:
            return self.resource_paths()

        if self.archive:
            decompress_file(self.path(), self.output_folder, self.archive)
        resources = {}
        for resource, inner_path in self.inner_paths.items():
            resource_path = os.path.join(self.output_folder, inner_path)
            if os.path.exists(resource_path):
                resources[resource] = resource_path
        return resources

is_locally_available(output_folder=None)

Check if the source file exists in the output folder.

Source code in datarec/data/source.py
def is_locally_available(self, output_folder=None) -> bool:
    """
    Check if the source file exists in the output folder.
    """
    if output_folder is None:
        if self.output_folder is None:
            raise ValueError("Must specify an output folder")
    return os.path.exists(self.path())

prepare()

Prepares the source by downloading it if not available locally and verifying its checksum. Returns: (None)

Source code in datarec/data/source.py
def prepare(self) -> None:
    """
    Prepares the source by downloading it if not available locally
    and verifying its checksum.
    Returns:
        (None)
    """
    if self.prepared:
        return
    # check if source file exists, if not download it
    if not self.is_locally_available():
        self.download()
    # verify source checksum
    self.verify_checksum()
    self.prepared = True

resource_paths()

Returns a dictionary containing the paths of the resources inside the source. Returns: (dict): A dictionary containing the paths of the resources inside the source.

Source code in datarec/data/source.py
def resource_paths(self) -> Dict[str, str]:
    """
    Returns a dictionary containing the paths of the resources inside the source.
    Returns:
        (dict): A dictionary containing the paths of the resources inside the source.
    """
    resources_path = {res: os.path.join(self.output_folder, inner_path) for res, inner_path in self.inner_paths.items()}
    return resources_path

resources_available()

Check if all resources inside the source are available locally. Returns: (bool): True if all resources are available, False otherwise.

Source code in datarec/data/source.py
def resources_available(self) -> bool:
    """
    Check if all resources inside the source are available locally.
    Returns:
        (bool): True if all resources are available, False otherwise.
    """
    resource_paths = self.resource_paths()
    for resource_path in resource_paths.values():
        if not os.path.exists(resource_path):
            return False
    return True

get_resources(force=False)

Returns a dictionary containing the paths of the resources inside the source. Args: force (bool): If True, forces re-extraction of resources even if they are already available. Returns: (dict): A dictionary containing the paths of the resources inside the source.

Source code in datarec/data/source.py
def get_resources(self, force=False) -> Dict[str, str]:
    """
    Returns a dictionary containing the paths of the resources inside the source.
    Args:
        force (bool): If True, forces re-extraction of resources even if they are already available.
    Returns:
        (dict): A dictionary containing the paths of the resources inside the source.
    """
    if self.resources_available() and not force:
        return self.resource_paths()

    if self.archive:
        decompress_file(self.path(), self.output_folder, self.archive)
    resources = {}
    for resource, inner_path in self.inner_paths.items():
        resource_path = os.path.join(self.output_folder, inner_path)
        if os.path.exists(resource_path):
            resources[resource] = resource_path
    return resources

NestedSource dataclass

Bases: Source

Source code in datarec/data/source.py
@dataclass
class NestedSource(Source):
    parent_source_name: str = ''
    parent_source: Optional[Source] = None

    # def path(self, output_folder=None) -> str:
    #     output_folder = self.parent_source.output_folder
    #     if output_folder is None:
    #         raise ValueError("Must specify an output folder")
    #     if self.parent_source is None:
    #         raise RuntimeError('Parent source {self.parent_source_name} is not available. You need to link the parent source before (see self.link_parent_source).')

    #     inner_paths = self.parent_source.inner_paths
    #     if inner_paths is None:
    #         raise RuntimeError(f'Inner paths not found in parent source \'{self.parent_source_name}\'.')

    #     inner_path = inner_paths.get(self.source_name, None)
    #     if inner_path is None:
    #         raise RuntimeError(f"NestedSource {self.source_name} not found in parent source \'{self.parent_source_name}\' inner paths")

    #     return os.path.join(output_folder, inner_path)

    def link_parent_source(self, sources: dict[str, Source]):
        """
        Links the resource to its source.
        Args:
            sources (dict): A dictionary containing dataset sources objects.
        Returns:
            (None): None
        """
        if self.parent_source_name is None:
            raise RuntimeError(f"No source provided for resource {self.filename}")
        if self.parent_source_name not in sources:
            raise RuntimeError(f"Source {self.parent_source_name} not found")
        self.parent_source = sources[self.parent_source_name]

        output_folder = self.parent_source.output_folder
        if output_folder is None:
            raise ValueError("Must specify an output folder")

        inner_paths = self.parent_source.inner_paths
        if inner_paths is None:
            raise RuntimeError(f'Inner paths not found in parent source \'{self.parent_source_name}\'.')

        inner_path = inner_paths.get(self.source_name, None)
        if inner_path is None:
            raise RuntimeError(f"NestedSource {self.source_name} not found in parent source \'{self.parent_source_name}\' inner paths")

        inner_parent_path = os.path.dirname(inner_path)
        self.output_folder = os.path.join(self.parent_source.output_folder, inner_parent_path)


    def prepare(self) -> None:
        """
        Prepares the source by downloading it if not available locally
        and verifying its checksum.
        Returns:
            (None)
        """
        if self.prepared:
            return
        # check if source file exists, if not download it
        if not self.is_locally_available():
            if self.parent_source_name is None:
                raise RuntimeError(f"Parent source name not defined for nested source")
            if self.parent_source is None:
                raise RuntimeError(f"Parent source not set for nested source")

            # prepare parent source
            if not self.parent_source.prepared:
                self.parent_source.prepare()

            # check that this resource is available in parent source
            resources = self.parent_source.get_resources()
            if self.source_name not in resources:
                raise RuntimeError(f"Resource {self.filename} not found in parent source")

            # verify checksum
            self.verify_checksum()

            self.prepared = True
        return

Links the resource to its source. Args: sources (dict): A dictionary containing dataset sources objects. Returns: (None): None

Source code in datarec/data/source.py
def link_parent_source(self, sources: dict[str, Source]):
    """
    Links the resource to its source.
    Args:
        sources (dict): A dictionary containing dataset sources objects.
    Returns:
        (None): None
    """
    if self.parent_source_name is None:
        raise RuntimeError(f"No source provided for resource {self.filename}")
    if self.parent_source_name not in sources:
        raise RuntimeError(f"Source {self.parent_source_name} not found")
    self.parent_source = sources[self.parent_source_name]

    output_folder = self.parent_source.output_folder
    if output_folder is None:
        raise ValueError("Must specify an output folder")

    inner_paths = self.parent_source.inner_paths
    if inner_paths is None:
        raise RuntimeError(f'Inner paths not found in parent source \'{self.parent_source_name}\'.')

    inner_path = inner_paths.get(self.source_name, None)
    if inner_path is None:
        raise RuntimeError(f"NestedSource {self.source_name} not found in parent source \'{self.parent_source_name}\' inner paths")

    inner_parent_path = os.path.dirname(inner_path)
    self.output_folder = os.path.join(self.parent_source.output_folder, inner_parent_path)

prepare()

Prepares the source by downloading it if not available locally and verifying its checksum. Returns: (None)

Source code in datarec/data/source.py
def prepare(self) -> None:
    """
    Prepares the source by downloading it if not available locally
    and verifying its checksum.
    Returns:
        (None)
    """
    if self.prepared:
        return
    # check if source file exists, if not download it
    if not self.is_locally_available():
        if self.parent_source_name is None:
            raise RuntimeError(f"Parent source name not defined for nested source")
        if self.parent_source is None:
            raise RuntimeError(f"Parent source not set for nested source")

        # prepare parent source
        if not self.parent_source.prepared:
            self.parent_source.prepare()

        # check that this resource is available in parent source
        resources = self.parent_source.get_resources()
        if self.source_name not in resources:
            raise RuntimeError(f"Resource {self.filename} not found in parent source")

        # verify checksum
        self.verify_checksum()

        self.prepared = True
    return

set_source(source_name, source_conf)

Given a resource configuration, return a new resource object Args: resource_name (str): name of the resource raw_resource (dict): resource configuration Returns: (Source): a dataset source object

Source code in datarec/data/source.py
def set_source(source_name:str, source_conf:dict) -> Source:
    """
    Given a resource configuration, return a new resource object
    Args:
        resource_name (str): name of the resource
        raw_resource (dict): resource configuration
    Returns:
        (Source): a dataset source object
    """
    source_type = SOURCE_TYPES[source_conf['source_type']]
    source = source_type(source_name=source_name, **source_conf['args'])
    return source

set_sources(config, folder=None)

Given a dataset configuration, return a new dataset configuration Args: config (dict): dataset configuration folder (str): source output folder Returns: (dict): a dictionary containing dataset sources objects

Source code in datarec/data/source.py
def set_sources(config:dict, folder:Optional[str]=None) -> dict[str, Source]:
    """
    Given a dataset configuration, return a new dataset configuration
    Args:
        config (dict): dataset configuration
        folder (str): source output folder
    Returns:
        (dict): a dictionary containing dataset sources objects
    """
    sources = dict()
    for source_name, raw_source in config['sources'].items():
        source = set_source(source_name, raw_source)
        if folder is not None:
            source.output_folder = folder
        sources[source_name] = source

    # link parent source to nested sources, if any
    for source in sources.values():
        if isinstance(source, NestedSource):
            source.link_parent_source(sources)
    return sources

Resource dataclass

Source code in datarec/data/resource.py
@dataclass
class Resource:
    resource_name: Optional[str] = None
    source_name: Optional[str] = None
    source: Optional[Source] = None
    filename: Optional[str] = None
    type: Optional[str] = None
    format: Optional[str] = None
    required: Optional[bool] = False
    about: Optional[str] = None
    dataset_name: Optional[str] = None
    version: Optional[str] = None
    output_folder: Optional[str] = None
    prepared = False

    def link_source(self, sources: dict[str, Source]):
        """
        Links the resource to its source.
        Args:
            sources (dict): A dictionary containing dataset sources objects.
        Returns:
            (None): None
        """
        if self.source_name is None:
            raise RuntimeError(f"No source provided for resource {self.filename}")
        if self.source_name not in sources:
            raise RuntimeError(f"Source {self.source_name} not found")
        self.source = sources[self.source_name]

    def is_locally_available(self) -> Union[str, bool]:
        """
        Checks if the resource file is available locally.
        Returns:
            (str): The local path of the resource file if available, otherwise False.
        """
        # Check that a source is linked to resource
        if self.source is None:
            raise RuntimeError(f"No source provided for resource {self.filename}")
        resource_path = self.path()
        # Check if resource file already exists
        if os.path.exists(resource_path):
            return True
        return False

    def prepare(self, *args, **kwargs):
        """
        Ensures the resource file is downloaded and available locally.
        """
        if self.source is None:
            raise RuntimeError(f"No source provided for resource {self.filename}")

        self.source.prepare()
        resources = self.source.get_resources()

        if self.resource_name not in resources:
            raise RuntimeError(f"Resource {self.resource_name} not found in source")
        self.prepared = True

    def path(self):
        """
        Returns the local path of the resource file.
        Raises an error if the output folder is not specified.
        Returns:
            (str): The local path of the resource file.
        """
        output_folder = self.source.output_folder
        if output_folder is None:
            raise ValueError("Must specify an output folder")
        inner_path = self.source.inner_paths.get(self.resource_name, None)
        if inner_path is None:
            raise RuntimeError(f"Resource {self.resource_name} not found in source inner paths")
        return os.path.join(output_folder, inner_path)

    def assign_dataset_info(self, dataset_name:str, version:str):
        """
        Assigns dataset name and version to the resource.
        Args:
            dataset_name (str): The name of the dataset.
            version (str): The version of the dataset.
        Returns:
            (None): None
        """
        self.dataset_name = dataset_name
        self.version = version

    def free_cache(self):
        """
        Frees the cached version of the resource if it exists.
        Returns:
            (None): None
        """
        pass

Links the resource to its source. Args: sources (dict): A dictionary containing dataset sources objects. Returns: (None): None

Source code in datarec/data/resource.py
def link_source(self, sources: dict[str, Source]):
    """
    Links the resource to its source.
    Args:
        sources (dict): A dictionary containing dataset sources objects.
    Returns:
        (None): None
    """
    if self.source_name is None:
        raise RuntimeError(f"No source provided for resource {self.filename}")
    if self.source_name not in sources:
        raise RuntimeError(f"Source {self.source_name} not found")
    self.source = sources[self.source_name]

is_locally_available()

Checks if the resource file is available locally. Returns: (str): The local path of the resource file if available, otherwise False.

Source code in datarec/data/resource.py
def is_locally_available(self) -> Union[str, bool]:
    """
    Checks if the resource file is available locally.
    Returns:
        (str): The local path of the resource file if available, otherwise False.
    """
    # Check that a source is linked to resource
    if self.source is None:
        raise RuntimeError(f"No source provided for resource {self.filename}")
    resource_path = self.path()
    # Check if resource file already exists
    if os.path.exists(resource_path):
        return True
    return False

prepare(*args, **kwargs)

Ensures the resource file is downloaded and available locally.

Source code in datarec/data/resource.py
def prepare(self, *args, **kwargs):
    """
    Ensures the resource file is downloaded and available locally.
    """
    if self.source is None:
        raise RuntimeError(f"No source provided for resource {self.filename}")

    self.source.prepare()
    resources = self.source.get_resources()

    if self.resource_name not in resources:
        raise RuntimeError(f"Resource {self.resource_name} not found in source")
    self.prepared = True

path()

Returns the local path of the resource file. Raises an error if the output folder is not specified. Returns: (str): The local path of the resource file.

Source code in datarec/data/resource.py
def path(self):
    """
    Returns the local path of the resource file.
    Raises an error if the output folder is not specified.
    Returns:
        (str): The local path of the resource file.
    """
    output_folder = self.source.output_folder
    if output_folder is None:
        raise ValueError("Must specify an output folder")
    inner_path = self.source.inner_paths.get(self.resource_name, None)
    if inner_path is None:
        raise RuntimeError(f"Resource {self.resource_name} not found in source inner paths")
    return os.path.join(output_folder, inner_path)

assign_dataset_info(dataset_name, version)

Assigns dataset name and version to the resource. Args: dataset_name (str): The name of the dataset. version (str): The version of the dataset. Returns: (None): None

Source code in datarec/data/resource.py
def assign_dataset_info(self, dataset_name:str, version:str):
    """
    Assigns dataset name and version to the resource.
    Args:
        dataset_name (str): The name of the dataset.
        version (str): The version of the dataset.
    Returns:
        (None): None
    """
    self.dataset_name = dataset_name
    self.version = version

free_cache()

Frees the cached version of the resource if it exists. Returns: (None): None

Source code in datarec/data/resource.py
def free_cache(self):
    """
    Frees the cached version of the resource if it exists.
    Returns:
        (None): None
    """
    pass

Interactions dataclass

Bases: Resource

Source code in datarec/data/resource.py
@dataclass
class Interactions(Resource):
    schema: Optional[dict] = None
    _cache_ready: Optional[bool] = False

    def cache_path(self) -> str:
        """
        Returns the path of the cached version of the resource.
        Returns:
            (str): The path of the cached resource file.
        """
        if self.dataset_name is None or self.version is None:
            raise ValueError("Dataset name and version must be set to get cache path")
        return pickle_version_filepath(self.dataset_name, self.version)

    def _has_cache(self) -> bool:
        """
        Checks if a cached version of the resource exists.
        Returns:
            (bool): True if the cached file exists, otherwise False.
        """
        if self._cache_ready:
            return True

        if self.dataset_name is None or self.version is None:
            raise ValueError("Dataset name and version must be set to check for cache")
        self._cache_ready = os.path.exists(self.cache_path())
        return self._cache_ready

    def prepare(self, use_cache=True, *args, **kwargs):
        """
        Prepares the resource by ensuring it is downloaded and available locally.
        If a cached version exists, it skips downloading.
        """
        if use_cache is True and self._has_cache():
            self.prepared = True
            print(f"Resource '{self.resource_name}' found in cache. Skipping download.")
            return
        super().prepare(*args, **kwargs)

    def load(self, use_cache=True, to_cache=True) -> DataRec:
        """
        Loads the ratings resource into a DataRec dataset.
        Returns:
            DataRec: The loaded dataset.
        """
        if use_cache and self._has_cache():
            print(f"Loading resource '{self.resource_name}' from cache at {self.cache_path()}.")
            return from_pickle(filepath=self.cache_path())

        if self.format == 'transactions_tabular':
            schema = self.schema
            if schema is None:
                raise ValueError("Schema must be provided for transactions_tabular format")

            datarec_ = read_transactions_tabular(self.path(), 
                                                sep=schema['sep'],
                                                user_col=schema['user_col'],
                                                item_col=schema['item_col'],
                                                rating_col=schema.get('rating_col', None),
                                                timestamp_col=schema.get('timestamp_col', None),
                                                header=schema.get('header', None),
                                                skiprows=schema.get('skiprows', 0),
                                                cols=schema.get('cols', None),
                                                engine=schema.get('engine', 'c'),
                                                fallback_engine=schema.get('fallback_engine', 'python'),
                                                stream=schema.get('stream', False),
                                                encode_ids=schema.get('encode_ids', False),
                                                chunksize=schema.get('chunksize', 100_000),
                                                dataset_name=self.dataset_name,
                                                version_name=self.version,)

        elif self.format == 'transactions_json':
            schema = self.schema
            if schema is None:
                raise ValueError("Schema must be provided for transactions json format")

            datarec_ = read_transactions_json(self.path(),
                                             user_col=schema['user_col'],
                                             item_col=schema['item_col'],
                                             rating_col=schema.get('rating_col', None),
                                             timestamp_col=schema.get('timestamp_col', None),
                                             stream=schema.get('stream', False),
                                             encode_ids=schema.get('encode_ids', False),
                                             chunksize=schema.get('chunksize', 100_000),
                                             dataset_name=self.dataset_name,
                                            version_name=self.version,)

        elif self.format == 'transactions_jsonl':
            schema = self.schema
            if schema is None:
                raise ValueError("Schema must be provided for transactions jsonl format")

            datarec_ = read_transactions_jsonl(self.path(),
                                              user_col=schema['user_col'],
                                              item_col=schema['item_col'],
                                              rating_col=schema.get('rating_col', None),
                                              timestamp_col=schema.get('timestamp_col', None),
                                              stream=schema.get('stream', False),
                                              encode_ids=schema.get('encode_ids', False),
                                              chunksize=schema.get('chunksize', 100_000),
                                              version_name=self.version,)

        elif self.format == 'sequence_tabular_inline':
            schema = self.schema
            if schema is None:
                raise ValueError("Schema must be provided for sequence tabular inline format")

            sequence_col = schema.get('sequence_col', None)
            if sequence_col is None:
                raise ValueError("sequence_col must be provided in schema for sequence tabular inline format")

            datarec_ = read_sequence_tabular_inline(self.path(),
                                                    user_col=schema['user_col'],
                                                    sequence_col=sequence_col,
                                                    sequence_sep=schema.get('sequence_sep', ' '),
                                                    timestamp_col=schema.get('timestamp_col', None),
                                                    meta_cols=schema.get('meta_cols', None),
                                                    col_sep=schema.get('col_sep', ','),
                                                    header=schema.get('header', 0),
                                                    cols=schema.get('cols', None),
                                                    engine=schema.get('engine', 'c'),
                                                    fallback_engine=schema.get('fallback_engine', 'python'),
                                                    stream=schema.get('stream', schema.get('stream_encode', False)),
                                                    encode_ids=schema.get('encode_ids', False),
                                                    chunksize=schema.get('chunksize', 100_000),
                                                    dataset_name=self.dataset_name,
                                                    version_name=self.version,)

        elif self.format == 'sequence_tabular_wide':
            schema = self.schema
            if schema is None:
                raise ValueError("Schema must be provided for sequence tabular wide format")

            datarec_ = read_sequence_tabular_wide(self.path(),
                                                 user_col=schema.get('user_col', 'user'),
                                                 item_col=schema.get('item_col', 'item'),
                                                 col_sep=schema.get('col_sep', ' '),
                                                 header=schema.get('header', None),
                                                 encode_ids=schema.get('encode_ids', False),
                                                 dataset_name=self.dataset_name,
                                                 version_name=self.version,)

        elif self.format == 'sequence_tabular_implicit':
            schema = self.schema
            if schema is None:
                raise ValueError("Schema must be provided for sequence tabular implicit format")

            datarec_ = read_sequence_tabular_implicit(self.path(),
                                                     user_col=schema.get('user_col', 'sequence_id'),
                                                     item_col=schema.get('item_col', 'item'),
                                                     col_sep=schema.get('col_sep', ' '),
                                                     header=schema.get('header', None),
                                                     drop_length_col=schema.get('drop_length_col', True),
                                                     encode_ids=schema.get('encode_ids', False),
                                                     dataset_name=self.dataset_name,
                                                     version_name=self.version,)

        elif self.format == 'sequence_json':
            schema = self.schema
            if schema is None:
                raise ValueError("Schema must be provided for sequence json format")

            datarec_ = read_sequences_json(self.path(),
                                          user_col=schema['user_col'],
                                          item_col=schema['item_col'],
                                          rating_col=schema.get('rating_col', None),
                                          timestamp_col=schema.get('timestamp_col', None),
                                          dataset_name=self.dataset_name,
                                          version_name=self.version,)

        elif self.format == 'sequence_json_array':
            schema = self.schema
            if schema is None:
                raise ValueError("Schema must be provided for sequence json array format")

            datarec_ = read_sequences_json_array(self.path(),
                                                user_col=schema['user_col'],
                                                item_col=schema['item_col'],
                                                rating_col=schema.get('rating_col', None),
                                                timestamp_col=schema.get('timestamp_col', None),
                                                sequence_key=schema.get('sequence_key', 'sequence'),
                                                dataset_name=self.dataset_name,
                                                version_name=self.version,)

        else:
            raise NotImplementedError(f"Format {self.format} not supported for resource loading.")

        if self.dataset_name is None:
            print("Warning: dataset_name is not set for the resource. Using 'unknown_dataset'.")
            self.dataset_name = "unknown_dataset"
        if self.version is None:
            print("Warning: version is not set for the resource. Using 'unknown_version'.")
            self.version = "unknown_version"

        # set the origin of the dataset to registry, since it is being loaded from a resource file defined in the registry
        datarec_.set_origin_registry()

        # cache the dataset in pickle format
        if to_cache:
            datarec_.to_pickle()

        return datarec_

    def free_cache(self):
        """
        Frees the cached version of the resource if it exists.
        Returns:
            (None): None
        """
        if self._has_cache():
            os.remove(self.cache_path())
            self._cache_ready = False
            print(f"Cache for resource '{self.resource_name}' has been removed.")

cache_path()

Returns the path of the cached version of the resource. Returns: (str): The path of the cached resource file.

Source code in datarec/data/resource.py
def cache_path(self) -> str:
    """
    Returns the path of the cached version of the resource.
    Returns:
        (str): The path of the cached resource file.
    """
    if self.dataset_name is None or self.version is None:
        raise ValueError("Dataset name and version must be set to get cache path")
    return pickle_version_filepath(self.dataset_name, self.version)

prepare(use_cache=True, *args, **kwargs)

Prepares the resource by ensuring it is downloaded and available locally. If a cached version exists, it skips downloading.

Source code in datarec/data/resource.py
def prepare(self, use_cache=True, *args, **kwargs):
    """
    Prepares the resource by ensuring it is downloaded and available locally.
    If a cached version exists, it skips downloading.
    """
    if use_cache is True and self._has_cache():
        self.prepared = True
        print(f"Resource '{self.resource_name}' found in cache. Skipping download.")
        return
    super().prepare(*args, **kwargs)

load(use_cache=True, to_cache=True)

Loads the ratings resource into a DataRec dataset. Returns: DataRec: The loaded dataset.

Source code in datarec/data/resource.py
def load(self, use_cache=True, to_cache=True) -> DataRec:
    """
    Loads the ratings resource into a DataRec dataset.
    Returns:
        DataRec: The loaded dataset.
    """
    if use_cache and self._has_cache():
        print(f"Loading resource '{self.resource_name}' from cache at {self.cache_path()}.")
        return from_pickle(filepath=self.cache_path())

    if self.format == 'transactions_tabular':
        schema = self.schema
        if schema is None:
            raise ValueError("Schema must be provided for transactions_tabular format")

        datarec_ = read_transactions_tabular(self.path(), 
                                            sep=schema['sep'],
                                            user_col=schema['user_col'],
                                            item_col=schema['item_col'],
                                            rating_col=schema.get('rating_col', None),
                                            timestamp_col=schema.get('timestamp_col', None),
                                            header=schema.get('header', None),
                                            skiprows=schema.get('skiprows', 0),
                                            cols=schema.get('cols', None),
                                            engine=schema.get('engine', 'c'),
                                            fallback_engine=schema.get('fallback_engine', 'python'),
                                            stream=schema.get('stream', False),
                                            encode_ids=schema.get('encode_ids', False),
                                            chunksize=schema.get('chunksize', 100_000),
                                            dataset_name=self.dataset_name,
                                            version_name=self.version,)

    elif self.format == 'transactions_json':
        schema = self.schema
        if schema is None:
            raise ValueError("Schema must be provided for transactions json format")

        datarec_ = read_transactions_json(self.path(),
                                         user_col=schema['user_col'],
                                         item_col=schema['item_col'],
                                         rating_col=schema.get('rating_col', None),
                                         timestamp_col=schema.get('timestamp_col', None),
                                         stream=schema.get('stream', False),
                                         encode_ids=schema.get('encode_ids', False),
                                         chunksize=schema.get('chunksize', 100_000),
                                         dataset_name=self.dataset_name,
                                        version_name=self.version,)

    elif self.format == 'transactions_jsonl':
        schema = self.schema
        if schema is None:
            raise ValueError("Schema must be provided for transactions jsonl format")

        datarec_ = read_transactions_jsonl(self.path(),
                                          user_col=schema['user_col'],
                                          item_col=schema['item_col'],
                                          rating_col=schema.get('rating_col', None),
                                          timestamp_col=schema.get('timestamp_col', None),
                                          stream=schema.get('stream', False),
                                          encode_ids=schema.get('encode_ids', False),
                                          chunksize=schema.get('chunksize', 100_000),
                                          version_name=self.version,)

    elif self.format == 'sequence_tabular_inline':
        schema = self.schema
        if schema is None:
            raise ValueError("Schema must be provided for sequence tabular inline format")

        sequence_col = schema.get('sequence_col', None)
        if sequence_col is None:
            raise ValueError("sequence_col must be provided in schema for sequence tabular inline format")

        datarec_ = read_sequence_tabular_inline(self.path(),
                                                user_col=schema['user_col'],
                                                sequence_col=sequence_col,
                                                sequence_sep=schema.get('sequence_sep', ' '),
                                                timestamp_col=schema.get('timestamp_col', None),
                                                meta_cols=schema.get('meta_cols', None),
                                                col_sep=schema.get('col_sep', ','),
                                                header=schema.get('header', 0),
                                                cols=schema.get('cols', None),
                                                engine=schema.get('engine', 'c'),
                                                fallback_engine=schema.get('fallback_engine', 'python'),
                                                stream=schema.get('stream', schema.get('stream_encode', False)),
                                                encode_ids=schema.get('encode_ids', False),
                                                chunksize=schema.get('chunksize', 100_000),
                                                dataset_name=self.dataset_name,
                                                version_name=self.version,)

    elif self.format == 'sequence_tabular_wide':
        schema = self.schema
        if schema is None:
            raise ValueError("Schema must be provided for sequence tabular wide format")

        datarec_ = read_sequence_tabular_wide(self.path(),
                                             user_col=schema.get('user_col', 'user'),
                                             item_col=schema.get('item_col', 'item'),
                                             col_sep=schema.get('col_sep', ' '),
                                             header=schema.get('header', None),
                                             encode_ids=schema.get('encode_ids', False),
                                             dataset_name=self.dataset_name,
                                             version_name=self.version,)

    elif self.format == 'sequence_tabular_implicit':
        schema = self.schema
        if schema is None:
            raise ValueError("Schema must be provided for sequence tabular implicit format")

        datarec_ = read_sequence_tabular_implicit(self.path(),
                                                 user_col=schema.get('user_col', 'sequence_id'),
                                                 item_col=schema.get('item_col', 'item'),
                                                 col_sep=schema.get('col_sep', ' '),
                                                 header=schema.get('header', None),
                                                 drop_length_col=schema.get('drop_length_col', True),
                                                 encode_ids=schema.get('encode_ids', False),
                                                 dataset_name=self.dataset_name,
                                                 version_name=self.version,)

    elif self.format == 'sequence_json':
        schema = self.schema
        if schema is None:
            raise ValueError("Schema must be provided for sequence json format")

        datarec_ = read_sequences_json(self.path(),
                                      user_col=schema['user_col'],
                                      item_col=schema['item_col'],
                                      rating_col=schema.get('rating_col', None),
                                      timestamp_col=schema.get('timestamp_col', None),
                                      dataset_name=self.dataset_name,
                                      version_name=self.version,)

    elif self.format == 'sequence_json_array':
        schema = self.schema
        if schema is None:
            raise ValueError("Schema must be provided for sequence json array format")

        datarec_ = read_sequences_json_array(self.path(),
                                            user_col=schema['user_col'],
                                            item_col=schema['item_col'],
                                            rating_col=schema.get('rating_col', None),
                                            timestamp_col=schema.get('timestamp_col', None),
                                            sequence_key=schema.get('sequence_key', 'sequence'),
                                            dataset_name=self.dataset_name,
                                            version_name=self.version,)

    else:
        raise NotImplementedError(f"Format {self.format} not supported for resource loading.")

    if self.dataset_name is None:
        print("Warning: dataset_name is not set for the resource. Using 'unknown_dataset'.")
        self.dataset_name = "unknown_dataset"
    if self.version is None:
        print("Warning: version is not set for the resource. Using 'unknown_version'.")
        self.version = "unknown_version"

    # set the origin of the dataset to registry, since it is being loaded from a resource file defined in the registry
    datarec_.set_origin_registry()

    # cache the dataset in pickle format
    if to_cache:
        datarec_.to_pickle()

    return datarec_

free_cache()

Frees the cached version of the resource if it exists. Returns: (None): None

Source code in datarec/data/resource.py
def free_cache(self):
    """
    Frees the cached version of the resource if it exists.
    Returns:
        (None): None
    """
    if self._has_cache():
        os.remove(self.cache_path())
        self._cache_ready = False
        print(f"Cache for resource '{self.resource_name}' has been removed.")

load_dataset_config(dataset_name, dataset_version='')

Load a dataset configuration from the local registry.

Parameters:

Name Type Description Default
dataset_name str

name of the dataset

required
dataset_version str

version of the dataset. When empty, load the dataset-level registry file.

''

Returns: (dict): dataset configuration

Source code in datarec/data/resource.py
def load_dataset_config(dataset_name: str, dataset_version: str = "") -> dict:
    """
    Load a dataset configuration from the local registry.

    Args:
        dataset_name (str): name of the dataset
        dataset_version (str): version of the dataset. When empty, load the dataset-level registry file.
    Returns:
        (dict): dataset configuration
    """
    if dataset_version:
        config_path = registry_version_filepath(dataset_name, dataset_version)
    else:
        config_path = registry_dataset_filepath(dataset_name)
    assert os.path.exists(config_path), f"Config file {config_path} does not exist"
    with open(config_path, "r") as f:
        config = yaml.safe_load(f)
    return config

load_dataset_config_from_url(url)

Load a dataset configuration YAML from a remote URL.

Parameters:

Name Type Description Default
url str

URL pointing to a registry dataset or version YAML.

required

Returns:

Type Description
dict

Parsed dataset configuration.

Source code in datarec/data/resource.py
def load_dataset_config_from_url(url: str) -> dict:
    """
    Load a dataset configuration YAML from a remote URL.

    Args:
        url (str): URL pointing to a registry dataset or version YAML.

    Returns:
        (dict): Parsed dataset configuration.
    """
    try:
        with urllib.request.urlopen(url) as response:
            content = response.read().decode("utf-8")
    except Exception as exc:
        raise RuntimeError(f"Failed to load dataset config from URL: {url}") from exc

    config = yaml.safe_load(content)
    if not isinstance(config, dict):
        raise ValueError("Remote registry config must be a mapping.")
    return config

set_resource(resource_name, resource_conf)

Given a resource configuration, return a new resource object Args: resource_name (str): name of the resource resource_conf (dict): resource configuration Returns: (Resource): a dataset resource object

Source code in datarec/data/resource.py
def set_resource(resource_name: str, resource_conf: dict) -> Resource:
    """
    Given a resource configuration, return a new resource object
    Args:
        resource_name (str): name of the resource
        resource_conf (dict): resource configuration
    Returns:
        (Resource): a dataset resource object"""
    resource_typename = resource_conf.get('type', 'Resource')
    resource_type = RESOURCE_TYPES.get(resource_typename.lower(), None)
    if resource_type is None:
        raise ValueError(f'Resource type {resource_typename.lower()} not allowed. Available resource types: {RESOURCE_TYPES.keys()}.')
    resource = resource_type(**resource_conf)
    resource.resource_name = resource_name
    return resource

set_resources(config)

Given a dataset configuration, return a new dataset configuration Args: config (dict): dataset configuration Returns: (dict): a dictionary containing dataset sources objects

Source code in datarec/data/resource.py
def set_resources(config:dict) -> dict[str, Resource]:
    """
    Given a dataset configuration, return a new dataset configuration
    Args:
        config (dict): dataset configuration
    Returns:
        (dict): a dictionary containing dataset sources objects
    """
    resources = dict()
    for resource_name, raw_resource in config['resources'].items():
        resources[resource_name] = set_resource(resource_name, raw_resource)
    return resources

load_class(class_import)

Given a class import name, return a class object

Source code in datarec/data/resource.py
def load_class(class_import: str):
    """
    Given a class import name, return a class object
    """
    module_name, class_name = class_import.rsplit(".", 1)
    module = importlib.import_module(module_name)
    return getattr(module, class_name)

load_versions(dataset_name)

Given a dataset name, return a dictionary containing dataset versions and relative classes

Source code in datarec/data/resource.py
def load_versions(dataset_name:str)->dict:
    """
    Given a dataset name, return a dictionary containing dataset versions and relative classes
    """
    conf = load_dataset_config(dataset_name)
    return {n: load_class(m) for n, m in conf['versions'].items()}

find_ratings_resource(resources)

Given a dictionary of resources, return the ratings resource Args: resources (dict): a dictionary containing dataset resources objects Returns: (Resource): the ratings resource object

Source code in datarec/data/resource.py
def find_ratings_resource(resources:dict)->Interactions:
    """
    Given a dictionary of resources, return the ratings resource
    Args:
        resources (dict): a dictionary containing dataset resources objects
    Returns:
        (Resource): the ratings resource object
    """
    for res in resources.values():
        if res.type == 'ratings':
            return res
    raise RuntimeError(f"Resource type 'ratings' not found in resources")

find_resource_by_type(resources, rtype)

Given a dictionary of resources, return the ratings resource Args: resources (dict): a dictionary containing dataset resources objects rtype (str): resource type to find Returns: (Resource): the ratings resource object

Source code in datarec/data/resource.py
def find_resource_by_type(resources:dict, rtype:str)->Dict[str, Resource]:
    """
    Given a dictionary of resources, return the ratings resource
    Args:
        resources (dict): a dictionary containing dataset resources objects
        rtype (str): resource type to find
    Returns:
        (Resource): the ratings resource object
    """
    found = {}
    for res in resources.values():
        if res.type == rtype:
            found[res.resource_name] = res
    if len(found) == 0:
        raise RuntimeError(f"Resource type '{rtype}' not found in resources")
    return found

Torch Dataset Wrappers

PyTorch-compatible dataset wrappers.

BaseTorchDataset

Bases: Dataset

Base class for Torch datasets wrapping a DataRec dataset.

Source code in datarec/data/torch_dataset.py
class BaseTorchDataset(Dataset):
    """
    Base class for Torch datasets wrapping a DataRec dataset.
    """
    def __init__(self, datarec, copy_data=False):
        """
        Initializes the BaseTorchDataset object.    

        Args:
            datarec (DataRec): An instance of a DataRec dataset.
            copy_data (bool): Whether to copy the dataset or use it by reference.
        """
        self.df = datarec.data.copy() if copy_data else datarec.data
        self.user_col = datarec.user_col
        self.item_col = datarec.item_col

__init__(datarec, copy_data=False)

Initializes the BaseTorchDataset object.

Parameters:

Name Type Description Default
datarec DataRec

An instance of a DataRec dataset.

required
copy_data bool

Whether to copy the dataset or use it by reference.

False
Source code in datarec/data/torch_dataset.py
def __init__(self, datarec, copy_data=False):
    """
    Initializes the BaseTorchDataset object.    

    Args:
        datarec (DataRec): An instance of a DataRec dataset.
        copy_data (bool): Whether to copy the dataset or use it by reference.
    """
    self.df = datarec.data.copy() if copy_data else datarec.data
    self.user_col = datarec.user_col
    self.item_col = datarec.item_col

PointwiseTorchDataset

Bases: BaseTorchDataset

Torch dataset for pointwise recommendation tasks.

Source code in datarec/data/torch_dataset.py
class PointwiseTorchDataset(BaseTorchDataset):
    """
    Torch dataset for pointwise recommendation tasks.
    """
    def __init__(self, datarec, copy_data=False):
        """
        Initializes the PointwiseTorchDataset object.

        Args:
            datarec (DataRec): An instance of a DataRec dataset.
            copy_data (bool): Whether to copy the dataset or use it by reference.
        """
        super().__init__(datarec, copy_data)
        self.rating_col = datarec.rating_col

    def __len__(self):
        """
        Returns the total number of samples in the dataset.

        This is required by PyTorch's DataLoader to iterate over the dataset.

        Returns:
            (int): Number of samples in the dataset.
        """
        return len(self.df)

    def __getitem__(self, idx):
        """
        Returns a sample with user, item, and rating.

        Args:
            idx (int): Sample index to be returned.

        Returns:
            (dict): Sample with user, item, and rating.
        """
        row = self.df.iloc[idx]
        return {
            "user": row[self.user_col],
            "item": row[self.item_col],
            "rating": row.get(self.rating_col, 1.0)
        }

__init__(datarec, copy_data=False)

Initializes the PointwiseTorchDataset object.

Parameters:

Name Type Description Default
datarec DataRec

An instance of a DataRec dataset.

required
copy_data bool

Whether to copy the dataset or use it by reference.

False
Source code in datarec/data/torch_dataset.py
def __init__(self, datarec, copy_data=False):
    """
    Initializes the PointwiseTorchDataset object.

    Args:
        datarec (DataRec): An instance of a DataRec dataset.
        copy_data (bool): Whether to copy the dataset or use it by reference.
    """
    super().__init__(datarec, copy_data)
    self.rating_col = datarec.rating_col

__len__()

Returns the total number of samples in the dataset.

This is required by PyTorch's DataLoader to iterate over the dataset.

Returns:

Type Description
int

Number of samples in the dataset.

Source code in datarec/data/torch_dataset.py
def __len__(self):
    """
    Returns the total number of samples in the dataset.

    This is required by PyTorch's DataLoader to iterate over the dataset.

    Returns:
        (int): Number of samples in the dataset.
    """
    return len(self.df)

__getitem__(idx)

Returns a sample with user, item, and rating.

Parameters:

Name Type Description Default
idx int

Sample index to be returned.

required

Returns:

Type Description
dict

Sample with user, item, and rating.

Source code in datarec/data/torch_dataset.py
def __getitem__(self, idx):
    """
    Returns a sample with user, item, and rating.

    Args:
        idx (int): Sample index to be returned.

    Returns:
        (dict): Sample with user, item, and rating.
    """
    row = self.df.iloc[idx]
    return {
        "user": row[self.user_col],
        "item": row[self.item_col],
        "rating": row.get(self.rating_col, 1.0)
    }

PairwiseTorchDataset

Bases: BaseTorchDataset

Torch dataset for pairwise recommendation tasks with negative sampling.

Source code in datarec/data/torch_dataset.py
class PairwiseTorchDataset(BaseTorchDataset):
    """
    Torch dataset for pairwise recommendation tasks with negative sampling.
    """
    def __init__(self, datarec, num_negatives=1, item_pool=None, copy_data=False):
        """ 
        Initializes the PairwiseTorchDataset object.

        Args:
            datarec (DataRec): An instance of a DataRec dataset.
            num_negatives (int): Number of negative samples to generate per interaction.
            item_pool (array-like): Pool of items to sample from. Defaults to all items in the dataset.
            copy_data (bool): Whether to copy the dataset or use it by reference.
        """
        super().__init__(datarec, copy_data)
        self.num_negatives = num_negatives
        self.item_pool = item_pool or self.df[self.item_col].unique()
        self.user_pos_items = self.df.groupby(self.user_col)[self.item_col].apply(set).to_dict()

    def sample_negatives(self, user: Any) -> List[Any]:
        """
        Samples negative items for a given user, avoiding known positive items.

        This method is designed to be overridden to implement custom negative
        sampling strategies (e.g., popularity-based, adversarial, or
        distribution-aware sampling). The default implementation draws
        uniformly from the item pool, excluding items the user has already interacted with.

        Args:
            user: The user ID for which to sample negatives.

        Returns:
            (List): List of sampled negative item IDs.
        """
        neg_items = []
        user_positives = self.user_pos_items.get(user, set())
        while len(neg_items) < self.num_negatives:
            candidate = np.random.choice(self.item_pool)
            if candidate not in user_positives:
                neg_items.append(candidate)
        return neg_items

    def __len__(self):
        """
        Returns the total number of samples in the dataset.

        This is required by PyTorch's DataLoader to iterate over the dataset.

        Returns:
            (int): number of samples in the dataset.
        """
        return len(self.df)

    def __getitem__(self, idx):
        """
        Returns a sample with user, positive item, and negative items.

        Args:
            idx (int): Sample index to be returned.

        Returns:
            (dict): Sample with user, positive item, and negative items.
        """
        row = self.df.iloc[idx]
        user = row[self.user_col]
        pos_items = row[self.item_col]
        neg_items = self.sample_negatives(user)
        return {
            "user": user,
            "pos_items": pos_items,
            "neg_items": neg_items
        }

__init__(datarec, num_negatives=1, item_pool=None, copy_data=False)

Initializes the PairwiseTorchDataset object.

Parameters:

Name Type Description Default
datarec DataRec

An instance of a DataRec dataset.

required
num_negatives int

Number of negative samples to generate per interaction.

1
item_pool array - like

Pool of items to sample from. Defaults to all items in the dataset.

None
copy_data bool

Whether to copy the dataset or use it by reference.

False
Source code in datarec/data/torch_dataset.py
def __init__(self, datarec, num_negatives=1, item_pool=None, copy_data=False):
    """ 
    Initializes the PairwiseTorchDataset object.

    Args:
        datarec (DataRec): An instance of a DataRec dataset.
        num_negatives (int): Number of negative samples to generate per interaction.
        item_pool (array-like): Pool of items to sample from. Defaults to all items in the dataset.
        copy_data (bool): Whether to copy the dataset or use it by reference.
    """
    super().__init__(datarec, copy_data)
    self.num_negatives = num_negatives
    self.item_pool = item_pool or self.df[self.item_col].unique()
    self.user_pos_items = self.df.groupby(self.user_col)[self.item_col].apply(set).to_dict()

sample_negatives(user)

Samples negative items for a given user, avoiding known positive items.

This method is designed to be overridden to implement custom negative sampling strategies (e.g., popularity-based, adversarial, or distribution-aware sampling). The default implementation draws uniformly from the item pool, excluding items the user has already interacted with.

Parameters:

Name Type Description Default
user Any

The user ID for which to sample negatives.

required

Returns:

Type Description
List

List of sampled negative item IDs.

Source code in datarec/data/torch_dataset.py
def sample_negatives(self, user: Any) -> List[Any]:
    """
    Samples negative items for a given user, avoiding known positive items.

    This method is designed to be overridden to implement custom negative
    sampling strategies (e.g., popularity-based, adversarial, or
    distribution-aware sampling). The default implementation draws
    uniformly from the item pool, excluding items the user has already interacted with.

    Args:
        user: The user ID for which to sample negatives.

    Returns:
        (List): List of sampled negative item IDs.
    """
    neg_items = []
    user_positives = self.user_pos_items.get(user, set())
    while len(neg_items) < self.num_negatives:
        candidate = np.random.choice(self.item_pool)
        if candidate not in user_positives:
            neg_items.append(candidate)
    return neg_items

__len__()

Returns the total number of samples in the dataset.

This is required by PyTorch's DataLoader to iterate over the dataset.

Returns:

Type Description
int

number of samples in the dataset.

Source code in datarec/data/torch_dataset.py
def __len__(self):
    """
    Returns the total number of samples in the dataset.

    This is required by PyTorch's DataLoader to iterate over the dataset.

    Returns:
        (int): number of samples in the dataset.
    """
    return len(self.df)

__getitem__(idx)

Returns a sample with user, positive item, and negative items.

Parameters:

Name Type Description Default
idx int

Sample index to be returned.

required

Returns:

Type Description
dict

Sample with user, positive item, and negative items.

Source code in datarec/data/torch_dataset.py
def __getitem__(self, idx):
    """
    Returns a sample with user, positive item, and negative items.

    Args:
        idx (int): Sample index to be returned.

    Returns:
        (dict): Sample with user, positive item, and negative items.
    """
    row = self.df.iloc[idx]
    user = row[self.user_col]
    pos_items = row[self.item_col]
    neg_items = self.sample_negatives(user)
    return {
        "user": user,
        "pos_items": pos_items,
        "neg_items": neg_items
    }

RankingTorchDataset

Bases: BaseTorchDataset

Torch dataset for full softmax-style ranking tasks.

Source code in datarec/data/torch_dataset.py
class RankingTorchDataset(BaseTorchDataset):
    """
    Torch dataset for full softmax-style ranking tasks.
    """
    def __init__(self, datarec, copy_data=False):
        """
        Initializes the RankingTorchDataset object.

        Args:
            datarec (DataRec): An instance of a DataRec dataset.
            copy_data (bool): Whether to copy the dataset or use it by reference.
        """
        super().__init__(datarec, copy_data)
        # Could prepare user->items mapping here for evaluation

    def __len__(self):
        """
        Returns the total number of samples in the dataset.

        This is required by PyTorch's DataLoader to iterate over the dataset.

        Returns:
            (int): Number of samples in the dataset.
        """
        return len(self.df)

    def __getitem__(self, idx):
        """
        Returns a sample with user and item.

        Args:
            idx (int): Sample index to be returned.

        Returns:
            (dict): Sample with user and item data.
        """
        row = self.df.iloc[idx]
        return {
            "user": row[self.user_col],
            "item": row[self.item_col]
            # No target — implicit ranking
        }

__init__(datarec, copy_data=False)

Initializes the RankingTorchDataset object.

Parameters:

Name Type Description Default
datarec DataRec

An instance of a DataRec dataset.

required
copy_data bool

Whether to copy the dataset or use it by reference.

False
Source code in datarec/data/torch_dataset.py
def __init__(self, datarec, copy_data=False):
    """
    Initializes the RankingTorchDataset object.

    Args:
        datarec (DataRec): An instance of a DataRec dataset.
        copy_data (bool): Whether to copy the dataset or use it by reference.
    """
    super().__init__(datarec, copy_data)

__len__()

Returns the total number of samples in the dataset.

This is required by PyTorch's DataLoader to iterate over the dataset.

Returns:

Type Description
int

Number of samples in the dataset.

Source code in datarec/data/torch_dataset.py
def __len__(self):
    """
    Returns the total number of samples in the dataset.

    This is required by PyTorch's DataLoader to iterate over the dataset.

    Returns:
        (int): Number of samples in the dataset.
    """
    return len(self.df)

__getitem__(idx)

Returns a sample with user and item.

Parameters:

Name Type Description Default
idx int

Sample index to be returned.

required

Returns:

Type Description
dict

Sample with user and item data.

Source code in datarec/data/torch_dataset.py
def __getitem__(self, idx):
    """
    Returns a sample with user and item.

    Args:
        idx (int): Sample index to be returned.

    Returns:
        (dict): Sample with user and item data.
    """
    row = self.df.iloc[idx]
    return {
        "user": row[self.user_col],
        "item": row[self.item_col]
        # No target — implicit ranking
    }