博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
lakala反欺诈建模实际应用代码GBDT监督学习
阅读量:6296 次
发布时间:2019-06-22

本文共 8308 字,大约阅读时间需要 27 分钟。

/**  * Created by lkl on 2018/1/16.  */import org.apache.spark.mllib.evaluation.BinaryClassificationMetricsimport org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.mllib.tree.GradientBoostedTreesimport org.apache.spark.mllib.tree.configuration.BoostingStrategyimport org.apache.spark.mllib.tree.model.GradientBoostedTreesModelimport org.apache.spark.sql.{Row, SaveMode}import org.apache.spark.sql.hive.HiveContextimport org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}import org.apache.spark.{SparkConf, SparkContext}import scala.collection.mutable.ArrayBufferobject abregression3Model20180116 {  def main(args: Array[String]): Unit = {    val sparkConf = new SparkConf().setAppName("abregression3Model20180116")    val sc = new SparkContext(sparkConf)    val hc = new HiveContext(sc)    val data = hc.sql(s"select * from lkl_card_score.fqz_score_dataset_03train").map {      row =>        val arr = new ArrayBuffer[Double]()        //剔除label、phone字段        for (i <- 3 until row.size) {          if (row.isNullAt(i)) {            arr += 0.0          }          else if (row.get(i).isInstanceOf[Int])            arr += row.getInt(i).toDouble          else if (row.get(i).isInstanceOf[Double])            arr += row.getDouble(i)          else if (row.get(i).isInstanceOf[Long])            arr += row.getLong(i).toDouble          else if (row.get(i).isInstanceOf[String])            arr += 0.0        }        LabeledPoint(row.getInt(2).toDouble, Vectors.dense(arr.toArray))    }    // Split data into training (60%) and test (40%)    val Array(trainingData, testData) = data.randomSplit(Array(0.7,0.3), seed = 11L)    // 逻辑回归是迭代算法,所以缓存训练数据的RDD    trainingData.cache()    //使用SGD算法运行逻辑回归    val boostingStrategy = BoostingStrategy.defaultParams("Regression")    boostingStrategy.setNumIterations(40) // Note: Use more iterations in practice.    boostingStrategy.treeStrategy.setMaxDepth(6)    boostingStrategy.treeStrategy.setMinInstancesPerNode(50)    val model = GradientBoostedTrees.train(data, boostingStrategy)    model.save(sc, s"hdfs://ns1/user/songchunlin/model/abregression3Model20180116")    sc.makeRDD(Seq(model.toDebugString)).repartition(1).saveAsTextFile(s"hdfs://ns1/user/songchunlin/model/toDebugString/abregression3Model20180116")    // 全量data数据打分,原本用testData打分    val  omodel=GradientBoostedTreesModel.load(sc,s"hdfs://ns1/user/songchunlin/model/abregression3Model20180116")    val predictionAndLabels = data.map { case LabeledPoint(label, features) =>      val prediction = omodel.predict(features)      (prediction, label)    }    println("testData count = " + testData.count())    println("predictionAndLabels count = " + predictionAndLabels.count())    predictionAndLabels.map(x => {"predicts: "+x._1+"--> labels:"+x._2}).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/predictionAndLabels")    val metrics = new BinaryClassificationMetrics(predictionAndLabels)    val precision = metrics.precisionByThreshold    precision.map({case (t, p) =>      "Threshold: "+t+"Precision:"+p    }).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/precision")    val recall = metrics.recallByThreshold    recall.map({case (t, r) =>      "Threshold: "+t+"Recall:"+r    }).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/recall")    val beta = 2    val f2Score = metrics.fMeasureByThreshold(beta)    f2Score.map(x => {"Threshold: "+x._1+"--> F-score:"+x._2+"--> Beta = 2"})      .saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/f1Score")    val prc = metrics.pr    prc.map(x => {"Recall: " + x._1 + "--> Precision: "+x._2 }).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/prc")    // AUPRC,精度,召回曲线下的面积    val auPRC = metrics.areaUnderPR    println("Area under precision-recall curve = " +auPRC)    sc.makeRDD(Seq("Area under precision-recall curve = " +auPRC)).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/auPRC")    //roc    val roc = metrics.roc    roc.map(x => {"FalsePositiveRate:" + x._1 + "--> Recall: " +x._2}).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/roc")    // AUC    val auROC = metrics.areaUnderROC    sc.makeRDD(Seq("Area under ROC = " + +auROC)).saveAsTextFile(s"hdfs://ns1/user/szdsjkf/model_training/jrsc/20171218/auROC")    println("Area under ROC = " + auROC)    //val accuracy = 1.0 * predictionAndLabels.filter(x => x._1 == x._2).count() / testData.count()    val dataInstance = hc.sql(s"select * from lkl_card_score.fqz_score_dataset_03train").map {      row =>        val arr = new ArrayBuffer[Double]()        //剔除label、phone字段        for (i <- 3 until row.size) {          if (row.isNullAt(i)) {            arr += 0.0          }          else if (row.get(i).isInstanceOf[Int])            arr += row.getInt(i).toDouble          else if (row.get(i).isInstanceOf[Double])            arr += row.getDouble(i)          else if (row.get(i).isInstanceOf[Long])            arr += row.getLong(i).toDouble          else if (row.get(i).isInstanceOf[String])            arr += 0.0        }        (row(0),row(1),row(2),Vectors.dense(arr.toArray))    }    val preditDataGBDT = dataInstance.map { point =>      val prediction = model.predict(point._4)      //order_id,apply_time,score      (point._1,point._2,point._3,prediction)    }    //rdd转dataFrame    val rowRDD = preditDataGBDT.map(row => Row(row._1.toString,row._2.toString,row._3.toString,row._4))    val schema = StructType(      List(        StructField("order_id", StringType, true),        StructField("apply_time", StringType, true),        StructField("label", StringType, true),        StructField("score", DoubleType, true)      )    )    //将RDD映射到rowRDD,schema信息应用到rowRDD上    val scoreDataFrame = hc.createDataFrame(rowRDD,schema)    scoreDataFrame.count()    scoreDataFrame.write.mode(SaveMode.Overwrite).saveAsTable("lkl_card_score.fqz_score_dataset_03train_predict")    //   自己测试建模    val balance = hc.sql(s"select * from lkl_card_score.overdue_result_all_new_woe_instant_v3_02train where label='1' limit 85152 union all  select * from lkl_card_score.overdue_result_all_new_woe_instant_v3_02train where label='0'").map {      row =>        val arr = new ArrayBuffer[Double]()        //剔除label、phone字段        for (i <- 3 until row.size) {          if (row.isNullAt(i)) {            arr += 0.0          }          else if (row.get(i).isInstanceOf[Int])            arr += row.getInt(i).toDouble          else if (row.get(i).isInstanceOf[Double])            arr += row.getDouble(i)          else if (row.get(i).isInstanceOf[Long])            arr += row.getLong(i).toDouble          else if (row.get(i).isInstanceOf[String])            arr += 0.0        }        LabeledPoint(row.getInt(2).toDouble, Vectors.dense(arr.toArray))    }    // 逻辑回归是迭代算法,所以缓存训练数据的RDD    balance.cache()    val boostingStrategy1 = BoostingStrategy.defaultParams("Regression")    boostingStrategy1.setNumIterations(40) // Note: Use more iterations in practice.    boostingStrategy1.treeStrategy.setMaxDepth(6)    boostingStrategy1.treeStrategy.setMinInstancesPerNode(50)    val model2 = GradientBoostedTrees.train(balance, boostingStrategy1)    val predictionAndLabels2 = data.map { case LabeledPoint(label, features) =>      val prediction = model2.predict(features)      (prediction, label)    }    val metrics2 = new BinaryClassificationMetrics(predictionAndLabels2)    // AUPRC,精度,召回曲线下的面积    val auPRC1 = metrics2.areaUnderPR    val preditDataGBDT1 = dataInstance.map { point =>      val prediction2 = model2.predict(point._4)      //order_id,apply_time,score      (point._1,point._2,point._3,prediction2)    }    //rdd转dataFrame    val rowRDD2 = preditDataGBDT1.map(row => Row(row._1.toString,row._2.toString,row._3.toString,row._4))    val schema2 = StructType(      List(        StructField("order_id", StringType, true),        StructField("apply_time", StringType, true),        StructField("label", StringType, true),        StructField("score", DoubleType, true)      )    )    val scoreDataFrame2 = hc.createDataFrame(rowRDD2,schema2)    scoreDataFrame2.count()    scoreDataFrame2.write.mode(SaveMode.Overwrite).saveAsTable("lkl_card_score.fqz_score_dataset_02val_170506_predict")  }}

 

转载地址:http://hkvta.baihongyu.com/

你可能感兴趣的文章
Selenium 自动登录考勤系统
查看>>
关于如何以编程的方式执行TestNG
查看>>
智能照明造福千家万户 家居智能不再是梦
查看>>
物联网如何跳出“看起来很美”?
查看>>
浅谈MySQL 数据库性能优化
查看>>
《UNIX/Linux 系统管理技术手册(第四版)》——1.10 其他的权威文档
查看>>
灵动空间 创享生活
查看>>
《UNIX网络编程 卷1:套接字联网API(第3版)》——8.6 UDP回射客户程序:dg_cli函数...
查看>>
不要将时间浪费到编写完美代码上
查看>>
《算法基础:打开算法之门》一3.4 归并排序
查看>>
高德开放平台开放源代码 鼓励开发者创新
查看>>
《高并发Oracle数据库系统的架构与设计》一2.5 索引维护
查看>>
Firefox 是 Pwn2own 2014 上攻陷次数最多的浏览器
查看>>
阿里感悟(十八)- 应届生Review
查看>>
话说模式匹配(5) for表达式中的模式匹配
查看>>
《锋利的SQL(第2版)》——1.7 常用函数
查看>>
jquery中hover()的用法。简单粗暴
查看>>
线程管理(六)等待线程的终结
查看>>
spring boot集成mongodb最简单版
查看>>
DELL EqualLogic PS存储数据恢复全过程整理
查看>>