1. 引言 – 近似近邻搜索被提出所在的时代背景和挑战

0x1:从NN(Neighbor Search)说起

ANN的前身技术是NN(Neighbor Search),简单地说,最近邻检索就是根据数据的相似性,从数据集中寻找与目标数据最相似的项目,而这种相似性通常会被量化到空间上数据之间的距离,例如欧几里得距离(Euclidean distance),NN认为数据在空间中的距离越近,则数据之间的相似性越高。

当需要查找离目标数据最近的前k个数据项时,就是k最近邻检索(K-NN)。

0x2:NN的技术挑战与发展

近些年的研究中涌现出大量以最近邻检索为基本思想的方法,主要可分为两类:

  • 数据结构改进:基于提升检索结构性能的新数据结构,大多基于树形结构,例如KD-Tree。关于KD-tree的相关讨论可以参阅另一篇博客
  • 高效索引方法改进:基于对数据本身的索引和处理的方法,包括哈希算法、矢量量化方法等。哈希算法就是使用HASH算法构建数据索引,HASH方法的确效率很高,但是因为其全局敏感性,输入文本只要有任何微小的变化,得到的Hash Index就会发生改变,因此无法提高近邻搜索的性能。

尽管出现了很多针对NN算法的改进措施,但是在实际工业场景中,NN算法遇到最大阻碍是:

数据经过向量化(即特征工程)之后,因为特征空间特别高维(上百/上千/甚至上万),导致在空间距离上特别稀疏,维度越高这个现象越明显,这直接导致了NN的近邻搜索效果不好。笔者自己也同样在项目开发中尝试使用过NN算法,当发现NN搜索效果不佳时,反过来调整特征工程,然后再继续NN搜索,如此反复迭代,最终效果难以保证,因为你无法保证每一次的特征工程都能精确地表征出业务场景的相似性。

举个例子来说,我们有一批恶意文件现在要对其进行聚类分析,首先我们对其进行文本方面的特征工程,得到一个向量集合。因为基于专家经验得到的特征维度之间是彼此“正交”的,因此每个特征向量之间的余弦相似性都不强,基于“空间距离度量”的聚类算法效果自然也不会很好。

而且另一方面,因为特征空间的维度太高了(几百维、几千维),一些本来很有用的”强贡献特征“可能会被淹没在大量的“弱贡献特征”中,这很好理解,看一下欧几里得空间的距离度量公式:

从公式中可以看到,所有维度都被“公平看待”,平方和开根起到了一个均值的作用,弱特征越多,强特征被“稀释”的影响就越大。特征不是越多越好,有时候太多无用的特征可能还会引起反效果。

换句话说,即使本来可能很“相似的文件”(例如同一个病毒家族的变种),但是在我们设计的特征上却不能很好地体现。

面对这些问题,如何解决呢?

一个很自然的想法是,如果能有一种算法,能将相似的字符串,从高维空间降维到一个相对低维的空间中。同时,在这个低维空间中,语法/语义相近的字符串的夹角余弦相对较小,也即语法/语义相近的字符串在降维后彼此较为接近。

如果能实现上述两个目标,我们不仅可以有效实现对高维向量的降维,同时因为低维空间的向量间具备相似聚集性,我们可以在接近线性的时间内,进行向量间距离评估,以及找到相似的文本。

0x3:用几个例如引出LSH(局部敏感哈希)算法被提出的历史背景

这个章节我们按照历史时间线来讨论学术界在面对语言模型中文本相似性这个课题分支时,一路走来遇到了哪些问题,整个时间线学术成果非常丰富,我们这里只能摘取其主要节点进行推导式的讨论。

1. 基于原始文本计算最小编辑距离

假如我们有两段输入文本:

1. how are u?
2. how are you?

现在计算这两段文本的相似度,也即需要计算这两段文本的区别度,一个最简单直观的想法是直接基于原始的ascii序列逐位计算最小编辑距离:

1. u -> u
2. ? -> o
3. N/A -> u
3. N/A -> ?

即第一段文本通过4次修改即可得到第二段文本,所以这两个文本的相似度为:

(1 – 4 / (len(第一段文本) + len(第二段文本))) * 100% = (1 – 4 / (10 + 12)) * 100% = 81.81%

相似度为81%,这个结论怎么样?准吗?勉强好像可用,但是效果显然不太好,怎么办呢?

2. 对变化如此敏感的问题出在哪?

我们开始思考,原始ascii字符空间对变换的感知非常敏感,有两个主要原因:

  • 对输入文本的单个ascii byte修改很敏感,每一个byte的变换都会引起最后结果的变化
  • 对输入文本中ascii byte的position位置很敏感,即使是同一个ascii序列,换了一个位置(不管语法/语义是否发生变化),最终的结果都会发生变化

从向量空间的角度来看,原始的ascii字符空间可以抽象为一个 N * 2的列向量组(ascii bytes vec,position vec),这里N代表着输入文本的length长度。

3. 寻找一个线性空间映射

沿着线性空间的这个思考路线,我们应该去找一个新的向量空间,该新向量空间与原始ascii字符空间相比,对变化的敏感度更低(包括对ascii修改、ascii位置变化)。

那对ascii字符变化的敏感度更低,怎么用数学思维来理解这个概念呢?

这里需要引入线性映射的概念:

设 S 和 S’ 是两个集合,如果存在一个法则f,使得集合S中每一个元素a,都有集合 S’ 中唯一确定的元素b与它对应,则称 f 是S到 S’ 的一个映射,记作:

我们需要找到一种线性映射,将原始ascii序列中的 N * 2(ascii byte,position)向量组,降维映射成一个 M * 1(ascii sequence windows)向量组,这里 M 是新向量空间中的维度。

从投影降维理论视角我们知道,降维后,原始空间中的position维度被完全忽略了,而ascii byte这个维度被转换为ascii sequence windows这个新维度,这显然不是一个正交投影,即不是单射,因为原始输入文本中的一个ascii修改,可能会引起新空间里多个ascii sequence window的变化。

好,接下来的问题是,如何找到这个线性空间映射呢?这就是接下来要讨论的ngram分词算法。

4. 基于ngram固定长度滑动窗口对输入文本进行切词

假设我们现在有3段输入文本:

[
    'This is the first document.',
    'This is the second document.',
    'Is this the first document?',
]

以单个word为一个slice window进行切词,即1-gram(unigram),得到:

[
    { u'This', u'is', u'the', u'first', u'document' },
    { u'This', u'is', u'the', u'second', u'document' },
    { u'This', u'is', u'the', u'first', u'document' }
]

从1-gram slice结果中,我们可以看到几点信息:

  • 第二段文本相比第一段文本,因为word 1-gram windows的关系,只变化了一个token。
  • 第三段文本相比第一段文本,虽然单词的位置发生了变化,但是因为1-grame丢弃了position这个维度的信息,所以在1-gram这个新的特征空间中,position的变化是无感知的。

显然,1-gram的分词方案造成了信息的过度失真,导致了原始输入文本的语法结构被丢失了,这个问题怎么解决呢?显然,我们需要引入相对位置(relative position)这个特征维度。

使用2-gram算法进行切词,得到:

[
    { u'This', u'This is', u'is the', u'the first', u'first document', u'document' },
    { u'This', u'This is', u'is the', u'the second', u'second document', u'document' },
    { u'is', u'is This', , u'This the', u'the first', u'first document', u'document' }
]

从1-gram slice结果中,我们可以看到几点信息:

  • 第二段文本相比第一段文本,因为word 2-gram windows的关系,有两2个token发生了变化,2-gram对原始输入文本变化的敏感程度提高了。
  • 第三段文本相比第一段文本,因为单词的位置发生了变化,有3个token发生了变化,2-gram对原始输入文本中的语法结构变换的敏感程度提高了。

从线性映射的角度看,在2-gram算法下,原始ascii序列中的 N * 2(ascii byte,position)向量组,降维映射成一个 M * N(ascii sequence windows,relative position)向量组,这里 M 代表了2-gram后的 gram token数量,N 代表了每个gram分组内的word组合,2-gram token内的组合维度数为2,如果是3-gram,则组合维度数位6。

5. 还需要继续降维

问题到这里就结束了吗?显然不是的,ngram算法虽然比纯粹的ascii逐字符比对各方面效果要好,但是还存在几个问题:

  • 对局部修改还是太敏感,ngram只是缩小了对输入文本的变化感知域,但是不具备完全屏蔽输入修改的影响。也就是说,输入文本的任何修改,或多或少都会影响到ngram token lists的变换。
  • 对于海量语料库来说,ngram的特征空间还是太高了,有时会到达上万维,一般的输入文本进行词表编码后,很容易遇到高维稀疏问题,这种彼此正交的高维稀疏向量,不管是进行有监督学习还是无监督聚类来说,效果都很受影响。

那解决问题的思路是什么呢?答案还是降维,我们需要继续寻找一个新的映射函数,将原始ascii字符空间映射到一个低维向量空间中。但是要注意,这个新的低维向量空间有几个技术指标需要满足:

  • 语法/语义一致性:向量空间的变换不能丢失原向量空间的语法和语义信息,或者近似相似。
  • 相对低维:用尽量少的bit空间,尽可能多的存储有用的信息。
  • 局部修改容忍性:对不影响语法/语义的局部修改,在新向量空间引起的变换尽可能小,最好是完全无变化。

6. 两个不同的改进方向  

基于上个章节讨论的3个技术指标,学术界开始了学术的研究和创新,演化出了两个不同的方向:

7. Word Vector(data-dependent)和LSH(data-independent)这两个方向的演化背景

LSH的核心是哈希散列、其次是降维、其次是语法/语义一致性、再其次是算法过程简单高效适合在大规模高并发场景中使用。

可以这么说,LSH通过牺牲了一部分的信息熵,即牺牲了一部分的语法/语义一致性,换取了超级高效的时间/计算复杂度,是一种非常优秀的算法思想,值得我们不断深入思考和学习。 

但是换一个角度,如果对时间/空间复杂度没有那么高的需求,而是对语法/语义一致性有很高的要求,LSH算法家族可能就不一定非常适合了。

这个时候,另一条思考脉轮就呈现在我们的面前,即词向量/句子向量/文档向量,具体来说,就是从2007年开始逐渐被提出的各种词向量降维表征方法,包括:

  • 基于SVD奇异值矩阵分解词向量降维方法
  • 基于神经网络的NNLM方法
  • 基于神经网络与词频统计综合的Glove算法
  • ……

词向量广泛地被运用于NLP相关的任务中,关于这部分的详细讨论,笔者在另一篇文章有所涉及。

从笔者自己经验来看,在大数据时代,LSH的使用场景相比词向量要相对少一些,笔者个人觉得问题核心在于现代NLP任务中,对语义的精确表征能力要求越来越高,工程师和数据科学家通过不断地引入更庞大的数据集,引入更复杂的词向量算法,也是希望尽可能提高信息的利用率,尽量少的丢失信息。

而相对的,对时间/空间复杂度有极高要求的场景可能只存在于一些极端的场景中,例如搜索引擎等。

Relevant Link:

https://www2007.cpsc.ucalgary.ca/papers/paper215.pdf google Detecting Near-Duplicates for Web Crawling. WWW2007
http://www2007.org/
https://www.iw3c2.org/blog/category/www2007/

 

2. ANN(Approximate Nearest  Neighbor)算法简介 – 解决问题的思考方向

0x1:什么场景下需要ANN算法?

在很多应用领域中,我们面对和需要处理的数据往往是海量并且具有很高的维度(high dimensional spaces),同时数据中又普遍存在着近似相同的情况(例如相似的对话、相似的网页、相似的URl等),怎样快速地从海量的高维数据集合中找到与某个数据近似相似(approximate or exact Near Neighbor)的一个数据或多个数据,成为了一个难点和问题。

如果是低维的小数据集,我们通过线性查找(Linear Search)就可以容易解决,但如果是对一个海量的高维数据集采用线性查找匹配的话,会非常耗时,这成为ANN被研究和发展的原动力。

在笔者所在的网络安全学科中,也常常会遇到很多局部不同(locality change)的近似文本的识别与检测问题,例如:

  • WAF攻击识别与检测中,由于MVC这种架构的存在,会出现大量的相似URL,它们可能只是某一个key-value改变一个字符,或者是同一个module下不同的请求参数不同导致整个URL不同,这个时候我们常常会需要研发模拟化URL聚类技术,将大量的业务URL通过聚类降维到接口URL层面,这个时候可以大幅度降低数据量,在接口URL这个层面进行history profile建模以及非法检测。
  • 在WEBSHELL/BIN MALWARE这类问题上,常常会出现同一个恶意源代码,被大量的非法攻击者进行了一些微小的定制化修改(例如修改了pass、署名等等),造成文件的HASH变化。这个时候我们会需要对文件进行模拟化HASH降维,不管是进行相似度恶意检测还是相似恶意文件搜索,都非常有用。
  • 攻击者或者攻击者社区常常会使用一组相似的PAYLOAD/SHELLCODE进行有针对性的批量入侵,也就是所谓的”man with some power模型“。我们需要对攻击者power进行有效的相似性匹配。
  • 网页爬虫的场景中,网络中可能存在很多转载/复制/剽窃导致的”near-duplicate“网页,这个时候就需要有一套高效的相似度匹配算法,对新爬取的网页进行相似度匹配,避免重复的网页进入我们的spider候选队列中,提高扫描器的效率。

0x2:近似近邻算法(Approximate Nearest Neighbor Algorithm)设计准则

面对海量高维数据背景下,还要进行高效的数据相似性搜索的需求,该从哪些方面进行思考解决方案呢?

要实现上述目标,我们需要能找到一整套综合技术,能综合实现以下几个技术指标:

  • 不管上层采用何种向量化编码方式,ANN需要能够实现有效降维。也就是说在尽量不丢失语法/语义的前提下,找到一个新的vector space,将高维的数据投影到相对低维的vector space中。这个技术指标要求是为了处理性能方面的考虑,因为高维数据的处理容易遇到”维数灾难(dimension curse)问题“;
  • 降维投影后的新vector space中,在原有语料中词义/语义相近的词,在空间上要相对聚集在一起,反之在空间上要相对远离稀疏,即降维投影需要具备语义不变性
  • 需要引入一种”向量空间距离量化指标“,例如欧式距离、L2范数、Hamming distance、编辑距离等。这个技术指标要求是为了量化地定义”什么叫最近邻“,以及”如何定义近邻之间的距离“这两个问题;
  • 在低维空间获得了数据表征后,也有了度量不同向量之间的距离的指标,接下来最后一项是需要设计一种近邻搜索算法(Nearest  Neighbor Searching Algorithm),例如KNN算法、Hamming翻转距离算法等;以及近邻搜索数据结构(Nearest  Neighbor Searching Structure,例如K-d tree with BBF、Randomized Kd-trees、Hierarchical K-means Tree。

符合以上四点技术指标的算法被统称为ANN(Approximate Nearest  Neighbor)算法。

Relevant Link:

https://www2007.cpsc.ucalgary.ca/papers/paper215.pdf 

 

3. LSH(Locality Sensitive Hash 局部敏感哈希)算法

值得注意的是,对于第二章提到的ANN技术指标中的前两个,词向量和LSH都可以实现同样的效果,但是我们本文的讨论对象LSH局部敏感哈希。

0x1:LSH一般性定义

局部敏感哈希(LSH)核心思想是:在高维空间相邻的数据经过局部敏感哈希函数的映射投影转化到低维空间后,他们落入同一个吊桶(空间区间)的概率很大而不相邻的数据映射到同一个吊桶的概率则很小

这种方法的主要难点在于如何寻找适合的局部敏感哈希函数,在原论文中,作者提出了局部敏感Hash函数的一般性定义:

我们设定x和y的距离测定函数为d(x,y),这个d()函数可以是Jaccard函数/Hamming度量函数,也可以其他具备同样性能的函数。

在这个距离测定标准下,设定两个距离阈值d1,d2,且 d1 < d2。

如果一个函数族F的每一个函数 f 满足:

  • 如果 sim(x,y) <= d1,则 f(x) = f(y) 的概率至少为p1,即 P(f(x) = f(y)) >= p1;
  • 如果 sim(x,y) >= d2,则 f(x) = f(y) 的概率至多为p2,即 P(f(x) = f(y)) <= p2;

那么称F为(d1,d2,p1,p2)-敏感的函数族,实际上,simhash就是一种(d1,d2,p1,p2)-敏感的函数。

左图是传统Hash算法,右图是LSH。红色点和绿色点距离相近,橙色点和蓝色点距离相近。

0x2:LSH的发展历程

按照LSH的发展顺序,LSH家族的演变史如下:

  • 基于Stable Distribution投影方法
  • 基于随机超平面投影的方法
  • 球哈希Spherical Hashing算法
  • SimHash
  • Kernel LSH
  • SSDEEP模糊化哈希

我们接下来逐个讨论其算法流程及其背后的思维方式。 

Relevant Link:

https://www.cnblogs.com/wt869054461/p/9234184.html
http://people.csail.mit.edu/gregory/annbook/introduction.pdf
https://www.cnblogs.com/wt869054461/p/9234184.html
http://infolab.stanford.edu/~ullman/mmds/ch3.pdf
https://www.cnblogs.com/fengfenggirl/p/lsh.html
http://sawyersun.top/2016/Locality-Sensitive-Hashing.html

 

4. 基于Stable Distribution投影方法

2008年IEEE Signal Process上有一篇文章Locality-Sensitive Hashing for Finding Nearest Neighbors是一篇较为容易理解的基于Stable Dsitrubution的投影方法的Tutorial。

其思想在于高维空间中相近的物体,投影(降维)后也相近。

三维空间中的四个点,红色圆形在三围空间中相近,绿色方块在三围空间中相距较远,那么投影后还是红色圆形相距较近,绿色方块相距较远。

基于Stable Distribution的投影LSH,就是产生满足Stable Distribution的分布进行投影,最后将量化后的投影值作为value输出。

具体数学表示形式如下:给定特征向量v,Hash的每一bit的生成公式为:

其中:

  • x 是一个随机数,从满足Stable Distribution的分布中抽样而来(通常从高斯或柯西分布中抽样而来)
  • x ⋅ v就是投影(和单位向量的内积就是投影)
  • w 值可以控制量化误差
  • b 是随机扰动,避免极端情况产生

需要注意的是,如果 x 抽样于高斯分布,那么ϕ(u,v)衡量的是L2 norm;如果 x 抽样于柯西分布,那么ϕ(u,v)衡量的是L1 norm。

更详细的介绍在Alexandr Andoni维护的LSH主页中,这就是LSH方法的鼻祖。

关于随机抽样涉及到随机过程方面的知识,可以参阅这篇帖子。关于高斯随机投影和柯西分布随机投影的讨论,可以参阅另一篇blog

Relevant Link:

http://www.slaney.org/malcolm/yahoo/Slaney2008-LSHTutorial.pdf
https://www.cnblogs.com/LittleHann/p/6558575.html#_label2
https://www.zhihu.com/question/26694486/answer/242650962

  

5. 基于随机超平面投影(hyperplane projection)的方法

0x1:Stable Distribution Projection的缺点

Stable Distribution Projection从原理上没有什么大问题,其实后来改进的随机超平面和球面哈希算法,其底层思想上和Stable Distribution Projection没有太大的区别。

但是在实际操作中,Stable Distribution Projection存在几个比较明显的问题:

  • 需要同时人工指定两个参数,w和b,在具体的项目中会遇到调参难的问题
  • 量化后的哈希值是一个整数而不是bit形式的0和1,还需要再变换一次

面对上述问题,Charikar改进了这种情况,提出了一种随机超平面投影LSH。可以参考论文《Multi-probe LSH: efficient indexing for high-dimensional similarity search》

0x2:算法公式 

假设有一个M维高维数据向量x,我们在M维空间中随机选择一个超平面,通过这个超平面来对数据进行切分。

这个动作总共进行N次,即通过N个随机超平面单位向量来对原始数据集进行切分,这里N就是降维后的向量维度。

超平面的选择是随机过程,不需要提前参数设定

如下图所示,随机在空间里划几个超平面,就可以把数据分到不同空间里,比如中间这个小三角的区域就可以赋值为110.

Hash的每一bit的数学定义式为:

x 是随机超平面单位向量,sgn是符号函数:

接下来我们来讨论在随机超平面投影算法下,LSH哈希的产生原理是什么。

1. 基于cosine距离来度量两个向量相似度 

这时ϕ(u,v),也就是上述公式中的内积点乘计算,衡量的就是uv的cosine距离,θ(u,v)表示向量uv的夹角。

hyperplane projection的核心假设就是,两个向量越相似,则他们的cosine距离越小:

下图说明了该公式原理

可以看到,给定两个向量(图中的黑色箭头),只有在其法线的交叠区域(深蓝色区域)投影后的方向(sgn函数的值)才不相等,所以有:

,即蓝色区域面积占比整个圆,的比率等于u与v的夹角。

2. 基于sng符号函数将cosine距离归一化为“方向是否相同”的0/1二值信息

通过sgn符号函数的归一化,只要两个向量是同方向,不管距离远近,都统一归一化为1。

这样计算后的hash value值是比特形式的1和0,虽然带来了一定的信息丢失,但是免去了使用时需要再次归一化。

Relevant Link:

https://www.jiqizhixin.com/articles/2018-06-26-15
http://delivery.acm.org/10.1145/1330000/1325958/p950-lv.pdf?ip=42.120.75.135&id=1325958&acc=ACTIVE%20SERVICE&key=C8BAF422464E9FCC%2EC8BAF422464E9FCC%2E4D4702B0C3E38B35%2E4D4702B0C3E38B35&__acm__=1560846249_128df98f27ef856192df883b1ce48987
http://yangyi-bupt.github.io/ml/2015/08/28/lsh.html 

 

6. 球哈希Spherical Hashing算法

spherical hash是在前人hyperplane hash的基础之上改进而来的,所以这里我们首先来一起思考下hyperplane-base哈希算法都存在哪些问题。

0x1:hyperplane-based算法存在的问题

  • 空间封闭性:hyperplane不容易形成一个closed region(封闭区间),如果我们要在一个d维空间中“切割”出一个closed region,我们至少需要d + 1个hyperplanes。而且这还是在这d+1个hyperplane都线性无关的情况下。最简答的情况是是对于一个1维的线段,需要2条不同阈值的直线才能切割出一个封闭区间,高维空间可以类推。

    • ,左图展示了通过3个hyperplane切割出了一个3维的封闭向量空间。
  • 算法下界收敛性(Bounding Powrer):hyperplane切割得到的closed region不算非常紧凑(bounding),每个closed region内部,样本间距离的bounding往往过大。这种情况下,降维哈希函数的收敛性就会受影响,进而也影响后续近邻搜索的效果。

    • ,左图展示了在hyperplane没有完全封闭的区间内,样本间距离可能会过大。
  • 超平面线性相关性(hyperplane Independence):hyperplane-based hash算法中,hyperplane是基于随机过程随机采样的,存在一定的可能性两个hyperplane之间线性相关。通过线性代数的知识我们知道,线性相关性是空间中的冗余结构,是可以被忽略的。
    • ,左图展示了彼此正交的hyperplane和彼此线性相关的hyperplane的区别,显然,彼此正交的hyperplane可以提供信息增益。
  • 分界面信息增益(Balanced partitioning):分界面信息增益,指的是通过加入一个分界面,将分界面两边的数据分成0/1两类,对最终的目标函数的提升度量。这个概念我们并不陌生,在决策树的每个节点特征选择中,都是基于信息增益最大的原则进行的。

    •   ,从左图可以看到,我们希望每个hyperplane都能基本将样本均分为两半,在零先验无监督的前提下,这可以提供最大的信息增益,这也符合最大熵的原理。

sphericalplane hash超球体哈希算法就在这个背景下,在2012 CVPR上提出的。

面对上述几个问题,sphericalplane hash进行了算法理论和公式层面上的创新,我们接下来详细讨论具体细节。

0x2:sphericalplane hash的主要技术改进点

1. 用hypersphere超球面代替hyperplane超平面

我们知道,利用kernel space核空间技术,我们可以将线性超平面映射为一个非线性超平面,这是建立在核函数的理论基础上的。但是研究发现,使用sphericalplane,因为球平面天生的封闭性,可以直接对高维空间进行partition分类,并获得比non-linear hyperplane更好的效果。

理论上说,如果需要切割出一个d维封闭空间,至少需要d+1个超平面,但是如果使用超球体,则最少值需要1个超球体即可,例如下图

值得注意的是,c个超球体划分出的有界封闭区域数是可以计算的,即:

同时,球哈希划分的区域是封闭且更紧凑的,每个区域内样本的最大距离的平均值(bounding power)会更小,说明各个区域的样本是更紧凑的,如下图所示:

Average of maximum distances within a partition: ‐ Hyper‐spheres gives tighter bound!

2. 通过Iterative Optimization过程实现hyperplane Independent和Balanced partitioning

通过渐进逼近的方法,迭代优化算法超参数,得到符合算法约束条件的近似最优解。

这里的约束条件指的是:

1. 我们希望每个超球体把样本都是均分的,就是球内球外各占一半
2. 希望每个超球体的交叉部分不要太多,最多1/4,也就是每个哈希函数相对独立

0x3:算法模型详述

1. Optimization约束条件

优化过程最重要的一个前提就是设定约束条件(constraint condition)。

这里首先先定义一些数学标记:

为数据向量在单个超球体(单个hash function)内部(+1)还是外部(-1)的概率。

为单个超球体的半径。

Spherical Hashing是由c个不同位置,不同的大小的超球体组成的,对于c个超球体的总约束条件如下:

  • ,也即Balanced partitioning,数据向量在球内和求外的概率相等。
  • ,也即hyperplane Independent,任意两个球的交叉概率等于1/4。

注意,约束2个1/4是一个理论极限值了,通过空间几何的相关知识可以证明,当两个球都近似将空间一分为二时,这两个球的交集的最小值就是1/4。

2. 优化过程

优化过程的伪码如下,我们接下来逐步讨论:

1)输入样本准备

采用随机采样的方式从样本集中采样m个样本,用于进行后续的优化过程。当然,如果你的算力足够,也可以将所有样本都作为训练集进行优化训练。

2)初始化 – 首次迭代

从样本集S中随机选择c个数据点作为初始的超球体中心。

值得注意的是,作者在使用kmeans得到c个聚类中心作为初始的超球体中心后,并没有很明显提升实验结果,这反映了求哈希算法对初始值不是非常敏感。

3)第二次及之后的迭代

接下来的迭代会不断会动态调整半径,以及动态移动球心的位置。为了方便计算,我们定义下面两个辅助变量:

,1 ≤ i, j ≤ c。

对于来说,我们只要使其满足即可。

对于来说,我们的目标是使其靠近m/4,通过计算当前值和目标值之间的残差累积和,得到一个回归值,原论文中使用了力的概念形象地说明了这个过程。

对于交叉样本太多的两个球心,赋予一个repulsive的力,对离得太远的两个球赋予一个attractive 的力。然后计算这些力的累加作用,更新球心,再根据目标一更新半径。对照上面算法伪码很容易理解该思想。

重复这个过程,直到满足收敛条件。

4)收敛条件

理论上说,优化的最终结果应该是的均值为m/4,方差为0,即完全收敛,但是这很容易导致过拟合。

和很多渐进逼近的优化算法一样(例如gradient descent),球哈希算法设置了一个收敛近似精度,来提前停止优化,避免过拟合的发生。

算法对均值和方差设置了一个容忍精度阈值 ,只要优化在一段步骤区间中,达到了这个容忍精度,即表明优化结果,停止优化。

在原论文中,作者对的值实验最佳值分别是10%和15%。 

Relevant Link:

https://engineering.stanford.edu/people/moses-charikar
http://xueshu.baidu.com/s?wd=charikar+%E2%80%9CRandom+Hyperplane%E2%80%9D&tn=SE_baiduxueshu_c1gjeupa&cl=3&ie=utf-8&bs=charikar+Random+Hyperplane&f=8&rsv_bp=1&rsv_sug2=0&sc_f_para=sc_tasktype%3D%7BfirstSimpleSearch%7D
http://sglab.kaist.ac.kr/Spherical_Hashing/
https://blog.csdn.net/u014624632/article/details/79972100
http://sglab.kaist.ac.kr/Spherical_Hashing/Spherical_Hashing.pdf
https://blog.csdn.net/u014624632/article/details/79972100
https://blog.csdn.net/zwwkity/article/details/8565485
https://www.bbsmax.com/A/LPdojpBG53/

 

7. Simhash算法

Simhash是一种降维投影方法,它将一段文本映射为一段固定位数的二进制指纹(fixed length fingerprint),同时,这种fingerprint具有较好的语法/语义一致性。

它由google的Moses Charikar提出。整个算法非常简单精巧,我们这章来阐述一下其算法过程。

0x1:Ngram分词及权重统计

使用ngram对文档进行token化分词,值得注意的是,n值的选取具有一定的技巧:

  • n越大越能找到真正相似的文档,但是n值越大越容易让相似的文档在特征维度上不相似,即n越大,容错度越低;
  • 而n越小就能召回更多的文档,当时n值越小越可能将本来不相似的文档在 特征维度表现相似,即n越小,容错度越高。一个极端的情况,比如n=1,就变成了基本词的比较了。

在分词后,计算每个token的权重,可以通过ngram token词频统计得到w,也可以通过TF-IDF计算。不管用什么方式,核心是将ngram token的权重表征出来。

例如对”How are you?“这段话进行4-gram的切词可以得到:

ngram tokens frequency list:  
{u'owar': 1, u'reyo': 1, u'howa': 1, u'eyou': 1, u'ware': 1, u'arey': 1}

注意,这里权重w为1,只是我们举例比较简单,刚好的巧合。

0x2:Ngram Token哈希化

将每个ngram token都被转换为了一个散列hash,这个散列hash是随机均匀分布的,例如MD5、SHA-1算法。

(u'owar', 1):  333172464361321106773216808497407930520
(u'reyo', 1):  310879434437019318776469684649603935114
(u'howa', 1):  98593099505511350710740956016689849066
(u'eyou', 1):  32675000308058660898513414756955031020
(u'ware', 1):  325869966946114134008620588371145019154
(u'arey', 1):  110781832133915061990833609831166700777

这个哈希化过程主要是完成字符串的数字化。因为对token进行哈希处理的散列函数是像MD5、SHA1这种随机散列函数,散列后的空间是随机均匀的。因此不同的token得到的散列值本身不包含任何信息熵。

那什么东西包含信息熵呢?笔者认为这里的传递下一步的信息熵有两项:

  • token数量:token数量越多,包含的信息自然也就越多
  • token权重:token权重越大,这个token在最终的fingerprint V向量中的”影响力“就越大

0x3:逐位翻译为权重

对每个token hash进行逐位扫描,对某一个token hash来说,如果某一位为1,则赋值一个该token的正权重;如果某位是0,则赋值为该token的负权重。

得到一个N x M矩阵,N为token数量,M为fingerprint向量V的长度,原论文中默认为64bit,我们在实际开发中大多数也使用64bit,这是一个效率与效果比较折中的配置。

(u'owar', 1):  [-1, -1, -1, 1, 1, -1, -1, 1, -1, -1, -1, -1, 1, 1, -1, -1, 1, -1, 1, 1, 1, 1, 1, -1, -1, -1, -1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, -1, -1, 1, 1, -1, -1, 1, 1, 1, 1, 1, 1, -1, 1, -1, 1, -1, -1, 1, -1, 1, -1, 1, 1, 1, 1, -1]
(u'reyo', 1):  [-1, 1, -1, 1, -1, -1, -1, 1, 1, 1, -1, 1, -1, 1, -1, 1, -1, -1, -1, 1, 1, 1, 1, 1, -1, -1, 1, -1, 1, 1, -1, 1, -1, 1, -1, -1, -1, -1, -1, 1, 1, -1, 1, 1, 1, 1, 1, 1, 1, -1, 1, 1, -1, 1, -1, -1, 1, 1, 1, -1, 1, 1, -1, -1]
(u'howa', 1):  [-1, 1, -1, 1, -1, 1, 1, 1, -1, 1, -1, -1, -1, 1, -1, 1, -1, -1, 1, -1, 1, -1, -1, -1, -1, 1, 1, 1, -1, 1, -1, 1, 1, -1, 1, -1, 1, 1, -1, 1, -1, -1, -1, -1, -1, -1, -1, 1, 1, -1, 1, -1, -1, 1, 1, -1, 1, 1, 1, 1, -1, 1, -1, -1]
(u'eyou', 1):  [-1, -1, 1, 1, -1, 1, 1, 1, 1, -1, 1, -1, -1, -1, -1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, 1, 1, 1, -1, -1, -1, -1, 1, 1, 1, 1, -1, -1, 1, 1, -1, -1, 1, 1, -1, -1, -1, 1, -1, -1, -1, -1, 1, -1, 1, -1, -1, -1, 1, -1, -1, -1, -1, -1]
(u'ware', 1):  [-1, 1, -1, -1, 1, -1, -1, -1, 1, 1, 1, 1, -1, -1, 1, -1, -1, 1, 1, -1, 1, 1, -1, 1, -1, 1, -1, 1, -1, 1, -1, -1, -1, -1, -1, 1, 1, -1, -1, -1, -1, 1, 1, 1, -1, -1, 1, -1, 1, -1, -1, 1, 1, -1, 1, -1, -1, 1, 1, -1, 1, 1, -1, 1]
(u'arey', 1):  [1, -1, -1, 1, -1, 1, 1, 1, -1, -1, 1, 1, -1, 1, 1, 1, -1, 1, 1, 1, -1, -1, 1, 1, 1, 1, 1, -1, -1, 1, 1, 1, 1, -1, 1, 1, -1, -1, 1, -1, 1, 1, -1, 1, -1, 1, 1, 1, 1, -1, -1, -1, -1, -1, -1, 1, 1, 1, 1, 1, 1, -1, -1, -1]

这步有一个细节需要注意,即不管上一步token hash的位数多长,这一步都只进行fingerprint V长度的逐位扫描与翻译,这实际上使用裁剪cutoff的方式实现了降维,这种压缩映射会损失一部分准确性,引入一定的信息损失和误报,不过这和我们选择的fingerprint V长度有关,我们选的V越长,例如128bit,这种信息损失就越小。

从信息熵的角度来说,这一步实际就是在将上一步传入的token权重这一信息进行翻译。

0x4:逐位进行列维度sum压缩合并

上一步得到的V是一个由token w组成的N x M矩阵,我们逐位进行纵向的列维度sum压缩:

v:  [-4, 0, -4, 4, -2, 0, 0, 4, 0, 0, 0, 0, -4, 2, -2, 2, -2, 0, 4, 0, 2, 0, 0, 0, -4, 2, 2, 2, -4, 4, -4, 2, 0, 0, 0, 2, -2, -4, -2, 2, 0, -2, 0, 4, -2, 0, 2, 4, 4, -6, 0, -2, 0, -2, 0, -2, 0, 4, 4, 0, 2, 2, -4, -4]

这一步通过将每一bit上的所有信息都压缩综合起来,得到最终的信息表达。

0x5:逐位归一化

上一步得到的V是一个1 x M,这里M已经就是fingerprint长度的向量,默认为64bit,最后一步进行归一化。

逐位bit扫描当前fingerprint向量 V,如果其值>0,则归一化为1;如果其小于零,则归一化为0

v_:  [0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 1, 1, 0, 1, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 1, 0, 0]

用一张图来总结梳理一下上述的几个步骤:

 

这里笔者抛出一个问题来一起思考下,看起来在第四步已经得到了已经降维后的定长向量了,而且向量的每个元素也都是由所有token综合起来得到的,应该能够代表原始的输入文本了,那为啥第五步还要多此一举进行一次0/1归一化呢?背后的原理是啥呢?

0x6:Simhash python实现代码 

# -*- coding: utf-8 -*-

from __future__ import division, unicode_literals

import re
import sys
import hashlib
import logging
import numbers
import collections
from itertools import groupby

if sys.version_info[0] >= 3:
    basestring = str
    unicode = str
    long = int
else:
    range = xrange


def _hashfunc(x):
    return int(hashlib.md5(x).hexdigest(), 16)


class Simhash(object):

    def __init__(
        self, value, f=64, reg=r'[\w\u4e00-\u9fcc]+', hashfunc=None, log=None
    ):
        """
        `f` is the dimensions of fingerprints

        `reg` is meaningful only when `value` is basestring and describes
        what is considered to be a letter inside parsed string. Regexp
        object can also be specified (some attempt to handle any letters
        is to specify reg=re.compile(r'\w', re.UNICODE))

        `hashfunc` accepts a utf-8 encoded string and returns a unsigned
        integer in at least `f` bits.
        """

        self.f = f
        self.reg = reg
        self.value = None

        if hashfunc is None:
            self.hashfunc = _hashfunc
        else:
            self.hashfunc = hashfunc

        if log is None:
            self.log = logging.getLogger("simhash")
        else:
            self.log = log
 
 
        if isinstance(value, Simhash):
            self.value = value.value
        elif isinstance(value, basestring):
            self.build_by_text(unicode(value))
        elif isinstance(value, collections.Iterable):
            self.build_by_features(value)
        elif isinstance(value, numbers.Integral):
            self.value = value
        else:
            raise Exception('Bad parameter with type {}'.format(type(value)))

    def __eq__(self, other):
        """
        Compare two simhashes by their value.

        :param Simhash other: The Simhash object to compare to
        """
        return self.value == other.value

    def _slide(self, content, width=4):
        return [content[i:i + width] for i in range(max(len(content) - width + 1, 1))]

    def _tokenize(self, content):
        content = content.lower()
        content = ''.join(re.findall(self.reg, content))
        ans = self._slide(content)  # ngram slide into tokens list
        return ans

    def build_by_text(self, content):
        # 1. ngram分词
        features = self._tokenize(content)
        # 2. ngram token词频统计,统计得到的词频将作为权重
        features = {k:sum(1 for _ in g) for k, g in groupby(sorted(features))}
        print "ngram tokens frequency list: ", features
        return self.build_by_features(features)

    def build_by_features(self, features):
        """
        `features` might be a list of unweighted tokens (a weight of 1
                   will be assumed), a list of (token, weight) tuples or
                   a token -> weight dict.
        """
        v = [0] * self.f  # 初始化simhash fingerprint V,默认为64bit,每个元素初始化为0
        

        # 逐位为1的掩码,即[1], [10], [100]....[100000(64个)],这个掩码数组的作用是后面进行逐位提取  
        masks = [1 << i for i in range(self.f)]
        print "masks: ", masks
        if isinstance(features, dict):
            features = features.items()
        for f in features:
            v_ = [0] * self.f
            # 如果传入的是一个token string list,则默认每个token string的权重都为1
            if isinstance(f, basestring):
                # 通过散列哈希算法将每个ngram token转换为一个hash序列
                h = self.hashfunc(f.encode('utf-8'))
                w = 1
            # 如果传入的是一个(token, wight)的list,则按照预定的weight进行计算,我们本文默认采用ngram词频统计方式得到weight
            else:
                assert isinstance(f, collections.Iterable)
                h = self.hashfunc(f[0].encode('utf-8'))
                w = f[1]

            # 每个ngram token都被转换为了一个散列hash,这个散列hash是随机均匀分布的
            #print "{0}: ".format(f), h
            # 循环f次(本文是64bit),逐位进行扫描,如果某一位是1,则赋值为该token的正权重;如果某位是0,则赋值为该token的负权重
            for i in range(self.f):
                #print "h & masks[i]: ", h & masks[i]
                v[i] += w if h & masks[i] else -w
                v_[i] += w if h & masks[i] else -w
            print "{0}: ".format(f), v_
        # 在完成对所有ngram token的扫描后,fingerprint向量 V 的每一位bit都是所有token hash在该bit上的权重加和结果。
        print "v: ", v
        ans = 0
        # 逐位bit扫描当前fingerprint向量 V,如果其值>0,则归一化为1;如果其小于零,则归一化为0
        v_ = [0] * self.f
        for i in range(self.f):
            if v[i] > 0:
                ans |= masks[i]
                v_[i] = 1
            else:
                v_[i] = 0
        print "v_: ", v_
        self.value = ans

    def distance(self, another):
        assert self.f == another.f
        x = (self.value ^ another.value) & ((1 << self.f) - 1)
        ans = 0
        while x:
            ans += 1
            x &= x - 1
        return ans


class SimhashIndex(object):

    def __init__(self, objs, f=64, k=2, log=None):
        """
        `objs` is a list of (obj_id, simhash)
        obj_id is a string, simhash is an instance of Simhash
        `f` is the same with the one for Simhash
        `k` is the tolerance
        """
        self.k = k
        self.f = f
        count = len(objs)

        if log is None:
            self.log = logging.getLogger("simhash")
        else:
            self.log = log

        self.log.info('Initializing %s data.', count)

        self.bucket = collections.defaultdict(set)

        for i, q in enumerate(objs):
            if i % 10000 == 0 or i == count - 1:
                self.log.info('%s/%s', i + 1, count)

            self.add(*q)

    def get_near_dups(self, simhash):
        """
        `simhash` is an instance of Simhash
        return a list of obj_id, which is in type of str
        """
        assert simhash.f == self.f

        ans = set()

        for key in self.get_keys(simhash):
            dups = self.bucket[key]
            self.log.debug('key:%s', key)
            if len(dups) > 200:
                self.log.warning('Big bucket found. key:%s, len:%s', key, len(dups))

            for dup in dups:
                sim2, obj_id = dup.split(',', 1)
                sim2 = Simhash(long(sim2, 16), self.f)

                d = simhash.distance(sim2)
                if d <= self.k:
                    ans.add(obj_id)
        return list(ans)

    def add(self, obj_id, simhash):
        """
        `obj_id` is a string
        `simhash` is an instance of Simhash
        """
        assert simhash.f == self.f

        for key in self.get_keys(simhash):
            v = '%x,%s' % (simhash.value, obj_id)
            self.bucket[key].add(v)

    def delete(self, obj_id, simhash):
        """
        `obj_id` is a string
        `simhash` is an instance of Simhash
        """
        assert simhash.f == self.f

        for key in self.get_keys(simhash):
            v = '%x,%s' % (simhash.value, obj_id)
            if v in self.bucket[key]:
                self.bucket[key].remove(v)

    @property
    def offsets(self):
        """
        You may optimize this method according to <http://www.wwwconference.org/www2007/papers/paper215.pdf>
        """
        return [self.f // (self.k + 1) * i for i in range(self.k + 1)]

    def get_keys(self, simhash):
        for i, offset in enumerate(self.offsets):
            if i == (len(self.offsets) - 1):
                m = 2 ** (self.f - offset) - 1
            else:
                m = 2 ** (self.offsets[i + 1] - offset) - 1
            c = simhash.value >> offset & m
            yield '%x:%x' % (c, i)

    def bucket_size(self):
        return len(self.bucket)

使用时,import引入即可:

# -*- coding: utf-8 -*-

from simhash import Simhash, SimhashIndex

if __name__ == '__main__':
    sh = Simhash('How are you? I Am fine. ablar ablar xyz blar blar blar blar blar blar blar Thanks.')
    sh2 = Simhash('How are you i am fine.ablar ablar xyz blar blar blar blar blar blar blar than')
    dis = sh.distance(sh2)

    print "sh: ", sh.value
    print "sh2: ", sh2.value
    print "dis: ", dis

Relevant Link:

https://www.mit.edu/~andoni/LSH/
http://www.cs.princeton.edu/courses/archive/spr04/cos598B/bib/CharikarEstim.pdf
http://people.csail.mit.edu/indyk/ 
https://blog.csdn.net/laobai1015/article/details/78011870 
https://github.com/LittleHann/simhash
https://www.cnblogs.com/hxsyl/p/4518506.html
https://zhuanlan.zhihu.com/p/32078737
https://www.kancloud.cn/kancloud/the-art-of-programming/41614
https://wizardforcel.gitbooks.io/the-art-of-programming-by-july/content/06.03.html
http://yanyiwu.com/work/2014/01/30/simhash-shi-xian-xiang-jie.html
https://www.cnblogs.com/maybe2030/p/5203186.html

0x7:Simhash为什么能实现局部敏感的效果 – Simhash底层的思想原理

笔者认为Simhash之所以可以实现局部敏感,主要原有有两个:

  • rooling piece wise分片思想
  • Sice Token权重思想

1. rooling piece wise分片思想

simhash的hash不是直接通过原始输入文本计算得到的,而是通过ngram分片,将原始输入文本通过滑动窗口分片得到slice token列表,对每一个slice token分别通过某种合理的方式计算一段hash,然后通过某种合理的方式将所有hash综合起来,得到最终的hash。

我们通过一个例子来说明,假设有两段文本:

1. how are u?
2. how are you?

分别使用4-gram进行切片,得到:

1. [u'howa', u'owar', u'ware', u'areu']
2. [u'howa', u'owar', u'ware', u'arey', u'reyo', u'eyou']

可以看到,因为ngram切片的原因,输入文本中的修改只影响到最终ngram list中的最后3个slice token,从而输入文本对最终Hash的影响也从整个散列空间缩小到了最后3个slice token中,这就是所谓的局部敏感算法。

其实基于ngram的切片式特征工程本身就是一个有损信息抽取的特征提取方式,这种信息损失,一方面损失了精度,但是另一方面也带来了对输入局部修改的容忍度。

但是simhash仅仅是感知局部slice token的变化吗?不是,光一个rooling slice checksum是无法提供足够的局部修改容忍度的。

2. Sice Token权重思想

除了rooling piece wise分片思想之外,Simhash还引入了”Sice Token权重思想“,即每个slice Token具体对最终的Hash能产生多大的影响,取决于这些slice Token的权重。

我们还是用一个例子来说明这句话的意思,假设有三段文本:

1. how are u?
2. how are you?
3. how are u? and u? and u? and u? and u?

可以看到,这3段文本都不一样,但是如果我们以第一段文本为基准,可以发现另外2段文本的修改程度是不一样的。

  • 第二段文本只是修改了一个单词(修改了语法),但是主体语义没有变。
  • 第二段文本修改的比较多,整体语法、语义上已经发生了变化。

还是使用4-gram进行切片,得到slice token list:

1. [u'howa', u'owar', u'ware', u'areu']
2. [u'howa', u'owar', u'ware', u'arey', u'reyo', u'eyou']
3. [u'howa', u'owar', u'ware', u'areu', u'reua', u'euan', u'uand', u'andu', u'ndua', u'duan', u'uand', u'andu', u'ndua', u'duan', u'uand', u'andu', u'ndua', u'duan', u'uand', u'andu']

可以看到,后两个输入文本都造成了很多slice token的改变。那最终的simhash受了多少影响呢?影响slice token数量多就是影响多吗?

simhash在slice token之上,还引入了slice token weight一维度信息,simhash不仅统计受影响的slice token,还会统计每个slice token的权重(例如是词频统计,也可以是TF-iDF)。

例如对上面的slice token list进行词频统计得:

1. {u'ware': 1, u'owar': 1, u'howa': 1, u'areu': 1}
2. {u'ware': 1, u'owar': 1, u'howa': 1, u'arey': 1, u'reyo': 1, , u'eyou': 1}
3. {u'ware': 1, u'owar': 1, u'howa': 1, u'areu': 1, u'reua': 1, , u'euan': 1, u'ndua': 3, , u'duan': 3, u'andu': 4, u'uand': 4}

可以看到,第二个文本虽然变动了2个slice token,但是权重不高,对最终的hash的影响有限。但是第三个文本中,不仅出现了较多token变动,而且每个token的权重比较高,它们对最终hash的影响就相对很大了。

为了说明上述的观点,我们来运行一段示例代码:

# -*- coding: utf-8 -*-

from simhash import Simhash, SimhashIndex

if __name__ == '__main__':
    sh1 = Simhash('how are u?')
    sh2 = Simhash('how are you?')
    sh3 = Simhash('how are u? and u? and u? and u? and u?')
    dis_1_2 = sh1.distance(sh2)
    dis_1_3 = sh1.distance(sh3)

    print "sh1: ", sh1.value
    print "sh2: ", sh2.value
    print "sh3: ", sh3.value
 
    print "dis_1_2: ", dis_1_2
    print "dis_1_3: ", dis_1_3

可以看到,文本2和文本1的距离,小于文本3和文本1的距离。 

笔者认为,Simhash比Ssdeep效果好的主要原因之一就在于这第二点,即Slice Token权重思想,借助权重均值化这种hash化方法,使得Simhash对多处少量的局部可以具备更大的容忍度。

0x8:Simhas的降维过程

这里提醒读者朋友注意一个细节,simhash的降维过程分成了2个环节。第一个环节中,原始ascii特征空间被降维到了ngram token特征空间;第二个环节中,ngram token特征空间被降维到了一个定长的fingerprint hashbit空间中,第二步降维的本质上也是一个线性变换的过程,从矩阵列向量的角度可以看的非常明显。

0x9:Simhash与随机超平面hash算法的联系与区别 

Simhash算法与上文提到随机超平面哈希之间是什么关系呢?一言以蔽之:Simhash是随机超平面投影的一种特殊实现,本质上属于随机超平面投影的一种

怎么理解这句话呢?笔者带领大家从列向量的视角来重新审视一下simhash的计算过程。simhash的具体原理这里不再赘述,文章前面已经详细讨论过了,这里直接进入正题。

假设输入文本经过ngram之后得到5个词token,并通过词频统计得到这5个词token的权重向量d,d = (w1=1,w2=2,w3=0,w4=3,w5=0) 

simhash中是通过散列哈希的方法得到每个词token的一个向量化表示,这里我们抓住其本质,即散列哈希每一个词token的本质目的就是为了定义一个低维的向量空间。

假设这5个词token对应的3维向量分别为:

h(w1) = (1, -1, 1)
h(w2) = (-1, 1, 1)
h(w3) = (1, -1, -1)
h(w4) = (-1, -1, 1)
h(w5) = (1, 1, -1)

按照simhash的算法,是将每个词token向量乘上对应的权重w,然后再按照列相加起来,即

m = w1 * h(w1) + w2 * h(w2) + w3 * h(w3) + w4 * h(w4) + w5 * h(w5)
= 1 * h(w1) + 2 * h(w2) + 0 * h(w3) + 3 * h(w4) + 0 * h(w5) = (-4, -2, 6)

实际上,上述过程可以使用列向量矩阵的方式来一步完成:

接下来simhash的0/1归一化,其实就是sgn符号函数。

可以看到,simhash算法产生的结果与随机超平面投影的结果是一致的。

更进一步地说,在simhash中,随机超平面,被词token的权重向量代替了,词token权重向量作为超平面和原始向量进行内积计算,计算其夹角。

simhash算法得到的两个签名的汉明距离,可以用来衡量原始向量的夹角。

Relevant Link:

http://www.cs.princeton.edu/courses/archive/spr04/cos598B/bib/CharikarEstim.pdf

 

8. Kernel LSH

前面讨论的几种LSH算法,基本可以解决一般情况下的问题,不过对于某些特定情况还是不行,比如:

  • 输入的key值不是均匀分布在整个空间中,可能只是集中在某个小区域内,需要在这个区域内放大距离尺度
  • 如果我们采用直方图作为特征,往往会dense一些,向量只分布在大于0的区域中,不适合采用cosine距离,而stable Distribution投影方法参数太过敏感,实际设计起来较为困难和易错

其实如果我们从计算公式的角度来看前面讨论的几种LSH,发现其形式都可以表示成内积的形式,提到内积自然会想到kernel方法,LSH也同样可以使用kernel核方法,关于Kernel LSH的工作可参看下面这三篇文章。

Relevant Link:

http://www.robots.ox.ac.uk/~vgg/rg/papers/klsh.pdf 2009年ICCV上的 Kernelized Locality-Sensitive Hashing for Scalable Image Search
http://machinelearning.wustl.edu/mlpapers/paper_files/NIPS2009_0146.pdf 2009年NIPS上的Locality-Sensitive Binary Codes From Shift-Invariant Kernels
http://pages.cs.wisc.edu/~brecht/papers/07.rah.rec.nips.pdf 2007年NIPS上的Random Features for Large-Scale Kernel Machines

 

9. SSDEEP模糊化哈希算法

模糊哈希算法,又叫基于内容分割的分片分片哈希算法(context triggered piecewise hashing, CTPH)。

0x1:ssdeep主要算法思想

笔者认为,ssdeep算法的主要思想有以下几点:

  • Dynamic piecewise hashing动态分片哈希思想:基于输入文本的长度,进行动态分片,将局部的改变限制在一个有限长度的窗口内。

  • Slice piece hash cutoff压缩映射:本质是信息裁剪,通过主动丢失信息的方式换取一定的改变容忍度。

0x2:算法流程

1. 动态决定分片阈值

ssdeep的分片不是ngram那种固定size的滑动窗口机制,而是根据输入文本的长度动态算出的一个n值。

我们知道,即便对弱哈希,都具备随机均匀散列的性质,即产生的结果在其映射空间上是接近于均匀分布的。

在ssdeep中,n的值始终取2的整数次方,这样Alder-32哈希值(每个byte的滚动hash)除以n的余数也接近于均匀分布。仅当余数等于n-1时分片,就相当于只有差不多1/n的情况下会分片。也就是说,对一个文件,没往前读取一个byte,就有1/n的可能要分片。

在ssdeep中,每次都是将n除以或者乘以2,来调整,使最终的片数尽可能在32到64之间。

 bs = 3
 while bs * MAX_LENGTH < length:
   bs *= 2

同时在ssdeep中,n的值会作为一个最终结果的一部分出现,在比较的时候,n会作为一个考量因素被计入考量,具体细节后面会讨论。 

上述策略下,一个新问题出现了。这是一种比较极端的情况。假设一个文件使用的分片值n。在该文件中改动一个字节(修改、插入、删除等),且这个改动影响了分片的数量,使得分片数增加或减少,例如把n乘以或者除以2。因此,即便对文件的一个字节改动,也可能导致分片条件n的变化,从而导致分片数相差近一倍,而得到的结果可能会发生巨大的变化,如何解决这个问题?

ssdeep解决这种问题的思考是加入冗余因子,将边界情况也纳入进来。

对每一个文件,它同时使用n和n/2作为分片值,算得两个不同的模糊哈希值,而这两个值都使用。因此,最后得到的一个文件的模糊哈希值是:

n : h(n) : h(n/2)

而在比较时,如果两个文件的分片值分别为n和m,则判断是否有n==m, n==2m, 2n==m三种情况,如果有之一,则将两者相应的模糊哈希值进行比较。例如,如果n==2m,则比较h(n/2)与h(m)是否相似。这样,在一定程序上解决了分片值变化的问题。

2. 逐字节读取,计算Rooling Hash,并进行动态分片

ssdeep逐字节读取输入文本内容,并采用滚动哈希算法(rolling hashing)不断叠加式计算最新的hash,在ssdeep中,使用Alder-32 [4] 算法作为弱哈希。它实际是一种用于校验和的弱哈希,类似于CRC32,不能用于密码学算法,但计算快速,生成4字节哈希值,并且是滚动哈希。

得到了当前byte对应的滚动hash值后,ssdeep基于动态分片阈值(上一节讨论过)以及滚动Hash的当前State状态值动态决定每一步(byte)是否分片。

  • 哈希值除以n的余数恰好等于n-1时,就在当前位置分片

  • 否则,不分片,窗口往后滚动一个字节,然后再次计算Alder-32哈希值并判断,如此继续

3. token哈希化

和simhash一样,对每个token进行随机散列哈希化,可以使用传统的哈希算法,例如MD5。在ssdeep中,使用一个名为Fowler-Noll-Vo hash的哈希算法。

这一步没有什么特别意义,纯粹是一个信息传递过程。

4. token压缩映射

对每一个文件分片,计算得到一个哈希值以后,可以选择将结果压缩短。例如,在ssdeep中,只取FNV(Fowler-Noll-Vo hash的哈希算法)哈希结果的最低6位,并用一个ASCII字符表示出来,作为这个分片的最终哈希结果。

这一步的压缩映射损失了一部分的信息,但是带来了一定的冗余度的提升。 

5. 连接哈希值

将每片压缩后的哈希值连接到一起,就得到这个文件的模糊哈希值了(hash)。如果分片条件参数n对不同文件可能不同,还应该将n纳入模糊哈希值中。

':'.join([str(bs), hash1, hash2])

注意,上文提到的h(n)和h(n/2)都要拼接进来

6. 比较哈希值

在ssdeep中,采用的如下思路。由于ssdeep对每一片得到的哈希值是一个ASCII字符,最终得到的文件模糊哈希值就是一个字符串了。假设是s1、s2,将s1到s2的“加权编辑距离”(weighted edit distance)作为评价其相似性的依据。

接下来,ssdeep将这个距离除以s1和s2的长度和,以将绝对结果变为相对结果,再映射到0-100的一个整数值上,其中,100表示两个字符串完全一致,而0表示完全不相似。

0x3:ssdeep对输入文本改动的容忍情况分析

我们来模拟分析一下模糊哈希是如何面对不同程度的文本修改,以及又是如何在各种修改情况下进行相似性分析的,通过这个例子我们可以更清晰地理解ssdeep的工作原理。

我们以归纳推理的方法来展开分析,不管对原始输入文本进行如何程度的修改,都可以从单个字符的修改这里推演得到,复杂的增删改查是简单原子的修改的组合与叠加,这是部分与整体的关系。

如果在一个输入文本中修改一个字节,对ssdeep hash来说,有几种情况:

 

0x4:python代码实现

# -*- coding: utf-8 -*-

import numpy as np
import collections
import doctest
import pprint


def INSERTION(A, cost=1):
    return cost


def DELETION(A, cost=1):
    return cost


def SUBSTITUTION(A, B, cost=1):
    return cost


Trace = collections.namedtuple("Trace", ["cost", "ops"])


class WagnerFischer(object):
    # Initializes pretty printer (shared across all class instances).
    pprinter = pprint.PrettyPrinter(width=75)

    def __init__(self,
                 A,
                 B,
                 insertion=INSERTION,
                 deletion=DELETION,
                 substitution=SUBSTITUTION):
        # Stores cost functions in a dictionary for programmatic access.
        self.costs = {"I": insertion, "D": deletion, "S": substitution}
        # Initializes table.
        self.asz = len(A)
        self.bsz = len(B)
        self._table = [[None for _ in range(self.bsz + 1)]
                       for _ in range(self.asz + 1)]
        # From now on, all indexing done using self.__getitem__.
        ## Fills in edges.
        self[0][0] = Trace(0, {"O"})  # Start cell.
        for i in range(1, self.asz + 1):
            self[i][0] = Trace(self[i - 1][0].cost + self.costs["D"](A[i - 1]),
                               {"D"})
        for j in range(1, self.bsz + 1):
            self[0][j] = Trace(self[0][j - 1].cost + self.costs["I"](B[j - 1]),
                               {"I"})
        ## Fills in rest.
        for i in range(len(A)):
            for j in range(len(B)):
                # Cleans it up in case there are more than one check for match
                # first, as it is always the cheapest option.
                if A[i] == B[j]:
                    self[i + 1][j + 1] = Trace(self[i][j].cost, {"M"})
                # Checks for other types.
                else:
                    costD = self[i][j + 1].cost + self.costs["D"](A[i])
                    costI = self[i + 1][j].cost + self.costs["I"](B[j])
                    costS = self[i][j].cost + self.costs["S"](A[i], B[j])
                    min_val = min(costI, costD, costS)
                    trace = Trace(min_val, set())
                    # Adds _all_ operations matching minimum value.
                    if costD == min_val:
                        trace.ops.add("D")
                    if costI == min_val:
                        trace.ops.add("I")
                    if costS == min_val:
                        trace.ops.add("S")
                    self[i + 1][j + 1] = trace
        # Stores optimum cost as a property.
        self.cost = self[-1][-1].cost

    def __repr__(self):
        return self.pprinter.pformat(self._table)

    def __iter__(self):
        for row in self._table:
            yield row

    def __getitem__(self, i):
        """
        Returns the i-th row of the table, which is a list and so
        can be indexed. Therefore, e.g.,  self[2][3] == self._table[2][3]
        """
        return self._table[i]

    # Stuff for generating alignments.

    def _stepback(self, i, j, trace, path_back):
        """
        Given a cell location (i, j) and a Trace object trace, generate
        all traces they point back to in the table
        """
        for op in trace.ops:
            if op == "M":
                yield i - 1, j - 1, self[i - 1][j - 1], path_back + ["M"]
            elif op == "I":
                yield i, j - 1, self[i][j - 1], path_back + ["I"]
            elif op == "D":
                yield i - 1, j, self[i - 1][j], path_back + ["D"]
            elif op == "S":
                yield i - 1, j - 1, self[i - 1][j - 1], path_back + ["S"]
            elif op == "O":
                return  # Origin cell, so we"re done.
            else:
                raise ValueError("Unknown op {!r}".format(op))

    def alignments(self):
        """
        Generate all alignments with optimal-cost via breadth-first
        traversal of the graph of all optimal-cost (reverse) paths
        implicit in the dynamic programming table
        """
        # Each cell of the queue is a tuple of (i, j, trace, path_back)
        # where i, j is the current index, trace is the trace object at
        # this cell, and path_back is a reversed list of edit operations
        # which is initialized as an empty list.
        queue = collections.deque(
            self._stepback(self.asz, self.bsz, self[-1][-1], []))
        while queue:
            (i, j, trace, path_back) = queue.popleft()
            if trace.ops == {"O"}:
                # We have reached the origin, the end of a reverse path, so
                # yield the list of edit operations in reverse.
                yield path_back[::-1]
                continue
            queue.extend(self._stepback(i, j, trace, path_back))

    def IDS(self):
        """
        Estimates insertions, deletions, and substitution _count_ (not
        costs). Non-integer values arise when there are multiple possible
        alignments with the same cost.
        """
        npaths = 0
        opcounts = collections.Counter()
        for alignment in self.alignments():
            # Counts edit types for this path, ignoring "M" (which is free).
            opcounts += collections.Counter(op for op in alignment
                                            if op != "M")
            npaths += 1
        # Averages over all paths.
        return collections.Counter({o: c / npaths
                                    for (o, c) in opcounts.items()})


FNV_PRIME = 0x01000193
FNV_INIT = 0x28021967
MAX_LENGTH = 64
B64 = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"


class Last7chars(object):
    def __init__(self):
        self._reset_rollhash()

    def _reset_rollhash(self):
        self.roll_h1 = 0
        self.roll_h2 = 0
        self.roll_h3 = 0
        self.ringbuffer = [0] * 7
        self.writeindex = 0

    def _roll_hash(self, char):
        char7bf = self.readwrite(char)
        self.roll_h2 += 7 * char - self.roll_h1
        self.roll_h1 += char - char7bf
        self.roll_h3 <<= 5
        self.roll_h3 &= 0xffffffff
        self.roll_h3 ^= char
        return self.roll_h1 + self.roll_h2 + self.roll_h3

    def readwrite(self, num):
        retval = self.ringbuffer[self.writeindex]
        self.ringbuffer[self.writeindex] = num
        self.writeindex = (self.writeindex + 1) % 7
        return retval

    def __repr__(self):
        arr = self.ringbuffer[
            self.writeindex:] + self.ringbuffer[:self.writeindex]
        return " ".join(map(str, arr))


def _update_fnv(fnvhasharray, newchar):
    fnvhasharray *= FNV_PRIME
    fnvhasharray &= 0xffffffff
    fnvhasharray ^= newchar
    return fnvhasharray


def _calc_initbs(length):
    bs = 3
    while bs * MAX_LENGTH < length:
        bs *= 2

    if bs > 3:  #proably checking for integer overflow here?
        return bs
    return 3


def ssdeep_hash(content):
    bs = _calc_initbs(len(content))
    #print "bs: ", bs

    hash1 = ''
    hash2 = ''

    last7chars = Last7chars()

    while True:
        last7chars._reset_rollhash()
        fnv1 = FNV_INIT
        fnv2 = FNV_INIT
        hash1 = ''
        hash2 = ''
        fnvarray = np.array([fnv1, fnv2])

        for i in range(len(content)):   # 逐bytes扫描
            c = ord(content[i])
            # 使用Alder-32 [4] 算法作为弱哈希。它实际是一种用于校验和的弱哈希,类似于CRC32,不能用于密码学算法,但计算快速,生成4字节哈希值,并且是滚动哈希。
            h = last7chars._roll_hash(c)
            #print "h_roll_hash: ", h
            fnvarray = _update_fnv(fnvarray, c)

            # 当Alder-32哈希值除以n的余数恰好等于n-1时,就在当前位置分片;否则,不分片,窗口往后滚动一个字节,然后再次计算Alder-32哈希值并判断,如此继续
            # 1. 使用bs作为分片值
            if h % bs == (bs - 1) and len(hash1) < (MAX_LENGTH - 1):
                # 对每片分别计算哈希了。可以使用传统的哈希算法,例如MD5。在ssdeep中,使用一个名为Fowler-Noll-Vo hash的哈希算法
                b64char = B64[fnvarray[0] & 63]
                hash1 += b64char
                fnvarray[0] = FNV_INIT
            # 2. 使用2*bs作为分片值
            if h % (2 * bs) == (2 * bs - 1) and len(hash2) < (
                    MAX_LENGTH / 2 - 1):
                b64char = B64[fnvarray[1] & 63]
                hash2 += b64char
                fnvarray[1] = FNV_INIT

        # 将每片压缩后的哈希值连接到一起,就得到这个文件的模糊哈希值了
        hash1 += B64[fnvarray[0] & 63]  # 对每一个文件分片,计算得到一个哈希值以后,可以选择将结果压缩短。例如,在ssdeep中,只取FNV哈希结果的最低6位,并用一个ASCII字符表示出来,作为这个分片的最终哈希结果
        hash2 += B64[fnvarray[1] & 63]  # 这里 &63,等价于取最低6bit

        if bs <= 3 or len(hash1) > (MAX_LENGTH / 2):
            break
        bs = int(bs / 2)
        if bs < 3:
            bs = 3
    # 对每一个文件,它同时使用n和n/2作为分片值,算得两个不同的模糊哈希值,而这两个值都使用。因此,最后得到的一个文件的模糊哈希值是: n:h(n):h(n/2)
    return ':'.join([str(bs), hash1, hash2])


#from https://en.wikibooks.org/wiki/Algorithm_Implementation/Strings/Longest_common_substring#Python_2
def longest_common_substring(s1, s2):
    m = [[0] * (1 + len(s2)) for i in xrange(1 + len(s1))]
    longest, x_longest = 0, 0
    for x in xrange(1, 1 + len(s1)):
        for y in xrange(1, 1 + len(s2)):
            if s1[x - 1] == s2[y - 1]:
                m[x][y] = m[x - 1][y - 1] + 1
                if m[x][y] > longest:
                    longest = m[x][y]
                    x_longest = x
            else:
                m[x][y] = 0
    return s1[x_longest - longest:x_longest]


def _likeliness(min_lcs, a, b):
    # 如果最长公共子串长度不满足要求,则直接退出
    if longest_common_substring(a, b) < min_lcs:
        return 0

    # Wagner Fischer算法(字符串编辑距离,Edit Distance)
    dist = WagnerFischer(a, b).cost
    # ssdeep将这个距离除以s1和s2的长度和,以将绝对结果变为相对结果,再映射到0-100的一个整数值上,其中,100表示两个字符串完全一致,而0表示完全不相似
    dist = int(dist * MAX_LENGTH / (len(a) + len(b)))
    dist = int(100 * dist / 64)
    if dist > 100:
        dist = 100
    return 100 - dist


def ssdeep_compare(hashA, hashB, min_lcs=7):
    bsA, hs1A, hs2A = hashA.split(':')  #blocksize, hash1, hash2
    bsB, hs1B, hs2B = hashB.split(':')

    bsA = int(bsA)
    bsB = int(bsB)

    like = 0

    # 在比较时,如果两个文件的分片值分别为n和m,则判断是否有n==m, n==2m, 2n==m三种情况,如果有之一,则将两者相应的模糊哈希值进行比较。例如,如果n==2m,则比较h(n/2)与h(m)是否相似
    #block size comparison
    if bsA == bsB:
        #compare both hashes
        like1 = _likeliness(min_lcs, hs1A, hs1B)
        like2 = _likeliness(min_lcs, hs2A, hs2B)
        like = max(like1, like2)
    elif bsA == 2 * bsB:
        # Compare hash_bsA with hash_2*bsB
        like = _likeliness(min_lcs, hs1A, hs2B)
    elif 2 * bsA == bsB:
        # Compare hash_2*bsA with hash_bsB
        like = _likeliness(min_lcs, hs2A, hs1B)
    else:  #nothing suitable to compare
        like = 0
    return like


if __name__ == '__main__':
    import sys
    content1 = "this is a test!"
    content2 = "this is a test."
    hash1 = ssdeep_hash(content1)
    print hash1
    hash2 = ssdeep_hash(content2)
    print hash2
    similarity = ssdeep_compare(hash1, hash2)
    print similarity

Relevant Link: 

https://github.com/LittleHann/ssdeeppy
https://ssdeep-project.github.io/ssdeep/
https://www.claudxiao.net/2012/02/fuzzy_hashing/#comment-457473

 

版权声明:本文为LittleHann原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/LittleHann/p/11002030.html