欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

O2O优惠券使用预测 pyspark

程序员文章站 2022-07-02 10:02:49
...

一、项目背景

从项目的背景、要求、数据集介绍。

提供用户在2016年1月1日至2016年6月30日之间真实线上线下消费行为,预测用户在2016年7月领取优惠券后15天以内的使用情况。

Field

Description

User_id

用户ID

Merchant_id

商户ID

Coupon_id

优惠券ID:null表示无优惠券消费,此时Discount_rate和Date_received字段无意义

Discount_rate

优惠率:x \in [0,1]代表折扣率;x:y表示满x减y。单位是元

Distance

user经常活动的地点离该merchant的最近门店距离是x*500米(如果是连锁店,则取最近的一家门店),x\in[0,10];null表示无此信息,0表示低于500米,10表示大于5公里;

Date_received

领取优惠券日期

Date

消费日期:如果Date=null & Coupon_id != null,该记录表示领取优惠券但没有使用,即负样本;如果Date!=null & Coupon_id = null,则表示普通消费日期;如果Date!=null & Coupon_id != null,则表示用优惠券消费日期,即正样本;

 

 

二、分析过程(可以加页)

从数据的预处理、模型建立、参数调优、预测效果、数据可视化介绍。可以给出代码和结果截图。

     

第一步:导入需要的包

from datetime import date

from pyspark.sql import SQLContext

import time

from pyspark.ml.classification

import DecisionTreeClassifier

from pyspark.ml.evaluation

import BinaryClassificationEvaluator

from pyspark.ml import Pipeline

from pyspark.ml.feature import VectorAssembler

import pyspark.sql.functions as fun

import numpy as np

import pandas as pd

from pyspark.sql import SparkSession

from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator,  

ParamGridBuilder

from pyspark.sql.functions import udf

from pyspark.sql.types import DateType, StringType

第二步:创建需要的spark:

spark= SparkSession.builder.appName('train')

.getOrCreate()

 

sc = spark.sparkContext

第三步:读取数据

df_off = pd

.read_csv('file:/home/joker/PycharmProjects/MLFW/data/cc f_offline_stage1_train.csv')

df_test = pd

.read_csv('file:/home/joker/PycharmProjects

/MLFW/data/ccf_offline_stage1_test_revised.csv

 ')

print('数据读取完成')

第四步:处理数据

经过数据分析后,发现数据存在折扣数据类型过多,有满减还有打折类型,而且消费可能跟领到券的周几有关,所以先对数据进行处理

  1. 编写数据处理函数:

# 一些用到的函数

def trans_discount(row):

if pd.isnull(row):

return 1.0

elif ':' in str(row):

line = str(row).split(':')

return 1.0 - float(line[1]) / float(line[0])

else:

return float(row)





def trans_df(df):

df['discount_type'] = df['Discount_rate']

.apply(

lambda row: np.nan if pd.isnull(row) else (1 if ':'

 in str(row) else 0))

df['discount_rate']=df['Discount_rate']

.apply(trans_discount)

df['discount_man']=df['Discount_rate']

.apply(lambda row:

int(row.split(':')[0]) if ':' in str(row) else 0)

df['discount_reduce']=df['Discount_rate']

.apply(lambda row:

int(row.split(':')[1]) if ':' in str(row) else 0)

df['distance']=df['Distance']

.fillna(-1).astype(int)

return df





def label(row):

if pd.isnull(row['Date_received']):

return -1

if pd.notnull(row['Date']):

td=pd.to_datetime(row['Date'],

format='%Y%m%d')-pd

.to_datetime(

row['Date_received']

,format='%Y%m%d')

if td <= pd.Timedelta(15, 'D'):

return 1

return 0





print('函数编译完成')
  1. 进行数据处理:


 

time_start = time.time()

# 数据处理

df_off = trans_df(df_off)

df_test = trans_df(df_test)

# received_date = df_off['Date_received'].unique()

#received_date=sorted(received_date

[pd.notnull(received_date)])

# buying_date = df_off['Date'].unique()

#buying_date=sorted(buying_date[pd.notnull(buying_date)

 ])

#buying_date=sorted(df_off[df_off['Date']

.notnull()]['Date'])



#coupon_by_date=df_off[

df_off['Date_received'].notnull()]['Date']

# coupon_by_date.columns = ['Date_received', 'count']



#buy_by_date=df_off[(df_off['Date'].notnull())

&(df_off['Date_received'].notnull())]

[['Date_received', 'Date']] \

#  .groupby(['Date_received'], as_index=False).count()

# buy_by_date.columns = ['Date_received', 'count']



df_off['weekday'] = df_off['Date_received']

.astype(str).apply(lambda row

: np.nan if row == 'nan' else

 date(int(row[0:4]),

int(row[4:6]),

int(row[6:8])).weekday() + 1)

df_test['weekday'] = df_test['Date_received']

.astype(str) \

.apply(lambda row:

np.nan if row == 'nan' else

 date(int(row[0:4]),

int(row[4:6]),

int(row[6:8])).weekday() + 1)



df_off['weekday_type'] = df_off['weekday']

.apply(lambda x:

 1 if x in [6, 7] else 0)

df_test['weekday_type'] = df_test['weekday']

.apply(lambda x:

 1 if x in [6, 7]

 else 0)

weekday_columns = ['weekday' + str(i) for i in range(1, 8)]

df_off['label'] = df_off.apply(label, axis=1)

df_test['label'] = df_off.apply(label, axis=1)

time_end = time.time()

print('time cost', time_end - time_start, 's')

print('数据处理结束')





5.数据分割:

df_off.drop(['Distance', 'Discount_rate'], axis=1, inplace=True)

df_test.drop(['Distance', 'Discount_rate'], axis=1, inplace=True)



df = df_off[df_off['label'] != -1].copy()

# df_t = df_test[df_test['label'] != -1].copy()

train = df[(df['Date_received'] < 20160530)].copy()

valid = df[(df['Date_received'] >= 20160516) & (df['Date_received'] <= 20160615)].copy()

print('数据分割结束')

train['weekday'] = df['weekday'].astype(int)

train['Coupon_id'] = df['Coupon_id'].astype(int)

original_feature = ['discount_rate', 'discount_type', 'discount_man', 'discount_reduce',

                    'distance', 'weekday', 'weekday_type']



print('数据分割结束')

第五步:建立模型、

 

sql_context = SQLContext(sc)

df_train = sql_context.createDataFrame(train)

sp_test = sql_context.createDataFrame(valid)



# %%



assemblerInputs = original_feature

assembler = VectorAssembler(inputCols=assemblerInputs

, outputCol='features')

paramGrid = ParamGridBuilder()

.addGrid(dt.impurity, ['gini', 'entropy'])\

            .addGrid(dt.maxDepth, [16, 25])\

            .addGrid(dt.maxBins, [5, 10]).build()

tvs=TrainValidationSplit(estimator=dt

, evaluator=evaluator

,estimatorParamMaps=paramGrid

,trainRatio=0.8, seed=19)

    tvs_pipeline = Pipeline(stages=[assembler, tvs])

    tvs_model = tvs_pipeline.fit(df_train)

    proficiency=evaluator

.evaluate(tvs_model.transform(sp_test))

     



 

 

 

 

 

最初准确率为50%,经过参数调优后,准确率达到了65%,

O2O优惠券使用预测 pyspark

 

 

但是由于机器性能有限,数据量过于庞大、跑一次程序就O2O优惠券使用预测 pyspark
需要十五分钟,所以没有进行深度的优化

下面这个图便是参数调优时候对树深度的可视化

O2O优惠券使用预测 pyspark

 

代码

# %%
from datetime import date
from pyspark.sql import SQLContext
import time
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as fun
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator, ParamGridBuilder
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType, StringType

spark = SparkSession.builder.appName('train').getOrCreate()

sc = spark.sparkContext

# 读取数据
df_off = pd.read_csv('file:/home/joker/PycharmProjects/MLFW/data/ccf_offline_stage1_train.csv')
# df_on = pd.read_csv('file:/home/joker/PycharmProjects/MLFW/data/ccf_online_stage1_train.csv')
df_test = pd.read_csv('file:/home/joker/PycharmProjects/MLFW/data/ccf_offline_stage1_test_revised.csv')
print('数据读取完成')


# %%
# 一些用到的函数

def trans_discount(row):
    if pd.isnull(row):
        return 1.0
    elif ':' in str(row):
        line = str(row).split(':')
        return 1.0 - float(line[1]) / float(line[0])
    else:
        return float(row)


def trans_df(df):
    df['discount_type'] = df['Discount_rate'].apply(
        lambda row: np.nan if pd.isnull(row) else (1 if ':' in str(row) else 0))
    df['discount_rate'] = df['Discount_rate'].apply(trans_discount)
    df['discount_man'] = df['Discount_rate'].apply(lambda row: int(row.split(':')[0]) if ':' in str(row) else 0)
    df['discount_reduce'] = df['Discount_rate'].apply(lambda row: int(row.split(':')[1]) if ':' in str(row) else 0)

    df['distance'] = df['Distance'].fillna(-1).astype(int)
    return df


def label(row):
    if pd.isnull(row['Date_received']):
        return -1
    if pd.notnull(row['Date']):
        td = pd.to_datetime(row['Date'], format='%Y%m%d') - pd.to_datetime(row['Date_received'], format='%Y%m%d')
        if td <= pd.Timedelta(15, 'D'):
            return 1
    return 0


print('函数编译完成')

time_start = time.time()
# 数据处理
df_off = trans_df(df_off)
df_test = trans_df(df_test)

# received_date = df_off['Date_received'].unique()
# received_date = sorted(received_date[pd.notnull(received_date)])

# buying_date = df_off['Date'].unique()
# buying_date = sorted(buying_date[pd.notnull(buying_date)])
# buying_date = sorted(df_off[df_off['Date'].notnull()]['Date'])

# coupon_by_date = df_off[df_off['Date_received'].notnull()]['Date']
# coupon_by_date.columns = ['Date_received', 'count']

# buy_by_date = df_off[(df_off['Date'].notnull()) & (df_off['Date_received'].notnull())][['Date_received', 'Date']] \
#     .groupby(['Date_received'], as_index=False).count()
# buy_by_date.columns = ['Date_received', 'count']

df_off['weekday'] = df_off['Date_received'].astype(str) \
    .apply(lambda row: np.nan if row == 'nan' else date(int(row[0:4]), int(row[4:6]), int(row[6:8])).weekday() + 1)
df_test['weekday'] = df_test['Date_received'].astype(str) \
    .apply(lambda row: np.nan if row == 'nan' else date(int(row[0:4]), int(row[4:6]), int(row[6:8])).weekday() + 1)

df_off['weekday_type'] = df_off['weekday'].apply(lambda x: 1 if x in [6, 7] else 0)
df_test['weekday_type'] = df_test['weekday'].apply(lambda x: 1 if x in [6, 7] else 0)

weekday_columns = ['weekday' + str(i) for i in range(1, 8)]
# df_temp = pd.get_dummies(df_off['weekday'].replace('nan', np.nan))
# df_temp.columns = weekday_columns

df_off['label'] = df_off.apply(label, axis=1)
df_test['label'] = df_off.apply(label, axis=1)

time_end = time.time()
print('time cost', time_end - time_start, 's')
print('数据处理结束')

# %%
# 数据分割
df_off.drop(['Distance', 'Discount_rate'], axis=1, inplace=True)
df_test.drop(['Distance', 'Discount_rate'], axis=1, inplace=True)

df = df_off[df_off['label'] != -1].copy()
# df_t = df_test[df_test['label'] != -1].copy()
train = df[(df['Date_received'] < 20160530)].copy()
valid = df[(df['Date_received'] >= 20160516) & (df['Date_received'] <= 20160615)].copy()
print('数据分割结束')
train['weekday'] = df['weekday'].astype(int)
train['Coupon_id'] = df['Coupon_id'].astype(int)
original_feature = ['discount_rate', 'discount_type', 'discount_man', 'discount_reduce',
                    'distance', 'weekday', 'weekday_type']

print('数据分割结束')
# %%

time_start = time.time()
sql_context = SQLContext(sc)
df_train = sql_context.createDataFrame(train)
sp_test = sql_context.createDataFrame(valid)
time_end = time.time()
print('time cost', time_end - time_start, 's')
# %%
df_train.select('Date', 'label').show()

# %%

assemblerInputs = original_feature

time_start = time.time()
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')
time_end = time.time()
print('time cost', time_end - time_start, 's')

df_assembler = assembler.transform(df_train)

# %%
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features', impurity='gini', maxDepth=10, maxBins=14)
# dt_model = dt.fit(df_assembler)
#
# dt_DT = dt_model.transform(df_assembler)


#%%
pipeline = Pipeline(stages=[assembler, dt])

pipelineModel = pipeline.fit(df_train)
predicted = pipelineModel.transform(sp_test)

#%%
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='label', metricName='areaUnderROC')
auc = evaluator.evaluate(predicted)
print(auc)

#%%
paramGrid = ParamGridBuilder().addGrid(dt.impurity, ['gini', 'entropy'])\
                                .addGrid(dt.maxDepth, [16, 25])\
                                .addGrid(dt.maxBins, [5, 10]).build()
max_pro = 0.651
max_seed = 19    # 19
                #0.651
for i in range(100, 200):
    tvs = TrainValidationSplit(estimator=dt, evaluator=evaluator, estimatorParamMaps=paramGrid, trainRatio=0.8, seed=i)
    tvs_pipeline = Pipeline(stages=[assembler, tvs])
    tvs_model = tvs_pipeline.fit(df_train)
    proficiency = evaluator.evaluate(tvs_model.transform(sp_test))
    print(tvs_model.stages[1].bestModel.toDebugString[:500])
    if max_pro < proficiency:
        print(proficiency)
        max_pro = proficiency
        max_seed = i

    print(max_seed, '   ', max_pro)

#%%
# from pyspark.ml.classification import RandomForestClassifier
#
# time_start = time.time()
# rf = RandomForestClassifier(labelCol='label', featuresCol='features', numTrees=100)
# rf_pipeline = Pipeline(stages=[assembler, rf])
# rf_model = rf_pipeline.fit(df_train)
# print(evaluator.evaluate(rf_model.transform(sp_test)))
# time_end = time.time()
# print('time cost', time_end - time_start, 's')
# %%
# from pyspark.ml.tuning import ParamGridBuilder
# paramGrid = ParamGridBuilder().addGrid(rf.impurity, ['gini', 'entropy'])\
#                                 .addGrid(rf.maxDepth, [5, 10, 15])\
#                                 .addGrid(rf.maxBins, [3, 6, 9])\
#                                 .addGrid(rf.numTrees, [3, 5, 10]).build()
#
# rfcv = CrossValidator(estimator=rf, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=5)
# rfcv_pipeline = Pipeline(stages=[assembler, rfcv])
# rfcv_model = rfcv_pipeline.fit(df_train)
# predicted = rfcv_model.transform(sp_test)
# print(evaluator.evaluate(predicted))
#
#
# %%
# def evaluationTrainModel(train_data, validation_data, impurity, maxDepth, maxBins):
#     记录开始时间
    # start_time = time.time()
    #
    #
    # param = ParamGridBuilder().addGrid(rf.impurity, impurity) \
    #     .addGrid(rf.maxDepth, maxDepth) \
    #     .addGrid(rf.maxBins, maxBins).build()
    # tvs = TrainValidationSplit(estimator=dt, evaluator=evaluator, estimatorParamMaps=param, trainRatio=0.8)
    # tvs_pipeline = Pipeline(stages=[assembler, tvs])
    # tvs_model = tvs_pipeline.fit(train_data)
    # predicted = tvs_model.transform(validation_data)
    # auc = evaluator.evaluate(predicted)
    # duration = time.time() - start_time
    # auc_t = evaluator.evaluate(predicted)
    # return (tvs_model, duration, auc_t, impurity, maxDepth, maxBins)
    #
    # 根据传入参数训练模型
#