蹲厕所的熊 转载请注明原创出处,谢谢!
1、概述
位图(Bitmap),即位(Bit)的集合,是一种常用的数据结构,可用于记录大量的0-1状态,在很多地方都会用到,比如Linux内核(如inode,磁盘块)、Bloom Filter算法等,其优势是可以在一个非常高的空间利用率下保存大量0-1状态。在Java中,直接面向程序员的最小数据操作粒度是byte,并不提供能够直接操作某个bit的途径,但是程序员可以通过使用位运算符(& | ~ << >> 等等)自己封装位操作。如果不想自己动手,可以使用Java内置的BitSet类,其实现了位图数据结构并提供了一系列有用的方法。
2、使用
public class BitSetTest {
public static void main(String[] args) {
BitSet bitSet = new BitSet(128);
bitSet.set(77);
// true
System.out.println(bitSet.get(77));
}
}
一个简单的例子,可能大家刚开始不太好理解BitSet,其实你们就把BitSet想象成是一个拥有N个槽位的大数组,而set方法就是找到对应的槽位并填充,这样下次get对应槽位就会返回是否被填充过。上面这个例子就是在77这个位置进行了填充。
那么这个类有什么作用呢?我们先来分析它的源码,最后给出一个面试经常遇到的题目让大家深入理解。
3、源码分析
先来看看内部的一些属性所代表的含义
属性
/*
* BitSet由很多个words组成,每一个words是long类型,一个long是64bit,需要6个地址位(2的6次方)
*/
private final static int ADDRESS_BITS_PER_WORD = 6;
// 1算数左移6位,即1 × 2^6 = 64 即 01000000
private final static int BITS_PER_WORD = 1 << ADDRESS_BITS_PER_WORD;
// 1 x 2^6 - 1 = 63 即 00111111
private final static int BIT_INDEX_MASK = BITS_PER_WORD - 1;
/* 用于向左或向右移动部分word的掩码 */
private static final long WORD_MASK = 0xffffffffffffffffL;
至于为什么选择long这种数据类型,注释的解析是基于性能的原因,现在64位CPU已经非常普及,可以一次把一个64bit长度的long放进寄存器作计算。
/**
* 对应于serialField “bits” 的内部字段。
*/
private long[] words;
/**
* BitSet中words的实际使用长度
*/
private transient int wordsInUse = 0;
/**
* words的大小是不是由用户指定的,如果是我们假设知道用户在做什么,并尽力保存它
*/
private transient boolean sizeIsSticky = false;
属性words即为实际存储数据的地方。
构造函数
/**
* 初始化words长度为1,所有bit的初始化false
*/
public BitSet() {
initWords(BITS_PER_WORD);
sizeIsSticky = false;
}
/**
* 创建一个初始大小足够大的位集,以便明确地表示索引在范围在 0 到 nbits-1 范围内的位。
* 所有bit初始化都是false
*/
public BitSet(int nbits) {
// nbits不能为负数,可以为0
if (nbits < 0)
throw new NegativeArraySizeException("nbits < 0: " + nbits);
initWords(nbits);
sizeIsSticky = true;
}
/**
* 初始化words,数组长度为 (nbits-1)/64 + 1
*/
private void initWords(int nbits) {
words = new long[wordIndex(nbits-1) + 1];
}
/**
* 算出words的index
* 返回 bitIndex / 64
*/
private static int wordIndex(int bitIndex) {
return bitIndex >> ADDRESS_BITS_PER_WORD;
}
/**
* 内部访问
* 使用传入的words做为初始化words
* words的最后一个word不能为0
*/
private BitSet(long[] words) {
this.words = words;
this.wordsInUse = words.length;
checkInvariants();
}
用户如果知道长度的情况下,最好指定一个长度,一开始就初始化好,避免后面进行扩容操作。
set方法
/**
* 在指定的index设置bit
*/
public void set(int bitIndex) {
if (bitIndex < 0)
throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);
// 定位以及检查是否需要扩容
int wordIndex = wordIndex(bitIndex);
expandTo(wordIndex);
// 使用或操作设置bit值
words[wordIndex] |= (1L << bitIndex); // Restores invariants
checkInvariants();
}
/**
* 在指定的index根据value做操作
* value=true 设置值, value=false 清除值
*/
public void set(int bitIndex, boolean value) {
if (value)
set(bitIndex);
else
clear(bitIndex);
}
set方法首先根据wordIndex定位long数组的位置,然后通过expandTo检查是否需要扩容。最后使用或操作设置对应槽位的bit值。
/**
* 确保BitSet可以容纳给定的wordIndex,暂时违反不变量原则。
* 调用者必须在返回给用户之前恢复不变式,可能使用recalculateWordsInUse()。
* @param wordIndex 计算出放在words里面的数组下标index
*/
private void expandTo(int wordIndex) {
// 需要的words数组长度
int wordsRequired = wordIndex+1;
if (wordsInUse < wordsRequired) {
// 扩容
ensureCapacity(wordsRequired);
wordsInUse = wordsRequired;
}
}
/**
* 确保BitSet能够存放足够的words
* @param wordsRequired words需要的数组长度
*/
private void ensureCapacity(int wordsRequired) {
if (words.length < wordsRequired) {
// 扩容机制:2倍words的长度 和 wordsRequired比较,谁大选谁
int request = Math.max(2 * words.length, wordsRequired);
// 数组copy
words = Arrays.copyOf(words, request);
sizeIsSticky = false;
}
}
上面两个set方法都只是对单个槽位进行set操作,BitSet还提供了对连续的一串值进行set操作。
/**
* 设置 [fromIndex, toIndex) 范围的值为1
*/
public void set(int fromIndex, int toIndex) {
checkRange(fromIndex, toIndex);
if (fromIndex == toIndex)
return;
// Increase capacity if necessary
int startWordIndex = wordIndex(fromIndex);
int endWordIndex = wordIndex(toIndex - 1);
expandTo(endWordIndex);
// 左移fromIndex个位置,右边空位以0补齐
long firstWordMask = WORD_MASK << fromIndex;
// 右移(64-toIndex)个位置,也就是从右边起往左toIndex个位置是1,其他位置都是0
long lastWordMask = WORD_MASK >>> -toIndex;
if (startWordIndex == endWordIndex) {
// Case 1: 同一个word 取firstWordMask和lastWordMask相交的为1的部分
words[startWordIndex] |= (firstWordMask & lastWordMask);
} else {
// Case 2: 多个words
// 处理第一个word
words[startWordIndex] |= firstWordMask;
// 如果有中间words,处理它
for (int i = startWordIndex+1; i < endWordIndex; i++)
words[i] = WORD_MASK;
// 处理最后一个word
words[endWordIndex] |= lastWordMask;
}
checkInvariants();
}
/**
* 在指定的范围[fromIndex, toIndex)根据value做操作
* value=true 设置值, value=false 清除值
*/
public void set(int fromIndex, int toIndex, boolean value) {
if (value)
set(fromIndex, toIndex);
else
clear(fromIndex, toIndex);
}
set(int fromIndex, int toIndex)的思路就是先定位到long数组words的哪几个位置,如果fromIndex和toIndex在同一个word里,就直接把他们之间的数都设为1。如果fromIndex和toIndex不在同一个word里,就依次设置不同的word的值。
get方法
/**
* 返回指定的bitIndex位的bit是不是1,是1返回true,否则返回false
*/
public boolean get(int bitIndex) {
if (bitIndex < 0)
throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);
checkInvariants();
int wordIndex = wordIndex(bitIndex);
return (wordIndex < wordsInUse)
&& ((words[wordIndex] & (1L << bitIndex)) != 0);
}
/**
* 返回[fromIndex, toIndex)的值的一个BitSet
*/
public BitSet get(int fromIndex, int toIndex) {
checkRange(fromIndex, toIndex);
checkInvariants();
int len = length();
// If no set bits in range return empty bitset
if (len <= fromIndex || fromIndex == toIndex)
return new BitSet(0);
// An optimization
if (toIndex > len)
toIndex = len;
BitSet result = new BitSet(toIndex - fromIndex);
int targetWords = wordIndex(toIndex - fromIndex - 1) + 1;
int sourceIndex = wordIndex(fromIndex);
boolean wordAligned = ((fromIndex & BIT_INDEX_MASK) == 0);
// Process all words but the last word
for (int i = 0; i < targetWords - 1; i++, sourceIndex++)
result.words[i] = wordAligned ? words[sourceIndex] :
(words[sourceIndex] >>> fromIndex) |
(words[sourceIndex+1] << -fromIndex);
// Process the last word
long lastWordMask = WORD_MASK >>> -toIndex;
result.words[targetWords - 1] =
((toIndex-1) & BIT_INDEX_MASK) < (fromIndex & BIT_INDEX_MASK)
? /* straddles source words */
((words[sourceIndex] >>> fromIndex) |
(words[sourceIndex+1] & lastWordMask) << -fromIndex)
:
((words[sourceIndex] & lastWordMask) >>> fromIndex);
// Set wordsInUse correctly
result.wordsInUse = targetWords;
result.recalculateWordsInUse();
result.checkInvariants();
return result;
}
清空BitSet
/**
* 设置指定的index位为0
*/
public void clear(int bitIndex) {
if (bitIndex < 0)
throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);
int wordIndex = wordIndex(bitIndex);
if (wordIndex >= wordsInUse)
return;
words[wordIndex] &= ~(1L << bitIndex);
recalculateWordsInUse();
checkInvariants();
}
/**
* 清除范围[fromIndex, toIndex)的bit为0
*/
public void clear(int fromIndex, int toIndex) {
checkRange(fromIndex, toIndex);
if (fromIndex == toIndex)
return;
int startWordIndex = wordIndex(fromIndex);
if (startWordIndex >= wordsInUse)
return;
int endWordIndex = wordIndex(toIndex - 1);
if (endWordIndex >= wordsInUse) {
toIndex = length();
endWordIndex = wordsInUse - 1;
}
long firstWordMask = WORD_MASK << fromIndex;
long lastWordMask = WORD_MASK >>> -toIndex;
if (startWordIndex == endWordIndex) {
// Case 1: One word
words[startWordIndex] &= ~(firstWordMask & lastWordMask);
} else {
// Case 2: Multiple words
// Handle first word
words[startWordIndex] &= ~firstWordMask;
// Handle intermediate words, if any
for (int i = startWordIndex+1; i < endWordIndex; i++)
words[i] = 0;
// Handle last word
words[endWordIndex] &= ~lastWordMask;
}
recalculateWordsInUse();
checkInvariants();
}
/**
* 设置所有的BitSet为0
*/
public void clear() {
while (wordsInUse > 0)
words[--wordsInUse] = 0;
}
反转某一位
/**
* 反转指定的bitIndex位的数
*/
public void flip(int bitIndex) {
if (bitIndex < 0)
throw new IndexOutOfBoundsException("bitIndex < 0: " + bitIndex);
int wordIndex = wordIndex(bitIndex);
expandTo(wordIndex);
words[wordIndex] ^= (1L << bitIndex);
recalculateWordsInUse();
checkInvariants();
}
/**
* 反转指定范围[fromIndex, toIndex)位的数
*/
public void flip(int fromIndex, int toIndex) {
checkRange(fromIndex, toIndex);
if (fromIndex == toIndex)
return;
int startWordIndex = wordIndex(fromIndex);
int endWordIndex = wordIndex(toIndex - 1);
expandTo(endWordIndex);
long firstWordMask = WORD_MASK << fromIndex;
long lastWordMask = WORD_MASK >>> -toIndex;
if (startWordIndex == endWordIndex) {
// Case 1: One word
words[startWordIndex] ^= (firstWordMask & lastWordMask);
} else {
// Case 2: Multiple words
// Handle first word
words[startWordIndex] ^= firstWordMask;
// Handle intermediate words, if any
for (int i = startWordIndex+1; i < endWordIndex; i++)
words[i] ^= WORD_MASK;
// Handle last word
words[endWordIndex] ^= lastWordMask;
}
recalculateWordsInUse();
checkInvariants();
}
反转的原理就是把某一位变为它的相反数,使用的异或1的操作。如果bit是1,1^1=0,如果bit是0,0^1=1,这样就能够进行反转了。
几个静态函数
/**
* 返回一个包含longs数组的新bit set集合
*
* 更确切的说
* BitSet.valueOf(longs).get(n) == ((longs[n/64] & (1L<<(n%64))) != 0)
* 上面的 n < 64 * longs.length
*
* 这个方法相当于 BitSet.valueOf(LongBuffer.wrap(longs))
*/
public static BitSet valueOf(long[] longs) {
int n;
for (n = longs.length; n > 0 && longs[n - 1] == 0; n--)
;
return new BitSet(Arrays.copyOf(longs, n));
}
/**
* 返回一个新的BitSet,它包含position和limit之间的给定long缓冲区中的所有bit。
*/
public static BitSet valueOf(LongBuffer lb) {
lb = lb.slice();
int n;
for (n = lb.remaining(); n > 0 && lb.get(n - 1) == 0; n--)
;
long[] words = new long[n];
lb.get(words);
return new BitSet(words);
}
/**
* 返回一个包含bytes数组的新bit set集合
*
* 更确切的说
* BitSet.valueOf(bytes).get(n) == ((bytes[n/8] & (1<<(n%8))) != 0)
* 上面的 n < 8 * bytes.length
*
* 这个方法相当于 BitSet.valueOf(ByteBuffer.wrap(bytes))
*/
public static BitSet valueOf(byte[] bytes) {
return BitSet.valueOf(ByteBuffer.wrap(bytes));
}
/**
* 返回一个新的BitSet,它包含position和limit之间的给定byte缓冲区中的所有bit。
*/
public static BitSet valueOf(ByteBuffer bb) {
bb = bb.slice().order(ByteOrder.LITTLE_ENDIAN);
int n;
for (n = bb.remaining(); n > 0 && bb.get(n - 1) == 0; n--)
;
long[] words = new long[(n + 7) / 8];
bb.limit(n);
int i = 0;
while (bb.remaining() >= 8)
words[i++] = bb.getLong();
for (int remaining = bb.remaining(), j = 0; j < remaining; j++)
words[i] |= (bb.get() & 0xffL) << (8 * j);
return new BitSet(words);
}
其他方法
/**
* 返回指定起始索引处或之后出现的第一个设置为1的bit的索引。
* 如果不存在这样的bit,则返回{@code -1}。
*
* 要遍历BitSet中的{true}位,请使用以下循环:
*
* <pre> {@code
* for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i+1)) {
* // operate on index i here
* if (i == Integer.MAX_VALUE) {
* break; // or (i+1) would overflow
* }
* }}</pre>
*/
public int nextSetBit(int fromIndex) {
if (fromIndex < 0)
throw new IndexOutOfBoundsException("fromIndex < 0: " + fromIndex);
checkInvariants();
int u = wordIndex(fromIndex);
if (u >= wordsInUse)
return -1;
// 首先定位到fromIndex所在的word,并且把低位到fromIndex的bit设置为0
long word = words[u] & (WORD_MASK << fromIndex);
/**
* 如果word != 0,返回低位到不为0的数的index
* 注:Long.numberOfTrailingZeros(word)表示在word的补码基础上返回
* 最低位的数到第一个为1的数的index长度。
* 比如:Long.numberOfTrailingZeros(0xFFFFFFFFFFFFFFF4l) == 2
*/
while (true) {
if (word != 0)
return (u * BITS_PER_WORD) + Long.numberOfTrailingZeros(word);
if (++u == wordsInUse)
return -1;
word = words[u];
}
}
/**
* 返回指定起始索引处或之后出现的第一个设置为0的bit的索引。
* 如果不存在这样的bit,则返回{@code -1}。
* 和nextSetBit(int fromIndex) 类似的原理
*/
public int nextClearBit(int fromIndex) {
// Neither spec nor implementation handle bitsets of maximal length.
// See 4816253.
if (fromIndex < 0)
throw new IndexOutOfBoundsException("fromIndex < 0: " + fromIndex);
checkInvariants();
int u = wordIndex(fromIndex);
if (u >= wordsInUse)
return fromIndex;
long word = ~words[u] & (WORD_MASK << fromIndex);
while (true) {
if (word != 0)
return (u * BITS_PER_WORD) + Long.numberOfTrailingZeros(word);
if (++u == wordsInUse)
return wordsInUse * BITS_PER_WORD;
word = ~words[u];
}
}
/**
* 如果传入的BitSet和当前的BitSet有交集为1的话,返回true
*/
public boolean intersects(BitSet set) {
for (int i = Math.min(wordsInUse, set.wordsInUse) - 1; i >= 0; i--)
if ((words[i] & set.words[i]) != 0)
return true;
return false;
}
/**
* 返回words中设置了1的总bit数
* 比如
* BitSet bitSet = new BitSet(128);
* bitSet.set(77);
* bitSet.set(80);
* bitSet.set(2);
* 返回3
*/
public int cardinality() {
int sum = 0;
for (int i = 0; i < wordsInUse; i++)
sum += Long.bitCount(words[i]);
return sum;
}
/**
* 对传入的BitSet和当前的BitSet的所有bit位进行与操作。
*/
public void and(BitSet set) {
if (this == set)
return;
while (wordsInUse > set.wordsInUse)
words[--wordsInUse] = 0;
// Perform logical AND on words in common
for (int i = 0; i < wordsInUse; i++)
words[i] &= set.words[i];
recalculateWordsInUse();
checkInvariants();
}
4、实战
前一段时间在网上看到这样一道面试题:
有个老的手机短信程序,由于当时的手机CPU,内存都很烂。所以这个短信程序只能记住256条短信,多了就删了。每个短信有个唯一的ID,在0到255之间。当然用户可能自己删短信.现在要求设计实现一个功能: 当收到了一个新短信啥,如果手机短信容量还没"用完"(用完即已经存储256条),请分配给它一个可用的ID。由于手机很破,我要求你的程序尽量快,并少用内存。
审题
通读一遍题目,可以大概知道题目并不需要我们实现手机短信内容的存储,也就是不用管短信内容以什么形式存、存在哪里等问题。需要关心的地方应该是如何快速找到还没被占用的ID(0 ~ 255),整理一下需求,如下:
手机最多存储256条短信,短信ID范围是[0,255];
用户可以手动删除短信,删除哪些短信是由用户决定的;
当收到一条新短信时,只需要分配一个还没被占用的ID即可,不需要是可用ID中最小的ID;
题目没说明在手机短信容量已满的情况下,也就是无法找到可用ID时需要怎么办,这里约定在这种情况下程序返回一个错误码即可;
理清需求之后,其实需要做的事情就很清楚了:
设计一个数据结构来存储已被占用的或没被占用的短信ID;
实现一个函数,返回一个可用的ID,当无法找到可用ID时,返回-1;
在实现以上两点的前提下,尽量在程序执行速度和内存占用量上做优化。
解题
线性查找
这应该是最简(无)单(脑)一个办法。如果想用一个数据结构保存已占用的ID,由于这是一个变长无序的集合,而数组(Array)这种结构是定长的,并且原生并未提供删除数组元素的功能,所以应该很容想到用Java类库提供的List作为容器。那么寻找一个可用ID的方法就很简单:只要多次遍历这个List,第一次遍历时查找0是否在这个List中,如果没找到,着返回0,否则进行下一趟遍历查找1,直到255,这个过程可以用一个2重循环来实现:
/**
* 线性查找
* 时间复杂度: O(n^2)
* @param busyIDs 被占用的ID
* @return
*/
public int search(List<Integer> busyIDs) {
for(int i = 0; i < 255; i++) {
if(busyIDs.indexOf(i) == -1) return i;
}
return -1;
}
但是这种实现方式的问题不少,其中最严重的就是时间复杂度问题。由于List.indexOf(Object)函数的实现方式是顺序遍历整个数据结构(无论是ArrayList还是LinkedList都是如此,ArrayList由于底层用数组实现,遍历操作在连续的内存空间上进行,比LinkedList要快一些),再套上外层的循环,导致时间复杂度为O(2^n)。
另外一个问题是空间复杂度。先不论List这个类内部包含的各种元数据(ArrayList或LinkedList类的一些私有属性),由于List中存储的元素必须为Java Object,所以上面的代码的List中实际上存放的事Integer类。我们知道这种封装类型要比对应的基本数据类型(Primitive Types)占用更多的内存空间,以Integer为例,在64bit JVM(关闭压缩指针)下,一个Integer对象占用的内存空间为24Byte = 8Byte mark_header + 8Byte Klass 指针 + 4Byte int(用于存储数值)+ 4Byte(Padding,Java对象必须以8Byte为界对齐)。 而一个int变量只需要4Byte!另外即使把Integer替换成Short,情况也是一样。也就是说,当手机保存了256条短信时,存储被占用ID总共需要的空间为:256 × 24Byte = 6KB! 而且还不包括List本身的元数据!
最后还有个问题就是List在删除元素时的效率问题。ArrayList由于底层用数组实现,所以当删除一个元素后,被删除元素后面的所有元素都要往前移动一个位置(用System.arraycopy()实现);而LinkedList由于用双向链表存储数据,所以删除元素比较简单,但正是由于其采用双向链表,所以每个元素要额外多占用2个指针的空间(指向前一个和后一个元素)。
Hash表
由于2.1中内层循环采用顺序查找的方式导致时间复杂度为O(2^n),一个很容易想到的改进就是把已经被占用的ID存放在一个Hash表中,由于Hash表对查找操作的时间复杂度为O(C)(实际上并不一定,对于用链表法解决冲突的Hash表,查找一个元素的时间跟链表的平均长度有关,也就是O(n)。但这里简单认为时间复杂度就是常数),所以查找一个可用ID的时间复杂度为O(n)。代码如下:
/**
* Hash表查找
* 时间复杂度: O(n)
* @param busyIDs 被占用的ID
* @return
*/
public int search(HashSet<Integer> busyIDs) {
for(int i = 0; i < 255; i++) {
if(!busyIDs.contains(i)) return i;
}
return -1;
}
这种实现方式相对2.1在时间上有了改进,但是空间占用问题却更严重了:Java类库中的HashSet其实是用HashMap来实现的,这里不考虑任何元数据,只考虑HashMap本身,用于HashMap本身有一个load factor(默认是0.75,即是HashMap中保存的元素个数不能超过HashMap容量的75%,否则要Re-hash);另外对于HashMap中的每一个元素Entry
boolean数组
这种实现方式与上面2种比较一个根本的不同是:不存储具体被占用的ID的值,而是存储所有ID的状态(就2种状态,可用与被占用)。由于对于一个ID来说,总共只有2种状态,所以可以用boolean代表一个ID的状态,然后用一个长度为256的boolean数组表示所有ID的状态(假定false=可用,true=被占用)。
当需要查找可用ID时,只需要遍历这个数组,找到第一个值为false的boolean,返回其索引即可。用于现代CPU每次读内存时都可以一次性读取1个Cache Line(一般是64Byte)的内容,而一个boolean只占1Byte,所以达到很高的遍历速度。
另外做删除操作时,只需要把数组中ID对应索引的那个boolean设为false即可。
不过这种方案只适用与定长数据(比如题中注明最多256条短信)。代码如下:
/**
* boolean数组
* 时间复杂度: O(n)
* @param busyIDs 被占用的ID
* @return
*/
public int search(boolean[] busyIDs) {
for(int i = 0, len = busyIDs.length; i < len; i++) {
if(busyIDs[i] == false) return i;
}
return -1;
}
这种方案对比前面2种,在空间复杂度上有非常大的优化:只占用256Byte内存。并且在查找上也可以达到不错的速度。
使用BitSet
这种方案是对2.3的一个优化。由于一个boolean值在JVM中占用1Byte,而1Byte=8bit,8个bit可以表示的状态为2^8 = 256种(0000 0000 ~ 1111 1111),而我们的短信ID状态只有2种!所以用一个boolean表示1个状态是非常大的浪费,实际上1个bit就足够,其余7个bit都浪费了。这就给我们提供了一个思路:能不能用一个bit表示一个短信ID?如果可以的话,空间复杂度相对2.3有可以下降7/8!我们可以利用刚刚讲过的BitSet来实现:
public class BitSetTest {
public static void main(String[] args) {
BitSet bitSet = new BitSet(256);
bitSet.set(0);
bitSet.set(1);
bitSet.set(2);
bitSet.set(65);
// 返回第一个不是1的bit index
System.out.println(bitSet.nextClearBit(0));
}
}
如果读完觉得有收获的话,欢迎点赞、关注、加公众号【蹲厕所的熊】,查阅更多精彩历史!!!




