07-常用函数
创建数据案例
import pandas as pd
import numpy as np
df = pd.DataFrame({'id':[1001,1002,1003,1004,1005,1006],
'date':pd.date_range('20130102', periods=6),
'city':['Beijing','SH','Guangzhou ','Shenzhen','Shanghai', 'Beijing'],
'age':[23,44,54,32,34,32],
'category':['100-A','100-B','110-A','110-C','210-A','130-F'],
'price':[1200,np.nan,2133,5433,np.nan,4432]},
columns =['id','date','city','category','age','price'])
函数的使用
for函数
# 循环生成器
x = df['age']
out = []
for i in x:
out.append(i/2)
# 用for函数直接衍生变量
x = df['age']
out = [i/2 for i in df['age']]
# for结合if衍生变量
df['y'] = [1 if i>30 else 0 for i in df['overdue_days']]
apply函数
# 将函数作用于DataFrame中的行或列
df = pd.DataFrame([[4,9],] * 3, columns = ['A', 'B'])
df.apply(np.sqrt)
df.apply(np.sum, axis = 0)
df.apply(np.sum, axis = 1)
# 根据条件生成新列
df['age_bin'] = df['age'].apply(lambda x: 1 if x >= 30 else 0)
df['age_bin'] = df['age'].apply(lambda x: 1 if x >= 40 else 2 if x >= 30 else 0)
# 根据字典生成新列
dict = {"Beijing":"BJ", "Shanghai":"SH", "SH":"SH", "Guangzhou ":"GZ", "Shenzhen":"SZ"}
df['city_code'] = df['city'].apply(lambda x: dict[x])
# 计算行平均值: axis=1
df['mean'] = df.apply(lambda row: row[['age','price']].mean(), axis=1)
df['mean'] = df[['age','price']].mean(axis=1)
applymap函数
# 将函数作用于DataFrame中的每一个元素
df = pd.DataFrame(np.random.randn(4, 3), columns=list('abc'))
format = lambda x: '%.2f' % x
df.applymap(format)
map函数
# 将函数作用于Series中的每一个元素,DataFrame结构中无法使用
df['price'].map(lambda x: x*2)
# 用于Series
result = list(map(lambda x: x*2, df['price']))
# 用于Filter
result = list(filter(lambda x: x>5000, df['price']))
函数的应用
环比增长
def chain(current, last):
df1 = pd.read_csv(current, names=['app', 'tag', 'uv'], sep='\t')
df2 = pd.read_csv(last, names=['app', 'tag', 'uv'], sep='\t')
df3 = pd.merge(df1, df2, how='left', on='app')
df3['uv_y'] = df3['uv_y'].map(lambda x: 0.0 if pd.isnull(x) else x)
df3['growth'] = df3['uv_x'] - df3['uv_y']
return df3[['app', 'growth', 'uv_x', 'uv_y']].sort_values(by='growth', ascending=False)
差集
# 从df_all中过滤df2
df = df_all.append(df2).drop_duplicates(subset=['case_number'], keep=False)
# 对于给定的列,一个Dataframe过滤另一个Dataframe该列的值;相当于集合的差集操作
def difference(left, right, on):
df = pd.merge(left, right, how='left', on=on)
left_columns = left.columns
col_y = df.columns[left_columns.size]
df = df[df[col_y].isnull()]
df = df.ix[:, 0:left_columns.size]
df.columns = left_columns
return df
加密
# md5
import hashlib
def md5Encode(str):
m = hashlib.md5()
m.update(str.encode('utf-8'))
return m.hexdigest()
# sha256
import hashlib
def sha256Encode(str):
m = hashlib.sha256()
m.update(str.encode('utf-8'))
return m.hexdigest()
模糊匹配
def get_job(compname):
job_dict={'公司雇员':['公司','企业'],
'医生':['医院'],
'学生':['学院','大学'],
'自由职业':['自由','无业'],
'其他':['']}
for k,v in job_dict.items():
for i in v:
if i in compname:
#logging.info(compname+' contains* ' + i+' so return '+k+' as job')
return k
return '其他'
df['job'] = df['公司名称'].map(lambda x : get_job(x))
文本相似度
import difflib
list1 = list(df['city'])
# 根据list1中,找到与name最相近的1个值(或多个),相似度阈值为0.95
df['close_matches'] = df['name'].apply(lambda x: difflib.get_close_matches(x, list1, 1, cutoff=0.95))
# 取出list中的字符串,相似度未达到阈值则为空
df['close_matches'] = df['close_matches'].apply(lambda x: x[0] if len(x)>0 else np.nan)
# 计算相似度
df['diff_score'] = df.apply(lambda x: difflib.SequenceMatcher(None, x.col1, x.col2).quick_ratio(), axis=1)
json解析
import pandas as pd
from pandas import json_normalize
import json
# json.dumps参数设置:转化为标准json格式
dict_data = {'name':'小明', 'age':'18', 'address':'Beijing'}
json_data = json.dumps(dict_data,
ensure_ascii=False, # 中文
indent=2, # 缩进
sort_keys=True # 排序
)
df = pd.DataFrame(dict_data, index=[0])
# 根据层级解析
data = [{'id':'100001',
'name': '小明',
'info':{
'addr':{'country':'CN',
'province':'Beijing'},
'phone':'133***6069'
}
},
{'id':'100002',
'name': '小兰',
'info':{
'addr':{'country':'CN',
'province':'Shanghai'},
'phone':'133***5050'
}
}]
# 转为json标准格式
data_json = json.dumps(data, ensure_ascii=False, indent=2)
# 根据层级解析
df1 = pd.json_normalize(data, max_level=0)
df2 = pd.json_normalize(data, max_level=1)
df3 = pd.json_normalize(data, max_level=2)
# 取最后一个层级作为列名
df3.columns = [i.split('.')[-1] if len(i.split('.'))>1 else i for i in df3.columns]
# 批量解析(仅适用于统一格式)
df1 = []
for i in df0["data"]:
i_data = eval(i)
i_res = pd.json_normalize(i_data, max_level=9)
df1.append(i_res)
df2 = pd.concat(df1)
# 同一个json解析出多行,并关联主键
df1 = []
for i in df.index:
i_data = pd.DataFrame(json.loads(df.loc[i, 'data_json']))
i_data['case_number'] = df.loc[i, 'case_number']
i_res = i_data[['case_number', 'imei', 'name']]
df1.append(i_res)
df2 = pd.concat(df1).reset_index(drop=True)
# 单一层级json解析为DataFrame
def _SORTDICTKEY(dictin):
for key in dictin:
if not isinstance(dictin[key],(set,list,dict)):
return dictin[key]
return dictin[dictin.keys[0]]
def flatten_json(y):
out = {}
def flatten(x, name=''):
if isinstance(x,dict):
for a in x:
flatten(x[a], name + a + '_')
elif isinstance(x,list):
x = sorted(x, key=_SORTDICTKEY)
i = 0
for a in x:
flatten(a, name + _SORTDICTKEY(a) + '_')
i += 1
else:
out[name[:-1]] = x
flatten(y)
return out
df = pd.DataFrame({
'id': ['001','002','003'],
'report': [{'语文':80, '数学':85, '外语':90},
{'语文':75, '数学':80, '外语':85},
{'语文':90, '数学':85, '外语':80}]
})
data = df['report'].apply(lambda x: json.dumps(x, ensure_ascii=False)).tolist()
a = [flatten_json(json.loads(d)) for d in data]
res = json_normalize(a)
# 嵌套式json解析
data = [{'id':'100001',
'name': '小明',
'describe':[{'subject':'语', 'score':80},
{'subject':'数', 'score':85},
{'subject':'外', 'score':90}]
},
{'id':'100002',
'name': '小兰',
'describe':[{'subject':'语', 'score':82},
{'subject':'数', 'score':88},
{'subject':'外', 'score':92}]
}]
# 生成json_normalize所需参数
agr1=[]
agr2=[]
for key, value in data[0].items():
if isinstance(value, list):
agr1.append(key)
elif isinstance(value, dict):
for j in data[0][key].keys():
l=[]
l.append(key)
l.append(j)
agr2.append(l)
else:
agr2.append(key)
print(agr1, agr2)
# 只读取层级嵌套中的部分内容
df = json_normalize(data, 'describe')
# 读取全部内容
df = json_normalize(data, agr1, agr2)
df.columns = [i.split('.')[1] if len(i.split('.'))>1 else i for i in df.columns]
日期格式清洗
def time_trans(t):
from datetime import datetime
import re
if isinstance(t, float)==1 and t>30000:
return datetime.fromtimestamp(t).strftime('%Y-%m-%d')
elif isinstance(t, float)==1 and t<30000:
t = str(t).replace('.','-')
t = datetime.strptime(t, '%Y-%m').strftime('%Y-%m')
return t
elif isinstance(t, str) == 1:
t = str(t).split('.')[0].split(' ')[0]
if len(re.findall('(\d{4}/\d{1,2}/\d{1,2})',t))==1:
fmt='%Y/%m/%d'
elif len(re.findall('(\d{4}-\d{1,2}-\d{1,2})',t))==1:
fmt='%Y-%m-%d'
elif len(re.findall('(\d{8})',t))==1:
fmt = '%Y%m%d'
elif len(re.findall('(\d{6})',t))==1:
fmt = '%Y%m'
elif len(re.findall('(\d{2}-[a-zA-Z]{3}-\d{4})',t))==1:
fmt = '%d-%b-%Y'
elif len(re.findall('(\d{2}-[a-zA-Z]{3}-\d\d)',t))==1:
fmt = '%d-%b-%y'
try:
t = datetime.strptime(t,fmt).strftime('%Y-%m-%d')
except:
return 'not support the format of %s yet'%t
return t
t = 1562467704.0623
time_trans(t)
t = 2019.1
time_trans(t)
t = '2019/1/1'
time_trans(t)
时间戳格式
# 10位与13位时间戳
import datetime
import time
def get_time_stamp10():
# 生成13时间戳 eg:1540281250399895
datetime_now = datetime.datetime.utcnow()
# 10位,时间点相当于从UNIX TIME的纪元时间开始的当年时间编号
date_stamp = str(int(time.mktime(datetime_now.timetuple())))
# 3位,微秒
data_microsecond = str("%06d"%datetime_now.microsecond)[0:3]
date_stamp = date_stamp
return int(date_stamp)
def get_time_stamp13():
# 生成13时间戳 eg:1540281250399895
datetime_now = datetime.datetime.utcnow()
# 10位,时间点相当于从UNIX TIME的纪元时间开始的当年时间编号
date_stamp = str(int(time.mktime(datetime_now.timetuple())))
# 3位,微秒
data_microsecond = str("%06d"%datetime_now.microsecond)[0:3]
date_stamp = date_stamp+data_microsecond
return int(date_stamp)
utctime = get_time_stamp10()
utctime = get_time_stamp13()
众数占比
def mode_pct(L):
L = L.tolist()
pct = max(L.count(x) for x in set(L))/len(L)
return pct
a = df.groupby('id')['amount'].apply(mode_pct)
计算非空唯一值
def uni_cnt(list_x):
list_x = [x for x in list_x if x != None]
list_x = [x for x in list_x if x != np.nan]
list_x = [x for x in list_x if x != '']
return len(set(list_x))
a = pd.pivot_table(df, index=['Month'], values=['id'], aggfunc=nui_cnt)
分层抽样
def get_sample(df, k, stratified_col):
import math
import random
random.seed(10)
grouped = df.groupby(by=stratified_col)[stratified_col[0]].count()
group_k = grouped.map(lambda x: math.ceil(x * k))
res_df = pd.DataFrame(columns=df.columns)
for df_idx in group_k.index:
df1=df
if len(stratified_col)==1:
df1=df1[df1[stratified_col[0]]==df_idx]
else:
for i in range(len(df_idx)):
df1=df1[df1[stratified_col[i]]==df_idx[i]]
idx = random.sample(range(len(df1)), group_k[df_idx])
group_df = df1.iloc[idx,:].copy()
res_df = res_df.append(group_df)
return res_df
df_stratified = get_sample(df, k=0.1, stratified_col=['month'])
多行数据合并为一行
# 数值之间以'\t'分隔
def merge_col(df):
return '\t'.join(df.values)
df_res = df.groupby('group')['merge_name'].apply(merge_col)
数据集平均n等分
import math
from tqdm import tqdm
def cut_df(file_name, n):
df = file_name
df_num = len(df)
every_epoch_num = math.floor((df_num/n))
for index in tqdm(range(n)):
file_name = f'./XXX_{index}.csv'
if index < n-1:
df_tem = df[every_epoch_num * index: every_epoch_num * (index + 1)]
else:
df_tem = df[every_epoch_num * index:]
df_tem.to_csv(file_name, index=False)
cut_df(df, 5)
< <目录 | 返回顶部 ↑ |