Source code for gensim.models.ldamodel

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (C) 2011 Radim Rehurek <radimrehurek@seznam.cz>
# Licensed under the GNU LGPL v2.1 - http://www.gnu.org/licenses/lgpl.html
#
# Parts of the LDA inference code come from Dr. Hoffman's `onlineldavb.py` script,
# (C) 2010  Matthew D. Hoffman, GNU GPL 3.0


"""
**For a faster implementation of LDA (parallelized for multicore machines), see** :mod:`gensim.models.ldamulticore`.

Latent Dirichlet Allocation (LDA) in Python.

This module allows both LDA model estimation from a training corpus and inference of topic
distribution on new, unseen documents. The model can also be updated with new documents
for online training.

The core estimation code is based on the `onlineldavb.py` script by M. Hoffman [1]_, see
**Hoffman, Blei, Bach: Online Learning for Latent Dirichlet Allocation, NIPS 2010.**

The algorithm:

* is **streamed**: training documents may come in sequentially, no random access required,
* runs in **constant memory** w.r.t. the number of documents: size of the
  training corpus does not affect memory footprint, can process corpora larger than RAM, and
* is **distributed**: makes use of a cluster of machines, if available, to
  speed up model estimation.

.. [1] http://www.cs.princeton.edu/~mdhoffma

"""


import logging
import numpy  # for arrays, array broadcasting etc.
import numbers

from gensim import interfaces, utils, matutils
from itertools import chain
from scipy.special import gammaln, psi  # gamma function utils
from scipy.special import polygamma
from six.moves import xrange
import six

# log(sum(exp(x))) that tries to avoid overflow
try:
    # try importing from here if older scipy is installed
    from scipy.maxentropy import logsumexp
except ImportError:
    # maxentropy has been removed in recent releases, logsumexp now in misc
    from scipy.misc import logsumexp


logger = logging.getLogger('gensim.models.ldamodel')


def dirichlet_expectation(alpha):
    """
    For a vector `theta~Dir(alpha)`, compute `E[log(theta)]`.

    """
    if (len(alpha.shape) == 1):
        result = psi(alpha) - psi(numpy.sum(alpha))
    else:
        result = psi(alpha) - psi(numpy.sum(alpha, 1))[:, numpy.newaxis]
    return result.astype(alpha.dtype)  # keep the same precision as input


def update_dir_prior(prior, N, logphat, rho):
    """
    Updates a given prior using Newton's method, described in
    **Huang: Maximum Likelihood Estimation of Dirichlet Distribution Parameters.**
    http://jonathan-huang.org/research/dirichlet/dirichlet.pdf
    """
    dprior = numpy.copy(prior)
    gradf = N * (psi(numpy.sum(prior)) - psi(prior) + logphat)

    c = N * polygamma(1, numpy.sum(prior))
    q = -N * polygamma(1, prior)

    b = numpy.sum(gradf / q) / (1 / c + numpy.sum(1 / q))

    dprior = -(gradf - b) / q

    if all(rho * dprior + prior > 0):
        prior += rho * dprior
    else:
        logger.warning("updated prior not positive")

    return prior

def get_random_state(seed):
     """ Turn seed into a np.random.RandomState instance.

         Method originally from maciejkula/glove-python, and written by @joshloyal
     """
     if seed is None or seed is numpy.random:
         return numpy.random.mtrand._rand
     if isinstance(seed, (numbers.Integral, numpy.integer)):
         return numpy.random.RandomState(seed)
     if isinstance(seed, numpy.random.RandomState):
        return seed
     raise ValueError('%r cannot be used to seed a numpy.random.RandomState'
                      ' instance' % seed)

class LdaState(utils.SaveLoad):
    """
    Encapsulate information for distributed computation of LdaModel objects.

    Objects of this class are sent over the network, so try to keep them lean to
    reduce traffic.

    """
    def __init__(self, eta, shape):
        self.eta = eta
        self.sstats = numpy.zeros(shape)
        self.numdocs = 0

    def reset(self):
        """
        Prepare the state for a new EM iteration (reset sufficient stats).

        """
        self.sstats[:] = 0.0
        self.numdocs = 0

    def merge(self, other):
        """
        Merge the result of an E step from one node with that of another node
        (summing up sufficient statistics).

        The merging is trivial and after merging all cluster nodes, we have the
        exact same result as if the computation was run on a single node (no
        approximation).

        """
        assert other is not None
        self.sstats += other.sstats
        self.numdocs += other.numdocs

    def blend(self, rhot, other, targetsize=None):
        """
        Given LdaState `other`, merge it with the current state. Stretch both to
        `targetsize` documents before merging, so that they are of comparable
        magnitude.

        Merging is done by average weighting: in the extremes, `rhot=0.0` means
        `other` is completely ignored; `rhot=1.0` means `self` is completely ignored.

        This procedure corresponds to the stochastic gradient update from Hoffman
        et al., algorithm 2 (eq. 14).

        """
        assert other is not None
        if targetsize is None:
            targetsize = self.numdocs

        # stretch the current model's expected n*phi counts to target size
        if self.numdocs == 0 or targetsize == self.numdocs:
            scale = 1.0
        else:
            scale = 1.0 * targetsize / self.numdocs
        self.sstats *= (1.0 - rhot) * scale

        # stretch the incoming n*phi counts to target size
        if other.numdocs == 0 or targetsize == other.numdocs:
            scale = 1.0
        else:
            logger.info("merging changes from %i documents into a model of %i documents",
                        other.numdocs, targetsize)
            scale = 1.0 * targetsize / other.numdocs
        self.sstats += rhot * scale * other.sstats

        self.numdocs = targetsize

    def blend2(self, rhot, other, targetsize=None):
        """
        Alternative, more simple blend.
        """
        assert other is not None
        if targetsize is None:
            targetsize = self.numdocs

        # merge the two matrices by summing
        self.sstats += other.sstats
        self.numdocs = targetsize

    def get_lambda(self):
        return self.eta + self.sstats

    def get_Elogbeta(self):
        return dirichlet_expectation(self.get_lambda())
# endclass LdaState


[docs]class LdaModel(interfaces.TransformationABC): """ The constructor estimates Latent Dirichlet Allocation model parameters based on a training corpus: >>> lda = LdaModel(corpus, num_topics=10) You can then infer topic distributions on new, unseen documents, with >>> doc_lda = lda[doc_bow] The model can be updated (trained) with new documents via >>> lda.update(other_corpus) Model persistency is achieved through its `load`/`save` methods. """
[docs] def __init__(self, corpus=None, num_topics=100, id2word=None, distributed=False, chunksize=2000, passes=1, update_every=1, alpha='symmetric', eta=None, decay=0.5, offset=1.0, eval_every=10, iterations=50, gamma_threshold=0.001, minimum_probability=0.01, random_state=None, ns_conf={}): """ If given, start training from the iterable `corpus` straight away. If not given, the model is left untrained (presumably because you want to call `update()` manually). `num_topics` is the number of requested latent topics to be extracted from the training corpus. `id2word` is a mapping from word ids (integers) to words (strings). It is used to determine the vocabulary size, as well as for debugging and topic printing. `alpha` and `eta` are hyperparameters that affect sparsity of the document-topic (theta) and topic-word (lambda) distributions. Both default to a symmetric 1.0/num_topics prior. `alpha` can be set to an explicit array = prior of your choice. It also support special values of 'asymmetric' and 'auto': the former uses a fixed normalized asymmetric 1.0/topicno prior, the latter learns an asymmetric prior directly from your data. `eta` can be a scalar for a symmetric prior over topic/word distributions, or a matrix of shape num_topics x num_words, which can be used to impose asymmetric priors over the word distribution on a per-topic basis. This may be useful if you want to seed certain topics with particular words by boosting the priors for those words. It also supports the special value 'auto', which learns an asymmetric prior directly from your data. Turn on `distributed` to force distributed computing (see the `web tutorial <http://radimrehurek.com/gensim/distributed.html>`_ on how to set up a cluster of machines for gensim). Calculate and log perplexity estimate from the latest mini-batch every `eval_every` model updates (setting this to 1 slows down training ~2x; default is 10 for better performance). Set to None to disable perplexity estimation. `decay` and `offset` parameters are the same as Kappa and Tau_0 in Hoffman et al, respectively. `minimum_probability` controls filtering the topics returned for a document (bow). `random_state` can be a numpy.random.RandomState object or the seed for one Example: >>> lda = LdaModel(corpus, num_topics=100) # train model >>> print(lda[doc_bow]) # get topic probability distribution for a document >>> lda.update(corpus2) # update the LDA model with additional documents >>> print(lda[doc_bow]) >>> lda = LdaModel(corpus, num_topics=50, alpha='auto', eval_every=5) # train asymmetric alpha from data """ # store user-supplied parameters self.id2word = id2word if corpus is None and self.id2word is None: raise ValueError('at least one of corpus/id2word must be specified, to establish input space dimensionality') if self.id2word is None: logger.warning("no word id mapping provided; initializing from corpus, assuming identity") self.id2word = utils.dict_from_corpus(corpus) self.num_terms = len(self.id2word) elif len(self.id2word) > 0: self.num_terms = 1 + max(self.id2word.keys()) else: self.num_terms = 0 if self.num_terms == 0: raise ValueError("cannot compute LDA over an empty collection (no terms)") self.distributed = bool(distributed) self.num_topics = int(num_topics) self.chunksize = chunksize self.decay = decay self.offset = offset self.minimum_probability = minimum_probability self.num_updates = 0 self.passes = passes self.update_every = update_every self.eval_every = eval_every self.alpha, self.optimize_alpha = self.init_dir_prior(alpha, 'alpha') assert self.alpha.shape == (self.num_topics,), "Invalid alpha shape. Got shape %s, but expected (%d, )" % (str(self.alpha.shape), self.num_topics) self.eta, self.optimize_eta = self.init_dir_prior(eta, 'eta') self.random_state = get_random_state(random_state) assert (self.eta.shape == (self.num_topics, 1) or self.eta.shape == (self.num_topics, self.num_terms)), ( "Invalid eta shape. Got shape %s, but expected (%d, 1) or (%d, %d)" % (str(self.eta.shape), self.num_topics, self.num_topics, self.num_terms)) # VB constants self.iterations = iterations self.gamma_threshold = gamma_threshold # set up distributed environment if necessary if not distributed: logger.info("using serial LDA version on this node") self.dispatcher = None self.numworkers = 1 else: if self.optimize_alpha: raise NotImplementedError("auto-optimizing alpha not implemented in distributed LDA") # set up distributed version try: import Pyro4 with utils.getNS(**ns_conf) as ns: from gensim.models.lda_dispatcher import LDA_DISPATCHER_PREFIX self.dispatcher = Pyro4.Proxy(ns.list(prefix=LDA_DISPATCHER_PREFIX)[LDA_DISPATCHER_PREFIX]) logger.debug("looking for dispatcher at %s" % str(self.dispatcher._pyroUri)) self.dispatcher.initialize(id2word=self.id2word, num_topics=self.num_topics, chunksize=chunksize, alpha=alpha, eta=eta, distributed=False) self.numworkers = len(self.dispatcher.getworkers()) logger.info("using distributed version with %i workers" % self.numworkers) except Exception as err: logger.error("failed to initialize distributed LDA (%s)", err) raise RuntimeError("failed to initialize distributed LDA (%s)" % err) # Initialize the variational distribution q(beta|lambda) self.state = LdaState(self.eta, (self.num_topics, self.num_terms)) self.state.sstats = self.random_state.gamma(100., 1. / 100., (self.num_topics, self.num_terms)) self.expElogbeta = numpy.exp(dirichlet_expectation(self.state.sstats)) # if a training corpus was provided, start estimating the model right away if corpus is not None: use_numpy = self.dispatcher is not None self.update(corpus, chunks_as_numpy=use_numpy)
[docs] def init_dir_prior(self, prior, name): if prior is None: prior = 'symmetric' is_auto = False if isinstance(prior, six.string_types): if prior == 'symmetric': logger.info("using symmetric %s at %s", name, 1.0 / self.num_topics) init_prior = numpy.asarray([1.0 / self.num_topics for i in xrange(self.num_topics)]) elif prior == 'asymmetric': init_prior = numpy.asarray([1.0 / (i + numpy.sqrt(self.num_topics)) for i in xrange(self.num_topics)]) init_prior /= init_prior.sum() logger.info("using asymmetric %s %s", name, list(init_prior)) elif prior == 'auto': is_auto = True init_prior = numpy.asarray([1.0 / self.num_topics for i in xrange(self.num_topics)]) logger.info("using autotuned %s, starting with %s", name, list(init_prior)) else: raise ValueError("Unable to determine proper %s value given '%s'" % (name, prior)) elif isinstance(prior, list): init_prior = numpy.asarray(prior) elif isinstance(prior, numpy.ndarray): init_prior = prior elif isinstance(prior, numpy.number) or isinstance(prior, numbers.Real): init_prior = numpy.asarray([prior] * self.num_topics) else: raise ValueError("%s must be either a numpy array of scalars, list of scalars, or scalar" % name) if name == 'eta': # please note the difference in shapes between alpha and eta: # alpha is a row: [0.1, 0.1] # eta is a column: [[0.1], # [0.1]] if init_prior.shape == (self.num_topics,) or init_prior.shape == (1, self.num_topics): init_prior = init_prior.reshape((self.num_topics, 1)) # this statement throws ValueError if eta did not match self.num_topics return init_prior, is_auto
def __str__(self): return "LdaModel(num_terms=%s, num_topics=%s, decay=%s, chunksize=%s)" % \ (self.num_terms, self.num_topics, self.decay, self.chunksize)
[docs] def sync_state(self): self.expElogbeta = numpy.exp(self.state.get_Elogbeta())
[docs] def clear(self): """Clear model state (free up some memory). Used in the distributed algo.""" self.state = None self.Elogbeta = None
[docs] def inference(self, chunk, collect_sstats=False): """ Given a chunk of sparse document vectors, estimate gamma (parameters controlling the topic weights) for each document in the chunk. This function does not modify the model (=is read-only aka const). The whole input chunk of document is assumed to fit in RAM; chunking of a large corpus must be done earlier in the pipeline. If `collect_sstats` is True, also collect sufficient statistics needed to update the model's topic-word distributions, and return a 2-tuple `(gamma, sstats)`. Otherwise, return `(gamma, None)`. `gamma` is of shape `len(chunk) x self.num_topics`. Avoids computing the `phi` variational parameter directly using the optimization presented in **Lee, Seung: Algorithms for non-negative matrix factorization, NIPS 2001**. """ try: _ = len(chunk) except: # convert iterators/generators to plain list, so we have len() etc. chunk = list(chunk) if len(chunk) > 1: logger.debug("performing inference on a chunk of %i documents", len(chunk)) # Initialize the variational distribution q(theta|gamma) for the chunk gamma = self.random_state.gamma(100., 1. / 100., (len(chunk), self.num_topics)) Elogtheta = dirichlet_expectation(gamma) expElogtheta = numpy.exp(Elogtheta) if collect_sstats: sstats = numpy.zeros_like(self.expElogbeta) else: sstats = None converged = 0 # Now, for each document d update that document's gamma and phi # Inference code copied from Hoffman's `onlineldavb.py` (esp. the # Lee&Seung trick which speeds things up by an order of magnitude, compared # to Blei's original LDA-C code, cool!). for d, doc in enumerate(chunk): if doc and not isinstance(doc[0][0], six.integer_types): # make sure the term IDs are ints, otherwise numpy will get upset ids = [int(id) for id, _ in doc] else: ids = [id for id, _ in doc] cts = numpy.array([cnt for _, cnt in doc]) gammad = gamma[d, :] Elogthetad = Elogtheta[d, :] expElogthetad = expElogtheta[d, :] expElogbetad = self.expElogbeta[:, ids] # The optimal phi_{dwk} is proportional to expElogthetad_k * expElogbetad_w. # phinorm is the normalizer. # TODO treat zeros explicitly, instead of adding 1e-100? phinorm = numpy.dot(expElogthetad, expElogbetad) + 1e-100 # Iterate between gamma and phi until convergence for _ in xrange(self.iterations): lastgamma = gammad # We represent phi implicitly to save memory and time. # Substituting the value of the optimal phi back into # the update for gamma gives this update. Cf. Lee&Seung 2001. gammad = self.alpha + expElogthetad * numpy.dot(cts / phinorm, expElogbetad.T) Elogthetad = dirichlet_expectation(gammad) expElogthetad = numpy.exp(Elogthetad) phinorm = numpy.dot(expElogthetad, expElogbetad) + 1e-100 # If gamma hasn't changed much, we're done. meanchange = numpy.mean(abs(gammad - lastgamma)) if (meanchange < self.gamma_threshold): converged += 1 break gamma[d, :] = gammad if collect_sstats: # Contribution of document d to the expected sufficient # statistics for the M step. sstats[:, ids] += numpy.outer(expElogthetad.T, cts / phinorm) if len(chunk) > 1: logger.debug("%i/%i documents converged within %i iterations", converged, len(chunk), self.iterations) if collect_sstats: # This step finishes computing the sufficient statistics for the # M step, so that # sstats[k, w] = \sum_d n_{dw} * phi_{dwk} # = \sum_d n_{dw} * exp{Elogtheta_{dk} + Elogbeta_{kw}} / phinorm_{dw}. sstats *= self.expElogbeta return gamma, sstats
[docs] def do_estep(self, chunk, state=None): """ Perform inference on a chunk of documents, and accumulate the collected sufficient statistics in `state` (or `self.state` if None). """ if state is None: state = self.state gamma, sstats = self.inference(chunk, collect_sstats=True) state.sstats += sstats state.numdocs += gamma.shape[0] # avoids calling len(chunk) on a generator return gamma
[docs] def update_alpha(self, gammat, rho): """ Update parameters for the Dirichlet prior on the per-document topic weights `alpha` given the last `gammat`. """ N = float(len(gammat)) logphat = sum(dirichlet_expectation(gamma) for gamma in gammat) / N self.alpha = update_dir_prior(self.alpha, N, logphat, rho) logger.info("optimized alpha %s", list(self.alpha)) return self.alpha
[docs] def update_eta(self, lambdat, rho): """ Update parameters for the Dirichlet prior on the per-topic word weights `eta` given the last `lambdat`. """ if self.eta.shape[1] != 1: raise ValueError("Can't use update_eta with eta matrices, only column vectors.") N = float(lambdat.shape[1]) logphat = (sum(dirichlet_expectation(lambda_) for lambda_ in lambdat.transpose()) / N).reshape((self.num_topics, 1)) self.eta = update_dir_prior(self.eta, N, logphat, rho) logger.info("optimized eta %s", list(self.eta.reshape((self.num_topics)))) return self.eta
[docs] def log_perplexity(self, chunk, total_docs=None): """ Calculate and return per-word likelihood bound, using the `chunk` of documents as evaluation corpus. Also output the calculated statistics. incl. perplexity=2^(-bound), to log at INFO level. """ if total_docs is None: total_docs = len(chunk) corpus_words = sum(cnt for document in chunk for _, cnt in document) subsample_ratio = 1.0 * total_docs / len(chunk) perwordbound = self.bound(chunk, subsample_ratio=subsample_ratio) / (subsample_ratio * corpus_words) logger.info("%.3f per-word bound, %.1f perplexity estimate based on a held-out corpus of %i documents with %i words" % (perwordbound, numpy.exp2(-perwordbound), len(chunk), corpus_words)) return perwordbound
[docs] def update(self, corpus, chunksize=None, decay=None, offset=None, passes=None, update_every=None, eval_every=None, iterations=None, gamma_threshold=None, chunks_as_numpy=False): """ Train the model with new documents, by EM-iterating over `corpus` until the topics converge (or until the maximum number of allowed iterations is reached). `corpus` must be an iterable (repeatable stream of documents), In distributed mode, the E step is distributed over a cluster of machines. This update also supports updating an already trained model (`self`) with new documents from `corpus`; the two models are then merged in proportion to the number of old vs. new documents. This feature is still experimental for non-stationary input streams. For stationary input (no topic drift in new documents), on the other hand, this equals the online update of Hoffman et al. and is guaranteed to converge for any `decay` in (0.5, 1.0>. Additionally, for smaller `corpus` sizes, an increasing `offset` may be beneficial (see Table 1 in Hoffman et al.) Args: corpus (gensim corpus): The corpus with which the LDA model should be updated. chunks_as_numpy (bool): Whether each chunk passed to `.inference` should be a numpy array of not. Numpy can in some settings turn the term IDs into floats, these will be converted back into integers in inference, which incurs a performance hit. For distributed computing it may be desirable to keep the chunks as numpy arrays. For other parameter settings, see :class:`LdaModel` constructor. """ # use parameters given in constructor, unless user explicitly overrode them if decay is None: decay = self.decay if offset is None: offset = self.offset if passes is None: passes = self.passes if update_every is None: update_every = self.update_every if eval_every is None: eval_every = self.eval_every if iterations is None: iterations = self.iterations if gamma_threshold is None: gamma_threshold = self.gamma_threshold try: lencorpus = len(corpus) except: logger.warning("input corpus stream has no len(); counting documents") lencorpus = sum(1 for _ in corpus) if lencorpus == 0: logger.warning("LdaModel.update() called with an empty corpus") return if chunksize is None: chunksize = min(lencorpus, self.chunksize) self.state.numdocs += lencorpus if update_every: updatetype = "online" updateafter = min(lencorpus, update_every * self.numworkers * chunksize) else: updatetype = "batch" updateafter = lencorpus evalafter = min(lencorpus, (eval_every or 0) * self.numworkers * chunksize) updates_per_pass = max(1, lencorpus / updateafter) logger.info("running %s LDA training, %s topics, %i passes over " "the supplied corpus of %i documents, updating model once " "every %i documents, evaluating perplexity every %i documents, " "iterating %ix with a convergence threshold of %f", updatetype, self.num_topics, passes, lencorpus, updateafter, evalafter, iterations, gamma_threshold) if updates_per_pass * passes < 10: logger.warning("too few updates, training might not converge; consider " "increasing the number of passes or iterations to improve accuracy") # rho is the "speed" of updating; TODO try other fncs # pass_ + num_updates handles increasing the starting t for each pass, # while allowing it to "reset" on the first pass of each update def rho(): return pow(offset + pass_ + (self.num_updates / chunksize), -decay) for pass_ in xrange(passes): if self.dispatcher: logger.info('initializing %s workers' % self.numworkers) self.dispatcher.reset(self.state) else: other = LdaState(self.eta, self.state.sstats.shape) dirty = False reallen = 0 for chunk_no, chunk in enumerate(utils.grouper(corpus, chunksize, as_numpy=chunks_as_numpy)): reallen += len(chunk) # keep track of how many documents we've processed so far if eval_every and ((reallen == lencorpus) or ((chunk_no + 1) % (eval_every * self.numworkers) == 0)): self.log_perplexity(chunk, total_docs=lencorpus) if self.dispatcher: # add the chunk to dispatcher's job queue, so workers can munch on it logger.info('PROGRESS: pass %i, dispatching documents up to #%i/%i', pass_, chunk_no * chunksize + len(chunk), lencorpus) # this will eventually block until some jobs finish, because the queue has a small finite length self.dispatcher.putjob(chunk) else: logger.info('PROGRESS: pass %i, at document #%i/%i', pass_, chunk_no * chunksize + len(chunk), lencorpus) gammat = self.do_estep(chunk, other) if self.optimize_alpha: self.update_alpha(gammat, rho()) dirty = True del chunk # perform an M step. determine when based on update_every, don't do this after every chunk if update_every and (chunk_no + 1) % (update_every * self.numworkers) == 0: if self.dispatcher: # distributed mode: wait for all workers to finish logger.info("reached the end of input; now waiting for all remaining jobs to finish") other = self.dispatcher.getstate() self.do_mstep(rho(), other, pass_ > 0) del other # frees up memory if self.dispatcher: logger.info('initializing workers') self.dispatcher.reset(self.state) else: other = LdaState(self.eta, self.state.sstats.shape) dirty = False # endfor single corpus iteration if reallen != lencorpus: raise RuntimeError("input corpus size changed during training (don't use generators as input)") if dirty: # finish any remaining updates if self.dispatcher: # distributed mode: wait for all workers to finish logger.info("reached the end of input; now waiting for all remaining jobs to finish") other = self.dispatcher.getstate() self.do_mstep(rho(), other, pass_ > 0) del other dirty = False
# endfor entire corpus update
[docs] def do_mstep(self, rho, other, extra_pass=False): """ M step: use linear interpolation between the existing topics and collected sufficient statistics in `other` to update the topics. """ logger.debug("updating topics") # update self with the new blend; also keep track of how much did # the topics change through this update, to assess convergence diff = numpy.log(self.expElogbeta) self.state.blend(rho, other) diff -= self.state.get_Elogbeta() self.sync_state() # print out some debug info at the end of each EM iteration self.print_topics(5) logger.info("topic diff=%f, rho=%f", numpy.mean(numpy.abs(diff)), rho) if self.optimize_eta: self.update_eta(self.state.get_lambda(), rho) if not extra_pass: # only update if this isn't an additional pass self.num_updates += other.numdocs
[docs] def bound(self, corpus, gamma=None, subsample_ratio=1.0): """ Estimate the variational bound of documents from `corpus`: E_q[log p(corpus)] - E_q[log q(corpus)] `gamma` are the variational parameters on topic weights for each `corpus` document (=2d matrix=what comes out of `inference()`). If not supplied, will be inferred from the model. """ score = 0.0 _lambda = self.state.get_lambda() Elogbeta = dirichlet_expectation(_lambda) for d, doc in enumerate(corpus): # stream the input doc-by-doc, in case it's too large to fit in RAM if d % self.chunksize == 0: logger.debug("bound: at document #%i", d) if gamma is None: gammad, _ = self.inference([doc]) else: gammad = gamma[d] Elogthetad = dirichlet_expectation(gammad) # E[log p(doc | theta, beta)] score += numpy.sum(cnt * logsumexp(Elogthetad + Elogbeta[:, id]) for id, cnt in doc) # E[log p(theta | alpha) - log q(theta | gamma)]; assumes alpha is a vector score += numpy.sum((self.alpha - gammad) * Elogthetad) score += numpy.sum(gammaln(gammad) - gammaln(self.alpha)) score += gammaln(numpy.sum(self.alpha)) - gammaln(numpy.sum(gammad)) # compensate likelihood for when `corpus` above is only a sample of the whole corpus score *= subsample_ratio # E[log p(beta | eta) - log q (beta | lambda)]; assumes eta is a scalar score += numpy.sum((self.eta - _lambda) * Elogbeta) score += numpy.sum(gammaln(_lambda) - gammaln(self.eta)) if numpy.ndim(self.eta) == 0: sum_eta = self.eta * self.num_terms else: sum_eta = numpy.sum(self.eta, 1) score += numpy.sum(gammaln(sum_eta) - gammaln(numpy.sum(_lambda, 1))) return score
[docs] def print_topics(self, num_topics=10, num_words=10): return self.show_topics(num_topics, num_words, log=True)
[docs] def show_topics(self, num_topics=10, num_words=10, log=False, formatted=True): """ For `num_topics` number of topics, return `num_words` most significant words (10 words per topic, by default). The topics are returned as a list -- a list of strings if `formatted` is True, or a list of `(word, probability)` 2-tuples if False. If `log` is True, also output this result to log. Unlike LSA, there is no natural ordering between the topics in LDA. The returned `num_topics <= self.num_topics` subset of all topics is therefore arbitrary and may change between two LDA training runs. """ if num_topics < 0 or num_topics >= self.num_topics: num_topics = self.num_topics chosen_topics = range(num_topics) else: num_topics = min(num_topics, self.num_topics) # add a little random jitter, to randomize results around the same alpha sort_alpha = self.alpha + 0.0001 * self.random_state.rand(len(self.alpha)) sorted_topics = list(matutils.argsort(sort_alpha)) chosen_topics = sorted_topics[:num_topics // 2] + sorted_topics[-num_topics // 2:] shown = [] for i in chosen_topics: if formatted: topic = self.print_topic(i, topn=num_words) else: topic = self.show_topic(i, topn=num_words) shown.append((i, topic)) if log: logger.info("topic #%i (%.3f): %s", i, self.alpha[i], topic) return shown
[docs] def show_topic(self, topicid, topn=10): """ Return a list of `(word, probability)` 2-tuples for the most probable words in topic `topicid`. Only return 2-tuples for the topn most probable words (ignore the rest). """ return [(self.id2word[id], value) for id, value in self.get_topic_terms(topicid, topn)]
[docs] def get_topic_terms(self, topicid, topn=10): """ Return a list of `(word_id, probability)` 2-tuples for the most probable words in topic `topicid`. Only return 2-tuples for the topn most probable words (ignore the rest). """ topic = self.state.get_lambda()[topicid] topic = topic / topic.sum() # normalize to probability distribution bestn = matutils.argsort(topic, topn, reverse=True) return [(id, topic[id]) for id in bestn]
[docs] def print_topic(self, topicid, topn=10): """Return the result of `show_topic`, but formatted as a single string.""" return ' + '.join(['%.3f*%s' % (v, k) for k, v in self.show_topic(topicid, topn)])
[docs] def top_topics(self, corpus, num_words=20): """ Calculate the Umass topic coherence for each topic. Algorithm from **Mimno, Wallach, Talley, Leenders, McCallum: Optimizing Semantic Coherence in Topic Models, CEMNLP 2011.** """ is_corpus, corpus = utils.is_corpus(corpus) if not is_corpus: logger.warning("LdaModel.top_topics() called with an empty corpus") return topics = [] str_topics = [] for topic in self.state.get_lambda(): topic = topic / topic.sum() # normalize to probability distribution bestn = matutils.argsort(topic, topn=num_words, reverse=True) topics.append(bestn) beststr = [(topic[id], self.id2word[id]) for id in bestn] str_topics.append(beststr) # top_ids are limited to every topics top words. should not exceed the # vocabulary size. top_ids = set(chain.from_iterable(topics)) # create a document occurence sparse matrix for each word doc_word_list = {} for id in top_ids: id_list = set() for n, document in enumerate(corpus): if id in frozenset(x[0] for x in document): id_list.add(n) doc_word_list[id] = id_list coherence_scores = [] for t, top_words in enumerate(topics): # Calculate each coherence score C(t, top_words) coherence = 0.0 # Sum of top words m=2..M for m in top_words[1:]: # m_docs is v_m^(t) m_docs = doc_word_list[m] m_index = numpy.where(top_words == m)[0] # Sum of top words l=1..m-1 # i.e., all words ranked higher than the current word m for l in top_words[:m_index - 1]: # l_docs is v_l^(t) l_docs = doc_word_list[l] # make sure this word appears in some documents. if len(l_docs) > 0: # co_doc_frequency is D(v_m^(t), v_l^(t)) co_doc_frequency = len(m_docs.intersection(l_docs)) # add to the coherence sum for these two words m, l coherence += numpy.log((co_doc_frequency + 1.0) / len(l_docs)) coherence_scores.append((str_topics[t], coherence)) top_topics = sorted(coherence_scores, key=lambda t: t[1], reverse=True) return top_topics
[docs] def get_document_topics(self, bow, minimum_probability=None, minimum_phi_value=None, per_word_topics=False): """ Return topic distribution for the given document `bow`, as a list of (topic_id, topic_probability) 2-tuples. Ignore topics with very low probability (below `minimum_probability`). If per_word_topics is True, it also returns a list of topics, sorted in descending order of most likely topics for that word. It also returns a list of word_ids and each words corresponding topics' phi_values, multiplied by feature length (i.e, word count) """ if minimum_probability is None: minimum_probability = self.minimum_probability minimum_probability = max(minimum_probability, 1e-8) # never allow zero values in sparse output if minimum_phi_value is None: minimum_phi_value = self.minimum_probability minimum_phi_value = max(minimum_phi_value, 1e-8) # never allow zero values in sparse output # if the input vector is a corpus, return a transformed corpus is_corpus, corpus = utils.is_corpus(bow) if is_corpus: return self._apply(corpus) gamma, phis = self.inference([bow], collect_sstats=True) topic_dist = gamma[0] / sum(gamma[0]) # normalize distribution document_topics = [(topicid, topicvalue) for topicid, topicvalue in enumerate(topic_dist) if topicvalue >= minimum_probability] if not per_word_topics: return document_topics else: word_topic = [] # contains word and corresponding topic word_phi = [] # contains word and phi values for word_type, weight in bow: phi_values = [] # contains (phi_value, topic) pairing to later be sorted phi_topic = [] # contains topic and corresponding phi value to be returned 'raw' to user for topic_id in range(0, self.num_topics): if phis[topic_id][word_type] >= minimum_phi_value: # appends phi values for each topic for that word # these phi values are scaled by feature length phi_values.append((phis[topic_id][word_type], topic_id)) phi_topic.append((topic_id, phis[topic_id][word_type])) # list with ({word_id => [(topic_0, phi_value), (topic_1, phi_value) ...]). word_phi.append((word_type, phi_topic)) # sorts the topics based on most likely topic # returns a list like ({word_id => [topic_id_most_probable, topic_id_second_most_probable, ...]). sorted_phi_values = sorted(phi_values, reverse=True) topics_sorted = [x[1] for x in sorted_phi_values] word_topic.append((word_type, topics_sorted)) return (document_topics, word_topic, word_phi) # returns 2-tuple
[docs] def get_term_topics(self, word_id, minimum_probability=None): """ Returns most likely topics for a particular word in vocab. """ if minimum_probability is None: minimum_probability = self.minimum_probability minimum_probability = max(minimum_probability, 1e-8) # never allow zero values in sparse output # if user enters word instead of id in vocab, change to get id if isinstance(word_id, str): word_id = self.id2word.doc2bow([word_id])[0][0] values = [] for topic_id in range(0, self.num_topics): if self.expElogbeta[topic_id][word_id] >= minimum_probability: values.append((topic_id, self.expElogbeta[topic_id][word_id])) return values
def __getitem__(self, bow, eps=None): """ Return topic distribution for the given document `bow`, as a list of (topic_id, topic_probability) 2-tuples. Ignore topics with very low probability (below `eps`). """ return self.get_document_topics(bow, eps)
[docs] def save(self, fname, ignore=['state', 'dispatcher'], *args, **kwargs): """ Save the model to file. Large internal arrays may be stored into separate files, with `fname` as prefix. `separately` can be used to define which arrays should be stored in separate files. `ignore` parameter can be used to define which variables should be ignored, i.e. left out from the pickled lda model. By default the internal `state` is ignored as it uses its own serialisation not the one provided by `LdaModel`. The `state` and `dispatcher` will be added to any ignore parameter defined. Note: do not save as a compressed file if you intend to load the file back with `mmap`. Note: If you intend to use models across Python 2/3 versions there are a few things to keep in mind: 1. The pickled Python dictionaries will not work across Python versions 2. The `save` method does not automatically save all NumPy arrays using NumPy, only those ones that exceed `sep_limit` set in `gensim.utils.SaveLoad.save`. The main concern here is the `alpha` array if for instance using `alpha='auto'`. Please refer to the wiki recipes section (https://github.com/piskvorky/gensim/wiki/Recipes-&-FAQ#q9-how-do-i-load-a-model-in-python-3-that-was-trained-and-saved-using-python-2) for an example on how to work around these issues. """ if self.state is not None: self.state.save(utils.smart_extension(fname, '.state'), *args, **kwargs) # make sure 'state' and 'dispatcher' are ignored from the pickled object, even if # someone sets the ignore list themselves if ignore is not None and ignore: if isinstance(ignore, six.string_types): ignore = [ignore] ignore = [e for e in ignore if e] # make sure None and '' are not in the list ignore = list(set(['state', 'dispatcher']) | set(ignore)) else: ignore = ['state', 'dispatcher'] super(LdaModel, self).save(fname, *args, ignore=ignore, **kwargs)
@classmethod
[docs] def load(cls, fname, *args, **kwargs): """ Load a previously saved object from file (also see `save`). Large arrays can be memmap'ed back as read-only (shared memory) by setting `mmap='r'`: >>> LdaModel.load(fname, mmap='r') """ kwargs['mmap'] = kwargs.get('mmap', None) result = super(LdaModel, cls).load(fname, *args, **kwargs) state_fname = utils.smart_extension(fname, '.state') try: result.state = super(LdaModel, cls).load(state_fname, *args, **kwargs) except Exception as e: logging.warning("failed to load state from %s: %s", state_fname, e) return result
# endclass LdaModel