/** * Created by lkl on 2018/1/16. */import org.apache.spark.mllib.evaluation.BinaryClassificationMetricsimport org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.tree.GradientBoostedTreesimport org.apache.spark.mllib.tree.configuration.BoostingStrategyimport org.apache.spark.mllib.tree.model.GradientBoostedTreesModelimport org.apache.spark.sql.{Row, SaveMode}import org.apache.spark.sql.hive.HiveContextimport org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ArrayBufferobject abregression3Model20180116 { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("abregression3Model20180116") val sc = new SparkContext(sparkConf) val hc = new HiveContext(sc) val data = hc.sql(s"select * from lkl_card_score.fqz_score_dataset_03train").map { row => val arr = new ArrayBuffer[Double]() //剔除label、phone字段 for (i <- 3 until row.size) { if (row.isNullAt(i)) { arr += 0.0 } else if (row.get(i).isInstanceOf[Int]) arr += row.getInt(i).toDouble else if (row.get(i).isInstanceOf[Double]) arr += row.getDouble(i) else if (row.get(i).isInstanceOf[Long]) arr += row.getLong(i).toDouble else if (row.get(i).isInstanceOf[String]) arr += 0.0 } LabeledPoint(row.getInt(2).toDouble, Vectors.dense(arr.toArray)) } // Split data into training (60%) and test (40%) val Array(trainingData, testData) = data.randomSplit(Array(0.7,0.3), seed = 11L) // 逻辑回归是迭代算法,所以缓存训练数据的RDD trainingData.cache() //使用SGD算法运行逻辑回归 val boostingStrategy = BoostingStrategy.defaultParams("Regression") boostingStrategy.setNumIterations(40) // Note: Use more iterations in practice. boostingStrategy.treeStrategy.setMaxDepth(6) boostingStrategy.treeStrategy.setMinInstancesPerNode(50) val model = GradientBoostedTrees.train(data, boostingStrategy) model.save(sc, s"hdfs://ns1/user/songchunlin/model/abregression3Model20180116") sc.makeRDD(Seq(model.toDebugString)).repartition(1).saveAsTextFile(s"hdfs://ns1/user/songchunlin/model/toDebugString/abregression3Model20180116") // 全量data数据打分,原本用testData打分 val omodel=GradientBoostedTreesModel.load(sc,s"hdfs://ns1/user/songchunlin/model/abregression3Model20180116") val predictionAndLabels = data.map { case LabeledPoint(label, features) => val prediction = omodel.predict(features) (prediction, label) } println("testData count = " + testData.count()) println("predictionAndLabels count = " + predictionAndLabels.count()) predictionAndLabels.map(x => {"predicts: "+x._1+"--> labels:"+x._2}).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/predictionAndLabels") val metrics = new BinaryClassificationMetrics(predictionAndLabels) val precision = metrics.precisionByThreshold precision.map({case (t, p) => "Threshold: "+t+"Precision:"+p }).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/precision") val recall = metrics.recallByThreshold recall.map({case (t, r) => "Threshold: "+t+"Recall:"+r }).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/recall") val beta = 2 val f2Score = metrics.fMeasureByThreshold(beta) f2Score.map(x => {"Threshold: "+x._1+"--> F-score:"+x._2+"--> Beta = 2"}) .saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/f1Score") val prc = metrics.pr prc.map(x => {"Recall: " + x._1 + "--> Precision: "+x._2 }).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/prc") // AUPRC,精度,召回曲线下的面积 val auPRC = metrics.areaUnderPR println("Area under precision-recall curve = " +auPRC) sc.makeRDD(Seq("Area under precision-recall curve = " +auPRC)).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/auPRC") //roc val roc = metrics.roc roc.map(x => {"FalsePositiveRate:" + x._1 + "--> Recall: " +x._2}).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/roc") // AUC val auROC = metrics.areaUnderROC sc.makeRDD(Seq("Area under ROC = " + +auROC)).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/auROC") println("Area under ROC = " + auROC) //val accuracy = 1.0 * predictionAndLabels.filter(x => x._1 == x._2).count() / testData.count() val dataInstance = hc.sql(s"select * from lkl_card_score.fqz_score_dataset_03train").map { row => val arr = new ArrayBuffer[Double]() //剔除label、phone字段 for (i <- 3 until row.size) { if (row.isNullAt(i)) { arr += 0.0 } else if (row.get(i).isInstanceOf[Int]) arr += row.getInt(i).toDouble else if (row.get(i).isInstanceOf[Double]) arr += row.getDouble(i) else if (row.get(i).isInstanceOf[Long]) arr += row.getLong(i).toDouble else if (row.get(i).isInstanceOf[String]) arr += 0.0 } (row(0),row(1),row(2),Vectors.dense(arr.toArray)) } val preditDataGBDT = dataInstance.map { point => val prediction = model.predict(point._4) //order_id,apply_time,score (point._1,point._2,point._3,prediction) } //rdd转dataFrame val rowRDD = preditDataGBDT.map(row => Row(row._1.toString,row._2.toString,row._3.toString,row._4)) val schema = StructType( List( StructField("order_id", StringType, true), StructField("apply_time", StringType, true), StructField("label", StringType, true), StructField("score", DoubleType, true) ) ) //将RDD映射到rowRDD,schema信息应用到rowRDD上 val scoreDataFrame = hc.createDataFrame(rowRDD,schema) scoreDataFrame.count() scoreDataFrame.write.mode(SaveMode.Overwrite).saveAsTable("lkl_card_score.fqz_score_dataset_03train_predict") // 自己测试建模 val balance = hc.sql(s"select * from lkl_card_score.overdue_result_all_new_woe_instant_v3_02train where label='1' limit 85152 union all select * from lkl_card_score.overdue_result_all_new_woe_instant_v3_02train where label='0'").map { row => val arr = new ArrayBuffer[Double]() //剔除label、phone字段 for (i <- 3 until row.size) { if (row.isNullAt(i)) { arr += 0.0 } else if (row.get(i).isInstanceOf[Int]) arr += row.getInt(i).toDouble else if (row.get(i).isInstanceOf[Double]) arr += row.getDouble(i) else if (row.get(i).isInstanceOf[Long]) arr += row.getLong(i).toDouble else if (row.get(i).isInstanceOf[String]) arr += 0.0 } LabeledPoint(row.getInt(2).toDouble, Vectors.dense(arr.toArray)) } // 逻辑回归是迭代算法,所以缓存训练数据的RDD balance.cache() val boostingStrategy1 = BoostingStrategy.defaultParams("Regression") boostingStrategy1.setNumIterations(40) // Note: Use more iterations in practice. boostingStrategy1.treeStrategy.setMaxDepth(6) boostingStrategy1.treeStrategy.setMinInstancesPerNode(50) val model2 = GradientBoostedTrees.train(balance, boostingStrategy1) val predictionAndLabels2 = data.map { case LabeledPoint(label, features) => val prediction = model2.predict(features) (prediction, label) } val metrics2 = new BinaryClassificationMetrics(predictionAndLabels2) // AUPRC,精度,召回曲线下的面积 val auPRC1 = metrics2.areaUnderPR val preditDataGBDT1 = dataInstance.map { point => val prediction2 = model2.predict(point._4) //order_id,apply_time,score (point._1,point._2,point._3,prediction2) } //rdd转dataFrame val rowRDD2 = preditDataGBDT1.map(row => Row(row._1.toString,row._2.toString,row._3.toString,row._4)) val schema2 = StructType( List( StructField("order_id", StringType, true), StructField("apply_time", StringType, true), StructField("label", StringType, true), StructField("score", DoubleType, true) ) ) val scoreDataFrame2 = hc.createDataFrame(rowRDD2,schema2) scoreDataFrame2.count() scoreDataFrame2.write.mode(SaveMode.Overwrite).saveAsTable("lkl_card_score.fqz_score_dataset_02val_170506_predict") }}