暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

高效读取大数据文本文件(上亿行数据)

我是攻城师 2021-09-27
1096

一.前言

本文是对大数据文本文件读取(按行读取)的优化,目前常规的方案(限于JDK)有三种,第一种LineNumberReader,第二种RandomAccessFile,第三种是内存映射文件(详见http://sgq0085.iteye.com/blog/1318622)在RandomAccessFile基础上调用getChannel().map(...)。

1.LineNumberReader

按行读取,只能从第一行向后遍历,到需要读取的行时开始读入,直到完成;在我的测试用例中,读取1000W行数据每次5万行,用时93秒,效率实测比RandomAccessFile要高,但读取一亿跳数据时效率太低了(因为每次都要从头遍历),因为测试时超过1个小时,放弃测试;

2.RandomAccessFile

实际不适用于这种大数据读取,RandomAccessFile是为了磁盘文件的随机访问,所以效率很低,1000w行测试时用时140秒,一亿行数据测试用时1438秒但由于可以通过getFilePointer方法记录位置,并通过seek方法指定读取位置,所以从理论上比较适用这种大数据按行读取的场景;

RandomAccessFile只能按照8859_1这种方法读取,所以需要对内容重新编码,方法如下

Java代码

  1. new String(pin.getBytes("8859_1"), "")

3.内存映射文件

由于每行数据大小不同,内存映射文件在这种情况下不适用,其他情况请参考我的博客(详见http://sgq0085.iteye.com/blog/1318622

二.解决方案

如果在RandomAccessFile基础上,整合内部缓冲区,效率会有提高,测试过程中1000w行数据用时1秒,1亿行数据用时103(比1438秒快了13倍左右)

BufferedRandomAccessFile

网上已经有实现,代码如下:

Java代码

  1. package com.gqshao.file.io;

  2. import java.io.File;

  3. import java.io.FileNotFoundException;

  4. import java.io.IOException;

  5. import java.io.RandomAccessFile;

  6. import java.util.Arrays;

  7. public class BufferedRandomAccessFile extends RandomAccessFile {

  8. static final int LogBuffSz_ = 16; // 64K buffer

  9. public static final int BuffSz_ = (1 << LogBuffSz_);

  10. static final long BuffMask_ = ~(((long) BuffSz_) - 1L);

  11. private String path_;

  12. /*

  13. * This implementation is based on the buffer implementation in Modula-3's

  14. * "Rd", "Wr", "RdClass", and "WrClass" interfaces.

  15. */

  16. private boolean dirty_; // true iff unflushed bytes exist

  17. private boolean syncNeeded_; // dirty_ can be cleared by e.g. seek, so track sync separately

  18. private long curr_; // current position in file

  19. private long lo_, hi_; // bounds on characters in "buff"

  20. private byte[] buff_; // local buffer

  21. private long maxHi_; // this.lo + this.buff.length

  22. private boolean hitEOF_; // buffer contains last file block?

  23. private long diskPos_; // disk position

  24. public BufferedRandomAccessFile(File file, String mode) throws IOException {

  25. this(file, mode, 0);

  26. }

  27. public BufferedRandomAccessFile(File file, String mode, int size) throws IOException {

  28. super(file, mode);

  29. path_ = file.getAbsolutePath();

  30. this.init(size);

  31. }

  32. /**

  33. * Open a new <code>BufferedRandomAccessFile</code> on the file named

  34. * <code>name</code> in mode <code>mode</code>, which should be "r" for

  35. * reading only, or "rw" for reading and writing.

  36. */

  37. public BufferedRandomAccessFile(String name, String mode) throws IOException {

  38. this(name, mode, 0);

  39. }

  40. public BufferedRandomAccessFile(String name, String mode, int size) throws FileNotFoundException {

  41. super(name, mode);

  42. path_ = name;

  43. this.init(size);

  44. }

  45. private void init(int size) {

  46. this.dirty_ = false;

  47. this.lo_ = this.curr_ = this.hi_ = 0;

  48. this.buff_ = (size > BuffSz_) ? new byte[size] : new byte[BuffSz_];

  49. this.maxHi_ = (long) BuffSz_;

  50. this.hitEOF_ = false;

  51. this.diskPos_ = 0L;

  52. }

  53. public String getPath() {

  54. return path_;

  55. }

  56. public void sync() throws IOException {

  57. if (syncNeeded_) {

  58. flush();

  59. getChannel().force(true);

  60. syncNeeded_ = false;

  61. }

  62. }

  63. // public boolean isEOF() throws IOException

  64. // {

  65. // assert getFilePointer() <= length();

  66. // return getFilePointer() == length();

  67. // }

  68. public void close() throws IOException {

  69. this.flush();

  70. this.buff_ = null;

  71. super.close();

  72. }

  73. /**

  74. * Flush any bytes in the file's buffer that have not yet been written to

  75. * disk. If the file was created read-only, this method is a no-op.

  76. */

  77. public void flush() throws IOException {

  78. this.flushBuffer();

  79. }

  80. /* Flush any dirty bytes in the buffer to disk. */

  81. private void flushBuffer() throws IOException {

  82. if (this.dirty_) {

  83. if (this.diskPos_ != this.lo_)

  84. super.seek(this.lo_);

  85. int len = (int) (this.curr_ - this.lo_);

  86. super.write(this.buff_, 0, len);

  87. this.diskPos_ = this.curr_;

  88. this.dirty_ = false;

  89. }

  90. }

  91. /*

  92. * Read at most "this.buff.length" bytes into "this.buff", returning the

  93. * number of bytes read. If the return result is less than

  94. * "this.buff.length", then EOF was read.

  95. */

  96. private int fillBuffer() throws IOException {

  97. int cnt = 0;

  98. int rem = this.buff_.length;

  99. while (rem > 0) {

  100. int n = super.read(this.buff_, cnt, rem);

  101. if (n < 0)

  102. break;

  103. cnt += n;

  104. rem -= n;

  105. }

  106. if ((cnt < 0) && (this.hitEOF_ = (cnt < this.buff_.length))) {

  107. // make sure buffer that wasn't read is initialized with -1

  108. Arrays.fill(this.buff_, cnt, this.buff_.length, (byte) 0xff);

  109. }

  110. this.diskPos_ += cnt;

  111. return cnt;

  112. }

  113. /*

  114. * This method positions <code>this.curr</code> at position <code>pos</code>.

  115. * If <code>pos</code> does not fall in the current buffer, it flushes the

  116. * current buffer and loads the correct one.<p>

  117. *

  118. * On exit from this routine <code>this.curr == this.hi</code> iff <code>pos</code>

  119. * is at or past the end-of-file, which can only happen if the file was

  120. * opened in read-only mode.

  121. */

  122. public void seek(long pos) throws IOException {

  123. if (pos >= this.hi_ || pos < this.lo_) {

  124. // seeking outside of current buffer -- flush and read

  125. this.flushBuffer();

  126. this.lo_ = pos & BuffMask_; // start at BuffSz boundary

  127. this.maxHi_ = this.lo_ + (long) this.buff_.length;

  128. if (this.diskPos_ != this.lo_) {

  129. super.seek(this.lo_);

  130. this.diskPos_ = this.lo_;

  131. }

  132. int n = this.fillBuffer();

  133. this.hi_ = this.lo_ + (long) n;

  134. } else {

  135. // seeking inside current buffer -- no read required

  136. if (pos < this.curr_) {

  137. // if seeking backwards, we must flush to maintain V4

  138. this.flushBuffer();

  139. }

  140. }

  141. this.curr_ = pos;

  142. }

  143. public long getFilePointer() {

  144. return this.curr_;

  145. }

  146. public long length() throws IOException {

  147. // max accounts for the case where we have written past the old file length, but not yet flushed our buffer

  148. return Math.max(this.curr_, super.length());

  149. }

  150. public int read() throws IOException {

  151. if (this.curr_ >= this.hi_) {

  152. // test for EOF

  153. // if (this.hi < this.maxHi) return -1;

  154. if (this.hitEOF_)

  155. return -1;

  156. // slow path -- read another buffer

  157. this.seek(this.curr_);

  158. if (this.curr_ == this.hi_)

  159. return -1;

  160. }

  161. byte res = this.buff_[(int) (this.curr_ - this.lo_)];

  162. this.curr_++;

  163. return ((int) res) & 0xFF; // convert byte -> int

  164. }

  165. public int read(byte[] b) throws IOException {

  166. return this.read(b, 0, b.length);

  167. }

  168. public int read(byte[] b, int off, int len) throws IOException {

  169. if (this.curr_ >= this.hi_) {

  170. // test for EOF

  171. // if (this.hi < this.maxHi) return -1;

  172. if (this.hitEOF_)

  173. return -1;

  174. // slow path -- read another buffer

  175. this.seek(this.curr_);

  176. if (this.curr_ == this.hi_)

  177. return -1;

  178. }

  179. len = Math.min(len, (int) (this.hi_ - this.curr_));

  180. int buffOff = (int) (this.curr_ - this.lo_);

  181. System.arraycopy(this.buff_, buffOff, b, off, len);

  182. this.curr_ += len;

  183. return len;

  184. }

  185. public void write(int b) throws IOException {

  186. if (this.curr_ >= this.hi_) {

  187. if (this.hitEOF_ && this.hi_ < this.maxHi_) {

  188. // at EOF -- bump "hi"

  189. this.hi_++;

  190. } else {

  191. // slow path -- write current buffer; read next one

  192. this.seek(this.curr_);

  193. if (this.curr_ == this.hi_) {

  194. // appending to EOF -- bump "hi"

  195. this.hi_++;

  196. }

  197. }

  198. }

  199. this.buff_[(int) (this.curr_ - this.lo_)] = (byte) b;

  200. this.curr_++;

  201. this.dirty_ = true;

  202. syncNeeded_ = true;

  203. }

  204. public void write(byte[] b) throws IOException {

  205. this.write(b, 0, b.length);

  206. }

  207. public void write(byte[] b, int off, int len) throws IOException {

  208. while (len > 0) {

  209. int n = this.writeAtMost(b, off, len);

  210. off += n;

  211. len -= n;

  212. this.dirty_ = true;

  213. syncNeeded_ = true;

  214. }

  215. }

  216. /*

  217. * Write at most "len" bytes to "b" starting at position "off", and return

  218. * the number of bytes written.

  219. */

  220. private int writeAtMost(byte[] b, int off, int len) throws IOException {

  221. if (this.curr_ >= this.hi_) {

  222. if (this.hitEOF_ && this.hi_ < this.maxHi_) {

  223. // at EOF -- bump "hi"

  224. this.hi_ = this.maxHi_;

  225. } else {

  226. // slow path -- write current buffer; read next one

  227. this.seek(this.curr_);

  228. if (this.curr_ == this.hi_) {

  229. // appending to EOF -- bump "hi"

  230. this.hi_ = this.maxHi_;

  231. }

  232. }

  233. }

  234. len = Math.min(len, (int) (this.hi_ - this.curr_));

  235. int buffOff = (int) (this.curr_ - this.lo_);

  236. System.arraycopy(b, off, this.buff_, buffOff, len);

  237. this.curr_ += len;

  238. return len;

  239. }

  240. }

三.测试

1.FileUtil

用于封装三种方案(LineNumberReader、RandomAccessFile、BufferedRandomAccessFile)的文件读取

Java代码

  1. package com.gqshao.file.util;

  2. import com.google.common.collect.Lists;

  3. import com.google.common.collect.Maps;

  4. import com.gqshao.file.io.BufferedRandomAccessFile;

  5. import org.apache.commons.io.IOUtils;

  6. import org.apache.commons.lang3.StringUtils;

  7. import java.io.*;

  8. import java.util.List;

  9. import java.util.Map;

  10. public class FileUtil {

  11. /**

  12. * 通过BufferedRandomAccessFile读取文件,推荐

  13. *

  14. * @param file 源文件

  15. * @param encoding 文件编码

  16. * @param pos 偏移量

  17. * @param num 读取量

  18. * @return pins文件内容,pos当前偏移量

  19. */

  20. public static Map<String, Object> BufferedRandomAccessFileReadLine(File file, String encoding, long pos, int num) {

  21. Map<String, Object> res = Maps.newHashMap();

  22. List<String> pins = Lists.newArrayList();

  23. res.put("pins", pins);

  24. BufferedRandomAccessFile reader = null;

  25. try {

  26. reader = new BufferedRandomAccessFile(file, "r");

  27. reader.seek(pos);

  28. for (int i = 0; i < num; i++) {

  29. String pin = reader.readLine();

  30. if (StringUtils.isBlank(pin)) {

  31. break;

  32. }

  33. pins.add(new String(pin.getBytes("8859_1"), encoding));

  34. }

  35. res.put("pos", reader.getFilePointer());

  36. } catch (Exception e) {

  37. e.printStackTrace();

  38. } finally {

  39. IOUtils.closeQuietly(reader);

  40. }

  41. return res;

  42. }

  43. /**

  44. * 通过RandomAccessFile读取文件,能出来大数据文件,效率低

  45. *

  46. * @param file 源文件

  47. * @param encoding 文件编码

  48. * @param pos 偏移量

  49. * @param num 读取量

  50. * @return pins文件内容,pos当前偏移量

  51. */

  52. public static Map<String, Object> readLine(File file, String encoding, long pos, int num) {

  53. Map<String, Object> res = Maps.newHashMap();

  54. List<String> pins = Lists.newArrayList();

  55. res.put("pins", pins);

  56. RandomAccessFile reader = null;

  57. try {

  58. reader = new RandomAccessFile(file, "r");

  59. reader.seek(pos);

  60. for (int i = 0; i < num; i++) {

  61. String pin = reader.readLine();

  62. if (StringUtils.isBlank(pin)) {

  63. break;

  64. }

  65. pins.add(new String(pin.getBytes("8859_1"), encoding));

  66. }

  67. res.put("pos", reader.getFilePointer());

  68. } catch (Exception e) {

  69. e.printStackTrace();

  70. } finally {

  71. IOUtils.closeQuietly(reader);

  72. }

  73. return res;

  74. }

  75. /**

  76. * 使用LineNumberReader读取文件,1000w行比RandomAccessFile效率高,无法处理1亿条数据

  77. *

  78. * @param file 源文件

  79. * @param encoding 文件编码

  80. * @param index 开始位置

  81. * @param num 读取量

  82. * @return pins文件内容

  83. */

  84. public static List<String> readLine(File file, String encoding, int index, int num) {

  85. List<String> pins = Lists.newArrayList();

  86. LineNumberReader reader = null;

  87. try {

  88. reader = new LineNumberReader(new InputStreamReader(new FileInputStream(file), encoding));

  89. int lines = 0;

  90. while (true) {

  91. String pin = reader.readLine();

  92. if (StringUtils.isBlank(pin)) {

  93. break;

  94. }

  95. if (lines >= index) {

  96. if (StringUtils.isNotBlank(pin)) {

  97. pins.add(pin);

  98. }

  99. }

  100. if (num == pins.size()) {

  101. break;

  102. }

  103. lines++;

  104. }

  105. } catch (Exception e) {

  106. e.printStackTrace();

  107. } finally {

  108. IOUtils.closeQuietly(reader);

  109. }

  110. return pins;

  111. }

  112. }

2.RandomAccessFileTest

测试方法,涉及到的randomFile只是一个掺杂中文的文本文件,可以自己随便写一个

Java代码

  1. package com.gqshao.file;

  2. import com.gqshao.file.util.FileUtil;

  3. import org.apache.commons.collections.CollectionUtils;

  4. import org.apache.commons.collections.MapUtils;

  5. import org.apache.commons.io.IOUtils;

  6. import org.junit.Test;

  7. import org.slf4j.Logger;

  8. import org.slf4j.LoggerFactory;

  9. import java.io.*;

  10. import java.util.List;

  11. import java.util.Map;

  12. public class RandomAccessFileTest {

  13. private static final Logger logger = LoggerFactory.getLogger(RandomAccessFileTest.class);

  14. private static final String ENCODING = "UTF-8";

  15. private static final int NUM = 50000;

  16. private static File file = new File(ClassLoader.getSystemResource("").getPath() + File.separator + "test.txt");

  17. private static File randomFile = new File(ClassLoader.getSystemResource("").getPath() + File.separator + "RandomFile.txt");

  18. /**

  19. * 生成1000w随机文本文件

  20. */

  21. @Test

  22. public void makePin() {

  23. String prefix = "_$#";

  24. OutputStreamWriter out = null;

  25. try {

  26. out = new OutputStreamWriter(new FileOutputStream(file, true), ENCODING);

  27. // 在1500w里随机1000w数据

  28. for (int j = 0; j < 100000000; j++) {

  29. out.write(prefix + (int) (130000000 * Math.random()) + "\n");

  30. }

  31. } catch (Exception e) {

  32. e.printStackTrace();

  33. } finally {

  34. IOUtils.closeQuietly(out);

  35. }

  36. logger.info(file.getAbsolutePath());

  37. }

  38. /**

  39. * 测试RandomAccessFile读取文件

  40. */

  41. @Test

  42. public void testRandomAccessRead() {

  43. long start = System.currentTimeMillis();

  44. //

  45. logger.info(String.valueOf(file.exists()));

  46. long pos = 0L;

  47. while (true) {

  48. Map<String, Object> res = FileUtil.readLine(file, ENCODING, pos, NUM);

  49. // 如果返回结果为空结束循环

  50. if (MapUtils.isEmpty(res)) {

  51. break;

  52. }

  53. Object po = res.get("pins");

  54. List<String> pins = (List<String>) res.get("pins");

  55. if (CollectionUtils.isNotEmpty(pins)) {

  56. // logger.info(Arrays.toString(pins.toArray()));

  57. if (pins.size() < NUM) {

  58. break;

  59. }

  60. } else {

  61. break;

  62. }

  63. pos = (Long) res.get("pos");

  64. }

  65. logger.info(((System.currentTimeMillis() - start) 1000) + "");

  66. }

  67. /**

  68. * 测试RandomAccessFile读取文件

  69. */

  70. @Test

  71. public void testBufferedRandomAccessRead() {

  72. long start = System.currentTimeMillis();

  73. //

  74. logger.info(String.valueOf(file.exists()));

  75. long pos = 0L;

  76. while (true) {

  77. Map<String, Object> res = FileUtil.BufferedRandomAccessFileReadLine(file, ENCODING, pos, NUM);

  78. // 如果返回结果为空结束循环

  79. if (MapUtils.isEmpty(res)) {

  80. break;

  81. }

  82. List<String> pins = (List<String>) res.get("pins");

  83. if (CollectionUtils.isNotEmpty(pins)) {

  84. // logger.info(Arrays.toString(pins.toArray()));

  85. if (pins.size() < NUM) {

  86. break;

  87. }

  88. } else {

  89. break;

  90. }

  91. pos = (Long) res.get("pos");

  92. }

  93. logger.info(((System.currentTimeMillis() - start) / 1000) + "");

  94. }

  95. /**

  96. * 测试普通读取文件

  97. */

  98. @Test

  99. public void testCommonRead() {

  100. long start = System.currentTimeMillis();

  101. logger.info(String.valueOf(randomFile.exists()));

  102. int index = 0;

  103. while (true) {

  104. List<String> pins = FileUtil.readLine(file, ENCODING, index, NUM);

  105. if (CollectionUtils.isNotEmpty(pins)) {

  106. // logger.info(Arrays.toString(pins.toArray()));

  107. if (pins.size() < NUM) {

  108. break;

  109. }

  110. } else {

  111. break;

  112. }

  113. index += NUM;

  114. }

  115. logger.info(((System.currentTimeMillis() - start) / 1000) + "");

  116. }

  117. }


文章转载自我是攻城师,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论