Flink应用二次汇聚完成TopN测算-乱序数据信息

一、情况表明:

在上一篇文章完成了TopN测算,可是遇到晚到数据信息则会没法在当今对话框测算,必须对在其中的键控情况提升

Flink应用二次汇聚完成TopN测算

此次要求是对数据信息开展统计分析,规定每过5秒,輸出近期十分钟内浏览量数最多的前N个URL,数据流分析浏览以下(每一次一条从端口号传到):

208.115.111.72 - - 17/05/2015:10:25:49  0000 GET /?N=A&page=21   //15:50-25:50对话框数据信息
208.115.111.72 - - 17/05/2015:10:25:50  0000 GET /?N=A&page=21
208.115.111.72 - - 17/05/2015:10:25:51  0000 GET /?N=A&page=21
208.115.111.72 - - 17/05/2015:10:25:52  0000 GET /?N=A&page=21   //第一次开启测算,15:50-25:50对话框
208.115.111.72 - - 17/05/2015:10:25:47  0000 GET /?N=A&          //晚到数据信息,不一样url
208.115.111.72 - - 17/05/2015:10:25:53  0000 GET /?N=A&page=21   //第二次开启测算,15:50-25:50对话框
208.115.111.72 - - 17/05/2015:10:25:46  0000 GET /?N=A&page=21   //晚到数据信息
208.115.111.72 - - 17/05/2015:10:25:54  0000 GET /?N=A&page=21   //第三次开启测算

最终统计分析輸出結果以下(晚到数据信息均在25:50对话框):

==============2015-05-17 10:25:50.0==============               //第一次开启数值
Top1 Url:/?N=A&page=21 Counts:1
==============2015-05-17 10:25:50.0==============

==============2015-05-17 10:25:50.0==============               //第二次开启数值
Top1 Url:/?N=A&page=21 Counts:1
Top2 Url:/?N=A& Counts:1
==============2015-05-17 10:25:50.0==============

==============2015-05-17 10:25:50.0==============               //第三次开启数值
Top1 Url:/?N=A&page=21 Counts:2
Top2 Url:/?N=A& Counts:1
==============2015-05-17 10:25:50.0==============

二、完成全过程

  1. 完成构思:
    ①创建自然环境,设定并行度及CK。
    ②界定watermark对策及事情時间,读取数据并相匹配到JavaBean。
    ③第一次汇聚,按url排序开窗通风汇聚,应用aggregate算法开展增加量测算。
    ④第二次汇聚,按对话框汇聚,应用MapState存取数据,界定第一个计时器,在watermark做到后一秒开启,对对话框数据信息排列輸出,界定第二个计时器,对话框关掉后才清晰情况。
    ⑤打印出結果及实行。

ps:乱序数据信息不可以应用载入当地文本文档的方法检测,文档载入载入较为快,没法观查到晚到数据处理方法实际效果,乱序数据信息的软件开发测试这儿从服务器端口读取数据的方法检测

  1. 编码关键点表明:

只对于提升一部分编码表明,别的编码能够 在次序数据信息一篇文章查询,这儿获取调用KeyedProcessFunction里边方式的一部分编码

@Override
public void processElement(UrlCount value, Context ctx, Collector<String> out) throws Exception {
	//情况装进数据信息
	mapState.put(value.getUrl(), value);
	//计时器,对话框一秒后开启
	ctx.timerService().registerEventTimeTimer(value.getWindowEnd() 1L);
	//再加一个计时器来消除情况用,在对话框关掉后再消除情况,那样延迟时间数据到达后对话框还能做排列
	ctx.timerService().registerEventTimeTimer(value.getWindowEnd() 61001L);
}
//计时器內容
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
	if (timestamp == ctx.getCurrentKey() 61001L){
		mapState.clear();
		return;}
...
  • 这儿改成MapState,倘若应用ListState,进去晚到数据信息后,则会发生同一个url在同一个对话框的统计分析发生好几个记数的状况,目录情况不具有去重复作用,故在这儿应用map情况来完成去重复。
  • 这儿应用计时器来消除情况,原书写是在onTimer最终排列完立即消除情况,则会造成晚到数据到达后,原对话框别的数据信息被消除掉没法完成排行的輸出,这儿计时器的时间在61001ms后消除情况数据信息。
  • 计时器61001ms = 容许晚到数据信息一秒(forBoundedOutOfOrderness) 对话框晚到数据信息1分钟(allowedLateness) 第一个计时器1ms。

三、详细编码

package com.test.topN;

import bean.ApacheLog;
import bean.UrlCount;
import org.apache.commons.compress.utils.Lists;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
/**
 * @author: Rango
 * @create: 2021-05-26 10:16
 * @description: 每过5秒,輸出近期十分钟内浏览量数最多的前N个URL
 **/
public class URLTopN3 {
    public static void main(String[] args) throws Exception {

        //1.创建自然环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        //2.载入端口号数据信息并投射到JavaBean,并界定watermark時间词义
        WatermarkStrategy<ApacheLog> wms = WatermarkStrategy
                .<ApacheLog>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                .withTimestampAssigner(new SerializableTimestampAssigner<ApacheLog>() {
                    @Override
                    public long extractTimestamp(ApacheLog element, long recordTimestamp) {
                        return element.getTs();
                    }});

        SingleOutputStreamOperator<ApacheLog> apacheLogDS = env.socketTextStream("hadoop102", 9999)
                .map(new MapFunction<String, ApacheLog>() {
                    @Override
                    public ApacheLog map(String value) throws Exception {
                        SimpleDateFormat sdf = new SimpleDateFormat("dd/MM/yy:HH:mm:ss");
                        String[] split = value.split(" ");
                        return new ApacheLog(split[0],
                                split[2],
                                sdf.parse(split[3]).getTime(),
                                split[5],
                                split[6]);
                    }})
                .assignTimestampsAndWatermarks(wms);

        //3.第一次汇聚,按url变为tuple2分组,开窗通风,增加量汇聚
        SingleOutputStreamOperator<UrlCount> aggregateDS = apacheLogDS
                .map(new MapFunction<ApacheLog, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(ApacheLog value) throws Exception {
                return new Tuple2<>(value.getUrl(), 1);
            }}).keyBy(data -> data.f0)
                .window(SlidingEventTimeWindows.of(Time.minutes(10),Time.seconds(5)))
                .allowedLateness(Time.minutes(1))
                .aggregate(new HotUrlAggFunc(), new HotUrlWindowFunc());

        //4.第二次汇聚,对第一次汇聚輸出按对话框排序,再全对话框汇聚,创建计时器你,每5秒左右开启一次
        SingleOutputStreamOperator<String> processDS = aggregateDS
                .keyBy(data -> data.getWindowEnd())
                .process(new HotUrlProcessFunc(5));

        processDS.print();
        env.execute();
    }
    //完成AggregateFunction类中的方式
    public static class HotUrlAggFunc implements AggregateFunction<Tuple2<String, Integer>,Integer,Integer>{
        @Override
        public Integer createAccumulator() {return 0;}
        @Override
        public Integer add(Tuple2<String, Integer> value, Integer accumulator) { return accumulator 1;}
        @Override
        public Integer getResult(Integer accumulator) {return accumulator;}
        @Override
        public Integer merge(Integer a, Integer b) {return a b; }
    }
    //完成窗口函数的apply方式,把累积涵数輸出的整数金额結果,变换为javabean类urlcount来做輸出,便捷事后按对话框汇聚
    public static class HotUrlWindowFunc implements WindowFunction<Integer, UrlCount,String, TimeWindow> {
        @Override
        public void apply(String urls, TimeWindow window, Iterable<Integer> input, Collector<UrlCount> out) throws Exception {
            //获得按key求和后的频次并新创建javabean(urlcount)做为回到
            Integer count = input.iterator().next();
            out.collect(new UrlCount(urls,window.getEnd(),count));
        }  }
    //承继KeyedProcessFunction方式,调用processElemnt方式
    public static class HotUrlProcessFunc extends KeyedProcessFunction<Long,UrlCount,String>{
        //界定TopN为入参
        private Integer TopN;
        public HotUrlProcessFunc(Integer topN) {
            TopN = topN;
        }
        //界定情况
        private MapState <String,UrlCount>mapState;
        //open方式中复位情况
        @Override
        public void open(Configuration parameters) throws Exception {
            mapState = getRuntimeContext()
                    .getMapState(new MapStateDescriptor<String, UrlCount>("map-state",String.class,UrlCount.class));
        }
        @Override
        public void processElement(UrlCount value, Context ctx, Collector<String> out) throws Exception {
            //情况装进数据信息
            mapState.put(value.getUrl(), value);
            //计时器,对话框一秒后开启
            ctx.timerService().registerEventTimeTimer(value.getWindowEnd() 1L);
            //再加一个计时器来消除情况用,在对话框关掉后再消除情况,那样延迟时间数据到达后对话框还能做排列
            ctx.timerService().registerEventTimeTimer(value.getWindowEnd() 61001L);
        }
        //计时器內容
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            if (timestamp == ctx.getCurrentKey() 61001L){
                mapState.clear();
                return;}

            //取下情况数据信息
            Iterator<Map.Entry<String, UrlCount>> iterator = mapState.iterator();
            ArrayList<Map.Entry<String, UrlCount>> entries = Lists.newArrayList(iterator);

            //排列
            entries.sort(((o1, o2) -> o2.getValue().getCount()-o1.getValue().getCount()));

            //排列后装进StringBulider做为輸出TopN
            StringBuilder sb = new StringBuilder();
            sb.append("==============")
                    .append(new Timestamp(timestamp - 1L))
                    .append("==============")
                    .append("\n");
            for (int i = 0; i < Math.min(TopN,entries.size()); i  ) {
                UrlCount urlCount = entries.get(i).getValue();
                sb.append("Top").append(i 1);
                sb.append(" Url:").append(urlCount.getUrl());
                sb.append(" Counts:").append(urlCount.getCount());
                sb.append("\n");
            }
            sb.append("==============")
                    .append(new Timestamp(timestamp - 1L))
                    .append("==============")
                    .append("\n")
                    .append("\n");

            out.collect(sb.toString());
            Thread.sleep(200);
            }}}

投射数据库的JavaBean

package bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class ApacheLog {
    private String ip;
    private String userId;
    private Long ts;
    private String method;
    private String url;
}

第一次汇聚輸出的JavaBean

package bean;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class UrlCount {
    private String url;
    private Long windowEnd;
    private Integer count;
}

交流学习,有一切难题还请随时随地评价强调沟通交流。

评论(0条)

刀客源码 游客评论