暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

TF2.0使用方式七-读取tfrecords文件并与tf.keras结合使用

量化分析之路 2020-04-30
2025

今天科技大涨  总有种把猪骗进来杀的感觉 ,趁着昨天 前天大跌捞了一点 今天上涨跑了大部分     外围应该会疫情二次爆发   美股二次探底  后面应该不是 暴跌而是连续不断阴跌了 难熬啊。  左侧的话 A股不到2600 仓位不要超过50%  右侧的话 突破3000 二次回踩有效 再满仓吧


本节,tfrecord基础API的使用


1. 导入模块

import matplotlib as mpl
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import sklearn
import pandas as pd
import os
import sys
import time
import tensorflow as tf
from tensorflow import keras


2. tfrecord文件格式

tfrecord 文件格式
-> tf.train.Example
-> tf.train.Features -> {“key”:tf.train.Feature},value 具体的值
-> tf.train.Feature -> tf.train.ByteList/FloatList/Int64List  不同格式

# tfrecord 文件格式
# -> tf.train.Example
# -> tf.train.Features -> {"key": tf.train.Feature}
# -> tf.train.Feature -> tf.train.ByteList/FloatList/Int64List


favorite_books = [name.encode('utf-8') for name in ["machine learning""cc150"]]

favorite_books_bytelist = tf.train.BytesList(value = favorite_books)
print(favorite_books_bytelist)


hours_floatlist = tf.train.FloatList(value = [15.5, 9.5, 7.0, 8.0])
print(hours_floatlist)


age_int64list = tf.train.Int64List(value = [42])
print(age_int64list)


features = tf.train.Features(
feature = {
"favorite_books": tf.train.Feature(
bytes_list = favorite_books_bytelist),
"hours": tf.train.Feature(
float_list = hours_floatlist),
"age": tf.train.Feature(int64_list = age_int64list),
}
)
print(features)

example = tf.train.Example(features=features)
print(example)



3. 写入文件

output_dir = 'tfrecord_basic'
if not os.path.exists(output_dir):
os.mkdir(output_dir)
filename = 'test.tfrecords'
filename_fullpath = os.path.join(output_dir,filename)
with tf.io.TFRecordWriter(filename_fullpath) as writer:
for i in range(3):
        writer.write(serialized_example)

4. 读取文件

# 读取文件
dataset = tf.data.TFRecordDataset([filename_fullpath]) #生成dataset
for serialized_example_tensor in dataset:
print(serialized_example_tensor)

5. 解析 

tf.io.VarLenFeature  #变长类型
tf.io.FixedLenFeature  # 定长类型
expected_features = {
"favorite_books": tf.io.VarLenFeature(dtype = tf.string),
"hours": tf.io.VarLenFeature(dtype = tf.float32),
"age": tf.io.FixedLenFeature([], dtype = tf.int64),
}
dataset = tf.data.TFRecordDataset([filename_fullpath])
for serialized_example_tensor in dataset:
example = tf.io.parse_single_example(
serialized_example_tensor,
expected_features)
books = tf.sparse.to_dense(example["favorite_books"],
default_value=b"")
for book in books:
print(book.numpy().decode("UTF-8"))

6. 存成压缩文件

# 存成压缩文件
filename_fullpath_zip = filename_fullpath + '.zip'
options = tf.io.TFRecordOptions(compression_type = 'GZIP')
with tf.io.TFRecordWriter(filename_fullpath_zip , options) as writer:
for i in range(3):
        writer.write(serialized_example)

7. 读取压缩文件

# 读取压缩文件
dataset_zip = tf.data.TFRecordDataset([filename_fullpath_zip],compression_type='GZIP')
for serialized_example_tensor in dataset:
example = tf.io.parse_single_example(
serialized_example_tensor,
expected_features,
)


# print(example)
# 解析books :sparse_tensor
books = tf.sparse.to_dense(example["favorite_books"])
for book in books:
        print(book.numpy().decode("UTF-8"))

从CSV生成tfrecords文件 

用的上上节课生成的数据集

2. 读取文件  根据文件前缀名生成不同路径

source_dir = './generate_csv/'
print(os.listdir(source_dir))


# 读取文件名
def get_filenames_by_prefix(source_dir,prefix_name):
all_files = os.listdir(source_dir)
results = []
for filename in all_files:
if filename.startswith(prefix_name):
results.append(os.path.join(source_dir,filename))
return results


train_filenames = get_filenames_by_prefix(source_dir,'train')
valid_filenames = get_filenames_by_prefix(source_dir,'valid')
test_filenames = get_filenames_by_prefix(source_dir,'test')


import pprint
pprint.pprint(train_filenames)
pprint.pprint(valid_filenames)
pprint.pprint(test_filenames)

3. 从csv中读取文件

# 解析dataset 中的一行


def parse_csv_line(line,n_fields=9):
defs = [tf.constant(np.nan)] * n_fields
parsed_fields = tf.io.decode_csv(line,record_defaults=defs)
x = tf.stack(parsed_fields[0:-1])
y = tf.stack(parsed_fields[-1:])

return x,y


def csv_reader_dataset(filenames,n_readers=5,batch_size=32,n_parse_threads=5,
shuffle_buffer_size=10000):

# 1. filename -> dataset
dataset = tf.data.Dataset.list_files(filenames)
dataset = dataset.repeat() # 重复多少次
# 文件名转换成文本内容
dataset = dataset.interleave(
lambda filename:tf.data.TextLineDataset(filename).skip(1),
cycle_length = n_readers
)
dataset.shuffle(shuffle_buffer_size)
# 解析
dataset = dataset.map(parse_csv_line,
num_parallel_calls=n_parse_threads)

dataset = dataset.batch(batch_size)
    return dataset
batch_size = 32
train_set = csv_reader_dataset(train_filenames,batch_size=batch_size)
valid_set = csv_reader_dataset(valid_filenames,batch_size=batch_size)
test_set = csv_reader_dataset(test_filenames,batch_size=batch_size)

4. 数据写入tf.record

# 将数据写入tf.record


def serialize_example(x,y):
""" Converts x,y to tf.train.Example eand serialize"""
input_features = tf.train.FloatList(value = x)
label = tf.train.FloatList(value = y)
# 转成feature
features = tf.train.Features(
feature = {
"input_features":tf.train.Feature(float_list = input_features),
"label":tf.train.Feature(float_list = label)
}
)
# 转成example
example = tf.train.Example(features = features)
return example.SerializeToString() # 序列化


def csv_dataset_to_tfrecords(base_filename,dataset,n_shards,steps_per_shard,compression_type=None):
options = tf.io.TFRecordOptions(compression_type = compression_type)
all_filenames = []
for shard_id in range(n_shards):
filename_fullpath = '{}_{:05d}-of-{:05d}'.format(base_filename,shard_id,n_shards)
with tf.io.TFRecordWriter(filename_fullpath,options) as writer:
for x_batch,y_batch in dataset.take(steps_per_shard):
for x_example,y_example in zip(x_batch,y_batch):
writer.write(serialize_example(x_example,y_example))
all_filenames.append(filename_fullpath)

return all_filenames


n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards


output_dir = "generate_tfrecords"
if not os.path.exists(output_dir):
os.mkdir(output_dir)

train_basename = os.path.join(output_dir,'train')
valid_basename = os.path.join(output_dir,'valid')
test_basename = os.path.join(output_dir,'test')


train_tfrecord_filenmaes = csv_dataset_to_tfrecords(train_basename,train_set,n_shards,train_steps_per_shard,None)
valid_tfrecord_filenames = csv_dataset_to_tfrecords(valid_basename,valid_set,n_shards,valid_steps_per_shard,None)
test_tfrecord_filenames = csv_dataset_to_tfrecords(test_basename,test_set,n_shards,valid_steps_per_shard,None)


4.2 生成压缩文件

# 生成压缩文件
n_shards = 20
train_steps_per_shard = 11610 // batch_size // n_shards
valid_steps_per_shard = 3880 // batch_size // n_shards
test_steps_per_shard = 5170 // batch_size // n_shards


output_dir = "generate_tfrecords_zip"
if not os.path.exists(output_dir):
os.mkdir(output_dir)

train_basename = os.path.join(output_dir,'train')
valid_basename = os.path.join(output_dir,'valid')
test_basename = os.path.join(output_dir,'test')


train_tfrecord_filenmaes = csv_dataset_to_tfrecords(train_basename,train_set,n_shards,train_steps_per_shard,compression_type="GZIP")
valid_tfrecord_filenames = csv_dataset_to_tfrecords(valid_basename,valid_set,n_shards,valid_steps_per_shard,compression_type="GZIP")
test_tfrecord_filenames = csv_dataset_to_tfrecords(test_basename,test_set,n_shards,valid_steps_per_shard,compression_type="GZIP")



读取tfrecords文件

6.1 打印文件名

pprint.pprint(train_tfrecord_filenmaes)
pprint.pprint(valid_tfrecord_filenames)
pprint.pprint(test_tfrecord_filenames)

6.2 读取文件

expected_features = {
"input_features":tf.io.FixedLenFeature([8],dtype=tf.float32),
"label":tf.io.FixedLenFeature([1],dtype=tf.float32)
}


def parse_example(serialized_example):
example = tf.io.parse_single_example(serialized_example,expected_features)
return example["input_features"],example["label"]




def tfrecords_reader_dataset(filenames,n_readers=5,batch_size=32,n_parse_threads=5,shuffle_buffer_size=10000):

# 1. filename -> dataset
dataset = tf.data.Dataset.list_files(filenames)
dataset = dataset.repeat() # 重复多少次
# 文件名转换成文本内容
dataset = dataset.interleave(
lambda filename:tf.data.TFRecordDataset(filename,compression_type="GZIP"),
cycle_length = n_readers
)
dataset.shuffle(shuffle_buffer_size)
# 解析
dataset = dataset.map(parse_example,
num_parallel_calls=n_parse_threads)

dataset = dataset.batch(batch_size)
return dataset


6.3 测试

batch_size = 32
tfrecords_train_set = tfrecords_reader_dataset(train_tfrecord_filenmaes,batch_size=batch_size)
tfrecords_valid_set = tfrecords_reader_dataset(valid_tfrecord_filenames,batch_size=batch_size)
tfrecords_test_set = tfrecords_reader_dataset(test_tfrecord_filenames,batch_size=batch_size)


# 搭建模型
model = keras.models.Sequential([
keras.layers.Dense(30,activation='relu',input_shape=[8]),
keras.layers.Dense(1),

])


# 打印model信息
model.summary()
# 编译
model.compile(loss='mean_squared_error',optimizer="sgd")
# 回调函数
callbacks = [keras.callbacks.EarlyStopping(patience=5,min_delta=1e-3)]


#训练
history = model.fit(tfrecords_train_set,
validation_data=tfrecords_valid_set,
steps_per_epoch=11160 // batch_size,
validation_steps = 3870 // batch_size,
epochs=100,callbacks=callbacks)


6.5 测试集上   我们看到是百分之30多的的准确率

model.evaluate(tfrecords_test_set,steps = 5160 // batch_size)


文章转载自量化分析之路,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论