在Spark上用LDA计算文本主题模型

在新闻推荐中,由于新闻主要为文本的特性,基于内容的推荐(Content-based Recommendation)一直是主要的推荐策略。基于内容的策略主要思路是从文本提取出特征,然后利用特征向量化后的向量距离来计算文本间的相关度。这其中应用最广的当属分类(Category)相关和关键词(Keywords/Tag)相关,然而这两种策略却有很多无法覆盖的场景。首先,关键词无法解决同义词和一词多义的问题。比如下面两篇文章的关键词:

代码语言:javascript
复制
1. 苹果/营养/维生素  
2. 苹果/WWDC/手机  

显然,此苹果非彼苹果,然而单纯根据关键词计算相关性会很容易把这两篇文章联系在一起。我们再看下面这个情况:

代码语言:javascript
复制
1. 腾讯/微信/小程序  
2. 马化腾/张小龙/Pony  

这两篇文章在话题上是高度相关的,但在关键词这个维度上,他们的相似度为0.

代码语言:javascript
复制
1. 柯洁/李世乭/围棋  
2. AlphaGo/人机大战/人工智能  

同理,这两篇文章甚至分类都不同(前者在体育类别,后者在科技),要关联起来就更困难了。

解决这个问题的关键是发现文本中隐含的语义,NLP中称为隐语义分析(Latent Semantic Analysis),这个课题下又有很多种实现的方法,如SVD/LSI/LDA等,在这里我们主要讨论LDA。

神奇的LDA

LDA全称隐含狄利克雷分布(Latent Dirichlet Allocation),他的核心思想认为一篇文档的生成流程是:

代码语言:javascript
复制
1. 以一定概率选出一个主题  
2. 以一定概率选出一个词  
3. 重复上述流程直至选出所有词  

其中文档-主题和主题-词各服从一个多项式分布,流程如图:

具体的算法原理比较复杂,这里就不详解了,可以看看这篇博文的解读。总之,它的神奇之处就在于LDA作为一个无监督的学习,往往能达到很好的效果,且学习的结果具备很强的解释性。不像LSI一类算法计算出的隐分类矩阵,往往只能作为特征向量,LDA计算出的每个主题都包含主题词及其权重,可以很好地表征主题的含义。类似如下:

代码语言:javascript
复制
topic 1  
=======
apple 0.1312  
iOS 0.0937  
iPhone 0.0436  
iPad 0.0103  
...

根据主题词我们可以简单地采用人工的方式以某个名词表征这个主题(如上述主题表述为:苹果公司)。

基于主题模型的推荐策略

LDA训练出主题模型后,我们便可以利用模型预测某个词袋(Bag of Words)文档的主题分布,作为特征计算文本相似度。

图1 基于主题模型的推荐策略

如上图,LDA预测出的结果是文档在N个topic上的权重分布,我们利用该分布计算文档间的余弦相似度/欧氏距离/皮尔逊相似度等,得出topN的相似文档,可作为相关推荐的结果。整理topic并给出解释意义之后,也可作为用户画像的一个维度。

代码实现

LDA因为需要不断迭代,因此计算很耗时间。之前实现了一个Python单机版本,10+W的训练集跑了6小时……因此这次,我选择用先前搭建的Spark集群来训练LDA模型。

现在Spark对Java/Python都支持得很好,然而论库函数的支持和性能优化,我只信原生语言,因此选择了Scala(好吧,最近又是Go又是Java又是Python又是Scala,我承认写的时候语法经常会弄混…)。

话不多说,直接上代码:

代码语言:javascript
复制
    // 配置HDFS
    val hadoopConf = new Configuration()
    hadoopConf.set("fs.defaultFS", "hdfs://ip:port")
// 训练集路径
val logFile: String = args(0)
// 输出模型路径
val modelPath: String = args(1)
// 主题数
val numTopics: Int = args(2).toInt
// 最大迭代数
val numMaxIterations: Int = args(3).toInt
// 停用词数
val numStopwords: Int = if(args.length == 5) args(4).toInt else 0
// 超参数α
val docConcentration: Double = if(args.length == 6) args(5).toDouble else 1.1
// 超参数β
val topicConcentration: Double = if(args.length == 7) args(6).toDouble else 1.1</code></pre></div></div><p>首先是从命令行获取各种参数:</p><ol class="ol-level-0"><li>输入/输出路径:Spark可以直接连接HDFS作为输入/输出文件路径,因此我们对训练集完成预处理(分词/去停用词)后可直接上传到HDFS做准备  </li><li>numTopics:训练结果主题数,和训练集规模有关,可直接影响聚类效果;  </li><li>maxIteration:迭代次数。迭代过少可能造成主题区分不明显;  </li><li>超参数α/β:文档-主题分布和主题-词分布参数,大于1,一般设定为50/numTopics。</li></ol><p>之后从输入文件中分割词,提取出词汇表,转换成id-词的映射表。</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">    // 提取词汇表
val termCounts: Array[(String, Long)] = 
  words.flatMap(_.map(_ -&gt; 1L)).reduceByKey(_ + _).collect().sortBy(-_._2)

// 去除词频最高的numStopwords个词作为停用词
// 如预处理时已做过去停词,则该步骤可省略
val vocabArray: Array[String] = 
  termCounts.takeRight(termCounts.size - numStopwords).map(_._1)
val vocab: Map[String, Int] = vocabArray.zipWithIndex.toMap</code></pre></div></div><p>接着将文档转换成词频矩阵:</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">    // 建立文档-词频矩阵
val corpus = vectorize(words, vocab, ids).cache()

def vectorize(words: RDD[Seq[String]], vocab: Map[String, Int], ids: RDD[String]): RDD[(Long, Vector)] = {
val corpus: RDD[(Long, Vector)] =
words.zip(ids).map { case (tokens, id) =>
val counts = new mutable.HashMapInt, Double
tokens.foreach { term =>
if (vocab.contains(term)) {
val idx = vocab(term)
counts(idx) = counts.getOrElse(idx, 0.0) + 1.0
}
}
(id.toLong, Vectors.sparse(vocab.size, counts.toSeq))
}
corpus
}

训练LDA模型并输出结果:

代码语言:javascript
复制
    val ldaModel = new LDA().
setK(numTopics).
setDocConcentration(docConcentration).
setTopicConcentration(topicConcentration).
setMaxIterations(numMaxIterations).
setSeed(0L).
setCheckpointInterval(10).
setOptimizer("em").
run(corpus)

// 输出logLikelihood,评价模型结果
val distLdaModel = ldaModel.asInstanceOf[DistributedLDAModel]
val ll = distLdaModel.logLikelihood
println(&#34;Likelihood: &#34; + ll)

// 每个topic相关最高的20个文档id
val docs = distLdaModel.topDocumentsPerTopic(20)
// 输出topic主题词及其权重
val topics = ldaModel.describeTopics(maxTermsPerTopic = 10)
var i = 0
topics.foreach { case (terms, termWeights) =&gt;
  println(s&#34;TOPIC: $i&#34;)
  terms.zip(termWeights).foreach { case (term, weight) =&gt;
    println(s&#34;${vocabArray(term.toInt)}\t$weight&#34;)
  }
  println(&#34;DOCS: &#34;)
  docs(i)._1.zip(docs(i)._2).foreach { case (term, weight) =&gt;
    println(s&#34;$term\t$weight&#34;)
  }
  i += 1
  println()
}
// 保存模型
ldaModel.save(sc, modelPath)
sc.stop()</code></pre></div></div><h4 id="1hu5k" name="%E8%AE%AD%E7%BB%83%E7%BB%93%E6%9E%9C">训练结果</h4><p>输入门户网站50W新闻文本(分词后的标题+正文),输出200个topic,50次迭代,耗时30min左右。其中部分topic描述及其主题相关文章如下:</p><div class="rno-markdown-code"><div class="rno-markdown-code-toolbar"><div class="rno-markdown-code-toolbar-info"><div class="rno-markdown-code-toolbar-item is-type"><span class="is-m-hidden">代码语言:</span>javascript</div></div><div class="rno-markdown-code-toolbar-opt"><div class="rno-markdown-code-toolbar-copy"><i class="icon-copy"></i><span class="is-m-hidden">复制</span></div></div></div><div class="developer-code-block"><pre class="prism-token token line-numbers language-javascript"><code class="language-javascript" style="margin-left:0">TOPIC 3:  

人类 0.040956
人工智能 0.0384
机器人 0.036
围棋 0.016823
AI 0.01638
柯洁 0.015567
AlphaGo 0.0
棋手 0.011772
学习 0.011311

DOCS:
古力:AI改变围棋规则 柯洁赢一盘概率不足10% 0.93749
媒体披露“打狗棒法” 柯洁:绝招对付阿尓法狗 0.91730
聂卫平:AlphaGo给世界上了一课 人类别跟它下了 0.9
神剧情!AlphaGo开局走三-三 与柯洁互相学习? 0.9046
人机大战第二局前瞻:柯洁或用秘密武器争胜 0.9045219669
最后的人机围棋大战:柯洁曾逼至AIpahGo极限 0.8882
聂卫平人机战语录:阿法狗20段 想赢它得拔电源 0.885853
人机大战第二局前瞻:柯洁或用秘密武器争胜 0.8811950345

============

TOPIC: 5
选手 0.031177
冠军 0.020835
对手 0.020005
种子 0.018348
决赛 0.017980
纳达尔 0.013
法网 0.013581
晋级 0.013162
费德勒 0.012

DOCS:
马德里赛哈勒普胜张帅终结者 与法国新星争冠 0.99304432
蒙特卡洛赛小德再陷鏖战 险胜布斯塔晋级八强 0.99240365
马德里赛纳达尔仅丢4局横扫进八强 将战戈芬 0.99211261930
蒙特卡洛赛小德1-2生涯首负戈芬 无缘半决赛 0.9912739
罗马赛哈勒普完胜对手 携斯维托丽娜晋级四强 0.99069710
罗马赛纳达尔17连胜终结 不敌蒂姆无缘四强 0.98979494870

可以看到,结果大致符合预期。