
跳出就是用户成功访问了网站的一个页面后就退出,不在继续访问网站的其它页面。而
跳出率就是用跳出次数除以访问次数。
关注跳出率,可以看出引流过来的访客是否能很快的被吸引,渠道引流过来的用户之间
的质量对比,对于应用优化前后跳出率的对比也能看出优化改进的成果
首先要识别哪些是跳出行为,要把这些跳出的访客最后一个访问的页面识别出来。那么
要抓住几个特征:
- 该页面是用户近期访问的第一个页面
这个可以通过该页面是否有上一个页面(last_page_id)来判断,如果这个表示为空,就说明这是这个访客这次访问的第一个页面。首次访问之后很长一段时间(自己设定),用户没继续再有其他页面的访问。
第一个特征的识别很简单,保留 last_page_id 为空的就可以了。但是第二个访问的
判断,其实有点麻烦,首先这不是用一条数据就能得出结论的,需要组合判断,要用一条存
在的数据和不存在的数据进行组合判断。而且要通过一个不存在的数据求得一条存在的数据。
更麻烦的他并不是永远不存在,而是在一定时间范围内不存在。那么如何识别有一定失效的
组合行为呢?
最简单的办法就是 Flink 自带的 CEP 技术。这个 CEP 非常适合通过多条数据组合来识
别某个事件。
用户跳出事件,本质上就是一个组合。
代码实现:
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONAware;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.utils.MyKafkaUtil;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;
import java.time.Duration;
import java.util.List;
import java.util.Map;
//数据流:web/app -> Nginx -> SpringBoot -> Kafka(ods) -> FlinkApp -> Kafka(dwd) -> FlinkApp -> Kafka(dwm)
//程 序:mockLog -> Nginx -> Logger.sh -> Kafka(ZK) -> baseLogApp -> kafka -> UserJumpDetailApp -> Kafka
public class UserJumpDetailApp {
public static void main(String[] args) throws Exception {
//TODO 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); //生产环境,与Kafka分区数保持一致
//1.1 设置CK&状态后端
//env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
//env.enableCheckpointing(5000L);
//env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//env.getCheckpointConfig().setCheckpointTimeout(10000L);
//env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
//TODO 2.读取Kafka主题的数据创建流
String sourceTopic = "dwd_page_log";
String groupId = "userJumpDetailApp";
String sinkTopic = "dwm_user_jump_detail";
DataStreamSource kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
//TODO 3.将每行数据转换为JSON对象并提取时间戳生成Watermark
SingleOutputStreamOperator jsonObjDS = kafkaDS.map(JSON::parseObject)
.assignTimestampsAndWatermarks(WatermarkStrategy
.forBoundedOutOfOrderness(Duration.ofSeconds(1))
.withTimestampAssigner(new SerializableTimestampAssigner() {
@Override
public long extractTimestamp(JSONObject element, long recordTimestamp) {
return element.getLong("ts");
}
}));
//TODO 4.定义模式序列
Pattern pattern = Pattern.begin("start").where(new SimpleCondition() {
@Override
public boolean filter(JSONObject value) throws Exception {
String lastPageId = value.getJSONObject("page").getString("last_page_id");
return lastPageId == null || lastPageId.length() <= 0;
}
}).next("next").where(new SimpleCondition() {
@Override
public boolean filter(JSONObject value) throws Exception {
String lastPageId = value.getJSONObject("page").getString("last_page_id");
return lastPageId == null || lastPageId.length() <= 0;
}
}).within(Time.seconds(10));
//使用循环模式 定义模式序列
Pattern.begin("start").where(new SimpleCondition() {
@Override
public boolean filter(JSONObject value) throws Exception {
String lastPageId = value.getJSONObject("page").getString("last_page_id");
return lastPageId == null || lastPageId.length() <= 0;
}
})
.times(2)
.consecutive() //指定严格近邻(next)
.within(Time.seconds(10));
//TODO 5.将模式序列作用到流上
PatternStream patternStream = CEP
.pattern(jsonObjDS.keyBy(json -> json.getJSONObject("common").getString("mid"))
, pattern);
//TODO 6.提取匹配上的和超时事件
OutputTag timeOutTag = new OutputTag("timeOut") {
};
SingleOutputStreamOperator selectDS = patternStream.select(timeOutTag,
new PatternTimeoutFunction() {
@Override
public JSONObject timeout(Map> map, long ts) throws Exception {
return map.get("start").get(0);
}
}, new PatternSelectFunction() {
@Override
public JSONObject select(Map> map) throws Exception {
return map.get("start").get(0);
}
});
DataStream timeOutDS = selectDS.getSideOutput(timeOutTag);
//TODO 7.UNIOn两种事件
DataStream unionDS = selectDS.union(timeOutDS);
//TODO 8.将数据写入Kafka
unionDS.print();
unionDS.map(JSONAware::toJSONString)
.addSink(MyKafkaUtil.getKafkaProducer(sinkTopic));
//TODO 9.启动任务
env.execute("UserJumpDetailApp");
}
}
流程图:
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)