Skip to the content.

01-数据读取与导出


导入常用库

# -*- coding: utf-8 -*-
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = 'SimHei' #解决图表的中文乱码
plt.rcParams['axes.unicode_minus'] = 'False' #负号无法显示

# 忽略警告错误
import warnings
warnings.simplefilter('ignore') 

# 设置当前目录
import os
os.chdir(r'D:\Python')

# 导入自定义包
import sys
sys.path.append(r'D:\python')

import geohash

# 快速安装包(以scrapy为例)
pip --defualt-timeout = 600 install scrapy -i https://pypi.tuna.tsinghua.edu.cn/simple 



数据读取

读取MySQL

from sqlalchemy import create_engine

# 定义数据库
dct = create_engine('mysql+pymysql://username:password@host/***db', connect_args={'charset': 'utf8'})

# 1.读取sql语句
sql = '''select id, name, time from debit limit 100'''

# 2.读取sql数据库
df = pd.read_sql('debit', dct)

# 3.读取sql文件
sql = open(r'D:\python\a.sql', encoding='utf-8').read()

df = pd.read_sql(sql, dct)


读取Hive

  1. 安装impyla:pip install impyla
  2. 连接数据库报错时,参阅 ThriftParserError报错处理
# 定义数据库
from impala.dbapi import connect
from impala.util import as_pandas
conn = connect(host='***', port=***, auth_mechanism='NOSASL')
cursor = conn.cursor()

# 1.读取hive_sql文件
debit_hive = open(r'D:\python\a.sql', encoding='utf-8').read()
cursor.execute(debit_hive)

# 2.读取hive数据库
cursor.execute('select * from dw.debit limit 10')

debit_hive = as_pandas(cursor)


读取数据文件

# 改变当前工作目录到指定路径
import os
os.chdir(r'D:\Data\项目1')
data=pd.read_csv('data1.csv')

# 读取Excel
df = pd.read_excel(r'D:\data.xlsx', encoding='utf-8', sheet_name='Sheet1') #指定表名
df = pd.read_excel(r'D:\data.xlsx', encoding='utf-8', header=1, usecols=[0,2]) #指定读取范围:第2行开始,第1、3列
df = pd.read_excel(r'D:\python\data.xlsx', index_col = 2) #指定行索引:将第3列作为索引

# 读取csv
df = pd.read_csv(r'D:\data.csv', encoding='utf-8', dtype={'col':str}) #指定某一列格式
df = pd.read_csv(r'C:\Users\文件\data.csv', encoding='utf-8', engine='python') #路径包含中文
df = pd.read_csv(r'D:\data.csv', sep=',') #指明分隔符号
df = pd.read_csv(r'D:\data.csv', header=None, error_bad_lines=False) #无列名,并跳过错误的行
df = pd.read_csv(r'D:\data.csv', names=['caseNumber','createdMonth','score']) #指定列名
df = pd.read_csv(r'D:\big_data.csv', \
                 skiprows = lambda x: x>0 and np.random.rand()>0.01) #先随机读取1%数据

# 读取feather文件
# 安装库:pip install feather-format
df = feather.read_dataframe('data.feather') #原生
df = pd.read_feather('data.feather') #pandas

# 读取txt
df = pd.read_table(r'D:\data.txt', encoding='gbk') #读取txt格式文件
df = pd.read_table(r'D:\data.txt', sep=' ') #指明分隔符号

# 创建数据表
df = pd.DataFrame({'id':[1001,1002,1003,1004,1005,1006], 
                   'date':pd.date_range('20130102', periods=6),
                   'city':['Beijing','SH','Guangzhou ','Shenzhen','Shanghai', 'Beijing'],
                   'age':[23,44,54,32,34,32],
                   'category':['100-A','100-B','110-A','110-C','210-A','130-F'],
                   'price':[1200,np.nan,2133,5433,np.nan,4432]},
                   columns =['id','date','city','category','age','price'])

# 读取json文件
#方法一:占用内存大,适用于小数据量
data1 = pd.read_json('data.json',lines=True)
# 方法二:按行读取
import json
with open('data.json','r') as f:
    lines = f.readlines()
df1 = []
for i in lines:
    i_dict = json.loads(i)
    df1.append(i_dict)
df2 = pd.DataFrame(df1)

# 按行读取压缩包数据
import json
import zipfile
data = []
with zipfile.ZipFile(r'D:\data\file.zip', 'r') as z:
    with z.open(z.namelist()[0]) as f:
        for line in f:
            j = json.loads(line)
            data.append(j)
df = pd.DataFrame(data)
# 同样适用于压缩包里的csv文件
with zipfile.ZipFile(r'D:\data\file.zip', 'r') as z:
    with z.open(z.namelist()[0]) as f:
        df_case = pd.read_csv(f)

# invalid continuation byte报错问题解决
'''
UnicodeDecodeError: 'utf-8' codec can't decode byte 0xce in position 52: invalid continuation byte
'''
#-*- coding : utf-8-*-
# coding:unicode_escape
import pandas as pd
df = pd.read_csv(r'D:\data.csv', encoding='gbk')



数据导出

导出至Excel格式

df.to_excel(r'D:\py.xlsx', index=False, sheet_name='name') #指定sheet名称和index
df.to_excel(r'D:\py.xlsx', index=False, encoding='utf-8') #指定编码格式
df.to_excel(r'D:\py.xlsx', index=False, na_rep=0, inf_rep=0) #缺失值、无穷值填充为0

# 将多个数据表写入同一个excel文件
writer = pd.ExcelWriter(r'D:\py.xlsx', engine='xlsxwriter')
df[:3].to_excel(writer, index=False, sheet_name='book1')
df[:4].to_excel(writer, index=False, sheet_name='book2')
df[:5].to_excel(writer, index=False, sheet_name='book3')
writer.save()

# 根据日期生成文件名
import re
import datetime
path = 'C:/report/'
filename = path + '日报_' + re.sub(r'[^0-9]','',datetime.datetime.now().strftime("%Y%m%d")) + '.xlsx'
df.to_excel(filename, encoding='utf-8')


导出至csv格式

df.to_csv(r'D:\py.csv', encoding='utf-8', index=False)  #指定编码格式与index
df.to_csv(r'D:\py.csv', columns=['col1', 'col2'], sep=',') #指定列名与分隔符


导出至其他格式

# 输出到feather文件
feather.write_dataframe(df, 'data.feather') #原生
df.to_feather('data.feather') #pandas

# 输出到数据库格式
from db_connects.db_connects import db
db = db()
df.to_sql('table', db.ENGINE_XXX, if_exists='replace', index=False) #replace:替换;attend:累增

# 输出到Html
table = df.to_html(border=1, header=True, index=False)
content = "<html><body>" + table + "<br>" + "</body></html>"


< <目录 02-数据表检查 > 返回顶部 ↑