Source code for efficient_apriori.itemsets

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Implementations of algorithms related to itemsets.
"""

import itertools
import numbers
import typing
import collections
from dataclasses import field, dataclass
import collections.abc


@dataclass
class ItemsetCount:
    itemset_count: int = 0
    members: set = field(default_factory=set)


class TransactionManager:
    # The brilliant transaction manager idea is due to:
    # https://github.com/ymoch/apyori/blob/master/apyori.py

    def __init__(self, transactions: typing.Iterable[typing.Iterable[typing.Hashable]]):

        # A lookup that returns indices of transactions for each item
        self._indices_by_item = collections.defaultdict(set)

        # Populate
        i = -1
        for i, transaction in enumerate(transactions):
            for item in transaction:
                self._indices_by_item[item].add(i)

        # Total number of transactions
        self._transactions = i + 1

    @property
    def items(self):
        return set(self._indices_by_item.keys())

    def __len__(self):
        return self._transactions

    def transaction_indices(self, transaction: typing.Iterable[typing.Hashable]):
        """Return the indices of the transaction."""

        transaction = set(transaction)  # Copy
        item = transaction.pop()
        indices = self._indices_by_item[item]
        while transaction:
            item = transaction.pop()
            indices = indices.intersection(self._indices_by_item[item])
        return indices

    def transaction_indices_sc(self, transaction: typing.Iterable[typing.Hashable], min_support: float = 0):
        """Return the indices of the transaction, with short-circuiting.

        Returns (over_or_equal_to_min_support, set_of_indices)
        """

        # Sort items by number of transaction rows the item appears in,
        # starting with the item beloning to the most transactions
        transaction = sorted(transaction, key=lambda item: len(self._indices_by_item[item]), reverse=True)

        # Pop item appearing in the fewest
        item = transaction.pop()
        indices = self._indices_by_item[item]
        support = len(indices) / len(self)
        if support < min_support:
            return False, None

        # The support is a non-increasing function
        # Sorting by number of transactions the items appear in is a heuristic
        # to make the support drop as quickly as possible
        while transaction:
            item = transaction.pop()
            indices = indices.intersection(self._indices_by_item[item])
            support = len(indices) / len(self)
            if support < min_support:
                return False, None

        # No short circuit happened
        return True, indices


def join_step(itemsets: typing.List[tuple]):
    """
    Join k length itemsets into k + 1 length itemsets.

    This algorithm assumes that the list of itemsets are sorted, and that the
    itemsets themselves are sorted tuples. Instead of always enumerating all
    n^2 combinations, the algorithm only has n^2 runtime for each block of
    itemsets with the first k - 1 items equal.

    Parameters
    ----------
    itemsets : list of itemsets
        A list of itemsets of length k, to be joined to k + 1 length
        itemsets.

    Examples
    --------
    >>> # This is an example from the 1994 paper by Agrawal et al.
    >>> itemsets = [(1, 2, 3), (1, 2, 4), (1, 3, 4), (1, 3, 5), (2, 3, 4)]
    >>> list(join_step(itemsets))
    [(1, 2, 3, 4), (1, 3, 4, 5)]
    """
    i = 0
    # Iterate over every itemset in the itemsets
    while i < len(itemsets):

        # The number of rows to skip in the while-loop, initially set to 1
        skip = 1

        # Get all but the last item in the itemset, and the last item
        *itemset_first, itemset_last = itemsets[i]

        # We now iterate over every itemset following this one, stopping
        # if the first k - 1 items are not equal. If we're at (1, 2, 3),
        # we'll consider (1, 2, 4) and (1, 2, 7), but not (1, 3, 1)

        # Keep a list of all last elements, i.e. tail elements, to perform
        # 2-combinations on later on
        tail_items = [itemset_last]
        tail_items_append = tail_items.append  # Micro-optimization

        # Iterate over ever itemset following this itemset
        for j in range(i + 1, len(itemsets)):

            # Get all but the last item in the itemset, and the last item
            *itemset_n_first, itemset_n_last = itemsets[j]

            # If it's the same, append and skip this itemset in while-loop
            if itemset_first == itemset_n_first:

                # Micro-optimization
                tail_items_append(itemset_n_last)
                skip += 1

            # If it's not the same, break out of the for-loop
            else:
                break

        # For every 2-combination in the tail items, yield a new candidate
        # itemset, which is sorted.
        itemset_first_tuple = tuple(itemset_first)
        for a, b in sorted(itertools.combinations(tail_items, 2)):
            yield itemset_first_tuple + (a,) + (b,)

        # Increment the while-loop counter
        i += skip


def prune_step(itemsets: typing.Iterable[tuple], possible_itemsets: typing.List[tuple]):
    """
    Prune possible itemsets whose subsets are not in the list of itemsets.

    Parameters
    ----------
    itemsets : list of itemsets
        A list of itemsets of length k.
    possible_itemsets : list of itemsets
        A list of possible itemsets of length k + 1 to be pruned.

    Examples
    -------
    >>> itemsets = [('a', 'b', 'c'), ('a', 'b', 'd'),
    ...             ('b', 'c', 'd'), ('a', 'c', 'd')]
    >>> possible_itemsets = list(join_step(itemsets))
    >>> list(prune_step(itemsets, possible_itemsets))
    [('a', 'b', 'c', 'd')]
    """

    # For faster lookups
    itemsets = set(itemsets)

    # Go through every possible itemset
    for possible_itemset in possible_itemsets:

        # Remove 1 from the combination, same as k-1 combinations
        # The itemsets created by removing the last two items in the possible
        # itemsets must be part of the itemsets by definition,
        # due to the way the `join_step` function merges the sorted itemsets

        for i in range(len(possible_itemset) - 2):
            removed = possible_itemset[:i] + possible_itemset[i + 1 :]

            # If every k combination exists in the set of itemsets,
            # yield the possible itemset. If it does not exist, then it's
            # support cannot be large enough, since supp(A) >= supp(AB) for
            # all B, and if supp(S) is large enough, then supp(s) must be large
            # enough for every s which is a subset of S.
            # This is the downward-closure property of the support function.
            if removed not in itemsets:
                break

        # If we have not breaked yet
        else:
            yield possible_itemset


def apriori_gen(itemsets: typing.List[tuple]):
    """
    Compute all possible k + 1 length supersets from k length itemsets.

    This is done efficiently by using the downward-closure property of the
    support function, which states that if support(S) > k, then support(s) > k
    for every subset s of S.

    Parameters
    ----------
    itemsets : list of itemsets
        A list of itemsets of length k.

    Examples
    -------
    >>> # This is an example from the 1994 paper by Agrawal et al.
    >>> itemsets = [(1, 2, 3), (1, 2, 4), (1, 3, 4), (1, 3, 5), (2, 3, 4)]
    >>> possible_itemsets = list(join_step(itemsets))
    >>> list(prune_step(itemsets, possible_itemsets))
    [(1, 2, 3, 4)]
    """
    possible_extensions = join_step(itemsets)
    yield from prune_step(itemsets, possible_extensions)


[docs]def itemsets_from_transactions( transactions: typing.Iterable[typing.Union[set, tuple, list]], min_support: float, max_length: int = 8, verbosity: int = 0, output_transaction_ids: bool = False, ): """ Compute itemsets from transactions by building the itemsets bottom up and iterating over the transactions to compute the support repedately. This is the heart of the Apriori algorithm by Agrawal et al. in the 1994 paper. Parameters ---------- transactions : a list of itemsets (tuples/sets/lists with hashable entries) min_support : float The minimum support of the itemsets, i.e. the minimum frequency as a percentage. max_length : int The maximum length of the itemsets. verbosity : int The level of detail printing when the algorithm runs. Either 0, 1 or 2. output_transaction_ids : bool If set to true, the output contains the ids of transactions that contain a frequent itemset. The ids are the enumeration of the transactions in the sequence they appear. Examples -------- >>> # This is an example from the 1994 paper by Agrawal et al. >>> transactions = [(1, 3, 4), (2, 3, 5), (1, 2, 3, 5), (2, 5)] >>> itemsets, _ = itemsets_from_transactions(transactions, min_support=2/5) >>> itemsets[1] == {(1,): 2, (2,): 3, (3,): 3, (5,): 3} True >>> itemsets[2] == {(1, 3): 2, (2, 3): 2, (2, 5): 3, (3, 5): 2} True >>> itemsets[3] == {(2, 3, 5): 2} True """ # STEP 0 - Sanitize user inputs # ----------------------------- if not (isinstance(min_support, numbers.Number) and (0 <= min_support <= 1)): raise ValueError("`min_support` must be a number between 0 and 1.") # Store in transaction manager manager = TransactionManager(transactions) # If no transactions are present transaction_count = len(manager) if transaction_count == 0: return dict(), 0 # large_itemsets, num_transactions # STEP 1 - Generate all large itemsets of size 1 # ---------------------------------------------- if verbosity > 0: print("Generating itemsets.") print(" Counting itemsets of length 1.") candidates: typing.Dict[tuple, int] = {(item,): len(indices) for item, indices in manager._indices_by_item.items()} large_itemsets: typing.Dict[int, typing.Dict[tuple, int]] = { 1: {item: count for (item, count) in candidates.items() if (count / len(manager)) >= min_support} } if verbosity > 0: print(" Found {} candidate itemsets of length 1.".format(len(manager.items))) print(" Found {} large itemsets of length 1.".format(len(large_itemsets.get(1, dict())))) if verbosity > 1: print(" {}".format(list(item for item in large_itemsets.get(1, dict()).keys()))) # If large itemsets were found, convert to dictionary if not large_itemsets.get(1, dict()): return dict(), 0 # large_itemsets, num_transactions # STEP 2 - Build up the size of the itemsets # ------------------------------------------ # While there are itemsets of the previous size k = 2 while large_itemsets[k - 1] and (max_length != 1): if verbosity > 0: print(" Counting itemsets of length {}.".format(k)) # STEP 2a) - Build up candidate of larger itemsets # Retrieve the itemsets of the previous size, i.e. of size k - 1 # They must be sorted to maintain the invariant when joining/pruning itemsets_list = sorted(item for item in large_itemsets[k - 1].keys()) # Gen candidates of length k + 1 by joining, prune, and copy as set # This algorithm assumes that the list of itemsets are sorted, # and that the itemsets themselves are sorted tuples C_k: typing.List[tuple] = list(apriori_gen(itemsets_list)) if verbosity > 0: print(" Found {} candidate itemsets of length {}.".format(len(C_k), k)) if verbosity > 1: print(" {}".format(C_k)) # If no candidate itemsets were found, break out of the loop if not C_k: break # Prepare counts of candidate itemsets (from the prune step) if verbosity > 1: print(" Iterating over transactions.") # Keep only large transactions found_itemsets: typing.Dict[tuple, int] = dict() for candidate in C_k: over_min_support, indices = manager.transaction_indices_sc(candidate, min_support=min_support) if over_min_support: found_itemsets[candidate] = len(indices) # If no itemsets were found, break out of the loop if not found_itemsets: break # Candidate itemsets were found, add them large_itemsets[k] = {i: counts for (i, counts) in found_itemsets.items()} if verbosity > 0: num_found = len(large_itemsets[k]) print(" Found {} large itemsets of length {}.".format(num_found, k)) if verbosity > 1: print(" {}".format(list(large_itemsets[k].keys()))) k += 1 # Break out if we are about to consider larger itemsets than the max if k > max_length: break if verbosity > 0: print("Itemset generation terminated.\n") if output_transaction_ids: itemsets_out = { length: { item: ItemsetCount(itemset_count=count, members=manager.transaction_indices(set(item))) for (item, count) in itemsets.items() } for (length, itemsets) in large_itemsets.items() } return itemsets_out, len(manager) return large_itemsets, len(manager)
if __name__ == "__main__": import pytest pytest.main(args=[".", "--doctest-modules", "-v"])