Skip to content

Input/Output Reference

This section provides a detailed API reference for all modules related to data input/output and framework interoperability in the datarec library. Readers return RawData, writers accept RawData or DataRec, and framework exporters convert datasets to external formats.

On This Page

Minimal usage:

from datarec.io.readers.transactions.tabular import read_transactions_tabular
from datarec.io.writers.transactions.tabular import write_transactions_tabular

raw = read_transactions_tabular(
    "data/interactions.csv",
    sep=",",
    user_col="user",
    item_col="item",
    rating_col="rating",
)
write_transactions_tabular(raw, "out/interactions.tsv", sep="\t")

Core I/O Modules

These modules handle the fundamental tasks of reading, writing, and representing raw data.

RawData

Container for raw datasets in DataRec.

Wraps a pandas.DataFrame and stores metadata about user, item, rating, and timestamp columns. Provides lightweight methods for slicing, copying, and merging data.

Source code in datarec/io/rawdata.py
class RawData:
    """
    Container for raw datasets in DataRec.

    Wraps a `pandas.DataFrame` and stores metadata about user, item, rating, and timestamp columns.
    Provides lightweight methods for slicing, copying, and merging data.
    """
    def __init__(
            self,
            data=None,
            header=False,
            user=None,
            item=None,
            rating=None,
            timestamp=None,
            user_encoder=None,
            item_encoder=None):
        """
        Initialize a RawData object.

        Args:
            data (pd.DataFrame): DataFrame of the dataset. Defaults to None.
            header (bool): Whether the file has a header. Defaults to False.
            user (str): Column name for user IDs.
            item (str): Column name for item IDs.
            rating (str): Column name for ratings.
            timestamp (str): Column name for timestamps.
            user_encoder (dict | None): Optional user encoding mapping.
            item_encoder (dict | None): Optional item encoding mapping.
        """
        self.data = data
        self.header = header
        if data is None:
            self.data = pd.DataFrame
            self.header = header
        self.path = None

        self.user = user
        self.item = item
        self.rating = rating
        self.timestamp = timestamp
        # Aliases for consistency with DataRec naming
        self.user_col = user
        self.item_col = item
        self.rating_col = rating
        self.timestamp_col = timestamp
        # Aliases for consistency with DataRec naming
        self.user_col = user
        self.item_col = item
        self.rating_col = rating
        self.timestamp_col = timestamp
        # Aliases for consistency with DataRec naming
        self.user_col = user
        self.item_col = item
        self.rating_col = rating
        self.timestamp_col = timestamp
        # Optional encoders to support streaming/incremental loading
        self.user_encoder = user_encoder
        self.item_encoder = item_encoder

        self.pipeline_step: Optional["PipelineStep"] = None  # To track the pipeline step that produced this RawData

    def append(self, new_data) -> None:
        """
        Append new rows to the dataset.

        Args:
            new_data (pd.DataFrame): DataFrame to append.

        Returns:
            None
        """
        self.data.append(new_data)

    def copy(self, deep=True) -> "RawData":
        """
        Make a copy of the dataset.

        Args:
            deep (bool): If True, return a deep copy of the dataset.

        Returns:
            (RawData): A copy of the dataset.

        """
        return RawData(
            self.data.copy(deep=deep),
            header=self.header,
            user=self.user,
            item=self.item,
            rating=self.rating,
            timestamp=self.timestamp,
            user_encoder=self.user_encoder,
            item_encoder=self.item_encoder,
        )

    def __repr__(self):
        """
        Return a string representation of the dataset.
        """
        return repr(self.data)

    def __len__(self):
        """
        Return the length of the dataset.
        """
        return len(self.data)

    def __getitem__(self, idx):
        """
        Return the item at the given index.
        Args:
            idx: index of the item to return.

        Returns:
            (RawData): the sample at the given index.

        """
        return self.data[idx]

    def __add__(self, other):
        """
        Concatenate two RawData objects.
        Args:
            other (RawData): the other RawData to concatenate.

        Returns:
            (RawData): the concatenated RawData object.

        """
        self.__check_rawdata_compatibility__(other)
        new_data = pd.concat([self.data, other.data])
        new_rawdata = RawData(new_data, user=self.user, item=self.item, rating=self.rating,
                              timestamp=self.timestamp, header=self.header)
        return new_rawdata

    def __iter__(self):
        """
        Iterate over dataset rows.

        Returns:
            (pd.Series): Each row in the dataset.

        """
        return iter(self.data)

    def __check_rawdata_compatibility__(self, rawdata):
        """
        Check compatibility between RawData objects.
        Args:
            rawdata (RawData): RawData object to check.

        Returns:
            (bool): True if compatibility is verified.

        """
        return __check_rawdata_compatibility__(self, rawdata)

__init__(data=None, header=False, user=None, item=None, rating=None, timestamp=None, user_encoder=None, item_encoder=None)

Initialize a RawData object.

Parameters:

Name Type Description Default
data DataFrame

DataFrame of the dataset. Defaults to None.

None
header bool

Whether the file has a header. Defaults to False.

False
user str

Column name for user IDs.

None
item str

Column name for item IDs.

None
rating str

Column name for ratings.

None
timestamp str

Column name for timestamps.

None
user_encoder dict | None

Optional user encoding mapping.

None
item_encoder dict | None

Optional item encoding mapping.

None
Source code in datarec/io/rawdata.py
def __init__(
        self,
        data=None,
        header=False,
        user=None,
        item=None,
        rating=None,
        timestamp=None,
        user_encoder=None,
        item_encoder=None):
    """
    Initialize a RawData object.

    Args:
        data (pd.DataFrame): DataFrame of the dataset. Defaults to None.
        header (bool): Whether the file has a header. Defaults to False.
        user (str): Column name for user IDs.
        item (str): Column name for item IDs.
        rating (str): Column name for ratings.
        timestamp (str): Column name for timestamps.
        user_encoder (dict | None): Optional user encoding mapping.
        item_encoder (dict | None): Optional item encoding mapping.
    """
    self.data = data
    self.header = header
    if data is None:
        self.data = pd.DataFrame
        self.header = header
    self.path = None

    self.user = user
    self.item = item
    self.rating = rating
    self.timestamp = timestamp
    # Aliases for consistency with DataRec naming
    self.user_col = user
    self.item_col = item
    self.rating_col = rating
    self.timestamp_col = timestamp
    # Aliases for consistency with DataRec naming
    self.user_col = user
    self.item_col = item
    self.rating_col = rating
    self.timestamp_col = timestamp
    # Aliases for consistency with DataRec naming
    self.user_col = user
    self.item_col = item
    self.rating_col = rating
    self.timestamp_col = timestamp
    # Optional encoders to support streaming/incremental loading
    self.user_encoder = user_encoder
    self.item_encoder = item_encoder

    self.pipeline_step: Optional["PipelineStep"] = None  # To track the pipeline step that produced this RawData

append(new_data)

Append new rows to the dataset.

Parameters:

Name Type Description Default
new_data DataFrame

DataFrame to append.

required

Returns:

Type Description
None

None

Source code in datarec/io/rawdata.py
def append(self, new_data) -> None:
    """
    Append new rows to the dataset.

    Args:
        new_data (pd.DataFrame): DataFrame to append.

    Returns:
        None
    """
    self.data.append(new_data)

copy(deep=True)

Make a copy of the dataset.

Parameters:

Name Type Description Default
deep bool

If True, return a deep copy of the dataset.

True

Returns:

Type Description
RawData

A copy of the dataset.

Source code in datarec/io/rawdata.py
def copy(self, deep=True) -> "RawData":
    """
    Make a copy of the dataset.

    Args:
        deep (bool): If True, return a deep copy of the dataset.

    Returns:
        (RawData): A copy of the dataset.

    """
    return RawData(
        self.data.copy(deep=deep),
        header=self.header,
        user=self.user,
        item=self.item,
        rating=self.rating,
        timestamp=self.timestamp,
        user_encoder=self.user_encoder,
        item_encoder=self.item_encoder,
    )

__repr__()

Return a string representation of the dataset.

Source code in datarec/io/rawdata.py
def __repr__(self):
    """
    Return a string representation of the dataset.
    """
    return repr(self.data)

__len__()

Return the length of the dataset.

Source code in datarec/io/rawdata.py
def __len__(self):
    """
    Return the length of the dataset.
    """
    return len(self.data)

__getitem__(idx)

Return the item at the given index. Args: idx: index of the item to return.

Returns:

Type Description
RawData

the sample at the given index.

Source code in datarec/io/rawdata.py
def __getitem__(self, idx):
    """
    Return the item at the given index.
    Args:
        idx: index of the item to return.

    Returns:
        (RawData): the sample at the given index.

    """
    return self.data[idx]

__add__(other)

Concatenate two RawData objects. Args: other (RawData): the other RawData to concatenate.

Returns:

Type Description
RawData

the concatenated RawData object.

Source code in datarec/io/rawdata.py
def __add__(self, other):
    """
    Concatenate two RawData objects.
    Args:
        other (RawData): the other RawData to concatenate.

    Returns:
        (RawData): the concatenated RawData object.

    """
    self.__check_rawdata_compatibility__(other)
    new_data = pd.concat([self.data, other.data])
    new_rawdata = RawData(new_data, user=self.user, item=self.item, rating=self.rating,
                          timestamp=self.timestamp, header=self.header)
    return new_rawdata

__iter__()

Iterate over dataset rows.

Returns:

Type Description
Series

Each row in the dataset.

Source code in datarec/io/rawdata.py
def __iter__(self):
    """
    Iterate over dataset rows.

    Returns:
        (pd.Series): Each row in the dataset.

    """
    return iter(self.data)

__check_rawdata_compatibility__(rawdata)

Check compatibility between RawData objects. Args: rawdata (RawData): RawData object to check.

Returns:

Type Description
bool

True if compatibility is verified.

Source code in datarec/io/rawdata.py
def __check_rawdata_compatibility__(self, rawdata):
    """
    Check compatibility between RawData objects.
    Args:
        rawdata (RawData): RawData object to check.

    Returns:
        (bool): True if compatibility is verified.

    """
    return __check_rawdata_compatibility__(self, rawdata)

__check_rawdata_compatibility__(rawdata1, rawdata2)

Check compatibility between two RawData objects. Args: rawdata1 (RawData): First RawData object to check. rawdata2 (RawData): Second RawData object to check.

Returns:

Type Description
bool

True if compatibility is verified.

Source code in datarec/io/rawdata.py
def __check_rawdata_compatibility__(rawdata1: RawData, rawdata2: RawData):
    """
    Check compatibility between two RawData objects.
    Args:
        rawdata1 (RawData): First RawData object to check.
        rawdata2 (RawData): Second RawData object to check.

    Returns:
        (bool): True if compatibility is verified.

    """
    if rawdata1.user != rawdata2.user:
        raise ValueError('User columns are not compatible')
    if rawdata1.item != rawdata2.item:
        raise ValueError('Item columns are not compatible')
    if rawdata1.rating != rawdata2.rating:
        raise ValueError('Rating columns are not compatible')
    if rawdata1.timestamp != rawdata2.timestamp:
        raise ValueError('Timestamp columns are not compatible')
    if rawdata1.header != rawdata2.header:
        raise ValueError('Header is not compatible')
    return True

read_sequences_json(filepath, *, user_col='user', item_col='item', rating_col=None, timestamp_col=None, dataset_name='Unknown Dataset', version_name='Unknown Version')

Reads a JSON file representing sequential interaction data in the form:

{ "user_id": [ { "item": ..., "rating": ..., "timestamp": ... }, ... ], ... }

Converts it into a transactional RawData format with one row per interaction.

Parameters:

Name Type Description Default
filepath str

Path to the JSON file.

required
user_col str

Name assigned to the user column in the output.

'user'
item_col str

Key containing the item field inside each event.

'item'
rating_col Optional[str]

Key containing the rating field inside each event.

None
timestamp_col Optional[str]

Key containing the timestamp field inside each event.

None
dataset_name str

Name to assign to the resulting DataRec dataset.

'Unknown Dataset'
version_name str

Version identifier to assign to the resulting DataRec dataset.

'Unknown Version'

Returns:

Name Type Description
DataRec DataRec

A DataRec object containing all interactions exploded row-by-row. (Returned via @annotate_datarec_output, which wraps the RawData.)

Source code in datarec/io/readers/sequences/json.py
@annotate_datarec_output
def read_sequences_json(
    filepath: str,
    *,
    user_col: str = "user",
    item_col: str = "item",
    rating_col: Optional[str] = None,
    timestamp_col: Optional[str] = None,
    dataset_name: str = 'Unknown Dataset',
    version_name: str = 'Unknown Version',
) -> DataRec:
    """
    Reads a JSON file representing sequential interaction data in the form:

    {
      "user_id": [
        { "item": ..., "rating": ..., "timestamp": ... },
        ...
      ],
      ...
    }

    Converts it into a transactional RawData format with one row per interaction.

    Args:
        filepath: Path to the JSON file.
        user_col: Name assigned to the user column in the output.
        item_col: Key containing the item field inside each event.
        rating_col: Key containing the rating field inside each event.
        timestamp_col: Key containing the timestamp field inside each event.
        dataset_name: Name to assign to the resulting DataRec dataset.
        version_name: Version identifier to assign to the resulting DataRec dataset.

    Returns:
        DataRec: A DataRec object containing all interactions exploded row-by-row.
            (Returned via @annotate_datarec_output, which wraps the RawData.)
    """
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"File not found: {filepath}")

    # Load the entire JSON structure
    with open(filepath, "r", encoding="utf-8") as f:
        payload: Dict[str, Any] = json.load(f)

    rows = []

    # Iterate over each user and their list of events
    for user_id, events in payload.items():
        if not isinstance(events, list):
            raise ValueError(f"Expected a list of events for user '{user_id}', got {type(events)}.")

        for event in events:
            # Extract required fields
            row = {user_col: user_id}

            # Mandatory
            if item_col not in event:
                raise ValueError(f"Missing item field '{item_col}' in event for user {user_id}.")
            row[item_col] = event[item_col]

            # Optional
            if rating_col is not None:
                row[rating_col] = event.get(rating_col)
            if timestamp_col is not None:
                row[timestamp_col] = event.get(timestamp_col)

            rows.append(row)

    # Build DataFrame
    data = pd.DataFrame(rows)

    # Reset index for consistency
    data = data.reset_index(drop=True)

    # Final RawData object
    rawdata = RawData(
        data,
        user=user_col,
        item=item_col,
        rating=rating_col,
        timestamp=timestamp_col,)

    # Wrapped by @annotate_datarec_output to return DataRec at call sites.
    return cast(DataRec, rawdata)

read_sequences_json_array(filepath, *, user_col='user', item_col='item', rating_col=None, timestamp_col=None, sequence_key='sequence', dataset_name='Unknown Dataset', version_name='Unknown Version')

Reads a JSON file representing sequential interaction data in the form of an ARRAY of user-sequence objects, e.g.:

[
  {
    "user": 0,
    "sequence": [
      { "item": 1, "rating": 1, "timestamp": "001" },
      { "item": 2, "rating": 1, "timestamp": "022" }
    ]
  },
  {
    "user": 1,
    "sequence": [
      { "item": 1, "rating": 4, "timestamp": "011" }
    ]
  }
]

and converts it into a transactional RawData format with one row per interaction.

Parameters:

Name Type Description Default
filepath str

Path to the JSON file.

required
user_col str

Name assigned to the user column in the output. Also used as the key to read the user identifier in each top-level object.

'user'
item_col str

Key containing the item field inside each event.

'item'
rating_col Optional[str]

Key containing the rating field inside each event.

None
timestamp_col Optional[str]

Key containing the timestamp field inside each event.

None
sequence_key str

Key containing the list of events for each user.

'sequence'
dataset_name str

Name to assign to the resulting DataRec dataset.

'Unknown Dataset'
version_name str

Version identifier to assign to the resulting DataRec dataset.

'Unknown Version'

Returns:

Name Type Description
DataRec DataRec

A DataRec object containing all interactions exploded row-by-row. (Returned via @annotate_datarec_output, which wraps the RawData.)

Source code in datarec/io/readers/sequences/json.py
@annotate_datarec_output
def read_sequences_json_array(
    filepath: str,
    *,
    user_col: str = "user",
    item_col: str = "item",
    rating_col: Optional[str] = None,
    timestamp_col: Optional[str] = None,
    sequence_key: str = "sequence",
    dataset_name: str = 'Unknown Dataset',
    version_name: str = 'Unknown Version',
) -> DataRec:
    """
    Reads a JSON file representing sequential interaction data in the form
    of an ARRAY of user-sequence objects, e.g.:

        [
          {
            "user": 0,
            "sequence": [
              { "item": 1, "rating": 1, "timestamp": "001" },
              { "item": 2, "rating": 1, "timestamp": "022" }
            ]
          },
          {
            "user": 1,
            "sequence": [
              { "item": 1, "rating": 4, "timestamp": "011" }
            ]
          }
        ]

    and converts it into a transactional RawData format with one row per
    interaction.

    Args:
        filepath: Path to the JSON file.
        user_col: Name assigned to the user column in the output.
                  Also used as the key to read the user identifier in
                  each top-level object.
        item_col: Key containing the item field inside each event.
        rating_col: Key containing the rating field inside each event.
        timestamp_col: Key containing the timestamp field inside each event.
        sequence_key: Key containing the list of events for each user.
        dataset_name: Name to assign to the resulting DataRec dataset.
        version_name: Version identifier to assign to the resulting DataRec dataset.

    Returns:
        DataRec: A DataRec object containing all interactions exploded
                 row-by-row.
            (Returned via @annotate_datarec_output, which wraps the RawData.)
    """
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"File not found: {filepath}")

    # Load the entire JSON structure
    with open(filepath, "r", encoding="utf-8") as f:
        payload: Any = json.load(f)

    if not isinstance(payload, list):
        raise ValueError(
            f"Expected a JSON array at top level, got {type(payload)} instead."
        )

    rows: List[Dict[str, Any]] = []

    # Iterate over each user-level object
    for obj in payload:
        if not isinstance(obj, dict):
            raise ValueError(
                f"Expected each element of the array to be an object, got {type(obj)}."
            )

        if user_col not in obj:
            raise ValueError(
                f"Missing user field '{user_col}' in top-level object: {obj}."
            )
        user_id = obj[user_col]

        if sequence_key not in obj:
            raise ValueError(
                f"Missing sequence field '{sequence_key}' in top-level object for user {user_id}."
            )

        events = obj[sequence_key]
        if not isinstance(events, list):
            raise ValueError(
                f"Expected '{sequence_key}' to be a list for user {user_id}, "
                f"got {type(events)}."
            )

        for event in events:
            if not isinstance(event, dict):
                raise ValueError(
                    f"Expected each event in '{sequence_key}' to be an object, got {type(event)}."
                )

            row: Dict[str, Any] = {user_col: user_id}

            # Mandatory item field
            if item_col not in event:
                raise ValueError(
                    f"Missing item field '{item_col}' in event for user {user_id}: {event}."
                )
            row[item_col] = event[item_col]

            # Optional fields
            if rating_col is not None:
                row[rating_col] = event.get(rating_col)
            if timestamp_col is not None:
                row[timestamp_col] = event.get(timestamp_col)

            rows.append(row)

    # Build DataFrame
    data = pd.DataFrame(rows).reset_index(drop=True)

    # Final RawData object
    raw = RawData(
        data,
        user=user_col,
        item=item_col,
        rating=rating_col,
        timestamp=timestamp_col,
    )

    # Wrapped by @annotate_datarec_output to return DataRec at call sites.
    return cast(DataRec, raw)

read_sequences_json_items(filepath, *, user_col='user', item_col='item', rating_col=None, timestamp_col=None, dataset_name='Unknown Dataset', version_name='Unknown Version')

Reads a JSON file representing sequential interaction data in the form:

{ "user_id": [item_id, item_id, ...], ... }

Each list contains item identifiers only (no event objects). The data is converted into a transactional RawData format with one row per interaction.

Parameters:

Name Type Description Default
filepath str

Path to the JSON file.

required
user_col str

Name assigned to the user column in the output.

'user'
item_col str

Name assigned to the item column in the output.

'item'
rating_col Optional[str]

Not supported for item-only JSON.

None
timestamp_col Optional[str]

Not supported for item-only JSON.

None
dataset_name str

Name to assign to the resulting DataRec dataset.

'Unknown Dataset'
version_name str

Version identifier to assign to the resulting DataRec dataset.

'Unknown Version'

Returns: DataRec: A DataRec object containing all interactions exploded row-by-row. (Returned via @annotate_datarec_output, which wraps the RawData.)

Source code in datarec/io/readers/sequences/json.py
@annotate_datarec_output
def read_sequences_json_items(
    filepath: str,
    *,
    user_col: str = "user",
    item_col: str = "item",
    rating_col: Optional[str] = None,
    timestamp_col: Optional[str] = None,
    dataset_name: str = 'Unknown Dataset',
    version_name: str = 'Unknown Version',
) -> DataRec:
    """
    Reads a JSON file representing sequential interaction data in the form:

    {
      "user_id": [item_id, item_id, ...],
      ...
    }

    Each list contains item identifiers only (no event objects). The data is
    converted into a transactional RawData format with one row per interaction.

    Args:
        filepath: Path to the JSON file.
        user_col: Name assigned to the user column in the output.
        item_col: Name assigned to the item column in the output.
        rating_col: Not supported for item-only JSON.
        timestamp_col: Not supported for item-only JSON.
        dataset_name: Name to assign to the resulting DataRec dataset.
        version_name: Version identifier to assign to the resulting DataRec dataset.
    Returns:
        DataRec: A DataRec object containing all interactions exploded row-by-row.
            (Returned via @annotate_datarec_output, which wraps the RawData.)
    """
    if rating_col is not None or timestamp_col is not None:
        raise ValueError(
            "Item-list JSON cannot include rating/timestamp fields. "
            "Use read_sequences_json for event-object JSON."
        )

    if not os.path.exists(filepath):
        raise FileNotFoundError(f"File not found: {filepath}")

    # Load the entire JSON structure
    with open(filepath, "r", encoding="utf-8") as f:
        payload: Any = json.load(f)

    if not isinstance(payload, dict):
        raise ValueError(
            f"Expected a JSON object at top level, got {type(payload)} instead."
        )

    rows: List[Dict[str, Any]] = []

    # Iterate over each user and their list of item ids
    for user_id, items in payload.items():
        if not isinstance(items, list):
            raise ValueError(
                f"Expected a list of item ids for user '{user_id}', got {type(items)}."
            )

        for item_id in items:
            if isinstance(item_id, (dict, list)):
                raise ValueError(
                    "Expected each item in the list to be a scalar item id (int or str), "
                    f"got {type(item_id)} for user '{user_id}'."
                )
            if isinstance(item_id, bool) or not isinstance(item_id, (int, str)):
                raise ValueError(
                    "Expected each item in the list to be a scalar item id (int or str), "
                    f"got {type(item_id)} for user '{user_id}'."
                )
            rows.append({user_col: user_id, item_col: item_id})

    # Build DataFrame
    data = pd.DataFrame(rows).reset_index(drop=True)

    # Final RawData object
    rawdata = RawData(
        data,
        user=user_col,
        item=item_col,
        rating=rating_col,
        timestamp=timestamp_col,
    )

    # Wrapped by @annotate_datarec_output to return DataRec at call sites.
    return cast(DataRec, rawdata)

read_sequence_tabular_inline(filepath, *, user_col='user', sequence_col='sequence', sequence_sep=' ', timestamp_col='timestamp', meta_cols=None, col_sep=',', header=None, cols=None, engine='c', fallback_engine='python', stream=False, encode_ids=False, chunksize=100000, dataset_name='Unknown Dataset', version_name='Unknown Version')

Reads a file where interaction sequences are stored in a single string column.

Example: user_id,sequence -> 70,"495 1631 2317"

Parameters:

Name Type Description Default
filepath str

Path to the CSV file.

required
user_col str

Column name containing the user ID.

'user'
sequence_col str

Column containing the serialized interaction sequence.

'sequence'
sequence_sep str

Separator used inside the sequence string.

' '
timestamp_col Optional[str]

Column name for timestamp (if present).

'timestamp'
meta_cols Optional[List[str]]

Additional metadata columns to keep.

None
col_sep str

Column separator used in the CSV file.

','
header Union[int, List[int], str, None]

Row number for the header.

None
cols Optional[List[str]]

Explicit column names if the file has no header.

None
engine str

Pandas CSV engine to use ("c" or "python"). Defaults to "c" with automatic fallback to fallback_engine on failure.

'c'
fallback_engine str

Engine to try if the primary one fails. Defaults to "python".

'python'
stream bool

If True, process the file in chunks to reduce peak memory.

False
encode_ids bool

If True, encode user/item to int ids using IncrementalEncoder (streaming or full).

False
chunksize int

Number of rows per chunk when streaming.

100000
dataset_name str

Name to assign to the resulting DataRec dataset.

'Unknown Dataset'
version_name str

Version identifier to assign to the resulting DataRec dataset.

'Unknown Version'

Returns:

Name Type Description
DataRec DataRec

Transactional data. (Returned via @annotate_datarec_output, which wraps the RawData.)

Source code in datarec/io/readers/sequences/tabular.py
@annotate_datarec_output
def read_sequence_tabular_inline(
    filepath: str,
    *,
    user_col: str = "user",
    sequence_col: str = "sequence",
    sequence_sep: str = " ",
    timestamp_col: Optional[str] = "timestamp",
    meta_cols: Optional[List[str]] = None,
    col_sep: str = ",",
    header: Union[int, List[int], str, None] = None,
    cols: Optional[List[str]] = None,
    engine: str = "c",
    fallback_engine: str = "python",
    stream: bool = False,
    encode_ids: bool = False,
    chunksize: int = 100_000,
    dataset_name: str = "Unknown Dataset",
    version_name: str = "Unknown Version",
) -> DataRec:
    """
    Reads a file where interaction sequences are stored in a single string column.

    Example: user_id,sequence -> 70,"495 1631 2317"

    Args:
        filepath: Path to the CSV file.
        user_col: Column name containing the user ID.
        sequence_col: Column containing the serialized interaction sequence.
        sequence_sep: Separator used inside the sequence string.
        timestamp_col: Column name for timestamp (if present).
        meta_cols: Additional metadata columns to keep.
        col_sep: Column separator used in the CSV file.
        header: Row number for the header.
        cols: Explicit column names if the file has no header.
        engine: Pandas CSV engine to use ("c" or "python"). Defaults to "c" with automatic
                fallback to `fallback_engine` on failure.
        fallback_engine: Engine to try if the primary one fails. Defaults to "python".
        stream: If True, process the file in chunks to reduce peak memory.
        encode_ids: If True, encode user/item to int ids using IncrementalEncoder (streaming or full).
        chunksize: Number of rows per chunk when streaming.
        dataset_name: Name to assign to the resulting DataRec dataset.
        version_name: Version identifier to assign to the resulting DataRec dataset.

    Returns:
        DataRec: Transactional data.
            (Returned via @annotate_datarec_output, which wraps the RawData.)
    """
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"File not found: {filepath}")

    if meta_cols is None:
        meta_cols = []

    read_kwargs = dict(
        sep=col_sep,
        header=header,
        engine=engine,
        dtype=str,  # avoid type inference surprises
        encoding="utf-8",
        encoding_errors="ignore",
        quoting=csv.QUOTE_NONE,
    )
    if cols is not None:
        read_kwargs["names"] = cols

    if stream:
        rawdata = _read_sequence_tabular_inline_stream(
            filepath=filepath,
            user_col=user_col,
            sequence_col=sequence_col,
            sequence_sep=sequence_sep,
            timestamp_col=timestamp_col,
            meta_cols=meta_cols,
            read_kwargs=read_kwargs,
            fallback_engine=fallback_engine,
            encode_ids=encode_ids,
            chunksize=chunksize,
        )
        # Wrapped by @annotate_datarec_output to return DataRec at call sites.
        return cast(DataRec, rawdata)

    rawdata = _read_sequence_tabular_inline_full(
        filepath=filepath,
        user_col=user_col,
        sequence_col=sequence_col,
        sequence_sep=sequence_sep,
        timestamp_col=timestamp_col,
        meta_cols=meta_cols,
        read_kwargs=read_kwargs,
        fallback_engine=fallback_engine,
        encode_ids=encode_ids,
    )
    # Wrapped by @annotate_datarec_output to return DataRec at call sites.
    return cast(DataRec, rawdata)

read_sequence_tabular_wide(filepath, *, user_col='user', item_col='item', col_sep='\t', header=None, encode_ids=False, dataset_name='Unknown Dataset', version_name='Unknown Version')

Reads a file containing variable-length interaction sequences (ragged).

Example: u0 i0 i1 i3

Parameters:

Name Type Description Default
filepath str

Path to the text/CSV file.

required
user_col str

Name to assign to the user column.

'user'
item_col str

Name to assign to the item column.

'item'
col_sep str

Delimiter used in the file.

'\t'
header Optional[int]

Row number (0-indexed) to use as the header (to be skipped).

None
dataset_name str

Name to assign to the resulting DataRec dataset.

'Unknown Dataset'
version_name str

Version identifier to assign to the resulting DataRec dataset.

'Unknown Version'

Returns:

Name Type Description
DataRec DataRec

Transactional DataFrame. (Returned via @annotate_datarec_output, which wraps the RawData.)

Source code in datarec/io/readers/sequences/tabular.py
@annotate_datarec_output
def read_sequence_tabular_wide(
    filepath: str,
    *,
    user_col: str = "user",
    item_col: str = "item",
    col_sep: str = "\t",
    header: Optional[int] = None,
    encode_ids: bool = False,
    dataset_name: str = "Unknown Dataset",
    version_name: str = "Unknown Version",
) -> DataRec:
    """
    Reads a file containing variable-length interaction sequences (ragged).

    Example: u0\ti0\ti1\ti3

    Args:
        filepath: Path to the text/CSV file.
        user_col: Name to assign to the user column.
        item_col: Name to assign to the item column.
        col_sep: Delimiter used in the file.
        header: Row number (0-indexed) to use as the header (to be skipped).
        dataset_name: Name to assign to the resulting DataRec dataset.
        version_name: Version identifier to assign to the resulting DataRec dataset.

    Returns:
        DataRec: Transactional DataFrame.
            (Returned via @annotate_datarec_output, which wraps the RawData.)
    """
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"File not found: {filepath}")

    if header is not None and header < 0:
        raise ValueError(f"header must be >= 0 or None, got {header}.")

    if header is not None:
        _skiprows = header + 1 
    else:
        _skiprows = 0

    # Fast Loading (Raw Text)
    df_raw = pd.read_csv(
        filepath,
        sep="\0",
        header=None,
        names=["_raw_string"],
        skiprows=_skiprows,
        engine="c",
        quoting=csv.QUOTE_NONE,
    )

    if df_raw.empty:
        raise ValueError("The provided file is empty or all rows were skipped.")

    # Vectorized processing
    df_raw["_split"] = df_raw["_raw_string"].str.split(col_sep)
    df_raw[user_col] = df_raw["_split"].str[0].str.strip()
    df_raw["_items_list"] = df_raw["_split"].str[1:]

    df_long = df_raw.explode("_items_list")
    df_long = df_long.rename(columns={"_items_list": item_col})
    df_long = df_long[[user_col, item_col]]

    df_long = df_long.dropna(subset=[item_col])

    df_long[item_col] = df_long[item_col].astype(str).str.strip()
    df_long = df_long[df_long[item_col] != ""]

    df_long = df_long.reset_index(drop=True)

    user_encoder = None
    item_encoder = None
    if encode_ids:
        u_enc = IncrementalEncoder(offset=0)
        i_enc = IncrementalEncoder(offset=0)
        df_long[user_col] = u_enc.encode_many(df_long[user_col].tolist())
        df_long[item_col] = i_enc.encode_many(df_long[item_col].tolist())
        user_encoder = u_enc.forward
        item_encoder = i_enc.forward

    rawdata = RawData(df_long, user=user_col, item=item_col, user_encoder=user_encoder, item_encoder=item_encoder)
    # Wrapped by @annotate_datarec_output to return DataRec at call sites.
    return cast(DataRec, rawdata)

read_sequence_tabular_implicit(filepath, *, user_col='sequence_id', item_col='item', col_sep=' ', header=None, drop_length_col=True, encode_ids=False, dataset_name='Unknown Dataset', version_name='Unknown Version')

Reads a tabular file where each row represents a sequence with an implicit identifier (row-based), optionally starting with a declared sequence length.

Example

3 10 20 30 2 11 42

Each row is interpreted as a distinct sequence (pseudo-user). The first column may represent the declared sequence length and is ignored by default.

Parameters:

Name Type Description Default
filepath str

Path to the text/CSV file.

required
user_col str

Name assigned to the implicit sequence identifier column.

'sequence_id'
item_col str

Name assigned to the item column.

'item'
col_sep str

Delimiter used in the file.

' '
header Optional[int]

Optional row index to skip as header.

None
drop_length_col bool

If True, the first column is treated as sequence length and discarded.

True
dataset_name str

Name to assign to the resulting DataRec dataset.

'Unknown Dataset'
version_name str

Version identifier to assign to the resulting DataRec dataset.

'Unknown Version'

Returns:

Name Type Description
DataRec DataRec

Transactional DataFrame where each sequence is treated as a pseudo-user. (Returned via @annotate_datarec_output, which wraps the RawData.)

Source code in datarec/io/readers/sequences/tabular.py
@annotate_datarec_output
def read_sequence_tabular_implicit(
    filepath: str,
    *,
    user_col: str = "sequence_id",
    item_col: str = "item",
    col_sep: str = " ",
    header: Optional[int] = None,
    drop_length_col: bool = True,
    encode_ids: bool = False,
    dataset_name: str = "Unknown Dataset",
    version_name: str = "Unknown Version",
) -> DataRec:
    """
    Reads a tabular file where each row represents a sequence with an
    implicit identifier (row-based), optionally starting with a declared
    sequence length.

    Example:
        3  10  20  30
        2  11  42

    Each row is interpreted as a distinct sequence (pseudo-user).
    The first column may represent the declared sequence length and is
    ignored by default.

    Args:
        filepath: Path to the text/CSV file.
        user_col: Name assigned to the implicit sequence identifier column.
        item_col: Name assigned to the item column.
        col_sep: Delimiter used in the file.
        header: Optional row index to skip as header.
        drop_length_col: If True, the first column is treated as sequence
                         length and discarded.
        dataset_name: Name to assign to the resulting DataRec dataset.
        version_name: Version identifier to assign to the resulting DataRec dataset.

    Returns:
        DataRec: Transactional DataFrame where each sequence is treated
                 as a pseudo-user.
            (Returned via @annotate_datarec_output, which wraps the RawData.)
    """
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"File not found: {filepath}")

    if header is not None and header < 0:
        raise ValueError(f"header must be >= 0 or None, got {header}.")

    skiprows = header + 1 if header is not None else 0

    # Load each line as a raw string (ragged-safe, fast)
    df_raw = pd.read_csv(
        filepath,
        sep="\0",
        header=None,
        names=["_raw_string"],
        skiprows=skiprows,
        engine="c",
        quoting=csv.QUOTE_NONE,
    )

    if df_raw.empty:
        raise ValueError("The provided file is empty or all rows were skipped.")

    # Split line into tokens
    df_raw["_split"] = df_raw["_raw_string"].str.strip().str.split(col_sep)

    # Generate implicit sequence id (row-based)
    df_raw[user_col] = df_raw.index.astype(str)

    # Drop declared length if requested
    if drop_length_col:
        df_raw["_items_list"] = df_raw["_split"].str[1:]
    else:
        df_raw["_items_list"] = df_raw["_split"]

    # Explode sequences
    df_long = df_raw.explode("_items_list")

    df_long = df_long.rename(columns={"_items_list": item_col})
    df_long = df_long[[user_col, item_col]]

    # Cleanup
    df_long = df_long.dropna(subset=[item_col])
    df_long[item_col] = df_long[item_col].astype(str).str.strip()
    df_long = df_long[df_long[item_col] != ""]

    df_long = df_long.reset_index(drop=True)

    user_encoder = None
    item_encoder = None
    if encode_ids:
        u_enc = IncrementalEncoder(offset=0)
        i_enc = IncrementalEncoder(offset=0)
        df_long[user_col] = u_enc.encode_many(df_long[user_col].tolist())
        df_long[item_col] = i_enc.encode_many(df_long[item_col].tolist())
        user_encoder = u_enc.forward
        item_encoder = i_enc.forward

    rawdata = RawData(df_long, user=user_col, item=item_col, user_encoder=user_encoder, item_encoder=item_encoder)
    # Wrapped by @annotate_datarec_output to return DataRec at call sites.
    return cast(DataRec, rawdata)

read_transactions_tabular(filepath, *, sep='\t', user_col, item_col, rating_col=None, timestamp_col=None, header=None, skiprows=0, cols=None, engine='c', fallback_engine='python', stream=False, encode_ids=False, chunksize=100000, dataset_name='Unknown Dataset', version_name='Unknown Version')

Reads a tabular data file (CSV, TSV, etc.) into a RawData object.

Parameters:

Name Type Description Default
filepath str

Path to the tabular data file.

required
sep str

Delimiter to use (default: tab).

'\t'
user_col Union[str, int]

Column name or index for the user field (Required).

required
item_col Union[str, int]

Column name or index for the item field (Required).

required
rating_col Optional[Union[str, int]]

Column name or index for the rating field.

None
timestamp_col Optional[Union[str, int]]

Column name or index for the timestamp field.

None
header Union[int, List[int], str, None]

Row number(s) to use as the column names. Defaults to 'infer'.

None
skiprows Union[int, List[int]]

Line numbers to skip at the start of the file.

0
cols Optional[List[str]]

Explicit column names if the file has no header. Passed as names to pandas.read_csv.

None
engine Optional[str]

Pandas CSV engine.

'c'
fallback_engine Optional[str]

Engine to try if the primary fails.

'python'
stream bool

If True, read in chunks to reduce memory.

False
encode_ids bool

If True, encode user/item to int ids using IncrementalEncoder.

False
chunksize int

Rows per chunk when streaming.

100000
dataset_name str

Name to assign to the resulting DataRec dataset.

'Unknown Dataset'
version_name str

Version identifier to assign to the resulting DataRec dataset.

'Unknown Version'

Returns:

Name Type Description
DataRec DataRec

The loaded data. (Returned via @annotate_datarec_output, which wraps the RawData.)

Source code in datarec/io/readers/transactions/tabular.py
@annotate_datarec_output
def read_transactions_tabular(
    filepath: str,
    *,
    sep: str = "\t",
    user_col: Union[str, int],
    item_col: Union[str, int],
    rating_col: Optional[Union[str, int]] = None,
    timestamp_col: Optional[Union[str, int]] = None,
    header: Union[int, List[int], str, None] = None,
    skiprows: Union[int, List[int]] = 0,
    cols: Optional[List[str]] = None,
    engine: Optional[str] = 'c',
    fallback_engine: Optional[str] = 'python',
    stream: bool = False,
    encode_ids: bool = False,
    chunksize: int = 100_000,
    dataset_name: str = "Unknown Dataset",
    version_name: str = "Unknown Version",
) -> DataRec:
    """
    Reads a tabular data file (CSV, TSV, etc.) into a RawData object.

    Args:
        filepath: Path to the tabular data file.
        sep: Delimiter to use (default: tab).
        user_col: Column name or index for the user field (Required).
        item_col: Column name or index for the item field (Required).
        rating_col: Column name or index for the rating field.
        timestamp_col: Column name or index for the timestamp field.
        header: Row number(s) to use as the column names. Defaults to 'infer'.
        skiprows: Line numbers to skip at the start of the file.
        cols: Explicit column names if the file has no header. Passed as `names`
              to `pandas.read_csv`.
        engine: Pandas CSV engine.
        fallback_engine: Engine to try if the primary fails.
        stream: If True, read in chunks to reduce memory.
        encode_ids: If True, encode user/item to int ids using IncrementalEncoder.
        chunksize: Rows per chunk when streaming.
        dataset_name: Name to assign to the resulting DataRec dataset.
        version_name: Version identifier to assign to the resulting DataRec dataset.

    Returns:
        DataRec: The loaded data.
            (Returned via @annotate_datarec_output, which wraps the RawData.)
    """
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"File not found: {filepath}")

    read_kwargs = dict(sep=sep, header=header, skiprows=skiprows, engine=engine)
    if cols is not None:
        read_kwargs["names"] = cols

    if stream:
        rawdata = _read_transactions_tabular_stream(
            filepath=filepath,
            user_col=user_col,
            item_col=item_col,
            rating_col=rating_col,
            timestamp_col=timestamp_col,
            read_kwargs=read_kwargs,
            fallback_engine=fallback_engine,
            encode_ids=encode_ids,
            chunksize=chunksize,
        )
        # Wrapped by @annotate_datarec_output to return DataRec at call sites.
        return cast(DataRec, rawdata)

    try:
        data = pd.read_csv(filepath, **read_kwargs)
    except Exception as exc:
        if fallback_engine and fallback_engine != engine:
            read_kwargs["engine"] = fallback_engine
            data = pd.read_csv(filepath, **read_kwargs)
        else:
            raise exc

    # Helper to resolve column name from string or index
    def _resolve_col(col: Optional[Union[str, int]]) -> Optional[object]:
        """
        Resolve a column specifier to an existing column label in `data.columns`.

        Rules:
        - str: treated as a column name (label)
        - int: if it matches an existing column label, use it as label;
                otherwise treat it as positional index.
        """
        if col is None:
            return None

        cols_index = data.columns

        if isinstance(col, int):
            # Prefer label semantics if the label exists (works also with numpy integer labels)
            if col in cols_index:
                # Return the exact label object stored in the Index (could be np.int64(0), etc.)
                return int(cols_index[cols_index.get_loc(col)])

            # Otherwise, interpret as positional index
            if col < 0 or col >= len(cols_index):
                raise ValueError(
                    f"Column index {col} is out of bounds for {len(cols_index)} columns."
                )
            return cols_index[col]

        # str case: strict label match
        if col not in cols_index:
            raise ValueError(
                f"Column '{col}' not found in dataset columns: {list(cols_index)}"
            )
        return col

    user_col_name = _resolve_col(user_col)
    item_col_name = _resolve_col(item_col)
    rating_col_name = _resolve_col(rating_col)
    timestamp_col_name = _resolve_col(timestamp_col)

    std_columns = [user_col_name, item_col_name, rating_col_name, timestamp_col_name]
    assigned_columns = [c for c in std_columns if c is not None]

    if not assigned_columns or user_col_name is None or item_col_name is None:
        raise ValueError("User and Item columns are required.")

    # Subset data to the relevant columns
    data = data[assigned_columns]

    user_encoder = None
    item_encoder = None
    if encode_ids:
        u_enc = IncrementalEncoder(offset=0)
        i_enc = IncrementalEncoder(offset=0)
        data[user_col_name] = u_enc.encode_many(data[user_col_name].tolist())
        data[item_col_name] = i_enc.encode_many(data[item_col_name].tolist())
        user_encoder = u_enc.forward
        item_encoder = i_enc.forward

    rawdata = RawData(
        data,
        user=user_col_name,
        item=item_col_name,
        rating=rating_col_name,
        timestamp=timestamp_col_name,
        user_encoder=user_encoder,
        item_encoder=item_encoder,
    )

    # Wrapped by @annotate_datarec_output to return DataRec at call sites.
    return cast(DataRec, rawdata)

read_transactions_json(filepath, *, user_col, item_col, rating_col=None, timestamp_col=None, lines=True, stream=False, encode_ids=False, chunksize=100000, dataset_name='Unknown Dataset', version_name='Unknown Version')

Reads a JSON (or JSON Lines) file and returns it as a RawData object.

Arg names standardized to match read_tabular (user_col instead of user_field).

Parameters:

Name Type Description Default
filepath str

Path to the JSON file.

required
user_col str

JSON key corresponding to the user field (Required).

required
item_col str

JSON key corresponding to the item field (Required).

required
rating_col Optional[str]

JSON key corresponding to the rating field.

None
timestamp_col Optional[str]

JSON key corresponding to the timestamp field.

None
lines bool

If True, reads the file as a JSON object per line (JSONL).

True
dataset_name str

Name to assign to the resulting DataRec dataset.

'Unknown Dataset'
version_name str

Version identifier to assign to the resulting DataRec dataset.

'Unknown Version'

Returns:

Name Type Description
DataRec DataRec

The loaded data. (Returned via @annotate_datarec_output, which wraps the RawData.)

Source code in datarec/io/readers/transactions/json.py
@annotate_datarec_output
def read_transactions_json(
    filepath: str, 
    *,
    user_col: str, 
    item_col: str, 
    rating_col: Optional[str] = None, 
    timestamp_col: Optional[str] = None, 
    lines: bool = True,
    stream: bool = False,
    encode_ids: bool = False,
    chunksize: int = 100_000,
    dataset_name: str = "Unknown Dataset",
    version_name: str = "Unknown Version",
) -> DataRec:
    """
    Reads a JSON (or JSON Lines) file and returns it as a RawData object.

    Arg names standardized to match read_tabular (user_col instead of user_field).

    Args:
        filepath: Path to the JSON file.
        user_col: JSON key corresponding to the user field (Required).
        item_col: JSON key corresponding to the item field (Required).
        rating_col: JSON key corresponding to the rating field.
        timestamp_col: JSON key corresponding to the timestamp field.
        lines: If True, reads the file as a JSON object per line (JSONL).
        dataset_name: Name to assign to the resulting DataRec dataset.
        version_name: Version identifier to assign to the resulting DataRec dataset.

    Returns:
        DataRec: The loaded data.
            (Returned via @annotate_datarec_output, which wraps the RawData.)
    """
    rawdata = read_transactions_json_base(
        filepath,
        user_col=user_col,
        item_col=item_col,
        rating_col=rating_col,
        timestamp_col=timestamp_col,
        lines=False,
        stream=stream,
        encode_ids=encode_ids,
        chunksize=chunksize,
    )
    # Wrapped by @annotate_datarec_output to return DataRec at call sites.
    return cast(DataRec, rawdata)

read_transactions_jsonl(filepath, *, user_col, item_col, rating_col=None, timestamp_col=None, stream=False, encode_ids=False, chunksize=100000, dataset_name='Unknown Dataset', version_name='Unknown Version')

Reads a JSON Lines file and returns it as a RawData object.

Arg names standardized to match read_tabular (user_col instead of user_field).

Parameters:

Name Type Description Default
filepath str

Path to the JSON file.

required
user_col str

JSON key corresponding to the user field (Required).

required
item_col str

JSON key corresponding to the item field (Required).

required
rating_col Optional[str]

JSON key corresponding to the rating field.

None
timestamp_col Optional[str]

JSON key corresponding to the timestamp field.

None
dataset_name str

Name to assign to the resulting DataRec dataset.

'Unknown Dataset'
version_name str

Version identifier to assign to the resulting DataRec dataset.

'Unknown Version'

Returns:

Name Type Description
DataRec DataRec

The loaded data. (Returned via @annotate_datarec_output, which wraps the RawData.)

Source code in datarec/io/readers/transactions/jsonl.py
@annotate_datarec_output
def read_transactions_jsonl(
    filepath: str, 
    *,
    user_col: str, 
    item_col: str, 
    rating_col: Optional[str] = None, 
    timestamp_col: Optional[str] = None,
    stream: bool = False,
    encode_ids: bool = False,
    chunksize: int = 100_000,
    dataset_name: str = "Unknown Dataset",
    version_name: str = "Unknown Version",
) -> DataRec:
    """
    Reads a JSON Lines file and returns it as a RawData object.

    Arg names standardized to match read_tabular (user_col instead of user_field).

    Args:
        filepath: Path to the JSON file.
        user_col: JSON key corresponding to the user field (Required).
        item_col: JSON key corresponding to the item field (Required).
        rating_col: JSON key corresponding to the rating field.
        timestamp_col: JSON key corresponding to the timestamp field.
        dataset_name: Name to assign to the resulting DataRec dataset.
        version_name: Version identifier to assign to the resulting DataRec dataset.

    Returns:
        DataRec: The loaded data.
            (Returned via @annotate_datarec_output, which wraps the RawData.)
    """
    rawdata = read_transactions_json_base(
        filepath,
        user_col=user_col,
        item_col=item_col,
        rating_col=rating_col,
        timestamp_col=timestamp_col,
        lines=True,
        stream=stream,
        encode_ids=encode_ids,
        chunksize=chunksize,
    )
    # Wrapped by @annotate_datarec_output to return DataRec at call sites.
    return cast(DataRec, rawdata)

read_transactions_blocks(filepath, *, block_by, event_layout, user_col='user', item_col='item', rating_col=None, timestamp_col=None, sep='\t', chunksize=None, dataset_name='Unknown Dataset', version_name='Unknown Version')

Reads a block text format into transactional RawData.

Block structure
  • Header line per block: ":"
  • Event lines:
    • "id"
    • "id,rating"
    • "id,rating,timestamp"

The block header identifies either the item or the user depending on block_by. The event line id is the opposite entity.

Parameters:

Name Type Description Default
filepath str

Path to the block text file.

required
block_by Literal['item', 'user']

Whether blocks are grouped by "item" or by "user".

required
event_layout Literal['id', 'id,rating', 'id,rating,timestamp']

Layout of event lines.

required
user_col str

Output user column name.

'user'
item_col str

Output item column name.

'item'
rating_col Optional[str]

Output rating column name (required if layout includes rating).

None
timestamp_col Optional[str]

Output timestamp column name (required if layout includes timestamp).

None
sep str

Field separator used in event lines.

'\t'
chunksize Optional[int]

Optional number of rows per in-memory chunk before concatenation.

None
dataset_name str

Name to assign to the resulting DataRec dataset.

'Unknown Dataset'
version_name str

Version identifier to assign to the resulting DataRec dataset.

'Unknown Version'

Returns:

Name Type Description
DataRec DataRec

A DataRec object containing all interactions row-by-row. (Returned via @annotate_datarec_output, which wraps the RawData.)

Source code in datarec/io/readers/transactions/blocks.py
@annotate_datarec_output
def read_transactions_blocks(
    filepath: str,
    *,
    block_by: Literal["item", "user"],
    event_layout: Literal["id", "id,rating", "id,rating,timestamp"],
    user_col: str = "user",
    item_col: str = "item",
    rating_col: Optional[str] = None,
    timestamp_col: Optional[str] = None,
    sep: str = "\t",
    chunksize: Optional[int] = None,
    dataset_name: str = "Unknown Dataset",
    version_name: str = "Unknown Version",
) -> DataRec:
    """
    Reads a block text format into transactional RawData.

    Block structure:
      - Header line per block: "<BLOCK_ID>:"
      - Event lines:
          - "id"
          - "id,rating"
          - "id,rating,timestamp"

    The block header identifies either the item or the user depending on
    block_by. The event line id is the opposite entity.

    Args:
        filepath: Path to the block text file.
        block_by: Whether blocks are grouped by "item" or by "user".
        event_layout: Layout of event lines.
        user_col: Output user column name.
        item_col: Output item column name.
        rating_col: Output rating column name (required if layout includes rating).
        timestamp_col: Output timestamp column name (required if layout includes timestamp).
        sep: Field separator used in event lines.
        chunksize: Optional number of rows per in-memory chunk before concatenation.
        dataset_name: Name to assign to the resulting DataRec dataset.
        version_name: Version identifier to assign to the resulting DataRec dataset.

    Returns:
        DataRec: A DataRec object containing all interactions row-by-row.
            (Returned via @annotate_datarec_output, which wraps the RawData.)
    """
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"File not found: {filepath}")

    expects_rating = event_layout in ("id,rating", "id,rating,timestamp")
    expects_timestamp = event_layout == "id,rating,timestamp"

    if expects_rating and rating_col is None:
        raise ValueError("rating_col must be provided when event_layout includes rating.")
    if not expects_rating and rating_col is not None:
        raise ValueError("rating_col must be None when event_layout does not include rating.")
    if expects_timestamp and timestamp_col is None:
        raise ValueError("timestamp_col must be provided when event_layout includes timestamp.")
    if not expects_timestamp and timestamp_col is not None:
        raise ValueError("timestamp_col must be None when event_layout does not include timestamp.")

    frames: List[pd.DataFrame] = []
    users: List[Any] = []
    items: List[Any] = []
    ratings: List[Any] = [] if rating_col is not None else []
    timestamps: List[Any] = [] if timestamp_col is not None else []

    current_block_id: Optional[str] = None

    def _flush() -> None:
        if not users:
            return
        data: Dict[str, List[Any]] = {
            user_col: users.copy(),
            item_col: items.copy(),
        }
        if rating_col is not None:
            data[rating_col] = ratings.copy()
        if timestamp_col is not None:
            data[timestamp_col] = timestamps.copy()
        frames.append(pd.DataFrame(data))
        users.clear()
        items.clear()
        timestamps.clear()
        if rating_col is not None:
            ratings.clear()

    with open(filepath, "r", encoding="utf-8") as f:
        for line_num, raw_line in enumerate(f, start=1):
            line = raw_line.rstrip("\n")
            if line == "":
                _raise_parse_error(
                    line_num,
                    "Empty lines are not allowed",
                    raw_line,
                    current_block_id,
                    block_by,
                )

            stripped = line.strip()
            if stripped.endswith(":"):
                block_id = stripped[:-1].strip()
                if not block_id:
                    _raise_parse_error(
                        line_num,
                        "Missing block id in header",
                        raw_line,
                        current_block_id,
                        block_by,
                    )
                current_block_id = block_id
                continue

            if current_block_id is None:
                _raise_parse_error(
                    line_num,
                    "Event found before any block header",
                    raw_line,
                    current_block_id,
                    block_by,
                )

            parts = [p.strip() for p in line.split(sep)]
            if event_layout == "id":
                expected_fields = 1
            elif event_layout == "id,rating":
                expected_fields = 2
            else:
                expected_fields = 3
            if len(parts) != expected_fields:
                _raise_parse_error(
                    line_num,
                    f"Expected {expected_fields} fields separated by {sep!r}, got {len(parts)}",
                    raw_line,
                    current_block_id,
                    block_by,
                )

            other_id = parts[0]
            if event_layout == "id":
                rating_val = None
                date_val = None
            elif event_layout == "id,rating":
                rating_val = parts[1]
                date_val = None
            else:
                rating_val = parts[1]
                date_val = parts[2]

            if block_by == "item":
                users.append(other_id)
                items.append(current_block_id)
            else:
                users.append(current_block_id)
                items.append(other_id)

            if timestamp_col is not None:
                timestamps.append(date_val)
            if rating_col is not None:
                ratings.append(rating_val)

            if chunksize is not None and len(users) >= chunksize:
                _flush()

    _flush()

    if frames:
        data = pd.concat(frames, ignore_index=True)
    else:
        data_dict: Dict[str, List[Any]] = {
            user_col: users,
            item_col: items,
        }
        if rating_col is not None:
            data_dict[rating_col] = ratings
        if timestamp_col is not None:
            data_dict[timestamp_col] = timestamps
        data = pd.DataFrame(data_dict)

    raw = RawData(
        data,
        user=user_col,
        item=item_col,
        rating=rating_col,
        timestamp=timestamp_col,
    )

    # Wrapped by @annotate_datarec_output to return DataRec at call sites.
    return cast(DataRec, raw)

write_sequences_json(data, filepath, *, item_col='item', rating_col='rating', timestamp_col='timestamp', include_rating=False, include_timestamp=False, ensure_ascii=False, indent=2, verbose=True)

Writes sequential interaction data to a JSON mapping in the form:

{
  "<user_id>": [
    { "<item_col>": ..., "<rating_col>": ..., "<timestamp_col>": ... },
    ...
  ],
  ...
}

Notes: - The input is expected to be transactional RawData (one row per interaction). - User identifiers become JSON object keys (strings). Therefore, the output does NOT contain a user field name (no user_col parameter).

Parameters:

Name Type Description Default
data Union[RawData, DataRec]

RawData or DataRec instance.

required
filepath str

Output path.

required
item_col str

Output key for the item field inside each event.

'item'
rating_col str

Output key for the rating field inside each event (only if include_rating=True).

'rating'
timestamp_col str

Output key for the timestamp field inside each event (only if include_timestamp=True).

'timestamp'
include_rating bool

Whether to include rating in each event.

False
include_timestamp bool

Whether to include timestamp in each event.

False
ensure_ascii bool

Whether to escape non-ascii characters.

False
indent Optional[int]

Pretty-print indentation level.

2
verbose bool

Whether to print a confirmation message.

True

Returns:

Type Description
None

None

Source code in datarec/io/writers/sequences/json.py
def write_sequences_json(
    data: Union["RawData", "DataRec"],
    filepath: str,
    *,
    item_col: str = "item",
    rating_col: str = "rating",
    timestamp_col: str = "timestamp",
    include_rating: bool = False,
    include_timestamp: bool = False,
    ensure_ascii: bool = False,
    indent: Optional[int] = 2,
    verbose: bool = True,
) -> None:
    """
    Writes sequential interaction data to a JSON mapping in the form:

        {
          "<user_id>": [
            { "<item_col>": ..., "<rating_col>": ..., "<timestamp_col>": ... },
            ...
          ],
          ...
        }

    Notes:
    - The input is expected to be transactional RawData (one row per interaction).
    - User identifiers become JSON object keys (strings). Therefore, the output does
      NOT contain a user field name (no `user_col` parameter).

    Args:
        data: RawData or DataRec instance.
        filepath: Output path.
        item_col: Output key for the item field inside each event.
        rating_col: Output key for the rating field inside each event (only if include_rating=True).
        timestamp_col: Output key for the timestamp field inside each event (only if include_timestamp=True).
        include_rating: Whether to include rating in each event.
        include_timestamp: Whether to include timestamp in each event.
        ensure_ascii: Whether to escape non-ascii characters.
        indent: Pretty-print indentation level.
        verbose: Whether to print a confirmation message.

    Returns:
        None
    """
    raw = as_rawdata(data)

    if raw.user is None:
        raise ValueError("RawData.user is not defined.")
    if raw.item is None:
        raise ValueError("RawData.item is not defined.")
    if include_rating and raw.rating is None:
        raise ValueError("include_rating=True but RawData.rating is not defined.")
    if include_timestamp and raw.timestamp is None:
        raise ValueError("include_timestamp=True but RawData.timestamp is not defined.")

    cols: List[object] = [raw.user, raw.item]
    if include_rating:
        cols.append(raw.rating)  # type: ignore[arg-type]
    if include_timestamp:
        cols.append(raw.timestamp)  # type: ignore[arg-type]

    df = raw.data[cols].dropna(subset=[raw.user, raw.item])

    payload: Dict[str, List[Dict[str, Any]]] = {}

    # Preserve order of appearance
    for uid, g in df.groupby(raw.user, sort=False):
        # Build event dicts in a vectorized-ish way
        g2 = g.rename(columns={raw.item: item_col})

        keep_cols = [item_col]
        if include_rating:
            g2 = g2.rename(columns={raw.rating: rating_col})  # type: ignore[arg-type]
            keep_cols.append(rating_col)
        if include_timestamp:
            g2 = g2.rename(columns={raw.timestamp: timestamp_col})  # type: ignore[arg-type]
            keep_cols.append(timestamp_col)

        records = g2[keep_cols].to_dict(orient="records")

        # Make JSON-safe
        safe_events: List[Dict[str, Any]] = []
        for rec in records:
            safe_events.append({k: _json_safe(v) for k, v in rec.items()})

        payload[str(_json_safe(uid))] = safe_events

    out_dir = os.path.dirname(os.path.abspath(filepath))
    if out_dir and not os.path.exists(out_dir):
        os.makedirs(out_dir, exist_ok=True)

    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(payload, f, ensure_ascii=ensure_ascii, indent=indent)

    if verbose:
        print(f"Sequences JSON mapping written to '{filepath}'")

write_sequences_json_array(data, filepath, *, user_col='user', item_col='item', rating_col='rating', timestamp_col='timestamp', sequence_key='sequence', include_rating=False, include_timestamp=False, ensure_ascii=False, indent=2, verbose=True)

Writes sequential interaction data to a JSON array format:

[
  {
    "<user_col>": <user_id>,
    "<sequence_key>": [
      { "<item_col>": ..., "<rating_col>": ..., "<timestamp_col>": ... },
      ...
    ]
  },
  ...
]

Notes: - The input is expected to be transactional RawData (one row per interaction). This writer groups interactions by user and produces a per-user sequence list. - Unlike the mapping format, user ids remain values (not JSON keys).

Parameters:

Name Type Description Default
data Union[RawData, DataRec]

RawData or DataRec instance.

required
filepath str

Output path.

required
user_col str

Output key for the user field in each top-level object.

'user'
item_col str

Output key for the item field inside each event.

'item'
rating_col str

Output key for the rating field inside each event (only if include_rating=True).

'rating'
timestamp_col str

Output key for the timestamp field inside each event (only if include_timestamp=True).

'timestamp'
sequence_key str

Output key containing the list of events per user.

'sequence'
include_rating bool

Whether to include rating in each event.

False
include_timestamp bool

Whether to include timestamp in each event.

False
ensure_ascii bool

Whether to escape non-ascii characters.

False
indent Optional[int]

Pretty-print indentation level.

2
verbose bool

Whether to print a confirmation message.

True

Returns:

Type Description
None

None

Source code in datarec/io/writers/sequences/json.py
def write_sequences_json_array(
    data: Union["RawData", "DataRec"],
    filepath: str,
    *,
    user_col: str = "user",
    item_col: str = "item",
    rating_col: str = "rating",
    timestamp_col: str = "timestamp",
    sequence_key: str = "sequence",
    include_rating: bool = False,
    include_timestamp: bool = False,
    ensure_ascii: bool = False,
    indent: Optional[int] = 2,
    verbose: bool = True,
) -> None:
    """
    Writes sequential interaction data to a JSON array format:

        [
          {
            "<user_col>": <user_id>,
            "<sequence_key>": [
              { "<item_col>": ..., "<rating_col>": ..., "<timestamp_col>": ... },
              ...
            ]
          },
          ...
        ]

    Notes:
    - The input is expected to be transactional RawData (one row per interaction).
      This writer groups interactions by user and produces a per-user sequence list.
    - Unlike the mapping format, user ids remain values (not JSON keys).

    Args:
        data: RawData or DataRec instance.
        filepath: Output path.
        user_col: Output key for the user field in each top-level object.
        item_col: Output key for the item field inside each event.
        rating_col: Output key for the rating field inside each event (only if include_rating=True).
        timestamp_col: Output key for the timestamp field inside each event (only if include_timestamp=True).
        sequence_key: Output key containing the list of events per user.
        include_rating: Whether to include rating in each event.
        include_timestamp: Whether to include timestamp in each event.
        ensure_ascii: Whether to escape non-ascii characters.
        indent: Pretty-print indentation level.
        verbose: Whether to print a confirmation message.

    Returns:
        None
    """
    raw = as_rawdata(data)

    if raw.user is None:
        raise ValueError("RawData.user is not defined.")
    if raw.item is None:
        raise ValueError("RawData.item is not defined.")
    if include_rating and raw.rating is None:
        raise ValueError("include_rating=True but RawData.rating is not defined.")
    if include_timestamp and raw.timestamp is None:
        raise ValueError("include_timestamp=True but RawData.timestamp is not defined.")

    cols: List[object] = [raw.user, raw.item]
    if include_rating:
        cols.append(raw.rating)  # type: ignore[arg-type]
    if include_timestamp:
        cols.append(raw.timestamp)  # type: ignore[arg-type]

    df = raw.data[cols].dropna(subset=[raw.user, raw.item])

    payload: List[Dict[str, Any]] = []

    for uid, g in df.groupby(raw.user, sort=False):
        g2 = g.rename(columns={raw.item: item_col})

        keep_cols = [item_col]
        if include_rating:
            g2 = g2.rename(columns={raw.rating: rating_col})  # type: ignore[arg-type]
            keep_cols.append(rating_col)
        if include_timestamp:
            g2 = g2.rename(columns={raw.timestamp: timestamp_col})  # type: ignore[arg-type]
            keep_cols.append(timestamp_col)

        records = g2[keep_cols].to_dict(orient="records")
        safe_events = [{k: _json_safe(v) for k, v in rec.items()} for rec in records]

        payload.append(
            {
                user_col: _json_safe(uid),
                sequence_key: safe_events,
            }
        )

    out_dir = os.path.dirname(os.path.abspath(filepath))
    if out_dir and not os.path.exists(out_dir):
        os.makedirs(out_dir, exist_ok=True)

    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(payload, f, ensure_ascii=ensure_ascii, indent=indent)

    if verbose:
        print(f"Sequences JSON array written to '{filepath}'")

write_sequences_json_items(data, filepath, *, item_col='item', compact_items=True, ensure_ascii=False, indent=2, verbose=True)

Writes sequential interaction data to a JSON mapping in the form:

{
  "<user_id>": [item_id, item_id, ...],
  ...
}

Notes: - The input is expected to be transactional RawData (one row per interaction). - User identifiers become JSON object keys (strings). Therefore, the output does NOT contain a user field name (no user_col parameter).

Parameters:

Name Type Description Default
data Union[RawData, DataRec]

RawData or DataRec instance.

required
filepath str

Output path.

required
item_col str

Output key name for the item field in RawData.

'item'
compact_items bool

Whether to keep item lists on a single line when indenting.

True
ensure_ascii bool

Whether to escape non-ascii characters.

False
indent Optional[int]

Pretty-print indentation level.

2
verbose bool

Whether to print a confirmation message.

True

Returns:

Type Description
None

None

Source code in datarec/io/writers/sequences/json.py
def write_sequences_json_items(
    data: Union["RawData", "DataRec"],
    filepath: str,
    *,
    item_col: str = "item",
    compact_items: bool = True,
    ensure_ascii: bool = False,
    indent: Optional[int] = 2,
    verbose: bool = True,
) -> None:
    """
    Writes sequential interaction data to a JSON mapping in the form:

        {
          "<user_id>": [item_id, item_id, ...],
          ...
        }

    Notes:
    - The input is expected to be transactional RawData (one row per interaction).
    - User identifiers become JSON object keys (strings). Therefore, the output does
      NOT contain a user field name (no `user_col` parameter).

    Args:
        data: RawData or DataRec instance.
        filepath: Output path.
        item_col: Output key name for the item field in RawData.
        compact_items: Whether to keep item lists on a single line when indenting.
        ensure_ascii: Whether to escape non-ascii characters.
        indent: Pretty-print indentation level.
        verbose: Whether to print a confirmation message.

    Returns:
        None
    """
    raw = as_rawdata(data)

    if raw.user is None:
        raise ValueError("RawData.user is not defined.")
    if raw.item is None:
        raise ValueError("RawData.item is not defined.")

    df = raw.data[[raw.user, raw.item]].dropna(subset=[raw.user, raw.item])

    payload: Dict[str, List[Any]] = {}

    # Preserve order of appearance
    for uid, g in df.groupby(raw.user, sort=False):
        items = [_json_safe(v) for v in g[raw.item].tolist()]
        payload[str(_json_safe(uid))] = items

    out_dir = os.path.dirname(os.path.abspath(filepath))
    if out_dir and not os.path.exists(out_dir):
        os.makedirs(out_dir, exist_ok=True)

    with open(filepath, "w", encoding="utf-8") as f:
        if compact_items and indent is not None:
            indent_str = " " * indent
            f.write("{\n")
            items = list(payload.items())
            for idx, (uid, item_list) in enumerate(items):
                key_json = json.dumps(uid, ensure_ascii=ensure_ascii)
                list_json = json.dumps(item_list, ensure_ascii=ensure_ascii)
                sep = "," if idx < len(items) - 1 else ""
                f.write(f"{indent_str}{key_json}: {list_json}{sep}\n")
            f.write("}\n")
        else:
            json.dump(payload, f, ensure_ascii=ensure_ascii, indent=indent)

    if verbose:
        print(f"Sequences JSON item list written to '{filepath}'")

write_sequence_tabular_inline(data, filepath, *, user_col='user', sequence_col='sequence', sequence_sep=' ', include_timestamp=False, timestamp_col='timestamp', meta_cols=None, col_sep=',', header=True, index=False, decimal='.', engine=None, verbose=True)

Writes sequential interaction data to a tabular file where each row contains a single user and a serialized sequence in one column (inline format).

Output format (one sequence per row): user_col, sequence_col, [timestamp_col], [meta_cols...]

Notes: - The input is expected to be transactional RawData (one row per (user, item)), as produced by DataRec readers. This writer groups interactions by user and serializes the per-user item list using sequence_sep. - If include_timestamp=True, a per-user timestamp is derived by aggregating RawData timestamps using a deterministic strategy (min timestamp). - Metadata columns (meta_cols) are aggregated per user using a deterministic strategy (first value).

Parameters:

Name Type Description Default
data Union[RawData, DataRec]

RawData or DataRec instance.

required
filepath str

Output path.

required
user_col str

Output column name for user IDs.

'user'
sequence_col str

Output column name for the serialized sequence.

'sequence'
sequence_sep str

Separator used to serialize items in the sequence string.

' '
include_timestamp bool

Whether to include a per-user timestamp column.

False
timestamp_col str

Output column name for timestamp (used only if include_timestamp=True).

'timestamp'
meta_cols Optional[List[str]]

Additional metadata columns to include (if present in RawData).

None
col_sep str

Column delimiter for the output file.

','
header bool

Whether to write column names.

True
index bool

Whether to write the DataFrame index.

False
decimal str

Decimal separator passed to pandas.

'.'
engine Optional[str]

Optional pandas CSV engine hint.

None
verbose bool

Whether to print a confirmation message.

True

Returns:

Type Description
None

None

Source code in datarec/io/writers/sequences/tabular.py
def write_sequence_tabular_inline(
    data: Union["RawData", "DataRec"],
    filepath: str,
    *,
    user_col: str = "user",
    sequence_col: str = "sequence",
    sequence_sep: str = " ",
    include_timestamp: bool = False,
    timestamp_col: str = "timestamp",
    meta_cols: Optional[List[str]] = None,
    col_sep: str = ",",
    header: bool = True,
    index: bool = False,
    decimal: str = ".",
    engine: Optional[str] = None,
    verbose: bool = True,
) -> None:
    """
    Writes sequential interaction data to a tabular file where each row contains
    a single user and a serialized sequence in one column (inline format).

    Output format (one sequence per row):
        user_col, sequence_col, [timestamp_col], [meta_cols...]

    Notes:
    - The input is expected to be transactional RawData (one row per (user, item)),
      as produced by DataRec readers. This writer groups interactions by user and
      serializes the per-user item list using `sequence_sep`.
    - If include_timestamp=True, a per-user timestamp is derived by aggregating
      RawData timestamps using a deterministic strategy (min timestamp).
    - Metadata columns (meta_cols) are aggregated per user using a deterministic
      strategy (first value).

    Args:
        data: RawData or DataRec instance.
        filepath: Output path.
        user_col: Output column name for user IDs.
        sequence_col: Output column name for the serialized sequence.
        sequence_sep: Separator used to serialize items in the sequence string.
        include_timestamp: Whether to include a per-user timestamp column.
        timestamp_col: Output column name for timestamp (used only if include_timestamp=True).
        meta_cols: Additional metadata columns to include (if present in RawData).
        col_sep: Column delimiter for the output file.
        header: Whether to write column names.
        index: Whether to write the DataFrame index.
        decimal: Decimal separator passed to pandas.
        engine: Optional pandas CSV engine hint.
        verbose: Whether to print a confirmation message.

    Returns:
        None
    """
    raw = as_rawdata(data)

    if raw.user is None:
        raise ValueError("RawData.user is not defined.")
    if raw.item is None:
        raise ValueError("RawData.item is not defined.")

    if meta_cols is None:
        meta_cols = []

    df = raw.data

    # Validate timestamp intent vs availability
    if include_timestamp:
        if raw.timestamp is None:
            raise ValueError("include_timestamp=True but RawData.timestamp is not defined.")
        if raw.timestamp not in df.columns:
            raise ValueError(f"Timestamp column '{raw.timestamp}' not found in RawData.data.")

    # Determine which meta columns are available
    available_meta = [mc for mc in meta_cols if mc in df.columns]

    # Select needed columns (no copy)
    cols_needed: List[object] = [raw.user, raw.item]
    if include_timestamp:
        cols_needed.append(raw.timestamp)  # type: ignore[arg-type]
    cols_needed.extend(available_meta)

    df = df[cols_needed].dropna(subset=[raw.user, raw.item])

    # Aggregation strategy:
    # - items: list in input order
    # - timestamp: min (deterministic)
    # - meta: first (deterministic)
    agg: Dict[object, Any] = {raw.item: list}

    if include_timestamp:
        agg[raw.timestamp] = "min"  # type: ignore[index]

    for mc in available_meta:
        agg[mc] = "first"

    grouped = df.groupby(raw.user, sort=False, dropna=False).agg(agg).reset_index()

    # Serialize sequence
    grouped[sequence_col] = grouped[raw.item].apply(
        lambda items: sequence_sep.join(str(x).strip() for x in items if str(x).strip() != "")
    )

    # Drop users with empty sequences
    grouped = grouped[grouped[sequence_col] != ""]

    # Rename user (and timestamp if requested) to output names
    rename_map: Dict[object, str] = {raw.user: user_col}
    if include_timestamp:
        rename_map[raw.timestamp] = timestamp_col  # type: ignore[index]

    grouped = grouped.rename(columns=rename_map)

    # Drop the aggregated list column (raw.item), keep only the serialized sequence
    grouped = grouped.drop(columns=[raw.item])

    # Final column order
    out_cols: List[str] = [user_col, sequence_col]
    if include_timestamp:
        out_cols.append(timestamp_col)

    # Preserve user-requested ordering for meta_cols (only those available)
    for mc in meta_cols:
        if mc in grouped.columns and mc not in out_cols:
            out_cols.append(mc)

    grouped = grouped[out_cols]

    # Ensure output directory exists
    out_dir = os.path.dirname(os.path.abspath(filepath))
    if out_dir and not os.path.exists(out_dir):
        os.makedirs(out_dir, exist_ok=True)

    to_csv_kwargs = dict(sep=col_sep, header=header, index=index, decimal=decimal)
    if engine is not None:
        to_csv_kwargs["engine"] = engine

    grouped.to_csv(filepath, **to_csv_kwargs)

    if verbose:
        print(f"Inline sequence dataset written to '{filepath}'")

write_sequence_tabular_wide(data, filepath, *, user_col='user', item_col_prefix='item', col_sep='\t', header=False, index=False, decimal='.', verbose=True)

Writes sequential interaction data to a tabular-wide format, where each row corresponds to a user and items are spread across multiple columns.

Output example

user item_1 item_2 item_3

Parameters:

Name Type Description Default
data Union[RawData, DataRec]

RawData or DataRec instance.

required
filepath str

Output path.

required
user_col str

Output column name for user IDs.

'user'
item_col_prefix str

Prefix for item columns (e.g., item_1, item_2, ...).

'item'
col_sep str

Column delimiter.

'\t'
header bool

Whether to write column names.

False
index bool

Whether to write the DataFrame index.

False
decimal str

Decimal separator.

'.'
verbose bool

Whether to print a confirmation message.

True

Returns:

Type Description
None

None

Source code in datarec/io/writers/sequences/tabular.py
def write_sequence_tabular_wide(
    data: Union["RawData", "DataRec"],
    filepath: str,
    *,
    user_col: str = "user",
    item_col_prefix: str = "item",
    col_sep: str = "\t",
    header: bool = False,
    index: bool = False,
    decimal: str = ".",
    verbose: bool = True,
) -> None:
    """
    Writes sequential interaction data to a tabular-wide format, where each row
    corresponds to a user and items are spread across multiple columns.

    Output example:
        user <sep> item_1 <sep> item_2 <sep> item_3

    Args:
        data: RawData or DataRec instance.
        filepath: Output path.
        user_col: Output column name for user IDs.
        item_col_prefix: Prefix for item columns (e.g., item_1, item_2, ...).
        col_sep: Column delimiter.
        header: Whether to write column names.
        index: Whether to write the DataFrame index.
        decimal: Decimal separator.
        verbose: Whether to print a confirmation message.

    Returns:
        None
    """
    raw = as_rawdata(data)

    if raw.user is None or raw.item is None:
        raise ValueError("RawData must define both user and item columns.")

    df = raw.data[[raw.user, raw.item]].dropna()

    # Preserve order of appearance
    grouped = (
        df.groupby(raw.user, sort=False)[raw.item]
        .apply(list)
        .reset_index()
    )

    # Build wide DataFrame
    max_len = grouped[raw.item].map(len).max()

    wide_items = pd.DataFrame(
        grouped[raw.item].tolist(),
        columns=[f"{item_col_prefix}_{i+1}" for i in range(max_len)],
    )

    out_df = pd.concat(
        [
            grouped[[raw.user]].rename(columns={raw.user: user_col}),
            wide_items,
        ],
        axis=1,
    )

    out_dir = os.path.dirname(os.path.abspath(filepath))
    if out_dir and not os.path.exists(out_dir):
        os.makedirs(out_dir, exist_ok=True)

    out_df.to_csv(
        filepath,
        sep=col_sep,
        header=header,
        index=index,
        decimal=decimal,
    )

    if verbose:
        print(f"Wide sequence dataset written to '{filepath}'")

write_sequence_tabular_implicit(data, filepath, *, include_length_col=True, col_sep=' ', header=False, verbose=True)

Writes sequential interaction data to a tabular-implicit format, where each row represents a sequence and no explicit user identifier is written.

Output example (include_length_col=True, col_sep=" "): 3 10 20 30 2 11 42

Notes: - Each unique user in RawData is treated as a sequence instance. - The user identifier is NOT written to file. - If include_length_col=True, the first token is the sequence length.

Parameters:

Name Type Description Default
data Union[RawData, DataRec]

RawData or DataRec instance.

required
filepath str

Output path.

required
include_length_col bool

Whether to prepend the sequence length token.

True
col_sep str

Token separator used within each row (must match the reader's col_sep).

' '
header bool

Whether to write a header row (generally False for this format).

False
verbose bool

Whether to print a confirmation message.

True

Returns:

Type Description
None

None

Source code in datarec/io/writers/sequences/tabular.py
def write_sequence_tabular_implicit(
    data: Union["RawData", "DataRec"],
    filepath: str,
    *,
    include_length_col: bool = True,
    col_sep: str = " ",
    header: bool = False,
    verbose: bool = True,
) -> None:
    """
    Writes sequential interaction data to a tabular-implicit format, where each row
    represents a sequence and no explicit user identifier is written.

    Output example (include_length_col=True, col_sep=" "):
        3 10 20 30
        2 11 42

    Notes:
    - Each unique user in RawData is treated as a sequence instance.
    - The user identifier is NOT written to file.
    - If include_length_col=True, the first token is the sequence length.

    Args:
        data: RawData or DataRec instance.
        filepath: Output path.
        include_length_col: Whether to prepend the sequence length token.
        col_sep: Token separator used within each row (must match the reader's `col_sep`).
        header: Whether to write a header row (generally False for this format).
        verbose: Whether to print a confirmation message.

    Returns:
        None
    """
    raw = as_rawdata(data)

    if raw.user is None or raw.item is None:
        raise ValueError("RawData must define both user and item columns.")

    df = raw.data[[raw.user, raw.item]].dropna(subset=[raw.user, raw.item])

    grouped = (
        df.groupby(raw.user, sort=False)[raw.item]
        .apply(list)
        .reset_index(drop=True)
    )

    rows = []
    for items in grouped:
        items_str = [str(x).strip() for x in items if str(x).strip() != ""]
        if not items_str:
            continue
        if include_length_col:
            row_tokens = [str(len(items_str))] + items_str
        else:
            row_tokens = items_str
        rows.append(col_sep.join(row_tokens))

    if not rows:
        raise ValueError("No valid sequences to write.")

    out_dir = os.path.dirname(os.path.abspath(filepath))
    if out_dir and not os.path.exists(out_dir):
        os.makedirs(out_dir, exist_ok=True)

    with open(filepath, "w", encoding="utf-8") as f:
        if header:
            # This format typically has no header; if requested, write a minimal one.
            f.write("sequence\n")
        for row in rows:
            f.write(row + "\n")

    if verbose:
        print(f"Implicit sequence dataset written to '{filepath}'")

write_transactions_tabular(data, filepath, *, sep='\t', header=True, decimal='.', include_user=True, include_item=True, include_rating=False, include_timestamp=False, user_col=None, item_col=None, rating_col=None, timestamp_col=None, index=False, engine=None, verbose=True)

Writes transactional interaction data to a tabular file (CSV/TSV/etc.).

one interaction per row

user, item, [rating], [timestamp]

This writer accepts either: - RawData - DataRec (converted via .to_rawdata())

Parameters:

Name Type Description Default
data Union[RawData, DataRec]

RawData or DataRec instance.

required
filepath str

Output path.

required
sep str

Column delimiter (e.g., '\t', ',', ';').

'\t'
header bool

Whether to write column names.

True
decimal str

Decimal separator passed to pandas.

'.'
include_user bool

Whether to include the user column.

True
include_item bool

Whether to include the item column.

True
include_rating bool

Whether to include the rating column (if available).

False
include_timestamp bool

Whether to include the timestamp column (if available).

False
user_col Optional[str]

Output column name for user (optional rename).

None
item_col Optional[str]

Output column name for item (optional rename).

None
rating_col Optional[str]

Output column name for rating (optional rename).

None
timestamp_col Optional[str]

Output column name for timestamp (optional rename).

None
index bool

Whether to write the DataFrame index.

False
engine Optional[str]

Optional pandas CSV engine hint.

None
verbose bool

Whether to print a confirmation message.

True

Returns:

Type Description
None

None

Source code in datarec/io/writers/transactions/tabular.py
def write_transactions_tabular(
    data: Union["RawData", "DataRec"],
    filepath: str,
    *,
    sep: str = "\t",
    header: bool = True,
    decimal: str = ".",
    include_user: bool = True,
    include_item: bool = True,
    include_rating: bool = False,
    include_timestamp: bool = False,
    user_col: Optional[str] = None,
    item_col: Optional[str] = None,
    rating_col: Optional[str] = None,
    timestamp_col: Optional[str] = None,
    index: bool = False,
    engine: Optional[str] = None,
    verbose: bool = True,
) -> None:
    """
    Writes transactional interaction data to a tabular file (CSV/TSV/etc.).

    Output format: one interaction per row
        user, item, [rating], [timestamp]

    This writer accepts either:
    - RawData
    - DataRec (converted via `.to_rawdata()`)

    Args:
        data: RawData or DataRec instance.
        filepath: Output path.
        sep: Column delimiter (e.g., '\\t', ',', ';').
        header: Whether to write column names.
        decimal: Decimal separator passed to pandas.
        include_user: Whether to include the user column.
        include_item: Whether to include the item column.
        include_rating: Whether to include the rating column (if available).
        include_timestamp: Whether to include the timestamp column (if available).
        user_col: Output column name for user (optional rename).
        item_col: Output column name for item (optional rename).
        rating_col: Output column name for rating (optional rename).
        timestamp_col: Output column name for timestamp (optional rename).
        index: Whether to write the DataFrame index.
        engine: Optional pandas CSV engine hint.
        verbose: Whether to print a confirmation message.

    Returns:
        None
    """
    raw = as_rawdata(data)

    # Transactional exports should always contain user + item (coherent across formats)
    if not include_user:
        raise ValueError("Transactional export requires include_user=True.")
    if not include_item:
        raise ValueError("Transactional export requires include_item=True.")

    if raw.user is None:
        raise ValueError("User column is not defined in RawData.")
    if raw.item is None:
        raise ValueError("Item column is not defined in RawData.")

    cols: List[object] = [raw.user, raw.item]
    rename_map: Dict[object, str] = {}

    if user_col is not None:
        rename_map[raw.user] = user_col
    if item_col is not None:
        rename_map[raw.item] = item_col

    if include_rating:
        if raw.rating is None:
            raise ValueError("Rating column requested but not defined in RawData.")
        cols.append(raw.rating)
        if rating_col is not None:
            rename_map[raw.rating] = rating_col

    if include_timestamp:
        if raw.timestamp is None:
            raise ValueError("Timestamp column requested but not defined in RawData.")
        cols.append(raw.timestamp)
        if timestamp_col is not None:
            rename_map[raw.timestamp] = timestamp_col

    df = raw.data[cols]
    if rename_map:
        df = df.rename(columns=rename_map)

    out_dir = os.path.dirname(os.path.abspath(filepath))
    if out_dir and not os.path.exists(out_dir):
        os.makedirs(out_dir, exist_ok=True)

    to_csv_kwargs = dict(sep=sep, header=header, index=index, decimal=decimal)
    if engine is not None:
        to_csv_kwargs["engine"] = engine

    df.to_csv(filepath, **to_csv_kwargs)

    if verbose:
        print(f"Tabular dataset written to '{filepath}'")

write_transactions_json(data, filepath, *, user_col='user', item_col='item', rating_col='rating', timestamp_col='timestamp', include_user=True, include_item=True, include_rating=False, include_timestamp=False, indent=2, ensure_ascii=False, verbose=True)

Writes transactional interaction data as a single JSON array.

Source code in datarec/io/writers/transactions/json.py
def write_transactions_json(
    data: Union["RawData", "DataRec"],
    filepath: str,
    *,
    user_col: str = "user",
    item_col: str = "item",
    rating_col: str = "rating",
    timestamp_col: str = "timestamp",
    include_user: bool = True,
    include_item: bool = True,
    include_rating: bool = False,
    include_timestamp: bool = False,
    indent: Optional[int] = 2,
    ensure_ascii: bool = False,
    verbose: bool = True,
) -> None:
    """
    Writes transactional interaction data as a single JSON array.
    """
    return write_transactions_json_base(
        data,
        filepath,
        user_col=user_col,
        item_col=item_col,
        rating_col=rating_col,
        timestamp_col=timestamp_col,
        include_user=include_user,
        include_item=include_item,
        include_rating=include_rating,
        include_timestamp=include_timestamp,
        lines=False,
        indent=indent,
        ensure_ascii=ensure_ascii,
        verbose=verbose,
    )

write_transactions_jsonl(data, filepath, *, user_col='user', item_col='item', rating_col='rating', timestamp_col='timestamp', include_user=True, include_item=True, include_rating=False, include_timestamp=False, ensure_ascii=False, verbose=True)

Writes transactional interaction data as JSON Lines (one JSON object per line).

Source code in datarec/io/writers/transactions/jsonl.py
def write_transactions_jsonl(
    data: Union["RawData", "DataRec"],
    filepath: str,
    *,
    user_col: str = "user",
    item_col: str = "item",
    rating_col: str = "rating",
    timestamp_col: str = "timestamp",
    include_user: bool = True,
    include_item: bool = True,
    include_rating: bool = False,
    include_timestamp: bool = False,
    ensure_ascii: bool = False,
    verbose: bool = True,
) -> None:
    """
    Writes transactional interaction data as JSON Lines (one JSON object per line).
    """
    return write_transactions_json_base(
        data,
        filepath,
        user_col=user_col,
        item_col=item_col,
        rating_col=rating_col,
        timestamp_col=timestamp_col,
        include_user=include_user,
        include_item=include_item,
        include_rating=include_rating,
        include_timestamp=include_timestamp,
        lines=True,
        indent=None,
        ensure_ascii=ensure_ascii,
        verbose=verbose,
    )

write_transactions_blocks(data, filepath, *, block_by, event_layout, include_user=True, include_item=True, include_rating=False, include_timestamp=False, sep='\t', verbose=True)

Writes transactional interaction data to a block text format:

<BLOCK_ID>:
id
id,rating
id,rating,timestamp

The block header identifies either the item or the user depending on block_by. The event line id is the opposite entity.

Parameters:

Name Type Description Default
data Union[RawData, DataRec]

RawData or DataRec instance.

required
filepath str

Output path.

required
block_by Literal['item', 'user']

Whether blocks are grouped by "item" or by "user".

required
event_layout Literal['id', 'id,rating', 'id,rating,timestamp']

Layout of event lines.

required
include_user bool

Must be True.

True
include_item bool

Must be True.

True
include_rating bool

Must be True if event_layout includes rating.

False
include_timestamp bool

Must be True if event_layout includes timestamp.

False
sep str

Field separator used in event lines.

'\t'
verbose bool

Whether to print a confirmation message.

True
Source code in datarec/io/writers/transactions/blocks.py
def write_transactions_blocks(
    data: Union["RawData", "DataRec"],
    filepath: str,
    *,
    block_by: Literal["item", "user"],
    event_layout: Literal["id", "id,rating", "id,rating,timestamp"],
    include_user: bool = True,
    include_item: bool = True,
    include_rating: bool = False,
    include_timestamp: bool = False,
    sep: str = "\t",
    verbose: bool = True,
) -> None:
    """
    Writes transactional interaction data to a block text format:

        <BLOCK_ID>:
        id
        id,rating
        id,rating,timestamp

    The block header identifies either the item or the user depending on
    block_by. The event line id is the opposite entity.

    Args:
        data: RawData or DataRec instance.
        filepath: Output path.
        block_by: Whether blocks are grouped by "item" or by "user".
        event_layout: Layout of event lines.
        include_user: Must be True.
        include_item: Must be True.
        include_rating: Must be True if event_layout includes rating.
        include_timestamp: Must be True if event_layout includes timestamp.
        sep: Field separator used in event lines.
        verbose: Whether to print a confirmation message.
    """
    raw = as_rawdata(data)

    if not include_user:
        raise ValueError("Block format requires include_user=True.")
    if not include_item:
        raise ValueError("Block format requires include_item=True.")
    expects_rating = event_layout in ("id,rating", "id,rating,timestamp")
    expects_timestamp = event_layout == "id,rating,timestamp"
    if expects_rating and not include_rating:
        raise ValueError("event_layout includes rating; include_rating must be True.")
    if not expects_rating and include_rating:
        raise ValueError("event_layout does not include rating; include_rating must be False.")
    if expects_timestamp and not include_timestamp:
        raise ValueError("event_layout includes timestamp; include_timestamp must be True.")
    if not expects_timestamp and include_timestamp:
        raise ValueError("event_layout does not include timestamp; include_timestamp must be False.")

    if raw.user is None:
        raise ValueError("RawData.user is not defined.")
    if raw.item is None:
        raise ValueError("RawData.item is not defined.")
    if expects_timestamp and raw.timestamp is None:
        raise ValueError("RawData.timestamp is not defined but event_layout requires timestamp.")
    if expects_rating and raw.rating is None:
        raise ValueError("RawData.rating is not defined but event_layout requires rating.")

    cols = [raw.user, raw.item]
    if expects_rating:
        cols.append(raw.rating)  # type: ignore[arg-type]
    if expects_timestamp:
        cols.append(raw.timestamp)

    df = raw.data[cols]

    if df[raw.user].isna().any():
        raise ValueError("User column contains missing values; cannot write block format.")
    if df[raw.item].isna().any():
        raise ValueError("Item column contains missing values; cannot write block format.")
    if expects_timestamp and df[raw.timestamp].isna().any():  # type: ignore[index]
        raise ValueError("Timestamp column contains missing values; cannot write block format.")
    if expects_rating and df[raw.rating].isna().any():  # type: ignore[index]
        raise ValueError("Rating column contains missing values; cannot write block format.")

    if block_by == "item":
        block_col = raw.item
        other_col = raw.user
    else:
        block_col = raw.user
        other_col = raw.item

    out_dir = os.path.dirname(os.path.abspath(filepath))
    if out_dir and not os.path.exists(out_dir):
        os.makedirs(out_dir, exist_ok=True)

    with open(filepath, "w", encoding="utf-8") as f:
        for block_id, g in df.groupby(block_col, sort=False):
            f.write(f"{_stringify(block_id)}:\n")
            if event_layout == "id":
                g2 = g[[other_col]]
                for (other_id,) in g2.itertuples(index=False, name=None):
                    f.write(f"{_stringify(other_id)}\n")
            elif event_layout == "id,rating":
                g2 = g[[other_col, raw.rating]]  # type: ignore[list-item]
                for other_id, rating_val in g2.itertuples(index=False, name=None):
                    f.write(f"{_stringify(other_id)}{sep}{_stringify(rating_val)}\n")
            else:
                g2 = g[[other_col, raw.rating, raw.timestamp]]  # type: ignore[list-item]
                for other_id, rating_val, ts_val in g2.itertuples(index=False, name=None):
                    f.write(
                        f"{_stringify(other_id)}{sep}{_stringify(rating_val)}{sep}{_stringify(ts_val)}\n"
                    )

    if verbose:
        print(f"Block transactions written to '{filepath}'")

dataset_directory(dataset_name, must_exist=False)

Given the dataset name returns the dataset directory Args: dataset_name (str): name of the dataset must_exist (bool): flag for forcing to check if the folder exists

Returns:

Type Description
str

the path of the directory containing the dataset data

Source code in datarec/io/paths.py
def dataset_directory(dataset_name: str, must_exist=False) -> str:
    """
    Given the dataset name returns the dataset directory
    Args:
        dataset_name (str): name of the dataset
        must_exist (bool): flag for forcing to check if the folder exists

    Returns:
        (str): the path of the directory containing the dataset data
    """
    dataset_dir = os.path.join(cache_dir(), dataset_name)
    if must_exist and not os.path.exists(dataset_dir):
        raise FileNotFoundError(f'Directory at {dataset_dir} not found. Please, check that dataset directory exists')
    return os.path.abspath(dataset_dir)

dataset_version_directory(dataset_name, dataset_version, must_exist=False)

Given the dataset name and its version returns the dataset directory Args: dataset_name (str): name of the dataset version_name (str): version of the dataset must_exist (bool): flag for forcing to check if the folder exists

Returns:

Type Description
str

the path of the directory containing the dataset data

Source code in datarec/io/paths.py
def dataset_version_directory(dataset_name: str, dataset_version: str, must_exist=False) -> str:
    """
    Given the dataset name and its version returns the dataset directory
    Args:
        dataset_name (str): name of the dataset
        version_name (str): version of the dataset
        must_exist (bool): flag for forcing to check if the folder exists

    Returns:
        (str): the path of the directory containing the dataset data
    """
    dataset_dir = os.path.join(dataset_directory(dataset_name), dataset_version)
    if must_exist and not os.path.exists(dataset_dir):
        raise FileNotFoundError(f'Directory at {dataset_dir} not found. Please, check that dataset directory exists')
    return os.path.abspath(dataset_dir)

dataset_raw_directory(dataset_name, dataset_version=None)

Given the dataset name returns the directory containing the raw data of the dataset Args: dataset_name (str): name of the dataset dataset_version (str): version of the dataset Returns: (str): the path of the directory containing the raw data of the dataset

Source code in datarec/io/paths.py
def dataset_raw_directory(dataset_name: str, dataset_version: str=None) -> str:
    """
    Given the dataset name returns the directory containing the raw data of the dataset
    Args:
        dataset_name (str): name of the dataset
        dataset_version (str): version of the dataset
    Returns:
        (str): the path of the directory containing the raw data of the dataset
    """
    if dataset_version:
        return os.path.join(dataset_version_directory(dataset_name, dataset_version), RAW_DATA_FOLDER)
    return os.path.join(dataset_directory(dataset_name), RAW_DATA_FOLDER)

dataset_processed_directory(dataset_name)

Given the dataset name returns the directory containing the processed data of the dataset Args: dataset_name (str): name of the dataset

Returns:

Type Description
str

the path of the directory containing the processed data of the dataset

Source code in datarec/io/paths.py
def dataset_processed_directory(dataset_name: str) -> str:
    """
    Given the dataset name returns the directory containing the processed data of the dataset
    Args:
        dataset_name (str): name of the dataset

    Returns:
        (str): the path of the directory containing the processed data of the dataset
    """
    return os.path.join(dataset_directory(dataset_name), PROCESSED_DATA_FOLDER)

dataset_filepath(dataset_name)

Given the dataset name returns the path of the dataset data Args: dataset_name (str): name of the dataset

Returns:

Type Description
str

the path of the dataset data

Source code in datarec/io/paths.py
def dataset_filepath(dataset_name: str) -> str:
    """
    Given the dataset name returns the path of the dataset data
    Args:
        dataset_name (str): name of the dataset

    Returns:
        (str): the path of the dataset data
    """
    return os.path.join(dataset_directory(dataset_name), DATASET_NAME)

registry_dataset_filepath(dataset_name)

Given the dataset name returns the path of the dataset configuration file in the dataset registry Args: dataset_name (str): name of the dataset Returns: (str): the path of the dataset configuration file

Source code in datarec/io/paths.py
def registry_dataset_filepath(dataset_name: str) -> str:
    """
    Given the dataset name returns the path of the dataset configuration file in the dataset registry
    Args:
        dataset_name (str): name of the dataset
    Returns:
        (str): the path of the dataset configuration file
    """
    return os.path.join(REGISTRY_DATASETS_FOLDER, dataset_name) + '.yml'

registry_version_filepath(dataset_name, dataset_version)

Given the dataset name returns the path of the dataset configuration file in the dataset registry Args: dataset_name (str): name of the dataset dataset_version (str): version of the dataset Returns: (str): the path of the dataset configuration file

Source code in datarec/io/paths.py
def registry_version_filepath(dataset_name: str, dataset_version: str) -> str:
    """
    Given the dataset name returns the path of the dataset configuration file in the dataset registry
    Args:
        dataset_name (str): name of the dataset
        dataset_version (str): version of the dataset
    Returns:
        (str): the path of the dataset configuration file
    """
    return os.path.join(REGISTRY_VERSIONS_FOLDER, dataset_name+'_'+dataset_version) + '.yml'

registry_metrics_filepath(dataset_name, dataset_version)

Given dataset name and version, return the path of the precomputed metrics file in the registry metrics folder.

Source code in datarec/io/paths.py
def registry_metrics_filepath(dataset_name: str, dataset_version: str) -> str:
    """
    Given dataset name and version, return the path of the precomputed metrics file
    in the registry metrics folder.
    """
    return os.path.join(REGISTRY_METRICS_FOLDER, f"{dataset_name}_{dataset_version}.yml")

pickle_version_filepath(dataset_name, dataset_version)

Given the dataset name and version returns the path of the pickled version of the dataset Args: dataset_name (str): name of the dataset dataset_version (str): version of the dataset Returns: (str): the path of the pickled version of the dataset

Source code in datarec/io/paths.py
def pickle_version_filepath(dataset_name: str, dataset_version: str) -> str:
    """
    Given the dataset name and version returns the path of the pickled version of the dataset
    Args:
        dataset_name (str): name of the dataset
        dataset_version (str): version of the dataset
    Returns:
        (str): the path of the pickled version of the dataset
    """
    return os.path.join(dataset_version_directory(dataset_name=dataset_name, dataset_version=dataset_version), dataset_name+'_'+dataset_version) + '.pkl'

Framework Interoperability

This section covers the tools used to export DataRec datasets into formats compatible with other popular recommender systems libraries.

FrameworkExporter

Exporter for converting RawData datasets to external recommender system frameworks.

Provides methods to format a RawData object according to the expected schema of supported libraries (e.g., Cornac, RecBole).

Source code in datarec/io/frameworks/exporter.py
class FrameworkExporter:
    """
    Exporter for converting RawData datasets to external recommender system frameworks.

    Provides methods to format a `RawData` object according to
    the expected schema of supported libraries (e.g., Cornac, RecBole).

    """

    def __init__(self, output_path, user=True, item=True, rating=True, timestamp=False):
        """
        Initialize a FrameworkExporter object.
        Args:
            output_path (str): Path where to save the output file.
            user (bool): Whether to write the user information. If True, the user information will be written in the file.
            item (bool): Whether to write the item information. If True, the item information will be written in the file.
            rating (bool): Whether to write the rating information. If True, the rating information will be written in the file.
            timestamp (bool): Whether to write the timestamp information. If True, the timestamp information will be written in the file.
        """
        self.params = {k: v for k, v in locals().items() if k != 'self'}

        self.path = output_path
        self.user: bool = user
        self.item: bool = item
        self.rating: bool = rating
        self.timestamp: bool = timestamp

    def to_clayrs(self, data: RawData):
        """
        Export to ClayRS format.
        Args:
            data (RawData): RawData object to convert to ClayRS format.
        """
        write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                                   include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)

        ClayRS(timestamp=self.timestamp, path=self.path).info()

    def to_cornac(self, data: RawData):
        """
        Export to Cornac format.
        Args:
            data (RawData): RawData object to convert to Cornac format.
        """
        write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                                   include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)
        Cornac(timestamp=self.timestamp, path=self.path).info()

    def to_daisyrec(self, data: RawData):
        """
        Export to DaisyRec format.
        Args:
            data (RawData): RawData object to convert to DaisyRec format.
        """
        write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                                   include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)

        DaisyRec(timestamp=self.timestamp, path=self.path).info()

    def to_lenskit(self, data: RawData):
        """
        Export to LensKit format.
        Args:
            data (RawData): RawData object to convert to LensKit format.
        """
        data.data.rename(columns={data.user: "user", data.item: "item", data.rating: "rating"}, inplace=True)
        data.user = "user"
        data.item = "item"
        data.rating = "rating"

        if self.timestamp:
            data.data.rename(columns={data.timestamp: "timestamp"}, inplace=True)
            data.timestamp = "timestamp"
            data.rating = "rating"

        write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                                   include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)

        LensKit(timestamp=self.timestamp, path=self.path).info()

    def to_recbole(self, data: RawData):
        """
        Export to RecBole format.
        Args:
            data (RawData): RawData object to convert to RecBole format.
        """

        data.data.rename(columns={data.user: "user: token", data.item: "item: token",
                                  data.rating: "rating: float"}, inplace=True)
        data.user = "user: token"
        data.item = "item: token"
        data.rating = "rating: float"

        if self.timestamp:
            data.data.rename(columns={data.timestamp: "timestamp"}, inplace=True)
            data.timestamp = "timestamp:float"

        frmk = RecBole(timestamp=self.timestamp, path=self.path)
        frmk.info()
        write_transactions_tabular(data=data, filepath=frmk.path, sep=',', header=False, 
                                   include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)

    def to_rechorus(self, train_data: RawData, test_data: RawData, val_data: RawData):
        """
        Export to Rechus format.
        Args:
            train_data (RawData): Training data as RawData object to convert to Rechus format.
            test_data (RawData): Test data as RawData object to convert to Rechus format.
            val_data (RawData): Validation data as RawData object to convert to Rechus format.
        """
        # user_id	item_id	time
        if self.rating:
            print('Ratings will be interpreted as implicit interactions.')
            self.rating = False

        frmk = ReChorus(timestamp=self.timestamp, path=self.path)

        for data, name in zip([train_data, test_data, val_data], ['train.csv', 'dev.csv', 'test.csv']):
            data.data.rename(columns={data.user: "user_id", data.item: "item_id"}, inplace=True)
            data.user = "user_id"
            data.item = "item_id"

            if self.timestamp:
                data.data.rename(columns={data.timestamp: "time"}, inplace=True)
                data.timestamp = "time"

            path = os.path.join(frmk.directory, name)
            write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                                   include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)

        frmk.info()

    def to_recpack(self, data: RawData):
        """
        Export to RecPack format.
        Args:
            data (RawData): RawData object to convert to RecPack format.
        """

        if self.rating:
            print('Ratings will be interpreted as implicit interactions.')
            self.rating = False

        frmk = RecPack(timestamp=self.timestamp, path=self.path)

        data.data.rename(columns={data.user: "userId", data.item: "itemId"}, inplace=True)
        data.user = "userId"
        data.item = "itemId"
        if self.timestamp:
            data.data.rename(columns={data.timestamp: "timestamp"}, inplace=True)
            data.timestamp = "timestamp"

        write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                                   include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)


        frmk.info()

    def to_recommenders(self, data: RawData):
        """
        Export to Recommenders format.
        Args:
            data (RawData): RawData object to convert to Recommenders format.
        """

        frmk = Recommenders(timestamp=self.timestamp, path=self.path)

        data.data.rename(columns={data.user: "user", data.item: "item", data.rating: "rating"}, inplace=True)
        data.user = "item"
        data.item = "rating"
        data.rating = 'rating'
        if self.timestamp:
            data.data.rename(columns={data.timestamp: "timestamp"}, inplace=True)
            data.timestamp = "timestamp"

        write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                                   include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)


        frmk.info()

    def to_elliot(self, train_data: DataRec, test_data: DataRec, val_data: DataRec):
        """
        Export to Elliot format.
        Args:
            train_data (DataRec): Training data as DataRec object to convert to Elliot format.
            test_data (DataRec): Test data as DataRec object to convert to Elliot format.
            val_data (DataRec): Validation data as DataRec object to convert to Elliot format.
        """

        frmk = Elliot(timestamp=self.timestamp, path=self.path)

        for data, name in zip([train_data.to_rawdata(), test_data.to_rawdata(), val_data.to_rawdata()],
                              [frmk.train_path, frmk.test_path, frmk.val_path]):
            columns_order = [data.user, data.item, data.rating]
            if self.timestamp:
                columns_order.append(data.timestamp)

            write_transactions_tabular(data=data, filepath=name, sep='\t', header=False,
                          include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)

        frmk.info()
        train_data.pipeline.add_step("export", "Elliot", self.params)
        test_data.pipeline.add_step("export", "Elliot", self.params)
        val_data.pipeline.add_step("export", "Elliot", self.params)

__init__(output_path, user=True, item=True, rating=True, timestamp=False)

Initialize a FrameworkExporter object. Args: output_path (str): Path where to save the output file. user (bool): Whether to write the user information. If True, the user information will be written in the file. item (bool): Whether to write the item information. If True, the item information will be written in the file. rating (bool): Whether to write the rating information. If True, the rating information will be written in the file. timestamp (bool): Whether to write the timestamp information. If True, the timestamp information will be written in the file.

Source code in datarec/io/frameworks/exporter.py
def __init__(self, output_path, user=True, item=True, rating=True, timestamp=False):
    """
    Initialize a FrameworkExporter object.
    Args:
        output_path (str): Path where to save the output file.
        user (bool): Whether to write the user information. If True, the user information will be written in the file.
        item (bool): Whether to write the item information. If True, the item information will be written in the file.
        rating (bool): Whether to write the rating information. If True, the rating information will be written in the file.
        timestamp (bool): Whether to write the timestamp information. If True, the timestamp information will be written in the file.
    """
    self.params = {k: v for k, v in locals().items() if k != 'self'}

    self.path = output_path
    self.user: bool = user
    self.item: bool = item
    self.rating: bool = rating
    self.timestamp: bool = timestamp

to_clayrs(data)

Export to ClayRS format. Args: data (RawData): RawData object to convert to ClayRS format.

Source code in datarec/io/frameworks/exporter.py
def to_clayrs(self, data: RawData):
    """
    Export to ClayRS format.
    Args:
        data (RawData): RawData object to convert to ClayRS format.
    """
    write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                               include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)

    ClayRS(timestamp=self.timestamp, path=self.path).info()

to_cornac(data)

Export to Cornac format. Args: data (RawData): RawData object to convert to Cornac format.

Source code in datarec/io/frameworks/exporter.py
def to_cornac(self, data: RawData):
    """
    Export to Cornac format.
    Args:
        data (RawData): RawData object to convert to Cornac format.
    """
    write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                               include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)
    Cornac(timestamp=self.timestamp, path=self.path).info()

to_daisyrec(data)

Export to DaisyRec format. Args: data (RawData): RawData object to convert to DaisyRec format.

Source code in datarec/io/frameworks/exporter.py
def to_daisyrec(self, data: RawData):
    """
    Export to DaisyRec format.
    Args:
        data (RawData): RawData object to convert to DaisyRec format.
    """
    write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                               include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)

    DaisyRec(timestamp=self.timestamp, path=self.path).info()

to_lenskit(data)

Export to LensKit format. Args: data (RawData): RawData object to convert to LensKit format.

Source code in datarec/io/frameworks/exporter.py
def to_lenskit(self, data: RawData):
    """
    Export to LensKit format.
    Args:
        data (RawData): RawData object to convert to LensKit format.
    """
    data.data.rename(columns={data.user: "user", data.item: "item", data.rating: "rating"}, inplace=True)
    data.user = "user"
    data.item = "item"
    data.rating = "rating"

    if self.timestamp:
        data.data.rename(columns={data.timestamp: "timestamp"}, inplace=True)
        data.timestamp = "timestamp"
        data.rating = "rating"

    write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                               include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)

    LensKit(timestamp=self.timestamp, path=self.path).info()

to_recbole(data)

Export to RecBole format. Args: data (RawData): RawData object to convert to RecBole format.

Source code in datarec/io/frameworks/exporter.py
def to_recbole(self, data: RawData):
    """
    Export to RecBole format.
    Args:
        data (RawData): RawData object to convert to RecBole format.
    """

    data.data.rename(columns={data.user: "user: token", data.item: "item: token",
                              data.rating: "rating: float"}, inplace=True)
    data.user = "user: token"
    data.item = "item: token"
    data.rating = "rating: float"

    if self.timestamp:
        data.data.rename(columns={data.timestamp: "timestamp"}, inplace=True)
        data.timestamp = "timestamp:float"

    frmk = RecBole(timestamp=self.timestamp, path=self.path)
    frmk.info()
    write_transactions_tabular(data=data, filepath=frmk.path, sep=',', header=False, 
                               include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)

to_rechorus(train_data, test_data, val_data)

Export to Rechus format. Args: train_data (RawData): Training data as RawData object to convert to Rechus format. test_data (RawData): Test data as RawData object to convert to Rechus format. val_data (RawData): Validation data as RawData object to convert to Rechus format.

Source code in datarec/io/frameworks/exporter.py
def to_rechorus(self, train_data: RawData, test_data: RawData, val_data: RawData):
    """
    Export to Rechus format.
    Args:
        train_data (RawData): Training data as RawData object to convert to Rechus format.
        test_data (RawData): Test data as RawData object to convert to Rechus format.
        val_data (RawData): Validation data as RawData object to convert to Rechus format.
    """
    # user_id	item_id	time
    if self.rating:
        print('Ratings will be interpreted as implicit interactions.')
        self.rating = False

    frmk = ReChorus(timestamp=self.timestamp, path=self.path)

    for data, name in zip([train_data, test_data, val_data], ['train.csv', 'dev.csv', 'test.csv']):
        data.data.rename(columns={data.user: "user_id", data.item: "item_id"}, inplace=True)
        data.user = "user_id"
        data.item = "item_id"

        if self.timestamp:
            data.data.rename(columns={data.timestamp: "time"}, inplace=True)
            data.timestamp = "time"

        path = os.path.join(frmk.directory, name)
        write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                               include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)

    frmk.info()

to_recpack(data)

Export to RecPack format. Args: data (RawData): RawData object to convert to RecPack format.

Source code in datarec/io/frameworks/exporter.py
def to_recpack(self, data: RawData):
    """
    Export to RecPack format.
    Args:
        data (RawData): RawData object to convert to RecPack format.
    """

    if self.rating:
        print('Ratings will be interpreted as implicit interactions.')
        self.rating = False

    frmk = RecPack(timestamp=self.timestamp, path=self.path)

    data.data.rename(columns={data.user: "userId", data.item: "itemId"}, inplace=True)
    data.user = "userId"
    data.item = "itemId"
    if self.timestamp:
        data.data.rename(columns={data.timestamp: "timestamp"}, inplace=True)
        data.timestamp = "timestamp"

    write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                               include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)


    frmk.info()

to_recommenders(data)

Export to Recommenders format. Args: data (RawData): RawData object to convert to Recommenders format.

Source code in datarec/io/frameworks/exporter.py
def to_recommenders(self, data: RawData):
    """
    Export to Recommenders format.
    Args:
        data (RawData): RawData object to convert to Recommenders format.
    """

    frmk = Recommenders(timestamp=self.timestamp, path=self.path)

    data.data.rename(columns={data.user: "user", data.item: "item", data.rating: "rating"}, inplace=True)
    data.user = "item"
    data.item = "rating"
    data.rating = 'rating'
    if self.timestamp:
        data.data.rename(columns={data.timestamp: "timestamp"}, inplace=True)
        data.timestamp = "timestamp"

    write_transactions_tabular(data=data, filepath=self.path, sep=',', header=False, 
                               include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)


    frmk.info()

to_elliot(train_data, test_data, val_data)

Export to Elliot format. Args: train_data (DataRec): Training data as DataRec object to convert to Elliot format. test_data (DataRec): Test data as DataRec object to convert to Elliot format. val_data (DataRec): Validation data as DataRec object to convert to Elliot format.

Source code in datarec/io/frameworks/exporter.py
def to_elliot(self, train_data: DataRec, test_data: DataRec, val_data: DataRec):
    """
    Export to Elliot format.
    Args:
        train_data (DataRec): Training data as DataRec object to convert to Elliot format.
        test_data (DataRec): Test data as DataRec object to convert to Elliot format.
        val_data (DataRec): Validation data as DataRec object to convert to Elliot format.
    """

    frmk = Elliot(timestamp=self.timestamp, path=self.path)

    for data, name in zip([train_data.to_rawdata(), test_data.to_rawdata(), val_data.to_rawdata()],
                          [frmk.train_path, frmk.test_path, frmk.val_path]):
        columns_order = [data.user, data.item, data.rating]
        if self.timestamp:
            columns_order.append(data.timestamp)

        write_transactions_tabular(data=data, filepath=name, sep='\t', header=False,
                      include_user=self.user, include_item=self.item, include_rating=self.rating, include_timestamp=self.timestamp)

    frmk.info()
    train_data.pipeline.add_step("export", "Elliot", self.params)
    test_data.pipeline.add_step("export", "Elliot", self.params)
    val_data.pipeline.add_step("export", "Elliot", self.params)

Framework

Base class for all framework exporters.

Source code in datarec/io/frameworks/manager.py
class Framework:
    """
    Base class for all framework exporters.
    """
    FRAMEWORK_NAME = None

    PAPER = None

    DOI = None

    CITATION = None

    CODE = None

    REPOSITORY = None

    DOC = None

    def info_code(self):
        """
        Print example code for integrating this framework with DataRec.
        """
        print(f"How to use {self.FRAMEWORK_NAME} with DataRec:\n" + self.CODE)

    def info(self):
        """
        Print citation information for the framework including: paper name, DOI and bibtex citation.
        Print additional information such as: example code for integrating this framework with DataRec,
        repository URL and framework documentation URL.
        Returns:

        """
        if self.FRAMEWORK_NAME is None:
            raise AttributeError

        print(f"If you are going to use {self.FRAMEWORK_NAME} don't forget to cite the paper!")

        if self.PAPER:
            print(f'Paper: \'{self.PAPER}\'')
        if self.DOI:
            print(f'DOI: {self.DOI}')
        if self.CITATION:
            print(f'Bib text from dblp.org:\n {self.CITATION}')

        if self.CODE:
            print(
                '\n================================================ CODE EXAMPLE ================================================\n')
            self.info_code()
            print(
                '==============================================================================================================\n')

        if self.REPOSITORY:
            print(f'For more information check {self.FRAMEWORK_NAME} repository: \'{self.REPOSITORY}\'')

        if self.DOC:
            print(f'More documentation on how to use {self.FRAMEWORK_NAME} at \'{self.DOC}\'')

info_code()

Print example code for integrating this framework with DataRec.

Source code in datarec/io/frameworks/manager.py
def info_code(self):
    """
    Print example code for integrating this framework with DataRec.
    """
    print(f"How to use {self.FRAMEWORK_NAME} with DataRec:\n" + self.CODE)

info()

Print citation information for the framework including: paper name, DOI and bibtex citation. Print additional information such as: example code for integrating this framework with DataRec, repository URL and framework documentation URL. Returns:

Source code in datarec/io/frameworks/manager.py
def info(self):
    """
    Print citation information for the framework including: paper name, DOI and bibtex citation.
    Print additional information such as: example code for integrating this framework with DataRec,
    repository URL and framework documentation URL.
    Returns:

    """
    if self.FRAMEWORK_NAME is None:
        raise AttributeError

    print(f"If you are going to use {self.FRAMEWORK_NAME} don't forget to cite the paper!")

    if self.PAPER:
        print(f'Paper: \'{self.PAPER}\'')
    if self.DOI:
        print(f'DOI: {self.DOI}')
    if self.CITATION:
        print(f'Bib text from dblp.org:\n {self.CITATION}')

    if self.CODE:
        print(
            '\n================================================ CODE EXAMPLE ================================================\n')
        self.info_code()
        print(
            '==============================================================================================================\n')

    if self.REPOSITORY:
        print(f'For more information check {self.FRAMEWORK_NAME} repository: \'{self.REPOSITORY}\'')

    if self.DOC:
        print(f'More documentation on how to use {self.FRAMEWORK_NAME} at \'{self.DOC}\'')

ClayRS

ClayRS

Bases: Framework

ClayRS framework adapter.

Provide metadata, citation, and usage examples for ClayRS framework.

Source code in datarec/io/frameworks/clayrs/clayrs.py
class ClayRS(Framework):
    """
    ClayRS framework adapter.

    Provide metadata, citation, and usage examples for ClayRS framework.
    """

    def __init__(self, timestamp, path):
        """
        Initialize ClayRS adapter.
        Args:
            timestamp (bool): Whether timestamps are included.
            path (str): Path where the ClayRS-compatible dataset is stored.
        """
        self.timestamp = timestamp
        self.path = path

    FRAMEWORK_NAME = 'ClayRS'

    REPOSITORY = 'https://github.com/swapUniba/ClayRS/tree/master'

    PAPER = """ClayRS: An end-to-end framework for reproducible knowledge-aware recommender systems"""

    DOI = "https://doi.org/10.1016/j.is.2023.102273"

    CITATION = """
            @article{DBLP:journals/is/LopsPMSS23,
              author       = {Pasquale Lops and
                              Marco Polignano and
                              Cataldo Musto and
                              Antonio Silletti and
                              Giovanni Semeraro},
              title        = {ClayRS: An end-to-end framework for reproducible knowledge-aware recommender
                              systems},
              journal      = {Inf. Syst.},
              volume       = {119},
              pages        = {102273},
              year         = {2023},
              url          = {https://doi.org/10.1016/j.is.2023.102273},
              doi          = {10.1016/J.IS.2023.102273},
              timestamp    = {Mon, 05 Feb 2024 20:19:36 +0100},
              biburl       = {https://dblp.org/rec/journals/is/LopsPMSS23.bib},
              bibsource    = {dblp computer science bibliography, https://dblp.org}
            }"""

    CODE = """
    from clayrs import content_analyzer 

    ratings = content_analyzer.Ratings(content_analyzer.CSVFile(YOUR_PATH_HERE), timestamp_column=3)
    """

    DOC = 'https://swapuniba.github.io/ClayRS/'

    def info_code(self):
        """
        Provide the code to use in ClayRS to run experiments.
        """
        if self.timestamp:
            self.CODE = """
    from clayrs import content_analyzer 

    ratings = content_analyzer.Ratings(content_analyzer.CSVFile('{path}'), timestamp_column=3)
    """.format(path=self.path)
        else:
            self.CODE = """
    from clayrs import content_analyzer 

    ratings = content_analyzer.Ratings(content_analyzer.CSVFile('{path}'))
    """.format(path=self.path)
        super().info_code()

__init__(timestamp, path)

Initialize ClayRS adapter. Args: timestamp (bool): Whether timestamps are included. path (str): Path where the ClayRS-compatible dataset is stored.

Source code in datarec/io/frameworks/clayrs/clayrs.py
def __init__(self, timestamp, path):
    """
    Initialize ClayRS adapter.
    Args:
        timestamp (bool): Whether timestamps are included.
        path (str): Path where the ClayRS-compatible dataset is stored.
    """
    self.timestamp = timestamp
    self.path = path

info_code()

Provide the code to use in ClayRS to run experiments.

Source code in datarec/io/frameworks/clayrs/clayrs.py
def info_code(self):
    """
    Provide the code to use in ClayRS to run experiments.
    """
    if self.timestamp:
        self.CODE = """
from clayrs import content_analyzer 

ratings = content_analyzer.Ratings(content_analyzer.CSVFile('{path}'), timestamp_column=3)
""".format(path=self.path)
    else:
        self.CODE = """
from clayrs import content_analyzer 

ratings = content_analyzer.Ratings(content_analyzer.CSVFile('{path}'))
""".format(path=self.path)
    super().info_code()

Cornac

Cornac

Bases: Framework

Cornac framework adapter.

Provide metadata, citation, and usage examples for Cornac framework.

Source code in datarec/io/frameworks/cornac/cornac.py
class Cornac(Framework):
    """
    Cornac framework adapter.

    Provide metadata, citation, and usage examples for Cornac framework.
    """

    def __init__(self, timestamp, path):
        """
        Initialize Cornac adapter.
        Args:
            timestamp (bool): Whether timestamps are included.
            path (str): Path where the Cornac-compatible dataset is stored.
        """
        self.timestamp = timestamp
        self.path = path

    FRAMEWORK_NAME = 'Cornac'

    REPOSITORY = 'https://github.com/PreferredAI/cornac/tree/master'

    PAPER = """Cornac: A Comparative Framework for Multimodal Recommender Systems"""

    DOI = None

    CITATION = """
            @article{DBLP:journals/jmlr/SalahTL20,
              author       = {Aghiles Salah and
                              Quoc{-}Tuan Truong and
                              Hady W. Lauw},
              title        = {Cornac: {A} Comparative Framework for Multimodal Recommender Systems},
              journal      = {J. Mach. Learn. Res.},
              volume       = {21},
              pages        = {95:1--95:5},
              year         = {2020},
              url          = {http://jmlr.org/papers/v21/19-805.html},
              timestamp    = {Wed, 18 Nov 2020 15:58:12 +0100},
              biburl       = {https://dblp.org/rec/journals/jmlr/SalahTL20.bib},
              bibsource    = {dblp computer science bibliography, https://dblp.org}
            }"""

    CODE = """
        from cornac.data import Reader

        reader = Reader()
        train_data = reader.read(fpath='{path}', fmt="{frmt}")
    """

    DOC = 'https://cornac.preferred.ai/'

    def info_code(self):
        """
        Provide the code to use in Cornac to run experiments.
        """
        if self.timestamp:
            self.CODE = """
        from cornac.data import Reader

        reader = Reader()
        train_data = reader.read(fpath='{path}', fmt="{frmt}")
    """.format(path=self.path, frmt='UIRT')
        else:
            self.CODE = """
                from cornac.data import Reader

                reader = Reader()
                train_data = reader.read(fpath='{path}', fmt="{frmt}")
            """.format(path=self.path, frmt='UIR')
        super().info_code()

__init__(timestamp, path)

Initialize Cornac adapter. Args: timestamp (bool): Whether timestamps are included. path (str): Path where the Cornac-compatible dataset is stored.

Source code in datarec/io/frameworks/cornac/cornac.py
def __init__(self, timestamp, path):
    """
    Initialize Cornac adapter.
    Args:
        timestamp (bool): Whether timestamps are included.
        path (str): Path where the Cornac-compatible dataset is stored.
    """
    self.timestamp = timestamp
    self.path = path

info_code()

Provide the code to use in Cornac to run experiments.

Source code in datarec/io/frameworks/cornac/cornac.py
def info_code(self):
    """
    Provide the code to use in Cornac to run experiments.
    """
    if self.timestamp:
        self.CODE = """
    from cornac.data import Reader

    reader = Reader()
    train_data = reader.read(fpath='{path}', fmt="{frmt}")
""".format(path=self.path, frmt='UIRT')
    else:
        self.CODE = """
            from cornac.data import Reader

            reader = Reader()
            train_data = reader.read(fpath='{path}', fmt="{frmt}")
        """.format(path=self.path, frmt='UIR')
    super().info_code()

DaisyRec

DaisyRec

Bases: Framework

DaisyRec framework adapter.

Provide metadata, citation, and usage examples for DaisyRec framework.

Source code in datarec/io/frameworks/daisyrec/daisyrec.py
class DaisyRec(Framework):
    """
    DaisyRec framework adapter.

    Provide metadata, citation, and usage examples for DaisyRec framework.
    """

    def __init__(self, timestamp, path):
        """
        Initialize DaisyRec adapter.
        Args:
            timestamp (bool): Whether timestamps are included.
            path (str): Path where the DaisyRec-compatible dataset is stored.
        """
        self.timestamp = timestamp
        self.path = path

    FRAMEWORK_NAME = 'DaisyRec'

    REPOSITORY = 'https://github.com/recsys-benchmark/DaisyRec-v2.0'

    PAPER = """DaisyRec 2.0: Benchmarking Recommendation for Rigorous Evaluation"""

    DOI = "https://doi.org/10.1109/TPAMI.2022.3231891"

    CITATION = """
            @inproceedings{DBLP:conf/recsys/SunY00Q0G20,
              author       = {Zhu Sun and
                              Di Yu and
                              Hui Fang and
                              Jie Yang and
                              Xinghua Qu and
                              Jie Zhang and
                              Cong Geng},
              editor       = {Rodrygo L. T. Santos and
                              Leandro Balby Marinho and
                              Elizabeth M. Daly and
                              Li Chen and
                              Kim Falk and
                              Noam Koenigstein and
                              Edleno Silva de Moura},
              title        = {Are We Evaluating Rigorously? Benchmarking Recommendation for Reproducible
                              Evaluation and Fair Comparison},
              booktitle    = {RecSys 2020: Fourteenth {ACM} Conference on Recommender Systems, Virtual
                              Event, Brazil, September 22-26, 2020},
              pages        = {23--32},
              publisher    = {{ACM}},
              year         = {2020},
              url          = {https://doi.org/10.1145/3383313.3412489},
              doi          = {10.1145/3383313.3412489},
              timestamp    = {Tue, 21 Mar 2023 20:57:01 +0100},
              biburl       = {https://dblp.org/rec/conf/recsys/SunY00Q0G20.bib},
              bibsource    = {dblp computer science bibliography, https://dblp.org}
            }

            @article{DBLP:journals/pami/SunFYQLYOZ23,
              author       = {Zhu Sun and
                              Hui Fang and
                              Jie Yang and
                              Xinghua Qu and
                              Hongyang Liu and
                              Di Yu and
                              Yew{-}Soon Ong and
                              Jie Zhang},
              title        = {DaisyRec 2.0: Benchmarking Recommendation for Rigorous Evaluation},
              journal      = {{IEEE} Trans. Pattern Anal. Mach. Intell.},
              volume       = {45},
              number       = {7},
              pages        = {8206--8226},
              year         = {2023},
              url          = {https://doi.org/10.1109/TPAMI.2022.3231891},
              doi          = {10.1109/TPAMI.2022.3231891},
              timestamp    = {Fri, 07 Jul 2023 23:32:20 +0200},
              biburl       = {https://dblp.org/rec/journals/pami/SunFYQLYOZ23.bib},
              bibsource    = {dblp computer science bibliography, https://dblp.org}
            }"""

    CODE = """

    """

    DOC = 'https://daisyrec.readthedocs.io/en/latest/'

    def info_code(self):
        """
        Provide the code to use in DaisyRec to run experiments.
        """
        if self.timestamp:
            self.CODE = f"""
            In DaisyRec you need to replace the file at 
            \'daisy/utils/loader.py\'
            with the file at
            \'datarec/io/frameworks/daisyrec/loader.py\'
            Then you need to open the file, go to line 36 and change \'YOUR_PATH_HERE\' with
            \'{self.path}\'
            """
        else:
            self.CODE = f"""
            In DaisyRec you need to replace the file at 
            \'daisy/utils/loader.py\'
            with the file at
            \'datarec/io/frameworks/daisyrec/loader.py\'
            Then you need to open the file, go to line 36 and change \'YOUR_PATH_HERE\' with
            \'{self.path}\'
            Morover, from the attribute \'names\' you have to remove the timestamp.
            """
        super().info_code()

__init__(timestamp, path)

Initialize DaisyRec adapter. Args: timestamp (bool): Whether timestamps are included. path (str): Path where the DaisyRec-compatible dataset is stored.

Source code in datarec/io/frameworks/daisyrec/daisyrec.py
def __init__(self, timestamp, path):
    """
    Initialize DaisyRec adapter.
    Args:
        timestamp (bool): Whether timestamps are included.
        path (str): Path where the DaisyRec-compatible dataset is stored.
    """
    self.timestamp = timestamp
    self.path = path

info_code()

Provide the code to use in DaisyRec to run experiments.

Source code in datarec/io/frameworks/daisyrec/daisyrec.py
def info_code(self):
    """
    Provide the code to use in DaisyRec to run experiments.
    """
    if self.timestamp:
        self.CODE = f"""
        In DaisyRec you need to replace the file at 
        \'daisy/utils/loader.py\'
        with the file at
        \'datarec/io/frameworks/daisyrec/loader.py\'
        Then you need to open the file, go to line 36 and change \'YOUR_PATH_HERE\' with
        \'{self.path}\'
        """
    else:
        self.CODE = f"""
        In DaisyRec you need to replace the file at 
        \'daisy/utils/loader.py\'
        with the file at
        \'datarec/io/frameworks/daisyrec/loader.py\'
        Then you need to open the file, go to line 36 and change \'YOUR_PATH_HERE\' with
        \'{self.path}\'
        Morover, from the attribute \'names\' you have to remove the timestamp.
        """
    super().info_code()

load_rate(src='ml-100k', prepro='origin', binary=True, pos_threshold=None, level='ui')

Load certain raw data. Args: src (str): Name of dataset. prepro (str): Way to pre-process raw data input, expect 'origin', f'{N}core', f'{N}filter', N is integer value. binary (boolean): Whether to transform rating to binary label as CTR or not as Regression. pos_threshold (float): If not None, treat rating larger than this threshold as positive sample. level (str): which level to do with f'{N}core' or f'{N}filter' operation (it only works when prepro contains 'core' or 'filter').

Returns:

Type Description
Dataframe

Rating information with columns: user, item, rating, (options: timestamp).

int

The number of users in the dataset.

int

The number of items in the dataset.

Source code in datarec/io/frameworks/daisyrec/loader.py
def load_rate(src='ml-100k', prepro='origin', binary=True, pos_threshold=None, level='ui'):
    """
    Load certain raw data.
    Args:
        src (str): Name of dataset.
        prepro (str): Way to pre-process raw data input, expect 'origin', f'{N}core', f'{N}filter', N is integer value.
        binary (boolean): Whether to transform rating to binary label as CTR or not as Regression.
        pos_threshold (float): If not None, treat rating larger than this threshold as positive sample.
        level (str): which level to do with f'{N}core' or f'{N}filter' operation (it only works when prepro contains 'core' or 'filter').

    Returns:
        (pd.Dataframe): Rating information with columns: user, item, rating, (options: timestamp).
        (int): The number of users in the dataset.
        (int): The number of items in the dataset.

    """
    df = pd.DataFrame()
    # which dataset will use
    if src == 'ml-100k':
        df = pd.read_csv(f'./data/{src}/u.data', sep='\t', header=None,
                         names=['user', 'item', 'rating', 'timestamp'], engine='python')
    elif src == 'datarec':
        df = pd.read_csv('YOUR_PATH_HERE', sep='\t', header=None,
                         names=['user', 'item', 'rating', 'timestamp'], engine='python')
    elif src == 'ml-1m':
        df = pd.read_csv(f'./data/{src}/ratings.dat', sep='::', header=None,
                         names=['user', 'item', 'rating', 'timestamp'], engine='python')
        # only consider rating >=4 for data density
        df = df.query('rating >= 4').reset_index(drop=True).copy()

    elif src == 'ml-10m':
        df = pd.read_csv(f'./data/{src}/ratings.dat', sep='::', header=None,
                         names=['user', 'item', 'rating', 'timestamp'], engine='python')
        df = df.query('rating >= 4').reset_index(drop=True).copy()

    elif src == 'ml-20m':
        df = pd.read_csv(f'./data/{src}/ratings.csv')
        df.rename(columns={'userId': 'user', 'movieId': 'item'}, inplace=True)
        df = df.query('rating >= 4').reset_index(drop=True)

    elif src == 'netflix':
        cnt = 0
        tmp_file = open(f'./data/{src}/training_data.csv', 'w')
        tmp_file.write('user,item,rating,timestamp' + '\n')
        for f in os.listdir(f'./data/{src}/training_set/'):
            cnt += 1
            if cnt % 5000 == 0:
                print(f'Finish Process {cnt} file......')
            txt_file = open(f'./data/{src}/training_set/{f}', 'r')
            contents = txt_file.readlines()
            item = contents[0].strip().split(':')[0]
            for val in contents[1:]:
                user, rating, timestamp = val.strip().split(',')
                tmp_file.write(','.join([user, item, rating, timestamp]) + '\n')
            txt_file.close()

        tmp_file.close()

        df = pd.read_csv(f'./data/{src}/training_data.csv')
        df['rating'] = df.rating.astype(float)
        df['timestamp'] = pd.to_datetime(df['timestamp'])

    elif src == 'lastfm':
        # user_artists.dat
        df = pd.read_csv(f'./data/{src}/user_artists.dat', sep='\t')
        df.rename(columns={'userID': 'user', 'artistID': 'item', 'weight': 'rating'}, inplace=True)
        # treat weight as interaction, as 1
        df['rating'] = 1.0
        # fake timestamp column
        df['timestamp'] = 1

    elif src == 'book-x':
        df = pd.read_csv(f'./data/{src}/BX-Book-Ratings.csv', delimiter=";", encoding="latin1")
        df.rename(columns={'User-ID': 'user', 'ISBN': 'item', 'Book-Rating': 'rating'}, inplace=True)
        # fake timestamp column
        df['timestamp'] = 1

    elif src == 'pinterest':
        # TODO this dataset has wrong source URL, we will figure out in future
        pass

    elif src == 'amazon-cloth':
        df = pd.read_csv(f'./data/{src}/ratings_Clothing_Shoes_and_Jewelry.csv',
                         names=['user', 'item', 'rating', 'timestamp'])

    elif src == 'amazon-electronic':
        df = pd.read_csv(f'./data/{src}/ratings_Electronics.csv',
                         names=['user', 'item', 'rating', 'timestamp'])

    elif src == 'amazon-book':
        df = pd.read_csv(f'./data/{src}/ratings_Books.csv',
                         names=['user', 'item', 'rating', 'timestamp'], low_memory=False)
        df = df[df['timestamp'].str.isnumeric()].copy()
        df['timestamp'] = df['timestamp'].astype(int)

    elif src == 'amazon-music':
        df = pd.read_csv(f'./data/{src}/ratings_Digital_Music.csv',
                         names=['user', 'item', 'rating', 'timestamp'])

    elif src == 'epinions':
        d = sio.loadmat(f'./data/{src}/rating_with_timestamp.mat')
        prime = []
        for val in d['rating_with_timestamp']:
            user, item, rating, timestamp = val[0], val[1], val[3], val[5]
            prime.append([user, item, rating, timestamp])
        df = pd.DataFrame(prime, columns=['user', 'item', 'rating', 'timestamp'])
        del prime
        gc.collect()

    elif src == 'yelp':
        json_file_path = f'./data/{src}/yelp_academic_dataset_review.json'
        prime = []
        for line in open(json_file_path, 'r', encoding='UTF-8'):
            val = json.loads(line)
            prime.append([val['user_id'], val['business_id'], val['stars'], val['date']])
        df = pd.DataFrame(prime, columns=['user', 'item', 'rating', 'timestamp'])
        df['timestamp'] = pd.to_datetime(df.timestamp)
        del prime
        gc.collect()

    elif src == 'citeulike':
        user = 0
        dt = []
        for line in open(f'./data/{src}/users.dat', 'r'):
            val = line.split()
            for item in val:
                dt.append([user, item])
            user += 1
        df = pd.DataFrame(dt, columns=['user', 'item'])
        # fake timestamp column
        df['timestamp'] = 1

    else:
        raise ValueError('Invalid Dataset Error')

    # set rating >= threshold as positive samples
    if pos_threshold is not None:
        df = df.query(f'rating >= {pos_threshold}').reset_index(drop=True)

    # reset rating to interaction, here just treat all rating as 1
    if binary:
        df['rating'] = 1.0

    # which type of pre-dataset will use
    if prepro == 'origin':
        pass

    elif prepro.endswith('filter'):
        pattern = re.compile(r'\d+')
        filter_num = int(pattern.findall(prepro)[0])

        tmp1 = df.groupby(['user'], as_index=False)['item'].count()
        tmp1.rename(columns={'item': 'cnt_item'}, inplace=True)
        tmp2 = df.groupby(['item'], as_index=False)['user'].count()
        tmp2.rename(columns={'user': 'cnt_user'}, inplace=True)
        df = df.merge(tmp1, on=['user']).merge(tmp2, on=['item'])
        if level == 'ui':
            df = df.query(f'cnt_item >= {filter_num} and cnt_user >= {filter_num}').reset_index(drop=True).copy()
        elif level == 'u':
            df = df.query(f'cnt_item >= {filter_num}').reset_index(drop=True).copy()
        elif level == 'i':
            df = df.query(f'cnt_user >= {filter_num}').reset_index(drop=True).copy()
        else:
            raise ValueError(f'Invalid level value: {level}')

        df.drop(['cnt_item', 'cnt_user'], axis=1, inplace=True)
        del tmp1, tmp2
        gc.collect()

    elif prepro.endswith('core'):
        pattern = re.compile(r'\d+')
        core_num = int(pattern.findall(prepro)[0])

        def filter_user(df):
            tmp = df.groupby(['user'], as_index=False)['item'].count()
            tmp.rename(columns={'item': 'cnt_item'}, inplace=True)
            df = df.merge(tmp, on=['user'])
            df = df.query(f'cnt_item >= {core_num}').reset_index(drop=True).copy()
            df.drop(['cnt_item'], axis=1, inplace=True)

            return df

        def filter_item(df):
            tmp = df.groupby(['item'], as_index=False)['user'].count()
            tmp.rename(columns={'user': 'cnt_user'}, inplace=True)
            df = df.merge(tmp, on=['item'])
            df = df.query(f'cnt_user >= {core_num}').reset_index(drop=True).copy()
            df.drop(['cnt_user'], axis=1, inplace=True)

            return df

        if level == 'ui':
            while 1:
                df = filter_user(df)
                df = filter_item(df)
                chk_u = df.groupby('user')['item'].count()
                chk_i = df.groupby('item')['user'].count()
                if len(chk_i[chk_i < core_num]) <= 0 and len(chk_u[chk_u < core_num]) <= 0:
                    break
        elif level == 'u':
            df = filter_user(df)
        elif level == 'i':
            df = filter_item(df)
        else:
            raise ValueError(f'Invalid level value: {level}')

        gc.collect()

    else:
        raise ValueError('Invalid dataset preprocess type, origin/Ncore/Nfilter (N is int number) expected')

    # encoding user_id and item_id
    df['user'] = pd.Categorical(df['user']).codes
    df['item'] = pd.Categorical(df['item']).codes

    user_num = df['user'].nunique()
    item_num = df['item'].nunique()

    print(f'Finish loading [{src}]-[{prepro}] dataset')

    return df, user_num, item_num

get_ur(df)

Get user-rating pairs. Args: df (pd.DataFrame): Rating dataframe.

Returns:

Type Description
dict

Dictionary which stores user-items interactions.

Source code in datarec/io/frameworks/daisyrec/loader.py
def get_ur(df):
    """
    Get user-rating pairs.
    Args:
        df (pd.DataFrame): Rating dataframe.

    Returns:
        (dict): Dictionary which stores user-items interactions.

    """
    ur = defaultdict(set)
    for _, row in df.iterrows():
        ur[int(row['user'])].add(int(row['item']))

    return ur

get_ir(df)

Get item-rating pairs. Args: df (pd.DataFrame): Rating dataframe.

Returns:

Type Description
dict

Dictionary which stores item-items interactions.

Source code in datarec/io/frameworks/daisyrec/loader.py
def get_ir(df):
    """
    Get item-rating pairs.
    Args:
        df (pd.DataFrame): Rating dataframe.

    Returns:
        (dict): Dictionary which stores item-items interactions.

    """
    ir = defaultdict(set)
    for _, row in df.iterrows():
        ir[int(row['item'])].add(int(row['user']))

    return ir

build_feat_idx_dict(df, cat_cols=['user', 'item'], num_cols=[])

Encode feature mapping for FM. Args: df (pd.DataFrame): Feature dataframe. cat_cols (list): List of categorical column names. num_cols (list): List of numerical column names.

Returns:

Type Description
dict

Dictionary with index-feature column mapping information.

int

The number of features.

Source code in datarec/io/frameworks/daisyrec/loader.py
def build_feat_idx_dict(df: pd.DataFrame,
                        cat_cols: list = ['user', 'item'],
                        num_cols: list = []):
    """
    Encode feature mapping for FM.
    Args:
        df (pd.DataFrame): Feature dataframe.
        cat_cols (list): List of categorical column names.
        num_cols (list): List of numerical column names.

    Returns:
        (dict): Dictionary with index-feature column mapping information.
        (int): The number of features.

    """
    feat_idx_dict = {}
    idx = 0
    for col in cat_cols:
        feat_idx_dict[col] = idx
        idx = idx + df[col].max() + 1
    for col in num_cols:
        feat_idx_dict[col] = idx
        idx += 1
    print('Finish build feature index dictionary......')

    cnt = 0
    for col in cat_cols:
        for _ in df[col].unique():
            cnt += 1
    for _ in num_cols:
        cnt += 1
    print(f'Number of features: {cnt}')

    return feat_idx_dict, cnt

convert_npy_mat(user_num, item_num, df)

Convert pd.Dataframe to numpy matrix. Args: user_num(int): Number of users. item_num (int): Number of items. df (pd.DataFrame): Rating dataframe.

Returns:

Type Description
array

Rating matrix.

Source code in datarec/io/frameworks/daisyrec/loader.py
def convert_npy_mat(user_num, item_num, df):
    """
    Convert pd.Dataframe to numpy matrix.
    Args:
        user_num(int): Number of users.
        item_num (int): Number of items.
        df (pd.DataFrame): Rating dataframe.

    Returns:
        (np.array): Rating matrix.
    """
    mat = np.zeros((user_num, item_num))
    for _, row in df.iterrows():
        u, i, r = row['user'], row['item'], row['rating']
        mat[int(u), int(i)] = float(r)
    return mat

build_candidates_set(test_ur, train_ur, item_pool, candidates_num=1000)

Build candidate items for ranking. Args: test_ur (dict): Ground truth that represents the relationship of user and item in the test set. train_ur (dict): The relationship of user and item in the train set. item_pool (list or set): Set of all items. candidates_num (int): Number of candidates.:

Returns:

Name Type Description
test_ucands dict

Dictionary storing candidates for each user in test set.

Source code in datarec/io/frameworks/daisyrec/loader.py
def build_candidates_set(test_ur, train_ur, item_pool, candidates_num=1000):
    """
    Build candidate  items for ranking.
    Args:
        test_ur (dict): Ground truth that represents the relationship of user and item in the test set.
        train_ur (dict): The relationship of user and item in the train set.
        item_pool (list or set): Set of all items.
        candidates_num (int): Number of candidates.:

    Returns:
        test_ucands (dict): Dictionary storing candidates for each user in test set.

    """
    test_ucands = defaultdict(list)
    for k, v in test_ur.items():
        sample_num = candidates_num - len(v) if len(v) < candidates_num else 0
        sub_item_pool = item_pool - v - train_ur[k]  # remove GT & interacted
        sample_num = min(len(sub_item_pool), sample_num)
        if sample_num == 0:
            samples = random.sample(v, candidates_num)
            test_ucands[k] = list(set(samples))
        else:
            samples = random.sample(sub_item_pool, sample_num)
            test_ucands[k] = list(v | set(samples))

    return test_ucands

get_adj_mat(n_users, n_items)

Get adjacency matrix. Args: n_users (int): Number of users. n_items (int): Number of items.

Returns:

Name Type Description
adj_mat csr_matrix

Adjacency matrix.

norm_adj_mat csr_matrix

Normalized adjacency matrix.

mean_adj_mat csr_matrix

Mean adjacency matrix.

Source code in datarec/io/frameworks/daisyrec/loader.py
def get_adj_mat(n_users, n_items):
    """
    Get adjacency matrix.
    Args:
        n_users (int): Number of users.
        n_items (int): Number of items.

    Returns:
        adj_mat (sp.csr_matrix): Adjacency matrix.
        norm_adj_mat (sp.csr_matrix): Normalized adjacency matrix.
        mean_adj_mat(sp.csr_matrix): Mean adjacency matrix.

    """
    R = sp.dok_matrix((n_users, n_items), dtype=np.float32)
    adj_mat = sp.dok_matrix((n_users + n_items, n_users + n_items), dtype=np.float32)
    adj_mat = adj_mat.tolil()
    R = R.tolil()

    adj_mat[:n_users, n_users:] = R
    adj_mat[n_users:, :n_users] = R.T
    adj_mat = adj_mat.todok()
    print('already create adjacency matrix', adj_mat.shape)

    def mean_adj_single(adj):
        """
        Compute row-normalized adjacency matrix (D⁻¹A).
        Args:
            adj (sp.spmatrix): Sparse adjacency matrix.

        Returns:
            (sp.coo_matrix): Row-normalized adjacency matrix in COO format.
        """
        # D^-1 * A
        rowsum = np.array(adj.sum(1))

        d_inv = np.power(rowsum, -1).flatten()
        d_inv[np.isinf(d_inv)] = 0.
        d_mat_inv = sp.diags(d_inv)

        norm_adj = d_mat_inv.dot(adj)
        # norm_adj = adj.dot(d_mat_inv)
        print('generate single-normalized adjacency matrix.')
        return norm_adj.tocoo()

    def normalized_adj_single(adj):
        """
        Compute symmetric normalized adjacency matrix (D⁻¹/² A D⁻¹/²).
        Args:
            adj (sp.spmatrix): Sparse adjacency matrix.

        Returns:
            (sp.coo_matrix): Symmetric normalized adjacency matrix in COO format.
        """
        # D^-1/2 * A * D^-1/2
        rowsum = np.array(adj.sum(1))

        d_inv_sqrt = np.power(rowsum, -0.5).flatten()
        d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.
        d_mat_inv_sqrt = sp.diags(d_inv_sqrt)

        # bi_lap = adj.dot(d_mat_inv_sqrt).transpose().dot(d_mat_inv_sqrt)
        bi_lap = d_mat_inv_sqrt.dot(adj).dot(d_mat_inv_sqrt)
        return bi_lap.tocoo()

    def check_adj_if_equal(adj):
        """
        Check if normalized adjacency is equivalent to Laplacian-based transformation.
        Args:
            adj (sp.spmatrix): Sparse adjacency matrix.

        Returns:
            (np.ndarray): Dense matrix representing the normalized adjacency for verification

        """
        dense_A = np.array(adj.todense())
        degree = np.sum(dense_A, axis=1, keepdims=False)

        temp = np.dot(np.diag(np.power(degree, -1)), dense_A)
        print('check normalized adjacency matrix whether equal to this laplacian matrix.')
        return temp

    norm_adj_mat = mean_adj_single(adj_mat + sp.eye(adj_mat.shape[0]))
    # norm_adj_mat = normalized_adj_single(adj_mat + sp.eye(adj_mat.shape[0]))
    mean_adj_mat = mean_adj_single(adj_mat)

    print('already normalize adjacency matrix')
    return adj_mat.tocsr(), norm_adj_mat.tocsr(), mean_adj_mat.tocsr()

Elliot

Elliot

Bases: Framework

Elliot framework adapter.

Provide metadata, citation, and usage examples for Elliot framework.

Source code in datarec/io/frameworks/elliot/elliot.py
class Elliot(Framework):
    """
    Elliot framework adapter.

    Provide metadata, citation, and usage examples for Elliot framework.
    """

    def __init__(self, timestamp, path):
        """
        Initialize Elliot adapter.
        Args:
            timestamp (bool): Whether timestamps are included.
            path (str): Path where the Elliot-compatible dataset is stored.
        """
        self.timestamp = timestamp

        self.directory = os.path.abspath(path)
        if os.path.exists(self.directory) is False:
            os.makedirs(self.directory)

        self.train_path, self.test_path, self.val_path = \
            os.path.join(self.directory, 'train.tsv'), \
                os.path.join(self.directory, 'test.tsv'), \
                os.path.join(self.directory, 'validation.tsv')

        self.file = os.path.basename(path)
        self.file_path = os.path.join(self.directory, self.file)

        # create configuration file
        config_file = \
            CONF.format(path=self.file_path,
                        dataset='datarec2elliot',
                        train=self.train_path,
                        test=self.test_path,
                        val=self.val_path)

        self.config_path = os.path.join(self.directory, 'datarec_config.yml')
        with open(self.config_path, 'w') as file:
            file.write(config_file)

    FRAMEWORK_NAME = 'Elliot'

    REPOSITORY = 'https://github.com/sisinflab/elliot'

    PAPER = """Elliot: a Comprehensive and Rigorous Framework for Reproducible Recommender Systems Evaluation"""

    DOI = "https://doi.org/10.1145/3404835.3463245"

    CITATION = r"""
            @inproceedings{DBLP:conf/sigir/AnelliBFMMPDN21,
              author       = {Vito Walter Anelli and
                              Alejandro Bellog{\'{\i}}n and
                              Antonio Ferrara and
                              Daniele Malitesta and
                              Felice Antonio Merra and
                              Claudio Pomo and
                              Francesco Maria Donini and
                              Tommaso Di Noia},
              editor       = {Fernando Diaz and
                              Chirag Shah and
                              Torsten Suel and
                              Pablo Castells and
                              Rosie Jones and
                              Tetsuya Sakai},
              title        = {Elliot: {A} Comprehensive and Rigorous Framework for Reproducible
                              Recommender Systems Evaluation},
              booktitle    = {{SIGIR} '21: The 44th International {ACM} {SIGIR} Conference on Research
                              and Development in Information Retrieval, Virtual Event, Canada, July
                              11-15, 2021},
              pages        = {2405--2414},
              publisher    = {{ACM}},
              year         = {2021},
              url          = {https://doi.org/10.1145/3404835.3463245},
              doi          = {10.1145/3404835.3463245},
              timestamp    = {Sun, 12 Nov 2023 02:10:04 +0100},
              biburl       = {https://dblp.org/rec/conf/sigir/AnelliBFMMPDN21.bib},
              bibsource    = {dblp computer science bibliography, https://dblp.org}
            }"""

    CODE = "  "

    DOC = 'https://elliot.readthedocs.io/en/latest/'

    def info_code(self):
        """
        Provide the code to use in Elliot to run experiments.
        """
        self.CODE = """
            A configuration file for Elliot has been created here:
            \'{config_path}\'
            You can now run the script.
             If you move the configuration file remember to change the path in the script below.

            Elliot script:
            python start_experiments.py --config {config_path}

            This script contains a basic recommendation example. Change it if you need.
            """.format(config_path=self.config_path)

        super().info_code()

__init__(timestamp, path)

Initialize Elliot adapter. Args: timestamp (bool): Whether timestamps are included. path (str): Path where the Elliot-compatible dataset is stored.

Source code in datarec/io/frameworks/elliot/elliot.py
def __init__(self, timestamp, path):
    """
    Initialize Elliot adapter.
    Args:
        timestamp (bool): Whether timestamps are included.
        path (str): Path where the Elliot-compatible dataset is stored.
    """
    self.timestamp = timestamp

    self.directory = os.path.abspath(path)
    if os.path.exists(self.directory) is False:
        os.makedirs(self.directory)

    self.train_path, self.test_path, self.val_path = \
        os.path.join(self.directory, 'train.tsv'), \
            os.path.join(self.directory, 'test.tsv'), \
            os.path.join(self.directory, 'validation.tsv')

    self.file = os.path.basename(path)
    self.file_path = os.path.join(self.directory, self.file)

    # create configuration file
    config_file = \
        CONF.format(path=self.file_path,
                    dataset='datarec2elliot',
                    train=self.train_path,
                    test=self.test_path,
                    val=self.val_path)

    self.config_path = os.path.join(self.directory, 'datarec_config.yml')
    with open(self.config_path, 'w') as file:
        file.write(config_file)

info_code()

Provide the code to use in Elliot to run experiments.

Source code in datarec/io/frameworks/elliot/elliot.py
def info_code(self):
    """
    Provide the code to use in Elliot to run experiments.
    """
    self.CODE = """
        A configuration file for Elliot has been created here:
        \'{config_path}\'
        You can now run the script.
         If you move the configuration file remember to change the path in the script below.

        Elliot script:
        python start_experiments.py --config {config_path}

        This script contains a basic recommendation example. Change it if you need.
        """.format(config_path=self.config_path)

    super().info_code()

LensKit

LensKit

Bases: Framework

LensKit framework adapter.

Provide metadata, citation, and usage examples for LensKit framework.

Source code in datarec/io/frameworks/lenskit/lenskit.py
class LensKit(Framework):
    """
    LensKit framework adapter.

    Provide metadata, citation, and usage examples for LensKit framework.
    """

    def __init__(self, timestamp, path):
        """
        Initialize LensKit adapter.
        Args:
            timestamp (bool): Whether timestamps are included.
            path (str): Path where the LensKit-compatible dataset is stored.
        """
        self.timestamp = timestamp
        self.path = path

    FRAMEWORK_NAME = 'LensKit'

    REPOSITORY = 'https://github.com/lenskit/lkpy'

    PAPER = """LensKit for Python: Next-Generation Software for Recommender Systems Experiments"""

    DOI = "https://doi.org/10.1145/3340531.3412778"

    CITATION = """
            @inproceedings{DBLP:conf/cikm/Ekstrand20,
              author       = {Michael D. Ekstrand},
              editor       = {Mathieu d'Aquin and
                              Stefan Dietze and
                              Claudia Hauff and
                              Edward Curry and
                              Philippe Cudr{\'{e}}{-}Mauroux},
              title        = {LensKit for Python: Next-Generation Software for Recommender Systems
                              Experiments},
              booktitle    = {{CIKM} '20: The 29th {ACM} International Conference on Information
                              and Knowledge Management, Virtual Event, Ireland, October 19-23, 2020},
              pages        = {2999--3006},
              publisher    = {{ACM}},
              year         = {2020},
              url          = {https://doi.org/10.1145/3340531.3412778},
              doi          = {10.1145/3340531.3412778},
              timestamp    = {Tue, 29 Dec 2020 18:42:41 +0100},
              biburl       = {https://dblp.org/rec/conf/cikm/Ekstrand20.bib},
              bibsource    = {dblp computer science bibliography, https://dblp.org}
            }"""

    CODE = """

    """

    DOC = 'https://lkpy.lenskit.org/en/stable/'

    def info_code(self):
        """
        Provide the code to use in LensKit to run experiments.
        """
        self.CODE = """
        LensKit accepts pandas DataFrames with specific column naming. DataRec will do that for you!

        import pandas as pd

        ratings = pd.read_csv({path}, sep='\\t', header=False)
        """.format(path=self.path)

        super().info_code()

__init__(timestamp, path)

Initialize LensKit adapter. Args: timestamp (bool): Whether timestamps are included. path (str): Path where the LensKit-compatible dataset is stored.

Source code in datarec/io/frameworks/lenskit/lenskit.py
def __init__(self, timestamp, path):
    """
    Initialize LensKit adapter.
    Args:
        timestamp (bool): Whether timestamps are included.
        path (str): Path where the LensKit-compatible dataset is stored.
    """
    self.timestamp = timestamp
    self.path = path

info_code()

Provide the code to use in LensKit to run experiments.

Source code in datarec/io/frameworks/lenskit/lenskit.py
def info_code(self):
    """
    Provide the code to use in LensKit to run experiments.
    """
    self.CODE = """
    LensKit accepts pandas DataFrames with specific column naming. DataRec will do that for you!

    import pandas as pd

    ratings = pd.read_csv({path}, sep='\\t', header=False)
    """.format(path=self.path)

    super().info_code()

RecBole

RecBole

Bases: Framework

RecBole framework adapter.

Provide metadata, citation, and usage examples for RecBole framework.

Source code in datarec/io/frameworks/recbole/recbole.py
class RecBole(Framework):
    """
    RecBole framework adapter.

    Provide metadata, citation, and usage examples for RecBole framework.
    """

    def __init__(self, timestamp, path):
        """
        Initialize RecBole adapter.
        Args:
            timestamp (bool): Whether timestamps are included.
            path (str): Path where the RecBole-compatible dataset is stored.
        """
        self.timestamp = timestamp
        directory = os.path.dirname(path)
        self.directory = os.path.join(directory, 'DataRec2RecBole')
        print('RecBole requires a directory named as the the dataset.\n'
              f'Based on your path the directory that will be used is \'{self.directory}\'')
        if os.path.exists(self.directory) is False:
            os.makedirs(self.directory)
        self.path = os.path.join(self.directory, path)

    FRAMEWORK_NAME = 'RecBole'

    REPOSITORY = 'https://github.com/RUCAIBox/RecBole2.0'

    PAPER = """RecBole 2.0: Towards a More Up-to-Date Recommendation Library"""

    DOI = "https://doi.org/10.1145/3511808.3557680"

    CITATION = r"""
            @inproceedings{DBLP:conf/cikm/ZhaoMHLCPLLWTMF21,
              author       = {Wayne Xin Zhao and
                              Shanlei Mu and
                              Yupeng Hou and
                              Zihan Lin and
                              Yushuo Chen and
                              Xingyu Pan and
                              Kaiyuan Li and
                              Yujie Lu and
                              Hui Wang and
                              Changxin Tian and
                              Yingqian Min and
                              Zhichao Feng and
                              Xinyan Fan and
                              Xu Chen and
                              Pengfei Wang and
                              Wendi Ji and
                              Yaliang Li and
                              Xiaoling Wang and
                              Ji{-}Rong Wen},
              editor       = {Gianluca Demartini and
                              Guido Zuccon and
                              J. Shane Culpepper and
                              Zi Huang and
                              Hanghang Tong},
              title        = {RecBole: Towards a Unified, Comprehensive and Efficient Framework
                              for Recommendation Algorithms},
              booktitle    = {{CIKM} '21: The 30th {ACM} International Conference on Information
                              and Knowledge Management, Virtual Event, Queensland, Australia, November
                              1 - 5, 2021},
              pages        = {4653--4664},
              publisher    = {{ACM}},
              year         = {2021},
              url          = {https://doi.org/10.1145/3459637.3482016},
              doi          = {10.1145/3459637.3482016},
              timestamp    = {Tue, 07 May 2024 20:05:19 +0200},
              biburl       = {https://dblp.org/rec/conf/cikm/ZhaoMHLCPLLWTMF21.bib},
              bibsource    = {dblp computer science bibliography, https://dblp.org}
            }
            @inproceedings{DBLP:conf/cikm/ZhaoHPYZLZBTSCX22,
              author       = {Wayne Xin Zhao and
                              Yupeng Hou and
                              Xingyu Pan and
                              Chen Yang and
                              Zeyu Zhang and
                              Zihan Lin and
                              Jingsen Zhang and
                              Shuqing Bian and
                              Jiakai Tang and
                              Wenqi Sun and
                              Yushuo Chen and
                              Lanling Xu and
                              Gaowei Zhang and
                              Zhen Tian and
                              Changxin Tian and
                              Shanlei Mu and
                              Xinyan Fan and
                              Xu Chen and
                              Ji{-}Rong Wen},
              editor       = {Mohammad Al Hasan and
                              Li Xiong},
              title        = {RecBole 2.0: Towards a More Up-to-Date Recommendation Library},
              booktitle    = {Proceedings of the 31st {ACM} International Conference on Information
                              {\&} Knowledge Management, Atlanta, GA, USA, October 17-21, 2022},
              pages        = {4722--4726},
              publisher    = {{ACM}},
              year         = {2022},
              url          = {https://doi.org/10.1145/3511808.3557680},
              doi          = {10.1145/3511808.3557680},
              timestamp    = {Sun, 20 Aug 2023 12:23:03 +0200},
              biburl       = {https://dblp.org/rec/conf/cikm/ZhaoHPYZLZBTSCX22.bib},
              bibsource    = {dblp computer science bibliography, https://dblp.org}
            }"""

    CODE = """

    """

    DOC = 'https://recbole.io/'

    def info_code(self):
        """
        Provide the code to use in RecBole to run experiments.
        """
        self.CODE = """
            from recbole.data import create_dataset
            from recbole.config import Config

            config_dict = {{
                "dataset": "datarec",
                "data_path": {path},
            }}
            config = Config(config_dict=config_dict, config_file_list=config_file_list)
            dataset = create_dataset(config)
        """.format(path=self.path)

        super().info_code()

__init__(timestamp, path)

Initialize RecBole adapter. Args: timestamp (bool): Whether timestamps are included. path (str): Path where the RecBole-compatible dataset is stored.

Source code in datarec/io/frameworks/recbole/recbole.py
def __init__(self, timestamp, path):
    """
    Initialize RecBole adapter.
    Args:
        timestamp (bool): Whether timestamps are included.
        path (str): Path where the RecBole-compatible dataset is stored.
    """
    self.timestamp = timestamp
    directory = os.path.dirname(path)
    self.directory = os.path.join(directory, 'DataRec2RecBole')
    print('RecBole requires a directory named as the the dataset.\n'
          f'Based on your path the directory that will be used is \'{self.directory}\'')
    if os.path.exists(self.directory) is False:
        os.makedirs(self.directory)
    self.path = os.path.join(self.directory, path)

info_code()

Provide the code to use in RecBole to run experiments.

Source code in datarec/io/frameworks/recbole/recbole.py
def info_code(self):
    """
    Provide the code to use in RecBole to run experiments.
    """
    self.CODE = """
        from recbole.data import create_dataset
        from recbole.config import Config

        config_dict = {{
            "dataset": "datarec",
            "data_path": {path},
        }}
        config = Config(config_dict=config_dict, config_file_list=config_file_list)
        dataset = create_dataset(config)
    """.format(path=self.path)

    super().info_code()

ReChorus

ReChorus

Bases: Framework

ReChorus framework adapter.

Provide metadata, citation, and usage examples for ReChorus framework.

Source code in datarec/io/frameworks/rechorus/rechorus.py
class ReChorus(Framework):
    """
    ReChorus framework adapter.

    Provide metadata, citation, and usage examples for ReChorus framework.
    """

    def __init__(self, timestamp, path):
        """
        Initialize ReChorus adapter.
        Args:
            timestamp (bool): Whether timestamps are included.
            path (str): Path where the ReChorus-compatible dataset is stored.
        """
        self.timestamp = timestamp
        directory = os.path.dirname(path)
        self.directory = os.path.abspath(os.path.join(directory, 'DataRec2ReChorus'))
        print('RecBole requires a directory named as the the dataset.\n'
              f'Based on your path the directory that will be used is \'{self.directory}\'')
        if os.path.exists(self.directory) is False:
            os.makedirs(self.directory)

    FRAMEWORK_NAME = 'ReChorus'

    REPOSITORY = 'https://github.com/THUwangcy/ReChorus'

    PAPER = """Make It a Chorus: Knowledge- and Time-aware Item Modeling for Sequential Recommendation"""

    DOI = "https://doi.org/10.1145/3397271.3401131"

    CITATION = """
            @inproceedings{DBLP:conf/sigir/WangZMLM20,
              author       = {Chenyang Wang and
                              Min Zhang and
                              Weizhi Ma and
                              Yiqun Liu and
                              Shaoping Ma},
              editor       = {Jimmy X. Huang and
                              Yi Chang and
                              Xueqi Cheng and
                              Jaap Kamps and
                              Vanessa Murdock and
                              Ji{-}Rong Wen and
                              Yiqun Liu},
              title        = {Make It a Chorus: Knowledge- and Time-aware Item Modeling for Sequential
                              Recommendation},
              booktitle    = {Proceedings of the 43rd International {ACM} {SIGIR} conference on
                              research and development in Information Retrieval, {SIGIR} 2020, Virtual
                              Event, China, July 25-30, 2020},
              pages        = {109--118},
              publisher    = {{ACM}},
              year         = {2020},
              url          = {https://doi.org/10.1145/3397271.3401131},
              doi          = {10.1145/3397271.3401131},
              timestamp    = {Mon, 31 Oct 2022 08:39:18 +0100},
              biburl       = {https://dblp.org/rec/conf/sigir/WangZMLM20.bib},
              bibsource    = {dblp computer science bibliography, https://dblp.org}
            }"""

    CODE = """

    """

    DOC = None

    def info_code(self):
        """
        Provide the code to use in RecBole to run experiments.
        """
        self.CODE = """
            Dataset must be split and provided in a single folder within the \'data\' folder of the project.\n
            This data will be supported by ReChorus models that adopt a dataset \'BaseModel.Dataset\' \n
            DataRec created this directory here \'{directory}\'.
        """.format(directory=self.directory)

        super().info_code()

__init__(timestamp, path)

Initialize ReChorus adapter. Args: timestamp (bool): Whether timestamps are included. path (str): Path where the ReChorus-compatible dataset is stored.

Source code in datarec/io/frameworks/rechorus/rechorus.py
def __init__(self, timestamp, path):
    """
    Initialize ReChorus adapter.
    Args:
        timestamp (bool): Whether timestamps are included.
        path (str): Path where the ReChorus-compatible dataset is stored.
    """
    self.timestamp = timestamp
    directory = os.path.dirname(path)
    self.directory = os.path.abspath(os.path.join(directory, 'DataRec2ReChorus'))
    print('RecBole requires a directory named as the the dataset.\n'
          f'Based on your path the directory that will be used is \'{self.directory}\'')
    if os.path.exists(self.directory) is False:
        os.makedirs(self.directory)

info_code()

Provide the code to use in RecBole to run experiments.

Source code in datarec/io/frameworks/rechorus/rechorus.py
def info_code(self):
    """
    Provide the code to use in RecBole to run experiments.
    """
    self.CODE = """
        Dataset must be split and provided in a single folder within the \'data\' folder of the project.\n
        This data will be supported by ReChorus models that adopt a dataset \'BaseModel.Dataset\' \n
        DataRec created this directory here \'{directory}\'.
    """.format(directory=self.directory)

    super().info_code()

Recommenders

Recommenders

Bases: Framework

Recommenders framework adapter.

Provide metadata, citation, and usage examples for Recommenders framework.

Source code in datarec/io/frameworks/recommenders/recommenders.py
class Recommenders(Framework):
    """
    Recommenders framework adapter.

    Provide metadata, citation, and usage examples for Recommenders framework.
    """

    def __init__(self, timestamp, path):
        """
        Initialize Recommenders adapter.
        Args:
            timestamp (bool): Whether timestamps are included.
            path (str): Path where the Recommenders-compatible dataset is stored.
        """
        self.timestamp = timestamp
        self.directory = os.path.abspath(os.path.dirname(path))
        if os.path.exists(self.directory) is False:
            os.makedirs(self.directory)
        self.file = os.path.basename(path)
        self.file_path = os.path.join(self.directory, self.file)

    FRAMEWORK_NAME = 'Recommenders'

    REPOSITORY = 'https://github.com/recommenders-team/recommenders?tab=readme-ov-file'

    PAPER = """Microsoft recommenders: tools to accelerate developing recommender systems"""

    DOI = "https://doi.org/10.1145/3298689.3346967"

    CITATION = """
            @inproceedings{DBLP:conf/recsys/GrahamMW19,
              author       = {Scott Graham and
                              Jun{-}Ki Min and
                              Tao Wu},
              editor       = {Toine Bogers and
                              Alan Said and
                              Peter Brusilovsky and
                              Domonkos Tikk},
              title        = {Microsoft recommenders: tools to accelerate developing recommender
                              systems},
              booktitle    = {Proceedings of the 13th {ACM} Conference on Recommender Systems, RecSys
                              2019, Copenhagen, Denmark, September 16-20, 2019},
              pages        = {542--543},
              publisher    = {{ACM}},
              year         = {2019},
              url          = {https://doi.org/10.1145/3298689.3346967},
              doi          = {10.1145/3298689.3346967},
              timestamp    = {Wed, 09 Oct 2019 14:20:04 +0200},
              biburl       = {https://dblp.org/rec/conf/recsys/GrahamMW19.bib},
              bibsource    = {dblp computer science bibliography, https://dblp.org}
            }"""

    CODE = """

    """

    DOC = 'https://recommenders-team.github.io/recommenders'

    def info_code(self):
        """
        Provide the code to use in Recommenders to run experiments.
        """
        if self.timestamp:
            self.CODE = """
                import pandas as pd

                data = pd.read_csv({file}, sep="\\t", names=['user', 'item', 'rating', 'timestamp'])
                """.format(file=self.file_path)
        else:
            self.CODE = """
                import pandas as pd

                data = pd.read_csv({file}, sep="\\t", names=['user', 'item', 'rating'])
                """.format(file=self.file_path)

        super().info_code()

__init__(timestamp, path)

Initialize Recommenders adapter. Args: timestamp (bool): Whether timestamps are included. path (str): Path where the Recommenders-compatible dataset is stored.

Source code in datarec/io/frameworks/recommenders/recommenders.py
def __init__(self, timestamp, path):
    """
    Initialize Recommenders adapter.
    Args:
        timestamp (bool): Whether timestamps are included.
        path (str): Path where the Recommenders-compatible dataset is stored.
    """
    self.timestamp = timestamp
    self.directory = os.path.abspath(os.path.dirname(path))
    if os.path.exists(self.directory) is False:
        os.makedirs(self.directory)
    self.file = os.path.basename(path)
    self.file_path = os.path.join(self.directory, self.file)

info_code()

Provide the code to use in Recommenders to run experiments.

Source code in datarec/io/frameworks/recommenders/recommenders.py
def info_code(self):
    """
    Provide the code to use in Recommenders to run experiments.
    """
    if self.timestamp:
        self.CODE = """
            import pandas as pd

            data = pd.read_csv({file}, sep="\\t", names=['user', 'item', 'rating', 'timestamp'])
            """.format(file=self.file_path)
    else:
        self.CODE = """
            import pandas as pd

            data = pd.read_csv({file}, sep="\\t", names=['user', 'item', 'rating'])
            """.format(file=self.file_path)

    super().info_code()

RecPack

RecPack

Bases: Framework

RecPack framework adapter.

Provide metadata, citation, and usage examples for RecPack framework.

Source code in datarec/io/frameworks/recpack/recpack.py
class RecPack(Framework):
    """
    RecPack framework adapter.

    Provide metadata, citation, and usage examples for RecPack framework.
    """

    def __init__(self, timestamp, path):
        """
        Initialize RecPack adapter.
        Args:
            timestamp (bool): Whether timestamps are included.
            path (str): Path where the RecPack-compatible dataset is stored.
        """
        self.timestamp = timestamp
        self.directory = os.path.abspath(os.path.dirname(path))
        if os.path.exists(self.directory) is False:
            os.makedirs(self.directory)
        self.file = os.path.basename(path)
        self.file_path = os.path.join(self.directory, self.file)

    FRAMEWORK_NAME = 'RecPack'

    REPOSITORY = 'https://github.com/LienM/recpack'

    PAPER = """RecPack: An(other) Experimentation Toolkit for Top-N Recommendation using Implicit Feedback Data"""

    DOI = "https://doi.org/10.1145/3523227.3551472"

    CITATION = """
            @inproceedings{DBLP:conf/recsys/MichielsVG22,
              author       = {Lien Michiels and
                              Robin Verachtert and
                              Bart Goethals},
              editor       = {Jennifer Golbeck and
                              F. Maxwell Harper and
                              Vanessa Murdock and
                              Michael D. Ekstrand and
                              Bracha Shapira and
                              Justin Basilico and
                              Keld T. Lundgaard and
                              Even Oldridge},
              title        = {RecPack: An(other) Experimentation Toolkit for Top-N Recommendation
                              using Implicit Feedback Data},
              booktitle    = {RecSys '22: Sixteenth {ACM} Conference on Recommender Systems, Seattle,
                              WA, USA, September 18 - 23, 2022},
              pages        = {648--651},
              publisher    = {{ACM}},
              year         = {2022},
              url          = {https://doi.org/10.1145/3523227.3551472},
              doi          = {10.1145/3523227.3551472},
              timestamp    = {Mon, 01 May 2023 13:01:24 +0200},
              biburl       = {https://dblp.org/rec/conf/recsys/MichielsVG22.bib},
              bibsource    = {dblp computer science bibliography, https://dblp.org}
            }"""

    CODE = """

    """

    DOC = 'https://recpack.froomle.ai/'

    def info_code(self):
        """
        Provide the code to use in RecPack to run experiments.
        """
        self.CODE = """
            For using a dataset from DataRec you need to:
            1) copy/move the file 
            \'datarec/io/frameworks/recpack/datarec.py\'
            at \'recpack/datasets/datarec.py\'
            2) replace the content of the init file in RecPack
            \'datarec/io/frameworks/recpack/__init__.py\'
            with the content of
            \'datarec/io/frameworks/recpack/copy_me_in__init__.py\'
            Then you can use this code

            from recpack.datasets import DummyDataset
            dataset = (path={file}, filename={directory}, use_default_filters=False)
        """.format(file=self.file, directory=self.directory)

        super().info_code()

__init__(timestamp, path)

Initialize RecPack adapter. Args: timestamp (bool): Whether timestamps are included. path (str): Path where the RecPack-compatible dataset is stored.

Source code in datarec/io/frameworks/recpack/recpack.py
def __init__(self, timestamp, path):
    """
    Initialize RecPack adapter.
    Args:
        timestamp (bool): Whether timestamps are included.
        path (str): Path where the RecPack-compatible dataset is stored.
    """
    self.timestamp = timestamp
    self.directory = os.path.abspath(os.path.dirname(path))
    if os.path.exists(self.directory) is False:
        os.makedirs(self.directory)
    self.file = os.path.basename(path)
    self.file_path = os.path.join(self.directory, self.file)

info_code()

Provide the code to use in RecPack to run experiments.

Source code in datarec/io/frameworks/recpack/recpack.py
def info_code(self):
    """
    Provide the code to use in RecPack to run experiments.
    """
    self.CODE = """
        For using a dataset from DataRec you need to:
        1) copy/move the file 
        \'datarec/io/frameworks/recpack/datarec.py\'
        at \'recpack/datasets/datarec.py\'
        2) replace the content of the init file in RecPack
        \'datarec/io/frameworks/recpack/__init__.py\'
        with the content of
        \'datarec/io/frameworks/recpack/copy_me_in__init__.py\'
        Then you can use this code

        from recpack.datasets import DummyDataset
        dataset = (path={file}, filename={directory}, use_default_filters=False)
    """.format(file=self.file, directory=self.directory)

    super().info_code()

DataRec

Bases: Dataset

Base class for DataRec Datasets

Source code in datarec/io/frameworks/recpack/datarec.py
class DataRec(Dataset):
    """
    Base class for DataRec Datasets
    """
    USER_IX = "userId"
    """Name of the column in the DataFrame that contains user identifiers."""
    ITEM_IX = "itemId"
    """Name of the column in the DataFrame that contains item identifiers."""
    TIMESTAMP_IX = "timestamp"
    """Name of the column in the DataFrame that contains time of interaction in seconds since epoch."""

    @property
    def DEFAULT_FILENAME(self) -> str:
        """
        Default filename that will be used if it is not specified by the user.
        """
        return f"datarec.tsv"

    def _load_dataframe(self) -> pd.DataFrame:
        """
        Dataset from DataRec will be loaded as a pandas DataFrame

        Warning:: This does not apply any preprocessing, and returns the raw dataset.

        Returns:
            (pd.DataFrame): The interaction data as a DataFrame with a row per interaction.

        """
        df = pd.read_csv(os.path.join(self.path, self.filename), sep='\t', header=True, dtype={
                self.USER_IX: str,
                self.TIMESTAMP_IX: np.int64,
                self.ITEM_IX: str,
            })
        return df

USER_IX = 'userId' class-attribute instance-attribute

Name of the column in the DataFrame that contains user identifiers.

ITEM_IX = 'itemId' class-attribute instance-attribute

Name of the column in the DataFrame that contains item identifiers.

TIMESTAMP_IX = 'timestamp' class-attribute instance-attribute

Name of the column in the DataFrame that contains time of interaction in seconds since epoch.

DEFAULT_FILENAME property

Default filename that will be used if it is not specified by the user.