法律服务大数据智能推荐

背景

随着互联网和信息技术的快速发展,电子商务、网上服务与交易等网络业务越来越普及,这些操作会产生大量数据(或海量数据),用户想要从海量数据中快速准确地寻找到自己感兴趣的信息已经变得越来越困难,这也就造就了搜索引擎的诞生,应用比较广泛的如Google搜索、Bing搜索、百度搜索等。搜索引擎虽然可以根据关键词检索相关信息,但是无法解决用户的其他诸多需求,如用户无法找到准确描述自己需求的关键词时,搜索引擎就无能为力了(当然,图片搜索是个特例,但是搜索出来的结果的相关性也比较小,还是有待发展)。本案例的研究对象为某法律网站,该网站致力于为用户提供丰富的法律信息与个性化的专业咨询服务。随着该网站访问量的增大,用户在面对大量相关信息时,无法及时从中获得自己确实需要的信息,从而导致对信息的使用效率越来越低。那么,有没有比搜索引擎更加“智能”的技术来改善这种状况呢? 可以使用推荐系统来对搜索引擎加以完善。与搜索引擎不同,推荐系统并不需要用户提供明确的需求,而是通过分析用户的历史行为,主动为用户推荐能够满足他们兴趣和需求的信息。为了能够更好地满足用户需求,需要依据其网站的海量数据,研究用户的兴趣偏好,分析用户的需求和行为,发现用户的兴趣点,从而引导用户发现自己的信息需求,将长尾网页(长尾网页是指网页的点击情况满足长尾理论中尾巴部分的网页)准确地推荐给所需用户,即使用推荐引擎来为用户提供个性化的专业服务。

标题2目标

为简化系统设计,当用户访问网站页面时,系统会记录用户访问网站的日志。本案例针对这些日志内容加以整理,包括用户IP(已做数据脱敏处理)、用户访问的时间、访问内容等多项属性的记录,各个属性及其说明如表 21所示。 表 21 网站日志数据属性及其说明 属性名称 属性说明 属性名称 属性说明 realIP 真实ip fullURLID 网址类型 realAreacode 地区编号 hostname 源地址名 userAgent 浏览器代理 pageTitle 网页标题 userOS 用户浏览器类型 pageTitleCategoryId 标题类型ID userID 用户ID pageTitleCategoryName 标题类型名称 clientID 客户端ID pageTitleKw 标题类型关键字 timestamp 时间戳 fullReferrrer 入口源 timestamp_format 标准化时间 FullReferrerURL 入口网址 pagePath 路径 organicKeyword 搜索关键字 ymd 年月日 source 搜索源 fullURL 网址 依据表 21的网站日志属性的说明,针对以下内容进行分析: 按地域研究用户访问时间、访问内容、访问次数等分析主题,深入了解用户访问网站的行为、目的及关心的内容(主要指统计信息)。 借助大量用户访问记录,使用多种推荐算法发现用户访问习惯,对不同用户推荐相关服务页面(单个算法参数择优、多种算法之间对比分析择优)。 本案例涉及整个系统架构,所以某些环节会进行一定的简化。涉及到的模块包括:网站日志传输、存储,建立推荐算法模型、多种推荐算法模型对比、最优推荐算法模型择优,推荐结果展示,传输、存储、建模、最优模型筛选流程化。 3系统架构及流程 系统由两部分构成:法律网系统和大数据推荐系统。法律网系统为传统网站系统,提供相关法律咨询等服务,用户可以登录查询相关页面。大数据推荐系统则主要根据用户的访问日志使用推荐引擎为用户推荐感兴趣的网页或内容。 两个系统共同工作,用户访问法律网系统,会产生访问日志。法律网系统会定时(比如每天凌晨1点)把日志生成日志文件,然后传输到大数据推荐系统。推荐系统根据用户访问日志使用推荐引擎来对日志数据进行建模。建模针对不同参数进行,根据评价算法找出最优模型。接着,使用最优模型来对各个用户进行推荐,然后把推荐数据再次传输给法律网系统,这样就可以在用户下次登录的时候,对其进行推荐。整体流程如图 31所示。

图 31 系统流程图

图 31中的流程中的各个技术简要概括如下: 1)用户访问法律网系统时,会在系统后台服务器数据库生成对应的日志(比如使用MySQL存储数据); 2)系统采用Cron定时把后台服务器数据库中的用户日志数据存储到日志文件; 3)Flume数据传输管道会在日志文件全部生成后,把其通过Flume Channel传输到大数据推荐系统的HDFS上; 4)在HDFS上的日志文件系统可以通过Hive进行用户主题分析,得到各种用户相关的统计结果,整体分析用户行为;同时,可以发现数据中的异常或缺失情况; 5)在步骤4的基础上使用Pig或直接编写MapReduce代码来进行数据预处理,处理各种缺失或异常数据,同时,还需构造模型需要的数据集; 6)在构造出模型需要的数据后,使用多种模型对日志数据集进行建模,在各种参数调优之后,得到最优的模型参数,固化该模型,并以此模型来对日志数据进行推荐分析,得到推荐结果存储到HDFS或HBase中(这里HBase不是必选的,如果用户数不多,推荐结果直接存放在HDFS上即可,但是,如果用户数比较多,需把推荐结果存储在HBase中,同时提供一个远程调用接口,方便在其他平台调用HBase中的结果); 7)得到推荐结果后,再次通过Flume管道把推荐结果传输到法律网服务器,通过定时任务把该推荐结果文件解析到服务器数据库中,方便前台调用查询(如因用户数比较多采用HBase存储,则这时可直接调用HBase远程访问接口来查询推荐结果)。 8)上述4、5、6步骤需要通过Oozie工作流组件编辑多个工作流,保证这些步骤可以自动完成,特别是模型择优及参数选择模块。 下面针对各个技术实现进行分析。 4分析过程及实现 4.1数据传输 为简化操作,这里直接假定法律网服务器用户访问日志已经存储在指定目录,Apache Flume管道可以根据该目录来进行数据传输。下面是传输逻辑: 1)用户访问日志每天定时会产生一个或多个日志文件; 2)Flume传输任务定时在每天01:05分,注意传输的是前一天的日志文件; 3)传输完毕并检查正确后,删除原日志文件(是否删除可以根据实际情况确定); 在安装Flume前,需要确保各个节点已经安装好JDK,本节假定读者已经配置好了Flume的Apache Flume的环境(由于具体配置不在本案例范围内,所以读者可以在官网查询相关资料进行配置,或参考recommend/configuration/Centos6.7配置Flume1.6.0.txt文档)。 本节主要使用的是Flume的文件从一个服务器传输到HDFS上面的功能,如图 41所示。

图 41 Flume传输文件数据到HDFS

Flume数据传输需要定义Source、Channel、Sink,在本例中Source定义为本地目录即可,Channel选择为文件传输,Sink选择为集群HDFS,其示例如代码清单 41所示。 代码清单 41 Flume 文件到HDFS properties文件示例配置 #配置agent的原始源(sources)、管道(Channel)、目标源(sinks) agent.sources=exampleDir agent.channels=memoryChannel agent.sinks=flumeHDFS #设置原始源相关配置 agent.sources.exampleDir.type=spooldir agent.sources.exampleDir.spoolDir=/opt/flume_data #设置管道相关配置 agent.channels.memoryChannel.type=memory agent.channels.memoryChannel.capacity=10000 agent.channels.memoryChannel.transactioncapacity=1000000 #设置目标源相关配置 agent.sinks.flumeHDFS.type=hdfs agent.sinks.flumeHDFS.hdfs.path=hdfs://master:8020/flume_data agent.sinks.flumeHDFS.hdfs.fileType=DataStream agent.sinks.flumeHDFS.hdfs.writeFormat=Text agent.sinks.flumeHDFS.hdfs.maxOpenFiles=1

设置关系

agent.sources.exampleDir.channels=memoryChannel agent.sinks.flumeHDFS.channel=memoryChannel 注意:此表使用的传输方式为内存方式。 4.2数据传输:动手实践 本实验把日志文件从法律网服务器传输到HDFS文件系统。参考该实验,读者可以直接部署、运行相关代码,加深理解Flume数据传输流程以及Flume性能调优等问题。 步骤如下: 1)新建/data目录,在启动flume agent后把日志文件lawdata_20140819_20141015.txt拷贝到此文件夹下面的2014目录,如图 42所示;

图 42 原始日志文件 2)参考代码清单 42配置flume agent,使用如代码清单 43所示的命令运行flume agent,即可看到如图 43所示的终端信息; 代码清单 42 local2hdfs.properties配置文件

配置agent的原始源(sources)、管道(Channel)、目标源(sinks)

agent.sources=dataDir agent.channels=memoryChannel agent.sinks=hdfs

设置原始源相关配置

agent.sources.dataDir.type=spooldir agent.sources.dataDir.spoolDir=/data/

递归处理

agent.sources.dataDir.recursiveDirectorySearch=TRUE

100个event

agent.sources.dataDir.batchSize=100 agent.sources.dataDir.deserializer=LINE agent.sources.dataDir.deserializer.maxLineLength=40960

设置管道相关配置

agent.channels.memoryChannel.type=memory agent.channels.memoryChannel.capacity=10240 agnet.channels.memoryChannel.transactionCapacity = 100

设置目标源相关配置

agent.sinks.hdfs.type=hdfs agent.sinks.hdfs.hdfs.path=hdfs://nameservice1/flume_data/%y/%m agent.sinks.hdfs.hdfs.filePrefix=lawdata_ agent.sinks.hdfs.hdfs.fileType=DataStream agent.sinks.hdfs.hdfs.writeFormat=Text

每30秒产生一个文件

agent.sinks.hdfs.hdfs.rollInterval=30

128m文件大小产生新文件

agent.sinks.hdfs.hdfs.rollSize= 134000000

不根据event个数产生新文件

agent.sinks.hdfs.hdfs.rollCount=0

channel中event个数

agent.sinks.hdfs.hdfs.batchSize=100 agent.sinks.hdfs.hdfs.round=true agent.sinks.hdfs.hdfs.roundValue=1 agent.sinks.hdfs.hdfs.roundUnit=month agent.sinks.hdfs.hdfs.useLocalTimeStamp=true

设置关系

agent.sources.dataDir.channels=memoryChannel agent.sinks.hdfs.channel=memoryChannel 代码清单 43 flume agent启动命令 #cd KaTeX parse error: Expected 'EOF', got '#' at position 12: FLUME_HOME #̲bin/flume-ng ag…FLUME_HOME/demo,并重命名为local2hdfs.properties。

图 43 flume agent运行日志信息 3)运行完成后,查看HDFS上文件是否上传成功。若成功上传,可在HDFS上看到如图 44、图 45所示文件(使用flume从本地上传数据到HDFS后,如果上传成功,在本地会有一个提示成功的文件,以.COMPLETED结尾);

图 44 HDFS数据展示数据(flume agent上传数据到HDFS)

图 45 本地上传完成后日志文件(flume agent上传数据到HDFS) 4.3数据探索分析 数据通过法律网日志服务器传输到大数据平台HDFS后,不能简单的对其进行处理就直接丢给Spark模型进行运行。对于任何一个数据挖掘问题,这样的做法都是有问题的。一般情况下需要事先对原始的数据文件进行简单的统计分析,以期能得到数据的一个初步概况。本节就是针对数据进行这样的统计分析,使用的工具是Hive。上传到HDFS上面的数据可以通过相关Hive命令导入到Hive表中,由于后面的查询操作都是在Hive里面完成的,所以这里采用管理表。建表并导入数据的代码如代码清单 44所示。 代码清单 44 Hive建表及导入数据代码 ----创建law表 CREATE TABLE law ( ip bigint, area int, ie_proxy string, ie_type string , userid string, clientid string, time_stamp bigint, time_format string, pagepath string, ymd int, visiturl string, page_type string, host string, page_title string, page_title_type int, page_title_name string, title_keyword string, in_port string, in_url string, search_keyword string, source string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ STORED AS TEXTFILE; ----导入数据 load data inpath ‘/user/root/law_all_20140819_20141031.csv’ overwrite into table law; 原始数据在Hive表中导入完成后,直接查询得到该数据的记录数为837502(原来为59159918,实际没有给这么多数据,后面程序遇到这个数字都需要改过来)。同时对原始数据中的网页类型、点击次数、网页排名等各个维度进行分布分析,获得其内在的规律。针对出现的统计结果,解释其对应的原因。 注意:在数据探索、数据预处理、模型构建阶段使用抽取的2014/8/19~2014/10/15日的数据进行分析与处理。 4.3.1网页类型分析 针对原始数据中用户点击的网页类型进行统计,统计内容为:网页类型、记录数及其所占总记录百分比。其HiveQL语句如代码清单 45所示。 代码清单 45 按网页类型统计 ----统计网页类型 select substring(page_type,1,3) as page_type, count() as count_num, round((count()/837502.0)100,4) as weights from law group by substring(page_type,1,3) order by count_num desc; 在Hive中运行代码清单 45中代码,即可得到如表 41所示的结果。从中发现点击与咨询相关的网页(网页类型为101,http://www..cn/ask/)的记录的占比为55.16%,其次是知识相关网页(网页类型为107,http://www..com/info/)占比约为23.77%,剩余其他的类型网页(网页类型为199)占16.11%左右。 表 41 网页类型统计表 网页类型 记录数 百分比 101 32632665 55.16 107 14062820 23.77 199 9530386 16.11 301 1392701 2.35 102 1066935 1.80 106 409507 0.69 103 64901 0.11 201 3 5.07E-06 通过观察类别为199的网页,发现其页面信息多数与法律法规相关,所以统计类别为199,并且包含法律法规的记录个数,其Hive代码如代码清单 46所示。 代码清单 46 误类别统计 ----统计网页类型 select substring(page_type,1,7) as page_type, visiturl, count() as count_num from law where visiturl like ‘%faguizt%’ and page_type like ‘%199%’ ; 执行上述代码后,可以得到记录个数为3238450。综合可得表 41中199的记录数应该为6291936,而301的记录数应该为4631151。因此可以得到用户点击的页面类型的排行榜为:咨询相关、知识相关、其他方面的网页、法规(类型为301)、律师相关(类型为102)。可以初步得出相对于长篇的知识,用户更加偏向于查看咨询或者进行咨询。 1.咨询类别内部统计 进一步针对咨询类别内部进行统计分析,统计内容为:101网页类型的子类型、记录数及其所占101网页类型总记录百分比,Hive命令行代码如代码清单 47所示,其统计结果如表 42所示。 代码清单 47 咨询类内部统计Hive命令 -----咨询类别内部统计 select substring(page_type,1,6) as page_type, count() as count_num, round((count()/411665.0)100,4) as weights from law_part where page_type_part=101 group by substring(page_type,1,6) order by count_num desc; 运行代码清单 47,其结果如表 42所示。其中浏览咨询内容页(101003)记录最多,其次是咨询列表页(101002)和咨询首页(101001)。结合上述初步结论,可以得出用户都喜欢通过浏览问题的方式找到自己需要的信息,而不是以提问的方式或者查看长篇的知识的方式。 表 42 咨询类内部统计结果 101开头类型 记录数 百分比 101003 31925682 97.83 101002 510653 1.56 101001 159210 0.49 101009 9991 0.03 101006 7316 0.02 101004 7291 0.02 101007 7065 0.02 101008 3262 9.99E-03 101005 2195 6.73E-03 2.网页中带有“?”记录统计 统计所有访问网页中带有“?”的总记录数。统计分析访问网页中带有?的所有记录中,各网页类型、记录数、占访问网页中带有?的记录总数的百分比。Hive命令行如代码清单 48所示。 代码清单 48 网页中带有“?”记录统计及各个类别占比 -----统计visiturl中带有?的所有记录。 select count() as num from law_part where visiturl like ‘%?%’; -----统计带有?的所有记录中,各网页类型所占比例 select substring(page_type,1,7) as page_type,count(),round((count()100)/ 2171532,4) as weights from law where visiturl like ‘%?%’ group by substring(page_type,1,7) order by weights desc; 运行代码清单 48后,其结果整理见表 43。包含“?”总记录数为2171532,特别在其他网页这一类型中占了98%左右,比重较大,因此需要进一步分析该类型网页的内部规律,但在知识相关与法规专题中的占比仅1%左右。 表 43 网页中带有“?”记录统计结果 记录数 网页ID 百分比 199900 2142885 98.68 301001 14403 0.66 107001 11020 0.51 101003 2999 0.14 102002 221 0.01 101002 3 1.38E-04 102001 1 4.61E-05 通过分析发现,大部分网址以如下形式存在: http://www..cn/guangzhou/p2lawfirm 地区律师事务所 http://www..cn/guangzhou 地区网址 http://www..cn/ask/ask.php 咨询首页 http://www..cn/ask/midques_10549897.html 中间类型网页 http://www..cn/ask/exp/4317.html 咨询经验 http://www..cn/ask/online/138.html 在线咨询页 带有标记的三类网址本应该有相应的分类,但是由于分类规则的匹配问题,没有相应的匹配。带有lawfirm 关键字的网址对应律师事物所,带有ask/exp、ask/online关键字的网址对应咨询经验和在线咨询页。在处理数据过程中将其进行清楚分类,便于后续数据分析。 在1999001类型中,有法律快车-律师助手、带有“?”的访问页面记录等类型数据,通过业务了解获知,快车-律师助手类型的页面,是律师的一个登录页面。带有“?”的页面记录,如http://www..com/ask/question_9152354.html?&from=androidqq,代表该网页曾被分享过的,因此可以通过截取?前面的网址对其进行处理,还原其原类型。 在查看数据的过程中,发现存在一部分这样的用户,他们没有点击具体的网页(以.html后缀结尾),他们点击的大部分是目录网页,这样的用户可定义为“瞎逛用户”。由此可见,不仅1999001页面类型中有如此复杂的网址类型,其余的页面类型可能也会出现,因此,后续清洗数据时,需要对所有数据使用类似规则处理。 通过上述网址类型分布分析(后续分析中,选取其中占比最多的两类:咨询内容页、知识内容页进行模型分析),可以发现与分析目标无关的数据清洗规则: 1.无点击.html行为及URL中的用户记录。 2.中间类型网页(带有midques_关键字)。 3.网址中带有“?”类型,无法还原其本身类型的快搜页面与发布咨询网页。 4.法律快车-律师助手记录,页面标题包含“法律快车-律师助手”关键字。 5.筛选模型所需记录(咨询、知识、法规专题页面数据)。 6.重复数据(同一时间同一用户,访问相同网页)。 记录这些规则,有利于在数据清洗阶段对数据的清洗操作。上述过程就是对网址类型进行统计得到的分析结果,针对网页的点击次数也进行类似分析。 4.3.2点击次数分析 统计分析原始数据用户浏览网页次数的情况,统计内容为:点击次数、用户数、用户百分比、记录百分比。Hive命令行如代码清单 49所示。 代码清单 49 点击次数分析命令 -----用户个数统计 select count(distinct(userid)) from law; -----创建点击次数分区表,分区字段click_part set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nostrict; create table law_click ( user_num int, user_weights double, record_weights double ) partitioned by (click_part string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘,’ STORED AS TEXTFILE; ------导入分区数据到分区表law_click INSERT OVERWRITE TABLE law_click PARTITION (click_part) select count(click_num) as count,round(count(click_num)100/31562704.0,2),round((count(click_num)click_num)100/59159918.0,2),click_num from ( select count(userid) as click_num from law group by userid ) tmp_table group by click_num order by count desc; -----点击次数分类统计 set hive.exec.reducers.max=1; select click_num, count(click_num) as count,round(count(click_num)100/31562704.0,2),round((count(click_num)click_num)100/59159918.0,2)from ( select count(userid) as click_num from law group by userid ) tmp_table group by click_num order by count desc; 运行代码清单 49,其结果整理如表 44所示。其中用户总数为31562704,总记录数为59159918。可以发现浏览一次的用户占所有用户75%左右,大部分用户浏览的次数在1~7次。大约87%的用户只提供了约54%的浏览量。即浏览网页1到2次的用户占了大部分。 表 44 用户点击次数统计表 点击次数 用户数 用户百分比 记录百分比 1 23587727 74.73 39.87 2 4164797 13.20 14.08 3 1283523 4.07 6.51 4 829290 2.63 5.61 5 415653 1.32 3.51 6 310138 0.98 3.15 7 188632 0.60 2.23 针对浏览次数为一次的用户进行统计分析,统计内容为:网页类型、记录个数、记录占浏览一次的用户数百分比。Hive命令如代码清单 410所示。 代码清单 410 浏览一次用户行为分析 select page_type,count(page_type) as count,round((count(page_type)100)/ 23587727.0,4) from (select substring(a.page_type,1,7) as page_type from law a,(select userid from law group by userid having(count(userid)=1)) b where a.userid = b.userid) c group by page_type order by count desc limit 5; 整理结果见表 45。其中问题咨询页占比为71%左右,知识页占比为19%左右,而且这些访问基本上通过搜索引擎进入。 表 45 浏览一次用户行为分析 网页类型ID 记录个数 记录百分比 101003 16841247 71.40 107001 4524104 19.18 1999001 1883166 7.98 301001 315871 1.34 101002 7523 0.03 由以上针对浏览次数为一次的用户分析结果,可以对该类用户情况做出两种猜测: 1.用户为流失用户,在问题咨询与知识页面上没有找到相关的需要。 2.用户找到其需要的信息,因此直接退出。综合这些情况,可将这些点击一次的用户行为定义为网页的跳出行为,用于计算网页跳出率。 为了降低网页的跳出率,就需要对这些网页进行针对用户的个性化推荐,帮助用户发现其感兴趣或者需要的网页。针对点击一次的用户浏览的网页进行统计分析,其分析Hive代码如代码清单 411所示。 代码清单 411 针对点击一次用户浏览网页统计分析 select a.visiturl,count() as count from law a, (select userid from law group by userid having(count(userid)=1)) b where a.userid = b.userid group by a.visiturl order by count desc limi 7; 直接运行代码清单 411,其结果见表 46。可以看出排名靠前的页面均为知识与咨询页面,因此可以猜测大量用户的关注点为法律知识或咨询。 表 46 点击一次用户访问URL排名 网页 点击数 http://www..cn/info/hunyin/lhlawlhxy/20110707137693.html 69858 http://www..cn/info/shuifa/slb/2012111978933_2.html 28507 http://www..cn/zhishi/ 13167 http://www. .cn/info/hunyin/jiehun/hunjia/201312182875578.html 12802 http://www. .cn/info/shuifa/slb/2012111978933.html 12022 http://www. .cn/ask/exp/13425.html 11766 http://www. .cn/ask/question_925675.html 10731 4.3.3网页排名 由分析目标可知,个性化推荐主要针对html后缀的网页(与物品的概念类似)。从原始数据中统计html后缀的网页的点击率,其Hive代码如所代码清单 412所示。 代码清单 412 原始数据中包含html后缀的网页点击率统计 select a.visiturl,count() as count from law a where a.visiturl like ‘%.html%’ group by a.visiturl; 运行代码清单 412,其点击率排名的结果见表 47。从表 47中可以看出,点击次数排名前10名中,法规专题占了大部分,其次是知识。但是从前面分析的结果中可知,原始数据中与咨询主题相关的记录占了大部分。但是在其html后缀的网页排名中,专题与知识的占了大部分。通过业务了解,专题是属于知识大类里的一个小类。在统计html后缀的网页点击排名,出现这种现象的原因是知识页面相对咨询的页面要少很多,当大量的用户在浏览咨询页面时,呈现一种比较分散的浏览次数,即其各个页面点击率不高,但是其总的浏览量高于知识。所以造成网页排名中其咨询方面的排名比较低。 表 47 原始数据点击率排名表 网址 点击数 http://www..cn/faguizt/23.html 534426 http://www..cn/info/hunyin/lhlawlhxy/20110707137693.html 498055 http://www..cn/info/hunyin/lhlawlhxy/20110707137693_2.html 321863 http://www..cn/faguizt/11.html 287282 http://www..cn/faguizt/43.html 238754 http://www..cn/faguizt/21.html 222843 http://www..cn/faguizt/79.html 190692 http://www..cn/faguizt/117.html 149721 http://www..cn/faguizt/9.html 140222 http://www..cn/faguizt/7.html 118920 从原始html的点击率排行榜中可以发现如下情况,排行榜中存在这样两种类似的网址http://www..cn/info/hunyin/lhlawlhxy/20110707137693_2.html、http://www..cn/info/hunyin/lhlawlhxy/20110707137693.html。通过简单访问网址,发现其本身属于同一网页,但由于系统在记录用户的访问网址的信息时会同时记录翻页信息,因此在用户访问网址的数据中存在翻页的情况。以下针对这些翻页的网页进行统计,其结果见下表 48。 表 48 翻页网页统计 网页 点击次数 http://www..cn/info/gongsi/slbgzcdj/201312312876742.html 19299 http://www..cn/info/gongsi/slbgzcdj/201312312876742_2.html 13596 http://www..cn/info/hetong/ldht/201311152872128.html 15204 http://www..cn/info/hetong/ldht/201311152872128_2.html 38053 http://www..cn/info/hetong/ldht/201311152872128_3.html 21251 http://www..cn/info/hetong/ldht/201311152872128_4.html 13106 通过业务了解,登录次数最多的页面基本为可从外部搜索引擎直接搜索到的网页。对其中的浏览翻页的情况进行分析,平均大概60%80%的人会选择看下一页,基本每一页都会丢失20%40%的点击率,点击率会出现衰减的情况。同时对知识这类型网页进行检查,可以发现页面上并无全页显示功能,但是知识页面中大部分都存在翻页的情况。这样就造成了大量的用户基本只会选择浏览2~5页,极少数会选择浏览全部内容。因此用户就会直接就放弃此次的搜索,从而增加了网站的跳出率,降低了客户的满意度,不利于企业的长期稳定发展。 4.3.4动手实践:数据探索分析 根据4.3节数据探索分析过程,使用Hive完成以下实验。实验包括:网页类型统计、点击次数统计、网页排名统计等。实验步骤如下所示: 1.创建law表,导入数据law_all_20140819_20141031.csv。 2.查询表law中总记录数,统计网页类型,结果字段为:网页类型、记录数、记录所占总记录百分比。 3.创建动态分区表law_part,分区字段为page_type,导入对应数据到分区。 4.咨询类型网页内部统计,结果字段为:101网页类型的子类型、记录数、记录所占101网页类型总记录百分比。 5.统计访问网页中带有“?”的记录总数,统计访问网页中带有?的所有记录中,各网页类型、记录数及其占访问网页中带有?的记录总数的百分比。 6.用户个数统计,创建点击次数分区表law_click,分区字段为click_part。 7.导入分区数据到law_click。 8.点击次数分类统计,结果字段为:点击次数、用户数、用户百分比、记录百分比。 9.浏览一次的用户分类统计,结果字段为:网页类型、记录个数、记录占浏览一次的用户数百分比。点击一次的用户浏览网页统计。 10.点击率排名统计,结果字段为:网址、点击次数。 11.翻页网页统计,结果字段为:网址、点击次数。 思考: 1.为什么要创建动态分区表,动态分区表的字段应该如何选择? 2.针对各个HiveQL语句是否可以进行优化? 3.如果使用MapReduce来开发,是否效率比Hive高?开发时间呢? 4.4数据预处理(可以用Spark形式处理替代) 本案例在原始数据的探索分析的基础上,发现与分析目标无关或模型需要处理的数据,针对此类数据进行处理。其中涉及到的数据处理方式有:数据清洗、数据变换和属性规约。通过上述数据预处理过程,原始数据将被处理成模型需要的输入数据。 4.4.1数据清洗 使用Pig和MapReduce程序进行数据清洗,从探索分析的过程中发现与分析目标无关的数据,归纳总结并整理成清洗规则,如下: 4.无点击.html行为的用户记录。 5.中间类型网页(带有midques_关键字)。 6.网址中带有“?”类型数据。 7.筛选模型所需数据(咨询、知识、法规专题页面数据)。 8.重复数据(同一时间同一用户,访问相同网页,这里的同一用户是指UserID相同)。 其中1-5步骤使用Pig清洗,并将结果存储到HDFS。其采用的Pig脚本如代码清单 413所示。 代码清单 413 Pig数据清洗代码 –步骤1:删除无点击.html行为的用户记录,统计剩余记录 law = load ‘/user/root/law_all_20140819_20141031.csv’ using PigStorage (‘,’); law_filter_html = filter law by ($10 matches '.\.html’); law_grp_html = group law_filter_html all; count_num = foreach law_grp_html generate COUNT (law_filter_html) as delete_num; set job.name ‘law_filter_html’; dump count_num; –步骤2:删除中间类型网页(带有midques_关键字),统计剩余记录 law_filter_mid = filter law_filter_html by ($10 matches ‘.midques_.’); law_grp_mid = group law_filter_mid all; count_num = foreach law_grp_mid generate COUNT (law_filter_mid)) as delete_num; set job.name ‘law_filter_mid’; dump count_num; –步骤3:删除网址中带有“?”类型数据,统计剩余记录 law_filter_mark = filter law_filter_mid by ($10 matches ‘.\?.’); law_grp_mark = group law_filter_mark all; count_num = foreach law_grp_mark generate COUNT (law_filte_mark) as delete_num; set job.name ‘law_filter_mark’; dump count_num; –步骤4:筛选模型所需数据(咨询、知识、法规专题页面数据) –注意:为了使用SUBSTRING()方法,需注册piggybank.jar包。 –register pig安装目录/lib/piggybank.jar law_filter_data = filter law_filter_mark by (SUBSTRING($11,0,3) == ‘101’ or SUBSTRING($11,0,3) == ‘107’ or SUBSTRING($11,0,3) == ’ 301’); law_grp_data = group law_filter_data all; count_num = foreach law_grp_data generate COUNT (law_filter_data) as delete_num; set job.name ‘law_filter_data’; dump count_num; 步骤5:重复记录统计 –注意:为了使用SUBSTRING()方法,需注册piggybank.jar包。 –register pig安装目录/lib/piggybank.jar law_distinct_fields = foreach law_filter_data generate $4 ,$7,$10; law_distinct_data = distinct law_distinct_fields; law_grp_distinct = group law_distinct_data all; count_num = foreach law_grp_distinct generate COUNT (law_distinct_data) as delete_num; set job.name ‘law_filter_distinct’; dump count_num; –输出结果到HDFS store law_distinct_data into ‘/user/root/law_cleaned’ using PigStorage(‘,’); 清洗结果见表 49。清洗过程中,上一步的结果作为下一步的数据,因此下一步需要清洗的数据已在上一步清洗过程中完成。 表 49 数据清洗结果 清洗顺序 清洗规则 删除数据记录 剩余记录数 1 无.html点击行为的用户记录 6785915 52374003 2 中间类型网页(带midques_关键字) 20360 52353643 3 带有?的记录 44686 52308957 5 筛选模型所需数据 7602925 44706032 6 重复数据 52325 44653707 根据分析目标以及探索结果可知咨询、知识、法规专题是其主要业务来源,故需筛选咨询、知识与法规专题相关的记录,将此部分数据作为模型分析需要的数据。 4.4.2数据变换 用户访问网页的过程中,存在翻页的情况,不同的网址属于同一类型的网页,类似记录见表 410。 表 410 存在翻页的记录 用户ID 时间 访问网页 1665329212.1408435380 2014-09-11 15:24:25 http://www..com/info/jiaotong/jtlawdljtaqf/201410103308246.html 1665329212.1408435380 2014-09-11 15:25:46 http://www..com/info/jiaotong/jtlawdljtaqf/201410103308246_2.html 1665329212.1408435380 2014-09-11 15:25:52 http://www..com/info/jiaotong/jtlawdljtaqf/201410103308246_4.html 1665329212.1408435380 2014-09-11 15:26:00 http://www..com/info/jiaotong/jtlawdljtaqf/201410103308246_5.html 1665329212.1408435380 2014-09-11 15:26:10 http://www..com/info/jiaotong/jtlawdljtaqf/201410103308246_6.html 数据处理过程中需要对这类网址进行处理,最简单的处理方法是将翻页的网址删除。但是用户在访问页面的过程中,是通过搜索引擎进入网站的,其入口网页不一定是其原始类别的首页,采用删除的方法会损失大量的有用数据,在进入推荐系统时,会影响推荐结果。因此针对这些网页需要还原其原始类别,所以首先需要识别翻页的网址,然后对翻页的网 页进行还原处理,如表 410所示的数据清洗后得到的结果为http://www..com/info/jiaotong/jtlawdljtaqf/201410103308246.html。 在数据清洗后的结果中,有类似http://www..cn/ask/question_4749.html和http://www..cn/ask/question_list4749.html的访问网址,这些网址并不属于翻页网址,因此不能按照翻页来处理,其中以上两个网址的区别是:ask/question_与.html之间的字符串是否全为数字,全为数字的网址是保留的网址,否则舍弃。例如,对比4749和list4749,需要保留网址http://www..cn/ask/question_4749.html。数据清洗结果中还发现类似http://www..cn/ask/browse.html和http://www.*/ask/browse_s25.html的访问网址,该类都网址属于用户的浏览网址,并不属于某一个具体的咨询或知识网页,因此推荐算法的模型构建应舍弃该类记录。因此根据对数据变换阶段的分析,整理成处理规则如下: 1.去掉访问网址中包含browse.html或browse_的记录。 2.如果访问网址中包含ask/question_关键字且ask/question_与.html之间的字符串全为数字,保留;否则ask/question_与.html之间的字符串不全为数字,舍弃。 3.翻页处理,针对类似上述同一用户(这里指相同IP)的用户翻页网址表中的记录,如果记录访问时间小于5分钟,就按翻页来处理。 4.4.3属性规约 根据推荐系统模型的输入数据需要,需对处理后的数据进行属性规约,提取模型需要的属性。本案例中模型需要的数据属性为用户ID、用户访问的网页、访问的时间戳。因此将其他的属性删除,只保留用户ID、用户访问的网页及时间戳数据,其输入数据集示例如表 411所示。 表 411 属性规约后数据集 用户 网页 时间戳 1665329212.1408435380 http://www.lawtime.cn/info/hetong/htfqwjd/20110120109204.html 1408435378361 533240726.1376897428 http://www.lawtime.cn/info/laodong/ldzy/lb/20140103141670.html 1408435378868 135675932.1408435387 http://www.lawtime.cn/info/shuifa/qysds/2011121674117.html 1408435378929 714942021.1408434842 http://www.lawtime.cn/ask/question_4481075.html 1408435378397 720175499.1408435379 http://www.lawtime.cn/ask/question_7707845.html 1408435378713 1134612159.1408435382 http://www.lawtime.cn/ask/question_3596455.html 1408435378484 1654830710.1408435380 http://www.lawtime.cn/ask/question_6432279.html 1408435378817 690909414.1408432296 http://www.lawtime.cn/ask/question_6701899.html 1408435378817 4.4.4实验数据预处理 根据1.中的数据清洗规则,使用Pig完成以下实验。实验内容包括:数据清洗,数据变换,属性规约。 实验步骤:

删除无点击.html行为的用户记录,统计删除记录及剩余记录数。基于步骤1的结果,删除中间类型网页(带有midques_关键字),统计删除记录及剩余记录数。基于步骤2的结果,删除网址中带有“?”类型数据,统计删除记录及剩余记录数。 4. 基于步骤3的结果,删除法律快车-律师助手记录,页面标题包含“法律快车-律师助手”关键字,统计删除记录及剩余记录数。 5. 基于步骤4的结果,筛选模型所需数据(咨询、知识、法规专题页面数据),统计删除记录及剩余记录数。基于步骤5的结果,删除重复数据(同一时间同一用户,访问相同网页),统计删除记录及剩余记录数。将步骤1-5的处理结果输出到HDFS,并使用MapReduce删除重复数据(同一时间同一用户,访问相同网页),输出处理后的结果。依据数据变换中的处理规则处理数据,同时筛选模型需要的属性数据。 4.4.5数据编码 为了节省数据存储空间以及加速模型建模效率,可先把数据预处理后的数据进行编码。当然这里的数据编码不是指通信中的数据编码,而是指将数据从一种表示形式变为另一种表现形式。由于用户以及网页URL均使用字符串表示,占用存储空间较大,并且在计算分析的时候效率较低,所以可以考虑把其转换为数值类型。Integer类型可以最大表示2147483647,符合原始数据的范围大小,所以考虑把其转换为Integer类型。 编码思路为:1)求得原始用户以及URL的去重值,并按照ASCII值进行排序;2)使用排序后的原始用户以及URL的下标值来代替该用户或URL;3)使用编码后的值替换原始数据中的值。使用Spark对原始数据进行上述编码处理,其代码如代码清单 414所示。 代码清单 414 原始数据编码及替换 / 加载数据, 原始数据: timestamp,user,url,urlType val rawDataPath = “hdfs://server1:8020/user/root/law_data_clean.txt” val dataAll = sc.textFile(rawDataPath).map{x => val fields=x.split(“,”); (fields(0),fields(1),fields(2),fields(3))}

// 排序去重后用户、URL数据 val userUrl = dataAll.map(x => (x._2,x._3)) val allUserList = userUrl.map(data=>data._1).distinct.sortBy(x => x) val allUrlList = userUrl.map(data=>data._2).distinct.sortBy(x => x)

// 构造用户、URL编码 val allUserIdList = allUserList.zipWithIndex.map(data=>(data._1,data._2.toInt)) val allUrlIdList = allUrlList.zipWithIndex.map(data=>(data._1,data._2.toInt))

// 保存编码数据 allUserIdList.map(x => x._1 +“,”+x._2).repartition(1).saveAsTextFile(“/user/root/law_userlist”) allUrlIdList.map(x => x._1 +“,”+x._2).repartition(1).saveAsTextFile(“/user/root/law_urllist”)

// 替换原始数据

val replacedDataAll = dataAll.map(x => (x._2,(x._1,x._3,x._4))).join(allUserIdList).map(x => (x._2._1._2,(x._2._1._1,x._2._2,x._2._1._3))).join(allUrlIdList).map(x => (x._2._1._1,x._2._1._2,x._2._2,x._2._1._3))

// 保存编码后数据 replacedDataAll.saveAsTextFile(“/user/root/law_data_replaced”) 更新后的数据如表 412所示: 表 412 数据编码后数据集 用户 网页 时间戳 10372520 2878057 1408435378361 22845225 3016874 1408435378868 5560869 3152389 1408435378929 25675969 1301602 1408435378397 25757670 2104448 1408435378713 2099108 1033789 1408435378484 10209628 1792428 1408435378817 25300943 1857446 1408435378817 4.4.6数据集的分割 。。。。 4.5模型构建 日志数据经过数据预处理后,得到用于建模的数据。这里针对不同的数据(即咨询和知识数据)分别建立Spark ALS模型、Spark ALS Implicat模型、基于用户的协同过滤模型、基于项目的协同过滤模型。针对这些模型,使用评价系统来对其进行评价,对比各种模型评价结果,得到最优模型进行评价。 4.5.1基于Spark ALS & Spark ALS Implicit建模 本节介绍ALS推荐算法, ALS是alternating least squares的缩写, 意为交替最小二乘法,该方法常用于基于矩阵分解的推荐系统中。例如:将用户(user)对项目(item)的评分矩阵分解为两个矩阵:一个是用户对项目隐含特征(指的是使用这些隐含的特征可以较好地表示这个用户的评价体系)的偏好矩阵,另一个是项目所包含的隐含特征(指的是使用这些隐含的特征可以较好地表示这个项目)的矩阵。在这个矩阵分解的过程中,评分缺失项会被填充,也就是说可以基于这个填充的评分来对用户没有评价过的商品进行排序,得到预测填充评分最高的多个项目,来对用户进行推荐。 对于一般的用户项目矩阵R(m×n),ALS推荐算法旨在寻找两个低维矩阵X(m×k)和矩阵Y(n×k),使得矩阵X和矩阵Y的乘积逼近R(m×n),即: 公式 41 其中,R(m×n)代表用户对项目的评分矩阵,X(m×k)代表用户对隐含特征的偏好矩阵,Y(n×k)表示项目所包含隐含特征的矩阵,T表示矩阵的转置。实际中,一般取k<

从HDFS读取数据,转换为 (User,URL,Rating),以splitVisitedNumArray进行数据过滤@param sc : SparkContext@param inputDir 原始数据路径@param splitVisitedNumArray 如(2,5,10,30),代表分割次数的点@param splitVisitedNumValues 如(0,2,3,4,5),代表分割次数段对应的值,需要比splitVisitedNumArray多1个@return 那么访问次数为2以下的次数被映射为0,并且被去除,次数在[2,5)的那么分数为2,次数在[5,10)的分数为3 次数为[10,30)的分数为4,次数在[30,+Max) 分数为5

*/ def initRatingWithNum(sc:SparkContext,inputDir: String, splitVisitedNumArray: Array[Int], splitVisitedNumValues : Array[Int]): RDD[(Int, Int, Int)] = { val dataRaw = sc.textFile(inputDir).map { x => val fields = x.slice(1, x.size - 1).split(“,”); (fields(0).toInt, fields(1).toInt) } val dataCount = dataRaw.map(x => (x, 1)).reduceByKey((x, y) => (x + y)). filter(x => x._2 >= splitVisitedNumArray(0)) // 过滤掉最小次数的值 //Array(((-1,2),0), ((2,5),2), ((5,10),3), ((10,30),4), ((30,2147483647),5)) val spliterPointWithValue = (-1 +: splitVisitedNumArray).zip(splitVisitedNumArray :+ Integer.MAX_VALUE). zip(splitVisitedNumValues) // dataCount : RDD[((Int,Int),Int)], splitPointWithValue filter得到的数据有且只有一个,所以使用下标0不会越界 dataCount.map(x => (x._1._1,x._1._2, spliterPointWithValue.filter(y => y._1._1 >= x._2 && y._1._2 > x._2)(0).2)) } } // 数据映射 val data = initRatingWithNum(sc,”/user/root/law_info_data_data.txt”,Array(2,5,10,20),Array(0,2,3,4,5),) 加载经过映射后的数据,并设置参数即可建立模型,如代码清单 416所示。 代码清单 416 建立模型 // 数据加载 val data = initRatingWithNum(sc,”/user/root/law_fagui_data_data.txt”,Array(2,5,10,20),Array(0,2,3,4,5),) // 建模 val rank = 10 val numIterations = 10 val lambda =0.01 val model = ALS.train(training,rank,numIterations,lambda) // 如果使用implicit进行建模,则添加下面的代码 val alpha = 0.01 val model1 = ALS.trainImplicit(ratings, rank, numIterations, lambda, alpha) 4.5.2基于用户的协同过滤建模 基于用户的协同过滤,即通过不同用户对项目的评分来评测用户之间的相似性,搜索目标用户的最近邻,然后根据最近邻的评分向目标用户产生推荐。具体描述如下: 1.计算相似度 用户之间的相似度通过每个用户对项目的评分向量(注意,在本文中如果用户对某个URL进行访问,那么该项目就为1,如果没有访问,那么就为0)计算得到。相似度的计算可以使用任何向量相似度计算公式,但在实际使用中,需要选择一种契合模型数据的算法。同时,如果现有相似度计算算法不符合实际情况,也可对其加以改进。 2.寻找与目标用户最近邻的K个用户 在计算出各个用户之间的相似度后,可以找到所有与目标用户的相似度大于某一阈值的近邻用户(第一步粗略过滤),然后对这些用户按照相似度进行排序,得到前K个近邻用户。 3.通过这K个用户进行推荐 得到K个近邻用户后,怎么推荐呢?当然,这里的方式有多种。比如使用相似度和所有K个用户的项目对应加权进行推荐。 根据上述算法原理,编写Spark的基于用户的协同过滤算法,其代码如代码清单 417所示。 代码清单 417 Spark 基于用户的协同过滤算法实现 package com.tipdm.userbased import scala.math. import com.tipdm.utils.SparkUtils

/**

创建基于用户的协同过滤模型,需要输入以下参数trainDataPath: 训练数据(userid,itemid)modelPath:模型存储目录minItemsPerUser: 单用户的最小访问物品数recommendItemNum: 单个用户的最大推荐物品数目splitter:输入原始数据分隔符 */ object ModelCreate { def main(args:Array[String]) = { if (args.length != 5) { System.err.println(“Usage: com.tipdm.userbased.ModelCreate " +” ") } // 处理参数 val trainDataPath = args(0) val modelPath = args(1) val minItemsPerUser = args(2).toInt val recommendItemNum = args(3).toInt val splitter = args(4) val appName = " UserBased CF Create Model " val sc = SparkUtils.getSparkContext(appName) // 加载训练集数据 val trainDataRaw= sc.textFile(trainDataPath).map{x=>val fields=x.slice(1,x.size-1).split(splitter); (fields(0).toInt,fields(1).toInt)} // 获取训练集数据,以单用户最小访问Item数过滤 val trainDataFiltered = trainDataRaw.groupBy(_._1).filter(data=>data.2.toList.size>=minItemsPerUser).flatMap(._2) // (user,item)pair 的重复次数统计 val trainUserItemNumPre = trainDataFiltered.countByValue().toArray.map(x=>(x._1._1,(x._1._2,x._2.toInt))) // user的访问次数统计 val trainUserNumPre = trainDataFiltered.keys.countByValue().toArray // 转化为RDD val trainUserItemNum = sc.parallelize(trainUserItemNumPre) val trainUserNum =sc.parallelize(trainUserNumPre) // 建立用户相似度矩阵 // (user,item,userItemNum,userSum) val userItemBase = trainUserItemNum.join(trainUserNum).map(x=>(x._1,x._2._1._1,x._2._1._2.toInt,x._2._2.toInt)) // (item,(user,userItemNum,userSum)) val itemUserBase = userItemBase.map(x=>(x._2,(x._1,x._3,x._4))) // [(item, ((userA,userAItemNum,userASum), (userB,userBItemNum,userBSum)))] val itemMatrix = itemUserBase.join(itemUserBase).filter((f => f._2._1._1 < f._2._2._1)) // (userA,userB),(userAItemNum,userASum,userBItemNum,userBSum) val userSimilarityBase = itemMatrix.map(f=>((f._2._1._1,f._2._2._1),(f._2._1._2,f._2._1._3,f._2._2._2,f._2._2._3))) // 应用Jaccard 公式求相似度 val userSimilarityPre = userSimilarityBase.map(data => { val user1=data._1._1 val user2= data._1._2 val similarity = (min(data._2._1, data._2._3))*1.0/(data._2._2 + data._2._4) ((user1, user2), similarity) }).combineByKey( x=>x, (x:Double,y:Double)=>(x+y), (x:Double,y:Double)=>(x+y)) // 用户相似度 (user,(user,similarity)) val userSimilarity = userSimilarityPre.map(x=>((x._1._2,x._1._1),x._2)).union(userSimilarityPre). map(x=>(x._1._1,(x._1._2,x._2))) // 初始化推荐集合(user,List(item,similarity)) val statistics = trainDataFiltered.join(userSimilarity).map(x=>(x._2._2._1,(x._2._1,x._2._2._2))).combineByKey( (x:(Int,Double)) => List(x), (c:List[(Int,Double)], x:(Int,Double)) => c :+ x , (c1:List[(Int,Double)], c2:List[(Int,Double)]) => c1 ::: c2) //生成推荐集合(user,List(item)) //为每个user,截取前recommendItemNum个item记录 val dataModel = statistics. map(data=>{val key = data._1; val value = data.2.sortWith(.2>._2); if(value.size>recommendItemNum){ (key,value.slice(0,recommendItemNum)) }else{(key,value)}}). map(x=>(x._1,x._2.map(x=>x._1))) // 存储模型 dataModel.repartition(12).saveAsObjectFile(modelPath) println(“Model saved”) sc.stop() } } 4.5.3基于项目的协同过滤建模 基于项目的协同过滤推荐算法,其基本思想是用户的项目的预测评分可以由该用户对与该项目相似度最高的个邻居项目的评分通过加权平均计算得到,如图 46所示,对项目1感兴趣的用户也都对项目2项目n感兴趣,因此项目1和项目2项目n的相似度较高,它们是相似项目,而用户t目前对项目2~项目n感兴趣但还没发现项目1,因此可将项目1推荐给用户t。

图 46 基于项目的协同过滤原理 根据上述原理,编写Spark的基于项目的协同过滤算法,其代码如代码清单 418所示。 代码清单 418 Spark 基于项目的协同过滤算法实现 package com.tipdm.itembased import com.tipdm.utils.SparkUtils import scala.math._ /**

创建基于项目的协同过滤模型,需要输入以下参数trainDataPath: 训练数据(userid,itemid)minVisitedNumPerUser: 单用户最小访问item的数量recommendItemNum: 单个物品的最大推荐数目modelPath:模型存储目录trainFilteredPath: 过滤后的训练数据存储目录splitter:输入原始数据分隔符 */ object ModelCreate { def main(args:Array[String]) = { if (args.length != 6) { System.err.println("Usage: com.tipdm.itembased.ModelCreate " + “ ”) } // 处理参数 val trainDataPath = args(0) val minVisitedNumPerUser = args(1).toInt val recommendItemNum = args(2).toInt val modelPath = args(3) val trainFilteredPath = args(4) val splitter = args(5) val appName = " ItemBased CF Create Model " val sc = SparkUtils.getSparkContext(appName) sc.setLogLevel(“WARN”) // 加载训练集数据 val trainDataRaw= sc.textFile(trainDataPath).map{x=>val fields=x.slice(1,x.size-1).split(splitter); (fields(0).toInt,fields(1).toInt)} println("trainDataRawrecords count : " + trainDataRaw.count) // val trainData = trainDataRaw.groupBy(_._1).filter(data=>data.2.toList.size>=minVisitedNumPerUser).flatMap(._2).cache() // (user,item,userItemNum) val trainUserItemNumPre = trainData.countByValue().toArray // (item,itemSum) val trainItemNumPre = trainData.values.countByValue().toArray // (item,(user,userItemNum)) // userItemNum次数大于200,设定为200 val trainUserItemNum = sc.parallelize(trainUserItemNumPre).map(data=>{ val item = data._1._2; val user = data._1._1; var userItemNum = data._2.toInt; if (data._2>200){userItemNum = 200} (item,(user,userItemNum)) }) // (item,itemSum) // itemSum次数大于300,设定为300 val trainItemNum =sc.parallelize(trainItemNumPre).map(data=>{ val item = data._1; var itemSum =data._2; if (data._2>300) {itemSum = 300} (item,itemSum) }) // (user,item,userItemNum,itemSum) val itemUserBase = trainUserItemNum.join(trainItemNum). map(x=>(x._2._1._1,(x._1,x._2._1._2,x._2._2.toInt))).cache() // [(user, ((itemA,userItemANum,itemASum), (itemB,userItemBNum,itemBSum)))] val itemMatrix = itemUserBase.join(itemUserBase).filter((f => f._2._1._1 < f._2._2._1)) // (itemA,itemB),(userItemANum,itemASum,userItemBNum,itemBSum) val itemSimilarityBase = itemMatrix.map(f=>((f._2._1._1,f._2._2._1),(f._2._1._2,f._2._1._3,f._2._2._2,f._2._2._3))) // calculate similarity by using Jaccard val itemSimilarityPre = itemSimilarityBase.map(data => { val item1=data._1._1 val item2= data._1._2 val similarity = (min(data._2._1, data._2._3))*1.0/(data._2._2 + data._2._4) ((item1, item2), similarity) }).combineByKey( x=>x, (x:Double,y:Double)=>(x+y), (x:Double,y:Double)=>(x+y)) // item similarity (item,(item,similarity)) val itemSimilarity = itemSimilarityPre.map(x=>((x._1._2,x._1._1),x._2)).union(itemSimilarityPre). map(x=>(x._1._1,(x._1._2,x._2))) // 生成item推荐模型 (item,List(item)) val dataModelPre = itemSimilarity.combineByKey( (x:(Int,Double)) => List(x), (c:List[(Int,Double)], x:(Int,Double)) => c :+ x , (c1:List[(Int,Double)], c2:List[(Int,Double)]) => c1 ::: c2) // 用模型匹配trainData val dataModel = trainData.map(x=>(x._2,x._1)).join(dataModelPre) // 按相似度排序,生成推荐结果集 ==> (user,List(item)) val finalModel = dataModel.flatMap(joined => { joined._2._2.map(f => (joined._2._1,f._1,f._2))}).sortBy(x => (x._1,x._3),false). map(x=>(x._1,x._2)). combineByKey( (x:Int) => List(x), (c:List[Int], x:Int) => c :+ x , (c1:List[Int], c2:List[Int]) => c1 ::: c2).map(x => (x._1,x._2.take(recommendItemNum))) // 存储模型 finalModel.repartition(12).saveAsObjectFile(modelPath) // 存储训练数据 trainData.saveAsTextFile(trainFilteredPath) sc.stop() } }

4.5.4模型评价&最优模型 好的推荐系统能够满足用户的需求,推荐其感兴趣但不全是热门的物品,同时也需要用户反馈意见帮助完善其推荐系统。因此,好的推荐系统不仅能预测用户的行为,而且能帮助用户发现可能会感兴趣,但却不易被发现的物品。同时,推荐系统还应该帮助商家将长尾中的好商品发掘出来,推荐给可能会对它们感兴趣的用户。在实际应用中,评测推荐系统是必不可少的。评测指标主要来源于如下3种评测推荐效果的实验方法,即离线测试、用户调查和在线实验。 离线测试是通过从实际系统中提取数据集,然后采用各种推荐算法对其进行测试,获各个算法的评测指标。这种实验方法的好处是不需要真实用户参与。 注意:离线测试的指标和实际商业指标存在差距,比如预测准确率和用户满意度之间就存在很大差别,高预测准确率不等于高用户满意度。所以当推荐系统投入实际应用之前,需要利用测试的推荐系统进行用户调查。 用户调查利用测试的推荐系统调查真实用户,观察并记录他们的行为,并让他们回答一些相关的问题。通过分析用户的行为及反馈来判断测试推荐系统的好坏。 在线测试顾名思义就是直接将系统投入实际应用中,通过不同的评测指标比较不同的推荐算法的结果,比如点击率,跳出率等。 由于本例中的模型是采用离线的数据集构建的,因此在模型评价阶段采用离线测试的方法获取评价指标。因为不同表现方式的数据集,其评测指标也不同,针对不同的数据方式,其评测指标的公式见表 413。 表 413 评测指标表 数据表现方式 指标1 指标2 指标3 预测准确度 分类准确度 在某些电子商务的网站中,存在对物品进行打分的功能。在存在此种数据的情况下,如果要预测用户对某个物品的评分,需要采用的数据表现方式为预测准确度,其中评测的指标有均方根误差(RMSE),平均绝对误差(MAE)。其中代表用户u对物品i的实际评分,代表推荐算法预测的评分,N代表实际参与评分的物品总数。 同时在电子商务网站中,用户只有二元选择,比如:喜欢与不喜欢,浏览与否等。针对这类型的数据预测,就要用分类准确度,其中的评测指标有准确率(P、precesion),它表示用户对一个被推荐产品感兴趣的可能性。召回率(R、recall)表示一个用户喜欢的产品被推荐的概率。F1指标综合考虑了准确率与召回率因素,能更好的评价算法的优劣(F1越大,说明算法越优)。其中相关的指标说明见表 414。 表 414 分类准确度指标说明表 预测 合计 推荐物品数(正) 未被推荐物品数(负) 实际 用户喜欢物品数(正) TP FN TP+FN 用户不喜欢物品数(负) FP TN FP+TN 合计 TP+FP TN+FN 根据上述指标,计算评价指标召回率、进度公式如下: 召回率recall= TP /(TP + FN)意思为:正样本预测结果数 / 正样本实际数; 精度precision= TP /(TP+FP)意思为:正样本预测结果数 / 推荐物品数; 经过预处理后的数据,再次分为知识类、咨询类和法规类数据,针对每类数据都采用统一的处理方式,以下代码以法规类数据为示例进行演示(针对其他类参考法规类数据处理方式即可)。 法规类数据首先按照时间戳分为3份,分别是:训练集、验证集和测试集,对应占比为80%、10%、10%。其分割代码如代码清单 419所示。 代码清单 419 知识类数据分割为训练集、验证集、测试集 // 分割点:编码数据: timestamp,user,url,urlType val data = sc.textFile(“/user/root/law_data_replaced”).map{x => val fields=x.split(“,”); (fields(0).toLong,fields(1).toInt,fields(2).toInt,fields(3).toDouble)} val timeStamp = data.map(_._1) val num = timeStamp.count val firstSplitPoint = num * 0.8 val secondSplitPoint = num * 0.9

// 分割数据为训练集、测试集合及验证集 val train = data.filter(x => x._1 < firstSplitPoint).map(x => (x._2,x._3,x._4)) val validation = data.filter(x => x._1 >= firstSplitPoint && x._1 < secondSplitPoint).map(x => (x._2,x._3,x._4)) val test = data.filter(x => x._1 > secondSplitPoint).map(x => (x._2,x._3,x._4)) 训练集用于训练模型,验证集用于评估模型以找到最优模型,测试集对最优模型进行验证。在最优模型选择过程中,不同算法需要采用不同的评估方式:针对Spark ALS和Spark ALS Implicit模型采用均方根误差(RMSE)来进行评估、针对基于用户/项目的协同过滤模型采用F1值来进行评估,同时,由于k值的选取会造成F1值的变化,所以针对各个算法的最优模型的评估需要综合多个K值并采用F1值进行评估,具体评估方法见下文。 Spark ALS & Spark ALS Implicit 针对Spark ALS算法以及Spark ALS Implicate算法,采用均方根误差来进行模型寻优,其思路为:1)定义计算RMSE函数,该函数接收一个模型以及测试数据作为参数,根据模型预测测试数据,并和原始测试数据进行对比,得到RMSE;2)针对训练集、验证集、测试集使用1.节中的映射方法处理各个数据集;3)设置建模参数,采用循环的方式遍历每组参数,针对每组参数建立一个模型,计算求得RMSE,如果当前模型的RMSE小于当前定义的最小的RMSE,则赋值对应参数;4)循环结束得到最优模型以及该模型的建模参数。 如代码清单 420所示代码,即是Spark ALS算法寻找最优模型的代码。 代码清单 420 Spark ALS 模型寻优 /** * 根据模型及测试数据集计算均方根误差 * @param model 模型 * @param data 测试数据 * @return 均方根误差 / def computeRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = { val usersProducts = data.map(x => (x.user, x.product)) val ratingAndPredictions1 = data.map { case Rating(user, product, rating) => ((user, product), rating) } val ratingsAndPredictions = ratingAndPredictions1.join(model.predict(usersProducts).map { case Rating(user, product, rating) => ((user, product), rating) }).values math.sqrt(ratingsAndPredictions.map(x => (x._1 - x._2) * (x._1 - x._2)).mean()) } /* * 建立ALS模型,寻求最佳参数 * @param rankList rank值列表 * @param iteration 循环次数 * @param lamdbaList lambda值列表 * @param trainRatingSet 训练数据集 * @param testRatingSet 测试数据集 * @param outPutDir 参数输出目录 */ def getModelParameter(rankList: Array[(Int)], iteration: Int, lamdbaList: Array[(Double)], trainRatingSet: RDD[Rating], testRatingSet: RDD[Rating], outPutDir: String) = { var bestRMSE = 999.00 var bestRank = 0 var bestLambda = 0.0 for (rank <- rankList) { for (lambda <- lamdbaList) { val testModel = ALS.train(trainRatingSet, rank, iteration, lambda) // Train Model val testRmse = computeRMSE(testModel, testRatingSet) // Calculate RMSE //println(rank+“: “+lambda+”: “+alpha+”=>”+testRmse) if (testRmse < bestRMSE) { bestRMSE = testRmse bestRank = rank bestLambda = lambda } } } println(“BestRank:Iteration:BestLambda:BestAlpha => BestRMSE”) println(bestRank + ": " + iteration + ": " + bestLambda + " => " + bestRMSE)

val result = Array(bestRank + "," + iteration + "," + bestLambda)

sc.parallelize(result).repartition(1).saveAsTextFile(outPutDir);

} val listRank = Array(10,20,30,40,50) val iteration = 10 val listLambda = Array(0.001,0.005,0.01,0.03,0.09,0.3,0.6,1.0,2.0) val parameterPath = “/user/root/als/fagui/parameter” val minVisitTrain = Array(3,5,10,20) // train splitPoint val minVisitValidate = Array(2,5,10,20) // validate splitPoint val pointValues = Array(0,2,3,4,5) // 参考代码清单 415,处理数据 // … // 建立ALS模型,以RMSE值进行评测寻优,把最佳参数组存储到HDFS目录 getModelParameter(listRank, iteration, listLambda, train, validation, parameterPath) 由于minVisitTrain(在训练集中每个用户最少访问的URL个数)以及minVistiValidate(在验证集中每个用户最少访问URL个数)参数设置不同(设置minVisitTrain和minVisitValidate参数可以对数据进行一步过滤,过滤掉不合理数据),会导致最终最优模型的参数会有多组,这里设置多组参数值,分别得到各组参数值,最优模型参数如表 415所示。 表 415 Spark ALS算法知识类数据模型寻优参数结果 训练|验证集参数|值参数 最优Rank值 最优Lambda值 (3,5,10,20) | (1,5,10,20) |( 0,2,3,4,5) 30 0.6 (4,5,10,20) | (3,5,10,20) |( 0,2,3,4,5) 30 0.6 (4,5,10,20) | (4,5,10,20) |( 0,2,3,4,5) 40 0.6 如代码清单 421所示代码为Spark ALS Implicate算法模型寻优代码。 代码清单 421 Spak ALS Implicate模型寻优 // def computeRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = ??? // 参考代码清单 420代码 /** * 建立ALS Implicit模型,寻求最佳参数 * @param rankList rank值列表 * @param iteration 循环次数 * @param lamdbaList lambda值列表 * @param alphaList alpha值列表 * @param trainRatingSet 训练数据集 * @param testRatingSet 测试数据集 * @param outPutDir 参数输出目录 / def getModelParameter(rankList: Array[(Int)], iteration: Int, lamdbaList: Array[(Double)], alphaList:Array[(Double)], trainRatingSet: RDD[Rating], testRatingSet: RDD[Rating], outPutDir: String) = { var bestRMSE = 999.00 var bestRank = 0 var bestLambda = 0.0 var bestAlpha = 0.0 for(rank<-rankList) { for(lambda<-lamdbaList){ for(alpha<-alphaList){ val testModel = ALS.trainImplicit(trainRatingSet,rank,iteration,lambda,alpha) // Train Model val testRmse = computeRMSE(testModel,testRatingSet) // Calculate RMSE if (testRmse < bestRMSE) { bestRMSE = testRmse bestRank = rank bestLambda = lambda bestAlpha = alpha }}}} println(“BestRank:Iteration:BestLambda:BestAlpha => BestRMSE”) println(bestRank + ": " + iteration + ": " + bestLambda + " => " + bestRMSE) val result = Array(bestRank + “,” + iteration + “,” + bestLambda) sc.parallelize(result).repartition(1).saveAsTextFile(outPutDir); } val listRank = Array(5,15,20,25,30,35,45) val iteration = 10 val listLambda = Array(0.001,0.005,0.01,0.03,0.09,0.3,0.6,1.0,1.8,3.0) val alphaList = Array(0.001,0.01,0.6,1.5,6,12,25,40,60) val parameterPath = “/user/root/alsimplicit/fagui/parameter” val minVisitTrain = Array(3,5,10,20) // train splitPoint val minVisitValidate = Array(2,5,10,20) // validate splitPoint val pointValues = Array(0,2,3,4,5) // 参考代码清单 415,处理数据 // … // 建立ALS模型,以RMSE值进行评测寻优,把最佳参数组存储到HDFS目录 getModelParameter(listRank, iteration, listLambda, train, validation, parameterPath) 参考Spark ALS算法模型,这里最优模型的参数也会有多组,这里设置多组参数值,分别得到各组参数值最优模型参数如表 416所示。 表 416 Spark ALS Implicit算法知识类数据模型寻优参数结果 训练|验证集参数|值参数 最优Rank值 最优Lambda值 最优Alpha值 (3,5,10,20) | (1,5,10,20) |( 0,2,3,4,5) 35 0.3 0.001 (3,5,10,20) | (3,5,10,20) |( 0,2,3,4,5) 35 0.3 0.001 (4,5,10,20) | (4,5,10,20) |( 0,2,3,4,5) 35 0.3 0.001 Spark User-based & Spark Item-based 针对基于用户和基于项目的协同过滤算法只需要设置最小用户评价URL的个数或最小URL被用户评价个数即可,得到最优模型。但是,在对实际数据处理的过程中发现,当过滤数据比较少时,这两个算法的计算量太大(比如50000的用户,那么如果使用基于用户的协同过滤算法,那么需要计算的数据量就是5000050000/2数量级),不适合实际应用,所以这里不采用这两种算法,而单单对比Spark ALS和Spark ALS Implicit这两种算法。 4.5.5结果分析 使用4.节相关内容,得到最优模型,并计算各个最优模型的分类评价指标,如表 417所示。 表 417 Spark各组算法最优模型评价 数据 算法 K R (%) P (%) F1 法规专题 Spark ALS(Rank=30,Iteration=10,lambda=0.6,训练集过滤>3,验证集过滤>1) 10 1.83 0.2 0.36 20 2.05 0.11 0.21 30 2.18 0.08 0.15 40 2.27 0.06 0.12 50 2.39 0.05 0.10 Spark ALS(Rank=30,Iteration=10,lambda=0.6,训练集过滤>4,验证集过滤>3) 10 0.89 0.1 0.18 20 1.02 0.06 0.11 30 1.12 0.04 0.08 40 1.18 0.03 0.06 50 1.24 0.03 0.06 Spark ALS(Rank=40,Iteration=10,lambda=0.6,训练集过滤>4,验证集过滤>4) 10 0.61 0.07 0.13 20 0.67 0.04 0.08 30 0.75 0.03 0.06 40 0.81 0.02 0.04 50 0.89 0.02 0.04 Spark ALS Implicit(Rank=35,Iteration=10,lambda=0.3,Alpha=0.001,训练集过滤>3,验证集过滤>1) 10 2.82 0.31 0.56 20 3.35 0.18 0.34 30 3.6 0.13 0.25 40 3.73 0.1 0.19 50 3.82 0.08 0.16 Spark ALS Implicit(Rank=35,Iteration=10,lambda=0.3,Alpha=0.001,训练集过滤>3,验证集过滤>3) 10 1.39 0.15 0.27 20 1.66 0.09 0.17 30 1.79 0.06 0.12 40 1.87 0.05 0.10 50 1.91 0.04 0.08 Spark ALS Implicit(Rank=30,Iteration=10,lambda=0.3,Alpha=0.001,训练集过滤>4,验证集过滤>4) 10 0.81 0.09 0.16 20 0.97 0.05 0.10 30 1.05 0.04 0.08 40 1.09 0.03 0.06 50 1.11 0.02 0.04 根据表 417的结果,做Spark ALS和Spark ALS Implicit算法模型的对比结果, 其F1评价指标画图如图 47、图 48所示。

图 47 Spark ALS算法最优模型F1评价

图 48 Spark ALS Implicit最优模型F1评价 从图 47、图 48中可以看到对于单个算法来说,使用不同的过滤方式将得到不同的最优模型。对于Spark ALS和Spark ALS Implicit算法,设置的过滤参数如果较小(比如训练集3,验证集1),那么模型效果也较好。接着,使用Spark ALS以及Spark ALS Implicit模型的最优模型来做对比,如图 49、图 410所示。

图 49 Spark ALS & Spark ALS Implicit F1值对比

图 410 Spark ALS & Spark ALS Implicit Recall/Precision值对比 从图 49、图 410中可以很明显的看出,综合来说,Spark ALS Implicit模型最优。同时,使用Spark ALSImplicit算法模型对实际的数据进行推荐,其得到的结果也是可以解释的。例如针对咨询类数据进行推荐,其结果如表 418所示。 表 418 咨询类推荐结果 用户 访问网址 推荐网址 3951071 “http://www..com/ask/question_10244513.html" "http://www..com/ask/question_10244238.html” [1]“http://www..com/ask/question_10243783.html" [2]"http://www..com/ask/question_10244541.html” [3]“http://www..com/ask/question_10223080.html" [4]"http://www..com/ask/question_10223488.html” [5]"http://www..com/ask/question_10246475.html" 21777264 "http://www..com/ask/question_10383635.html" "http://www..com/ask/question_10383635.html" [1]"http://www..com/ask/question_10162051.html" 参考表 418的结果,在浏览器访问上述网址,发现网址“http://www..com/ask/question_10244513.html,http://www..com/ask/question_10244238.html”和网址“http://www..com/ask/question_10243783.html”等的相关度很高,这说明推荐的结果可以应用于实际。 5构建法律服务大数据智能推荐系统 5.1动手实践:构建推荐系统JavaEE 完善基础JavaEE Web程序,接着添加EasyUI及Jquery相关的JavaScript支持,工程结构如图 51所示。

图 51 法律服务大数据智能推荐工程结构 系统首页是系统介绍,具体介绍内容参考本案例背景及架构部分,其首页效果如图 52所示。需要注意此工程同时结合了法律网服务器工程和大数据推荐平台,由四个部分组成:算法建模、模型评估、算法寻优、模型推荐。

图 52 法律服务大数据智能推荐系统首页 在算法建模模块,一共提供了四个算法,分别是基于用户协同过滤算法、基于项目协同过滤算法、ALS协同过滤算法、ALS Implicit协同过滤算法。以ALS算法建模为例,其页面如图 53所示:

图 53 ALS算法建模界面 ALS算法建模需要用户提供原始数据所在路径(注意这个路径所包含的数据需要包含用户、项目、评分数据)、建模成功后模型存储的路径、算法参数矩阵分解秩、算法参数正则系数、算法参数循环次数、原始数据处理所需数据分隔符,以上参数设置成功后,点击“建模”,即可开始模型构建。 建模过程中,后台调用的是封装的Spark算法,采用Spark On YARN的运行方式,其核心代码如代码清单 51所示。 代码清单 51 提价Spark任务到YARN集群核心代码 /** * 调用Spark 加入监控模块 * @param args * @return Application ID字符串 / public static String runSpark(String[] args) { StringBuffer buff = new StringBuffer(); for (String arg : args) { buff.append(arg).append(“,”); } log.info(“runSpark args:” + buff.toString()); try { System.setProperty(“SPARK_YARN_MODE”, “true”); SparkConf sparkConf = new SparkConf(); sparkConf.set(“spark.yarn.jar”, getProperty(“spark.yarn.jar”)); sparkConf.set(“spark.yarn.scheduler.heartbeat.interval-ms”, getProperty(“spark.yarn.scheduler.heartbeat.interval-ms”)); ClientArguments cArgs = new ClientArguments(args, sparkConf); Client client = new Client(cArgs, cdhConfiguration.getConfiguratoin(), sparkConf); // client.run(); // 去掉此种调用方式,改为有监控的调用方式 /* * 调用Spark ,含有监控 */ ApplicationId appId = null; try { appId = client.submitApplication(); } catch (Throwable e) { e.printStackTrace(); // 返回null return null; } // 开启监控线程 updateAppStatus(appId.toString(), getProperty(“als.submitted.progress”));// 提交任务完成,返回2%作为提交任务成功的百分比 log.info(allAppStatus.toString()); new Thread(new MonitorThread(appId, client)).start(); return appId.toString(); } catch (Exception e) { e.printStackTrace(); return null; } } 算法评估模块也对应有四个模型评估页面,以ALS算法模型评估为例,其界面如图 54所示。

图 54 ALS算法评估界面 这里仍使用封装Spark算法,使用Spark On YARN的方式调用。算法评估需要提供训练数据集路径(用于过滤)、测试数据集路径、模型路径、K值列表(模型推荐的项目个数)、单用户最小访问URL个数(用于过滤数据)、结果路径等,其返回结果如图 55所示。

图 55 ALS算评估结果界面 同理,在算法寻优模块也有四个对应的页面,以ALS算法为例,其界面如图 56所示:

图 56 Spark ALS算法模型寻优界面 模型寻优其实就是设置多组建模参数,每组参数对应一个模型,然后通过模型评估选择最优模型的过程。算法参数值以列表形式输入,如矩阵分解秩rank列表、正则化系数lambda列表等。 最后一个模块是模型推荐,模型推荐使用提供的模型来对用户进行推荐,其界面如图 57所示。

图 57 模型推荐界面 在模型推荐界面中,用户需要提供使用的推荐算法以及使用的推荐模型路径,提供用户的ID以及对该用户推荐的URL个数,点击“生成推荐”即可对用户进行推荐,其推荐结果如图 58所示。

图 58 模型推荐结果界面 思考: 1.参考上述描述以及提供的参考工程,完成工程中提示的任务(TODO提示); 2.完成工程相关功能后,使用各个模型进行推荐,验证各个算法模型推荐效果; 5.2动手实践:Oozie工作流任务 通过前面章节的分析,读者应该对法律服务的整体流程及其相关实现有了一个清晰的认识,在现实情况中一般任务都会串联起来运行,这里使用Oozie来串联所有的任务。本节中,首先会使用Spark来封装数据预处理的相关算法规则,然后使用该封装的算法来启动Oozie任务。 参考4.4节数据预处理规则来定义相关Spark处理算法,其代码封装如代码清单 52所示。 代码清单 52 Spark封装数据预处理代码 package spark import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /**

数据预处理输入:

HDFS数据路径 (flume路径 ,“/flume_data//\/”)

读入后的Partition个数 ,12;

训练集分割点: 0.8

验证集分割点:0.9 处理得到:

用户和URL编码;

训练数据集、测试数据集、验证数据集;

/ object Prepare { def main(args: Array[String]) { if(args.length != 5){ println(“Usage: spark.Prepare ”) System.exit(-1) } // 参数处理 val input = args(0) val partitions = args(1).toInt val trainPercent = args(2).toDouble val validatePercent = args(3).toDouble val output = args(4) // 删除 output FileSystem.get(new Configuration()).delete(new Path(output),true) // 得到SparkContext val sc = new SparkContext(new SparkConf()) val data = sc.textFile(input,partitions) val parsedData = data.map(parse()).filter(.size != 1 ).map(x => (x(4),x(8),x(6))) /* * 编码 / val userSize = parsedData.map(.1).distinct.count val urlSize = parsedData.map(.2).distinct.count val userZipCode:RDD[(String,Long)] = parsedData.map(.1).distinct.sortBy(x => x).zipWithIndex() val urlZipCode:RDD[(String,Long)] = parsedData.map(.2).distinct.sortBy(x => x).zipWithIndex() val userZipCode = userZipCode.collect.toMap val urlZipCode = urlZipCode.collect.toMap val codeParedData = parsedData.map(x => (userZipCode_(x.1),urlZipCode(x._2), try{x._3.toLong}catch{case _ => new java.math.BigDecimal(x._3).toPlainString.toLong})) // 按照时间戳排序 val sortCodeParsedData = codeParedData.sortBy(x => x._3) // 分割训练集、验证集、测试集 val dataCount = sortCodeParsedData.count val firstSplitPoint = dataCount * trainPercent toInt val secondSplitPoint = dataCount * validatePercent toInt val splitUrlPoints = sortCodeParsedData.zipWithIndex. filter(x => x._2 == firstSplitPoint || x._2 == secondSplitPoint).map(x => x._1.3).collect val train = sortCodeParsedData.filter(._3 < splitUrlPoints(0)).map(x => (x._1,x._2) ) val validate = sortCodeParsedData.filter(x => x._3 >= splitUrlPoints(0) && x._3 < splitUrlPoints(1)).map(x => (x._1,x.2) ) val test = sortCodeParsedData.filter(._3 >= splitUrlPoints(1)).map(x => (x._1,x._2) ) //规约访问次数到评分 val realTrain : RDD[(Long,Long,Double)]= train.map(x => (x,1)).reduceByKey((x,y) => x+y).map(x => (x._1._1,x._1._2,mapping(x._2))) val validateTrain: RDD[(Long,Long,Double)] = validate.map(x => (x,1)).reduceByKey((x,y) => x+y).map(x => (x._1._1,x._1._2,mapping(x._2))) val testTrain : RDD[(Long,Long,Double)] = test.map(x => (x,1)).reduceByKey((x,y) => x+y).map(x => (x._1._1,x._1._2,mapping(x._2))) // 保存数据 userZipCode.map(x => x._1 +“,”+x._2).saveAsTextFile(output+“/userZipCode”) urlZipCode.map(x => x._1 +“,”+x._2).saveAsTextFile(output+“/urlZipCode”) realTrain.map(x => x._1 +“,”+x._2+“,”+x._3).saveAsTextFile(output+“/realTrain”) validateTrain.map(x => x._1 +“,”+x._2+“,”+x._3).saveAsTextFile(output+“/realValidate”) testTrain.map(x => x._1 +“,”+x._2+“,”+x._3).saveAsTextFile(output+“/realTest”) // 关闭 SparkContext sc.stop() } /*

数据转换,替换双引号中的逗号为空格@param str@return / def parse(str:String) :Array[String] ={ var flag = false var strr = str for(i <- 0 until str.size) { if(‘"’.equals(str(i))){ if(flag) flag = false else flag = true } if(flag && ‘,’.equals(str(i))) strr = strr.updated(i,’ ') } strr.split(“,”,-1) } /*根据规则规约访问次数到评分@param times@return / def mapping(times :Int) = if(1 <= times && times <5) times.toDouble else if( times > 100) 10.0 else (times - 5) * 5.0 / 95 +5 } 封装好上述代码后,使用相关打包工具把编译后的代码输出成Jar包待用。接着,定义Oozie相关配置文件,如代码清单 53、代码清单 54所示。 代码清单 53 Spark Prepare Oozie工作流 job.properties oozie.wf.application.path=

n

a

m

e

N

o

d

e

/

u

s

e

r

/

{nameNode}/user/

nameNode/user/{user.name}/workflow/spark_prepare nameNode=hdfs://nameservice1 resourceManager=node41.tipdm.com:8032 master=yarn-cluster queueName=default oozie.use.system.libpath=true input=/flume_data//*/ jarPath=KaTeX parse error: Unexpected character: '' at position 225: …_prepare 代码清单 5̲4 Spark Prepare…{resourceManager}

n

a

m

e

N

o

d

e

<

/

n

a

m

e

n

o

d

e

>

<

p

r

e

p

a

r

e

>

<

d

e

l

e

t

e

p

a

t

h

=

"

{nameNode}

m

a

s

t

e

r

<

/

m

a

s

t

e

r

>

<

n

a

m

e

>

S

p

a

r

k

p

r

e

p

a

r

e

J

o

b

<

/

n

a

m

e

>

<

c

l

a

s

s

>

s

p

a

r

k

.

P

r

e

p

a

r

e

<

/

c

l

a

s

s

>

<

j

a

r

>

{master} Spark prepare Job spark.Prepare

masterSparkprepareJobspark.Prepare{jarPath}

s

p

a

r

k

O

p

t

s

<

/

s

p

a

r

k

o

p

t

s

>

<

a

r

g

>

{sparkOpts}

sparkOpts{input}

p

a

r

t

i

t

i

o

n

s

<

/

a

r

g

>

<

a

r

g

>

{partitions}

partitions{trainPercent}

v

a

l

i

d

a

t

e

P

e

r

c

e

n

t

<

/

a

r

g

>

<

a

r

g

>

{validatePercent}

validatePercent{output} Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] 最后,启动该Oozie定时任务,其启动命令如图 59所示,启动后,会返回一个任务ID。

图 59 Oozie提交Spark Prepare任务 同时,该任务也可以在YARN任务监控看到,如图 510所示。

图 510 Oozie Spark Prepare任务在YARN中监控 从图 510中看到,Oozie任务流程的任务首先会启动一个Hadoop MapReduce任务,然后由此任务再启动一个Spark任务。在Ooize监控中查看其任务状态如图 511所示。

图 511 Spark Prepare任务在Oozie中监控

参考文章

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: