Skip to the content.

07-常用函数


创建数据案例

import pandas as pd
import numpy as np
df = pd.DataFrame({'id':[1001,1002,1003,1004,1005,1006], 
                   'date':pd.date_range('20130102', periods=6),
                   'city':['Beijing','SH','Guangzhou ','Shenzhen','Shanghai', 'Beijing'],
                   'age':[23,44,54,32,34,32],
                   'category':['100-A','100-B','110-A','110-C','210-A','130-F'],
                   'price':[1200,np.nan,2133,5433,np.nan,4432]},
                   columns =['id','date','city','category','age','price'])


函数的使用

for函数

# 循环生成器
x = df['age']
out = []
for i in x:
    out.append(i/2)

# 用for函数直接衍生变量
x = df['age']
out = [i/2 for i in df['age']]

# for结合if衍生变量
df['y'] = [1 if i>30 else 0 for i in df['overdue_days']]



apply函数

# 将函数作用于DataFrame中的行或列
df = pd.DataFrame([[4,9],] * 3, columns = ['A', 'B'])
df.apply(np.sqrt)
df.apply(np.sum, axis = 0)
df.apply(np.sum, axis = 1)

# 根据条件生成新列
df['age_bin'] = df['age'].apply(lambda x: 1 if x >= 30 else 0)
df['age_bin'] = df['age'].apply(lambda x: 1 if x >= 40 else 2 if x >= 30 else 0)

# 根据字典生成新列
dict = {"Beijing":"BJ", "Shanghai":"SH", "SH":"SH", "Guangzhou ":"GZ", "Shenzhen":"SZ"}
df['city_code'] = df['city'].apply(lambda x: dict[x])

# 计算行平均值: axis=1
df['mean'] = df.apply(lambda row: row[['age','price']].mean(), axis=1)
df['mean'] = df[['age','price']].mean(axis=1)



applymap函数

# 将函数作用于DataFrame中的每一个元素
df = pd.DataFrame(np.random.randn(4, 3), columns=list('abc'))
format = lambda x: '%.2f' % x
df.applymap(format)



map函数

# 将函数作用于Series中的每一个元素,DataFrame结构中无法使用
df['price'].map(lambda x: x*2)

# 用于Series
result = list(map(lambda x: x*2, df['price']))

# 用于Filter
result = list(filter(lambda x: x>5000, df['price']))



函数的应用

环比增长

def chain(current, last):
    df1 = pd.read_csv(current, names=['app', 'tag', 'uv'], sep='\t')
    df2 = pd.read_csv(last, names=['app', 'tag', 'uv'], sep='\t')
    df3 = pd.merge(df1, df2, how='left', on='app')
    df3['uv_y'] = df3['uv_y'].map(lambda x: 0.0 if pd.isnull(x) else x)
    df3['growth'] = df3['uv_x'] - df3['uv_y']
    return df3[['app', 'growth', 'uv_x', 'uv_y']].sort_values(by='growth', ascending=False)



差集

# 从df_all中过滤df2
df = df_all.append(df2).drop_duplicates(subset=['case_number'], keep=False)

# 对于给定的列,一个Dataframe过滤另一个Dataframe该列的值;相当于集合的差集操作
def difference(left, right, on):
    df = pd.merge(left, right, how='left', on=on)
    left_columns = left.columns
    col_y = df.columns[left_columns.size]
    df = df[df[col_y].isnull()]
    df = df.ix[:, 0:left_columns.size]
    df.columns = left_columns
    return df



加密

# md5
import hashlib
def md5Encode(str):
    m = hashlib.md5()
    m.update(str.encode('utf-8'))
    return m.hexdigest()

# sha256
import hashlib
def sha256Encode(str):
    m = hashlib.sha256()
    m.update(str.encode('utf-8'))
    return m.hexdigest()



模糊匹配

def get_job(compname):
    job_dict={'公司雇员':['公司','企业'],
              '医生':['医院'],
              '学生':['学院','大学'],
              '自由职业':['自由','无业'],
              '其他':['']}
    for k,v in job_dict.items():
        for i in v:
            if i in compname:
                #logging.info(compname+' contains* ' + i+' so return '+k+' as job')
                return k
    return '其他'

df['job'] = df['公司名称'].map(lambda x : get_job(x))



文本相似度

import difflib
list1 = list(df['city'])

# 根据list1中,找到与name最相近的1个值(或多个),相似度阈值为0.95
df['close_matches'] = df['name'].apply(lambda x: difflib.get_close_matches(x, list1, 1, cutoff=0.95)) 

# 取出list中的字符串,相似度未达到阈值则为空
df['close_matches'] = df['close_matches'].apply(lambda x: x[0] if len(x)>0 else np.nan)

# 计算相似度
df['diff_score'] = df.apply(lambda x: difflib.SequenceMatcher(None, x.col1, x.col2).quick_ratio(), axis=1)



json解析

import pandas as pd
from pandas import json_normalize
import json

# json.dumps参数设置:转化为标准json格式
dict_data = {'name':'小明', 'age':'18', 'address':'Beijing'}
json_data = json.dumps(dict_data,
                       ensure_ascii=False, # 中文
                       indent=2, # 缩进
                       sort_keys=True # 排序
                       )

df = pd.DataFrame(dict_data, index=[0])


# 根据层级解析
data = [{'id':'100001',
         'name': '小明',
         'info':{
                 'addr':{'country':'CN',
                         'province':'Beijing'},
                 'phone':'133***6069'    
                }    
         },
        {'id':'100002',
         'name': '小兰',
         'info':{
                 'addr':{'country':'CN',
                         'province':'Shanghai'},
                 'phone':'133***5050'    
                }    
         }]

# 转为json标准格式
data_json = json.dumps(data, ensure_ascii=False, indent=2)

# 根据层级解析
df1 = pd.json_normalize(data, max_level=0)
df2 = pd.json_normalize(data, max_level=1)
df3 = pd.json_normalize(data, max_level=2)

# 取最后一个层级作为列名
df3.columns = [i.split('.')[-1] if len(i.split('.'))>1 else i for i in df3.columns]


# 批量解析(仅适用于统一格式)
df1 = []
for i in df0["data"]:
    i_data = eval(i)
    i_res = pd.json_normalize(i_data, max_level=9)
    df1.append(i_res)
df2 = pd.concat(df1)

# 同一个json解析出多行,并关联主键
df1 = []
for i in df.index:
    i_data = pd.DataFrame(json.loads(df.loc[i, 'data_json']))
    i_data['case_number'] = df.loc[i, 'case_number']
    i_res = i_data[['case_number', 'imei', 'name']]
    df1.append(i_res)
df2 = pd.concat(df1).reset_index(drop=True)


# 单一层级json解析为DataFrame
def _SORTDICTKEY(dictin):
    for key in dictin:
        if not isinstance(dictin[key],(set,list,dict)):
            return dictin[key]
    return dictin[dictin.keys[0]]

def flatten_json(y):
    out = {}
    def flatten(x, name=''):
        if isinstance(x,dict):
            for a in x:
                flatten(x[a], name + a + '_')
        elif isinstance(x,list):
            x = sorted(x, key=_SORTDICTKEY)
            i = 0
            for a in x:
                flatten(a, name + _SORTDICTKEY(a) + '_')
                i += 1
        else:
            out[name[:-1]] = x
    flatten(y)
    return out

df = pd.DataFrame({
    'id': ['001','002','003'],
    'report': [{'语文':80, '数学':85, '外语':90},
               {'语文':75, '数学':80, '外语':85},
               {'语文':90, '数学':85, '外语':80}]
    })

data = df['report'].apply(lambda x: json.dumps(x, ensure_ascii=False)).tolist()
a = [flatten_json(json.loads(d)) for d in data]
res = json_normalize(a)


# 嵌套式json解析
data = [{'id':'100001',
         'name': '小明',
         'describe':[{'subject':'语', 'score':80},
                     {'subject':'数', 'score':85},
                     {'subject':'外', 'score':90}]
         },
        {'id':'100002',
         'name': '小兰',
         'describe':[{'subject':'语', 'score':82},
                     {'subject':'数', 'score':88},
                     {'subject':'外', 'score':92}]
         }]

# 生成json_normalize所需参数
agr1=[]
agr2=[]
for key, value in data[0].items():
    if isinstance(value, list):
        agr1.append(key)       
    elif isinstance(value, dict):
        for j in data[0][key].keys():
            l=[]
            l.append(key)
            l.append(j)
            agr2.append(l)       
    else:
        agr2.append(key)
print(agr1, agr2)

# 只读取层级嵌套中的部分内容
df = json_normalize(data, 'describe')

# 读取全部内容
df = json_normalize(data, agr1, agr2)
df.columns = [i.split('.')[1] if len(i.split('.'))>1 else i for i in df.columns]



日期格式清洗

def time_trans(t):
    from datetime import datetime
    import re
    if isinstance(t, float)==1 and t>30000:
        return datetime.fromtimestamp(t).strftime('%Y-%m-%d')
    elif isinstance(t, float)==1 and t<30000:
        t = str(t).replace('.','-')
        t =  datetime.strptime(t, '%Y-%m').strftime('%Y-%m')
        return t
    elif isinstance(t, str) == 1:
        t = str(t).split('.')[0].split(' ')[0]
        if len(re.findall('(\d{4}/\d{1,2}/\d{1,2})',t))==1:
            fmt='%Y/%m/%d'
        elif len(re.findall('(\d{4}-\d{1,2}-\d{1,2})',t))==1:
            fmt='%Y-%m-%d'
        elif len(re.findall('(\d{8})',t))==1:
            fmt = '%Y%m%d'
        elif len(re.findall('(\d{6})',t))==1:
            fmt = '%Y%m'
        elif len(re.findall('(\d{2}-[a-zA-Z]{3}-\d{4})',t))==1:
            fmt = '%d-%b-%Y'
        elif len(re.findall('(\d{2}-[a-zA-Z]{3}-\d\d)',t))==1:
            fmt = '%d-%b-%y'
    try:
        t = datetime.strptime(t,fmt).strftime('%Y-%m-%d')
    except:
        return 'not support the format of %s yet'%t 
    return t
        

t = 1562467704.0623
time_trans(t)

t = 2019.1
time_trans(t)

t = '2019/1/1'
time_trans(t)



时间戳格式

# 10位与13位时间戳
import datetime
import time
def get_time_stamp10():
  # 生成13时间戳  eg:1540281250399895
  datetime_now = datetime.datetime.utcnow()
  # 10位,时间点相当于从UNIX TIME的纪元时间开始的当年时间编号
  date_stamp = str(int(time.mktime(datetime_now.timetuple())))
  # 3位,微秒
  data_microsecond = str("%06d"%datetime_now.microsecond)[0:3]
  date_stamp = date_stamp
  return int(date_stamp)

def get_time_stamp13():
  # 生成13时间戳  eg:1540281250399895
  datetime_now = datetime.datetime.utcnow()
  # 10位,时间点相当于从UNIX TIME的纪元时间开始的当年时间编号
  date_stamp = str(int(time.mktime(datetime_now.timetuple())))
  # 3位,微秒
  data_microsecond = str("%06d"%datetime_now.microsecond)[0:3]
  date_stamp = date_stamp+data_microsecond
  return int(date_stamp)

utctime = get_time_stamp10()
utctime = get_time_stamp13()



众数占比

def mode_pct(L):
    L = L.tolist()
    pct = max(L.count(x) for x in set(L))/len(L)
    return pct

a = df.groupby('id')['amount'].apply(mode_pct)



计算非空唯一值

def uni_cnt(list_x):
    list_x = [x for x in list_x if x != None]
    list_x = [x for x in list_x if x != np.nan]
    list_x = [x for x in list_x if x != '']
    return len(set(list_x))

a = pd.pivot_table(df, index=['Month'], values=['id'], aggfunc=nui_cnt)



分层抽样

def get_sample(df, k, stratified_col):
    import math
    import random
    random.seed(10)
    grouped = df.groupby(by=stratified_col)[stratified_col[0]].count()
    group_k = grouped.map(lambda x: math.ceil(x * k))
    
    res_df = pd.DataFrame(columns=df.columns)
    for df_idx in group_k.index:
        df1=df
        if len(stratified_col)==1:
            df1=df1[df1[stratified_col[0]]==df_idx]
        else:
            for i in range(len(df_idx)):
                df1=df1[df1[stratified_col[i]]==df_idx[i]]
        idx = random.sample(range(len(df1)), group_k[df_idx])
        group_df = df1.iloc[idx,:].copy()
        res_df = res_df.append(group_df)
    return res_df

df_stratified = get_sample(df, k=0.1, stratified_col=['month'])



多行数据合并为一行

# 数值之间以'\t'分隔
def merge_col(df):
    return '\t'.join(df.values)

df_res = df.groupby('group')['merge_name'].apply(merge_col)



数据集平均n等分

import math
from tqdm import tqdm
def cut_df(file_name, n):
    df = file_name
    df_num = len(df)
    every_epoch_num = math.floor((df_num/n))
    for index in tqdm(range(n)):
        file_name = f'./XXX_{index}.csv'
        if index < n-1:
            df_tem = df[every_epoch_num * index: every_epoch_num * (index + 1)]
        else:
            df_tem = df[every_epoch_num * index:]
        df_tem.to_csv(file_name, index=False)

cut_df(df, 5)
< <目录 返回顶部 ↑