本文由 发布,转载请注明出处,如有问题请联系我们! 发布时间: 2021-05-18从 demo 到生产 - 手把手写出实战需求的 Flink 广播程序

加载中

从 demo 到生产制造 - 从零写下实战演练要求的 Flink 广播节目程序流程

Flink 广播节目自变量在并行处理程序流程中饰演很重要的人物角色,适度的应用广播节目自变量会大大的提高程序执行高效率。

 

文中从简易的 demo 情景考虑,引进生产制造中具体的要求并明确提出构思与一部分实例编码,解决一般要求应当没什么难题,话不多说,赶快讨论一下这篇干货满满的广播节目程序流程应用实战演练吧。

 

1 啥是广播节目 

 

Flink 适用广播节目自变量,容许在每台设备上保存一个写保护的缓存文件自变量,数据信息存有运行内存中,在不一样的 task 所属的连接点上的都能获得到,能够降低很多的 shuffle 实际操作。

也就是说,广播节目自变量能够了解为一个公共性的共享资源自变量,能够把一个 dataset 的数据广播节目出来 ,随后不一样的 task 在连接点上面可以获得到,这一数据信息在每一个连接点上总是存有一份。

如果不应用 broadcast,则在每一个连接点中的每一个 task 上都必须复制一份 dataset 数据,较为消耗运行内存 (也就是一个连接点中很有可能会存有好几份 dataset 数据信息)

 

2 使用方法汇总

 

//1 复位数据信息

DataSet<Integer>  toBroadcast = env.fromElements(1,2,3)

//2 广播节目数据信息 api

withBroadcastSet(toBroadcast,"broadcastSetName")

//3 读取数据

Collection<integer> broadcastSet = getRuntimeContext().getBroadcastVariable("broadcastSetName"); 

 

留意

 

  • 广播节目自变量因为要长驻运行内存,程序流程完毕时才会无效,因此 信息量不适合过大

     

  • 广播节目自变量广播节目在复位后不兼容改动 (改动情景也是有方法)

 

3 基本实例演试

 

  • 基本实例广播节目自变量应用

 

这类情景下广播节目自变量便是载入性能参数,性能参数不容易转变,记牢第二一部分常见汇总公式计算就可以。

 

/**
 * @author 互联网大数据武林
 * @version 1.0
 * @date 2021/5/17.
 *
 */
public class BaseBroadCast {

    /**
     * broadcast广播节目自变量
     * 要求:
     *  flink会从数据库中获得到客户的名字
     *  最后必须把客户的名字和年纪信息内容打印出出去
     *  剖析:
     *  因此 就必须在中间的map解决的情况下获得客户的年纪信息内容
     *  提议吧客户的关联数据应用广播节目自变量开展解决
     *
     */
    public static void main(String[] args) throws Exception {

        //获得软件环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //1:提前准备必须广播节目的数据信息
        ArrayList<Tuple2<String, Integer>> broadData = new ArrayList<>();
        broadData.add(new Tuple2<>("zs", 18));
        broadData.add(new Tuple2<>("ls", 20));
        broadData.add(new Tuple2<>("ww", 17));
        DataSet<Tuple2<String, Integer>> tupleData =
                env.fromCollection(broadData);

        //1.1:解决必须广播节目的数据信息,把数据转化成map种类,map中的key便是客户名字,value便是客户年纪

        DataSet<HashMap<String, Integer>> toBroadcast = tupleData.map(new MapFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
            @Override
            public HashMap<String, Integer> map(Tuple2<String, Integer> value)
                    throws Exception {
                HashMap<String, Integer> res = new HashMap<>();
                res.put(value.f0, value.f1);
                return res;
            }
        });
        //源数据信息
        DataSource<String> data = env.fromElements("zs", "ls", "ww");
        //留意:在这儿必须应用到RichMapFunction获得广播节目自变量
        DataSet<String> result = data.map(new RichMapFunction<String, String>() {


            List<HashMap<String, Integer>> broadCastMap = new ArrayList<HashMap<String, Integer>>();


            HashMap<String, Integer> allMap = new HashMap<String, Integer>();

            /**
             * 这一方式总是实行一次
             * 能够在这儿完成一些复位的作用
             * 因此 ,就可以在open方式中获得广播节目自变量数据信息
             */
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //3:获得广播节目数据信息
                this.broadCastMap = getRuntimeContext().getBroadcastVariable("broadCastMapName");
                for (HashMap map : broadCastMap) {
                    allMap.putAll(map);
                }
            }

            @Override
            public String map(String value) throws Exception {
                Integer age = allMap.get(value);
                return value   ","   age;
            }
        }).withBroadcastSet(toBroadcast, "broadCastMapName");//2:实行广播节目数据信息的实际操作
        result.print();
    }

}

 

 

生产制造实例演试

 

 

具体生产制造中有时是必须升级广播节目自变量的,但并不是自动更新的,一般会设定一个升级周期时间,数分钟,几个小时的都很普遍,依据业务流程而定。

 

因为广播节目自变量必须升级,解决方案一般是必须将广播节目自变量制成另一个 source,开展流与流中间的 connect 实际操作,按时更新广播节目的source,进而做到广播节目自变量改动的目地。

 

4.1.1 应用 redis 中的数据信息做为广播节目自变量的构思:

 

消費 kafka 中的数据信息,应用 redis 中的数据信息做为广播节目数据信息,开展数据预处理后 写到 kafka中。

 

实例编码分成三个一部分:kafka 经营者,redis 广播节目数据库,实行通道类

 

  • 搭建 kafka 转化成者,仿真模拟数据信息 (下列编码的消費信息源均是这里生产制造)

 

/**
 * 仿真模拟数据库
 */
public class kafkaProducer {

    public static void main(String[] args) throws Exception{
        Properties prop = new Properties();
        //特定kafka broker详细地址
        prop.put("bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092");
        //特定key value的实例化方法
        prop.put("key.serializer", StringSerializer.class.getName());
        prop.put("value.serializer", StringSerializer.class.getName());
        //特定topic名字
        String topic = "data_flink_bigdata_test";

        //建立producer连接
        KafkaProducer<String, String> producer = new KafkaProducer<String,String>(prop);

        //{"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]}


        while(true){
            String message = "{\"dt\":\"" getCurrentTime() "\",\"countryCode\":\"" getCountryCode() "\",\"data\":[{\"type\":\"" getRandomType() "\",\"score\":" getRandomScore() ",\"level\":\"" getRandomLevel() "\"},{\"type\":\"" getRandomType() "\",\"score\":" getRandomScore() ",\"level\":\"" getRandomLevel() "\"}]}";
            System.out.println(message);
            //同歩的方法,往Kafka里边生产制造数据信息

            producer.send(new ProducerRecord<String, String>(topic,message));


            Thread.sleep(2000);
        }
        //关掉连接
        //producer.close();
    }

    public static String getCurrentTime(){
        SimpleDateFormat sdf = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
        return sdf.format(new Date());
    }

    public static String getCountryCode(){
        String[] types = {"US","TW","HK","PK","KW","SA","IN"};
        Random random = new Random();
        int i = random.nextInt(types.length);
        return types[i];
    }


    public static String getRandomType(){
        String[] types = {"s1","s2","s3","s4","s5"};
        Random random = new Random();
        int i = random.nextInt(types.length);
        return types[i];
    }

    public static double getRandomScore(){
        double[] types = {0.3,0.2,0.1,0.5,0.8};
        Random random = new Random();
        int i = random.nextInt(types.length);
        return types[i];
    }

    public static String getRandomLevel(){
        String[] types = {"A","A ","B","C","D"};
        Random random = new Random();
        int i = random.nextInt(types.length);
        return types[i];
    }


}

 

 

 

  • redis 数据信息做为广播节目数据信息

 

/**
 * redis中提前准备的数据库
 * source:
 *
 * hset areas AREA_US US
 * hset areas AREA_CT TW,HK
 * hset areas AREA_AR PK,KW,SA
 * hset areas AREA_IN IN
 *
 * result:
 *
 * HashMap
 *
 * US,AREA_US
 * TW,AREA_CT
 * HK,AREA_CT
 *
 */
public class BigDataRedisSource implements SourceFunction<HashMap<String,String>> {

    private Logger logger= LoggerFactory.getLogger(BigDataRedisSource.class);

    private Jedis jedis;
    private boolean isRunning=true;

    @Override
    public void run(SourceContext<HashMap<String, String>> cxt) throws Exception {
        this.jedis = new Jedis("localhost",6379);
        HashMap<String, String> map = new HashMap<>();
        while(isRunning){
          try{
              map.clear();
              Map<String, String> areas = jedis.hgetAll("areas");
              /**
               * AREA_CT TT,AA
               *
               * map:
               * TT,AREA_CT
               * AA,AREA_CT
               */
              for(Map.Entry<String,String> entry: areas.entrySet()){
                  String area = entry.getKey();
                  String value = entry.getValue();
                  String[] fields = value.split(",");
                  for(String country:fields){
                      map.put(country,area);
                  }

              }
              if(map.size() > 0 ){
                  cxt.collect(map);
              }
              Thread.sleep(60000);
          }catch (JedisConnectionException e){
              logger.error("redis联接出现异常",e.getCause());
              this.jedis = new Jedis("localhost",6379);
          }catch (Exception e){
              logger.error("数据库出现异常",e.getCause());
          }

        }

    }

    @Override
    public void cancel() {
        isRunning=false;
        if(jedis != null){
            jedis.close();
        }

    }
}

 

 
  • 程序流程通道类

 

/**
 * @author 互联网大数据武林
 * @version 1.0
 * @date 2021/4/25.
 *
 *
 * 应用 kafka 輸出流和 redis 輸出流 开展合拼清理
 *
 *
 */
public class 广播节目方法一分2个流开展connnect实际操作 {
    public static void main(String[] args) throws Exception {

        //1 获得实行自然环境

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(3);//并行度在于 kafka 中的系统分区数 维持与kafka 一致

        //2 设定 checkpoint

        //打开checkpoint 一分钟一次
        env.enableCheckpointing(60000);
        //设定checkpoint 仅一次词义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //2次checkpoint的间隔时间
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        //最多个适用一个checkpoint另外实行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //checkpoint请求超时的時间
        env.getCheckpointConfig().setCheckpointTimeout(60000);       // 每日任务不成功后也保存 checkPonit数据信息
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 试着重新启动的频次
                Time.of(10, TimeUnit.SECONDS) // 间距
        ));

        // 设定 checkpoint 途径
       // env.setStateBackend(new FsStateBackend("hdfs://192.168.123.103:9000/flink/checkpoint"));


        //3 设定 kafka Flink 消費

        //建立 Kafka 消費信息内容

        String topic="data_flink_bigdata_test";
        Properties consumerProperties = new Properties();
        consumerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092");
        consumerProperties.put("group.id","data_test_new_1");
        consumerProperties.put("enable.auto.commit", "false");
        consumerProperties.put("auto.offset.reset","earliest");


        //4 获得 kafka 与 redis 数据库

        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), consumerProperties);

        DataStreamSource<String> kafkaSourceData = env.addSource(consumer);


        //立即应用广播节目的方法 事后做为2个数据流分析来实际操作

        DataStream<HashMap<String, String>> redisSourceData = env.addSource(new NxRedisSource()).broadcast();

        //5 2个数据库开展 ETL 解决 应用 connect 联接解决

        SingleOutputStreamOperator<String> etlData = kafkaSourceData.connect(redisSourceData).flatMap(new MyETLProcessFunction());


        //6 新创建一个 kafka 经营者 开展推送
        String outputTopic="allDataClean";


        // 輸出给中下游 kafka

        Properties producerProperties = new Properties();
        producerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092");

        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(outputTopic,
                new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()),
                producerProperties);

        etlData.addSink(producer);


        //7 递交每日任务实行

        env.execute("DataClean");


    }

    /**
     * in 1 kafka source   :
     *
     *  {"dt":"2018-01-01 10:11:11","countryCode":"US","data":[{"type":"s1","score":0.3,"level":"A"},{"type":"s2","score":0.2,"level":"B"}]}
     *
     *
     * in 2 redis source
     *
     *
     *  US,AREA_US
     *  TW,AREA_CT
     *  HK,AREA_CT
     *
     *
     *
     * out 合拼后的source
     */
    private static class MyETLProcessFunction implements CoFlatMapFunction<String,HashMap<String,String>,String> {

        //用于储存 redis 中的数据信息
        HashMap<String,String> allMap = new HashMap<String,String>();

        @Override
        public void flatMap1(String line, Collector<String> collector) throws Exception {

            //将 kafka 数据信息 按 redis 数据信息开展更换
            // s -> kafka 数据信息
            //allMap -> redis 数据信息

            JSONObject jsonObject = JSONObject.parseObject(line);
            String dt = jsonObject.getString("dt");
            String countryCode = jsonObject.getString("countryCode");
            //能够依据countryCode获得战区的名称
            String area = allMap.get(countryCode);
            JSONArray data = jsonObject.getJSONArray("data");
            for (int i = 0; i < data.size(); i  ) {
                JSONObject dataObject = data.getJSONObject(i);
                System.out.println("战区:" area);
                dataObject.put("dt", dt);
                dataObject.put("area", area);
                //中下游获得到数据信息的情况下,也就是一个json文件格式的数据信息
                collector.collect(dataObject.toJSONString());
            }




        }

        @Override
        public void flatMap2(HashMap<String, String> stringStringHashMap, Collector<String> collector) throws Exception {
            //将 redis 中 数据信息开展取值
            allMap = stringStringHashMap;

        }
    }
}

 


 

4.1.2 应用 MapState 开展广播节目程序流程提升:

 

提升的点取决于 (下边编码中 TODO 标志点):

 

  1. 开展数据信息广播节目时必须应用 MapStateDescriptor 开展申请注册

     

  2. 开展2个流合拼解决时 应用 process 涵数

     

  3. 处理函数中应用 MapState  来存储 redis 中的数据信息

 

/**
 * @author 互联网大数据武林
 * @version 1.0
 * @date 2021/4/25.
 * <p>
 * 应用 kafka 輸出流和 redis 輸出流 开展合拼清理
 * <p>
 * 网上应用的方法
 */
public class 广播节目方法2应用MapState对方法1更新改造 {
    public static void main(String[] args) throws Exception {

        //1 获得实行自然环境

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(3);//并行度在于 kafka 中的系统分区数 维持与kafka 一致

        //2 设定 checkpoint

        //打开checkpoint 一分钟一次
        env.enableCheckpointing(60000);
        //设定checkpoint 仅一次词义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //2次checkpoint的间隔时间
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        //最多个适用一个checkpoint另外实行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //checkpoint请求超时的時间
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 每日任务不成功后也保存 checkPonit数据信息
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 试着重新启动的频次
                Time.of(10, TimeUnit.SECONDS) // 间距
        ));

        // 设定 checkpoint 途径
        //env.setStateBackend(new FsStateBackend("hdfs://192.168.123.103:9000/flink/checkpoint"));


        //3 设定 kafka Flink 消費


        //建立 Kafka 消費信息内容


        String topic = "data_flink_bigdata_test";
        Properties consumerProperties = new Properties();
        consumerProperties.put("bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092");
        consumerProperties.put("group.id", "data_flink_fpy_test_consumer");
        consumerProperties.put("enable.auto.commit", "false");
        consumerProperties.put("auto.offset.reset", "earliest");


        //4 获得 kafka 与 redis 数据库

        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), consumerProperties);

        DataStreamSource<String> kafkaSourceData = env.addSource(consumer);

        // 获得 redis 数据库而且开展广播节目  网上的广播节目也是 source   广播节目方式

        MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<String, String>(
                "RedisBdStream",
                String.class,
                String.class
        );

        //5 2个数据库开展 ETL 解决 应用 connect 联接解决 TODO process 更换 FlatMap
        //TODO 应用 MapState 来开展广播节目
        BroadcastStream<HashMap<String, String>> redisSourceData = env.addSource(new NxRedisSource()).broadcast(descriptor);

        SingleOutputStreamOperator<String> etlData = kafkaSourceData.connect(redisSourceData).process(new MyETLProcessFunction());


        //6 新创建一个 kafka 经营者 开展推送
        String outputTopic = "allDataClean";


        // 輸出给中下游 kafka

        Properties producerProperties = new Properties();
        producerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092");

        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(outputTopic,
                new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()),
                producerProperties);

        etlData.addSink(producer);


        etlData.print();


        //7 递交每日任务实行

        env.execute("DataClean");


    }

    /**
     * in 1 kafka source
     * in 2 redis source
     * <p>
     * out 合拼后的source
     */
    private static class MyETLProcessFunction extends BroadcastProcessFunction<String, HashMap<String, String>, String> {

        // TODO 留意这里 descriptor 的名字必须与 广播节目时 (99行编码) 名字一致
        MapStateDescriptor<String, String> descriptor =
                new MapStateDescriptor<String, String>(
                        "RedisBdStream",
                        String.class,
                        String.class
                );


        //逻辑性的解决方式 kafka 的数据信息
        @Override
        public void processElement(String line, ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
            //将 kafka 数据信息 按 redis 数据信息开展更换
            // s -> kafka 数据信息
            //allMap -> redis 数据信息
            System.out.println("into  processElement ");
            JSONObject jsonObject = JSONObject.parseObject(line);
            String dt = jsonObject.getString("dt");
            String countryCode = jsonObject.getString("countryCode");
            //能够依据countryCode获得战区的名称

            // String area = allDataMap.get(countryCode);
            //TODO 从MapState中获得相匹配的Code
            String area = readOnlyContext.getBroadcastState(descriptor).get(countryCode);

            JSONArray data = jsonObject.getJSONArray("data");

            for (int i = 0; i < data.size(); i  ) {
                JSONObject dataObject = data.getJSONObject(i);
                System.out.println("战区:"   area);
                dataObject.put("dt", dt);
                dataObject.put("area", area);
                //中下游获得到数据信息的情况下,也就是一个json文件格式的数据信息
                collector.collect(dataObject.toJSONString());
            }


        }


        //广播节目流的解决方式
        @Override
        public void processBroadcastElement(HashMap<String, String> stringStringHashMap, Context context, Collector<String> collector) throws Exception {


            // 将接受到的控制参数放进 broadcast state 中
            //key , flink
            // 将 RedisMap中的值放进 MapState 中
            for (Map.Entry<String, String> entry : stringStringHashMap.entrySet()) {
                //TODO 应用 MapState 储存 redis 数据信息
                context.getBroadcastState(descriptor).put(entry.getKey(), entry.getValue());
                System.out.println(entry);
            }


        }
    }
}
 

 

4.2 关联型数据库查询广播节目自变量实例构思:

 

要求:

 

在 flink 流式的解决中经常必须载入数据库查询中的数据信息做为标准开展数据处理方法,有一些表做为系统软件表,实时查询高效率很低,此刻就必须将这种数据信息做为广播节目数据信息,而另外这种数据信息很有可能也必须按时的升级。

 

构思:

 

数据库表的广播节目自变量构思同redis等缓存文件广播节目数据信息的构思相近,也是应用 2个source 开展 connect 解决 , 在数据库表的 source 中按时刷新数据就可以了。

 

不同之处取决于这儿把数据库的实际操作转为另一个java工具,在复位时应用了静态代码块,在广播节目时应用了流的 connect 实际操作。

 

实例编码分成三个一部分:数据库表广播节目源,数据库操作类,实行通道类

 

  • 数据库表广播节目源

 

/**
 * @author 互联网大数据武林
 * @Date:2021-5-17
 * DB source 根源 开展广播节目
 */
public class BigDataDBBroadSource extends RichSourceFunction<Map<String,Object>> {

    private  final Logger logger = LoggerFactory.getLogger(BigDataDBBroadSource.class);
    private volatile boolean isRunning = true;

    public BigDataDBBroadSource() {

    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

    }

    @Override
    public void run(SourceContext<Map<String,Object>> sourceContext) throws Exception {


        while (isRunning) {
            //TODO  应用的是一个 DB 根源的 source 60 s 更新一次 开展往中下游推送
            TimeUnit.SECONDS.sleep(60);

            Map<String,Object> map = new HashMap<String,Object>();

            //标准配对关键字

            final DbBroadCastListInitUtil.Build ruleListInitUtil = new DbBroadCastListInitUtil.Build();

            ruleListInitUtil.reloadRule();

            map.put("dbsource", ruleListInitUtil);


            if(map.size() > 0) {
                sourceContext.collect(map);
            }
        }
    }

    @Override
    public void cancel() {
        this.isRunning = false;
    }

    @Override
    public void close() throws Exception {
        super.close();

    }
}

 

  • 实行数据库操作类

 

/**
 * 数据库查询标准表复位
 *
 * @author 互联网大数据武林
 * @Date:2021-5-17
 *
 * US,AREA_US
 * TW,AREA_CT
 * HK,AREA_CT
 *
 */
public class DbBroadCastListInitUtil implements Serializable {

    private static final Logger LOG = LoggerFactory.getLogger(DbBroadCastListInitUtil.class);


    // 数据库查询标准信息内容

    public static Map<String, String> areasMap = new HashMap<String, String>();


    static {
        LOG.info("复位 db 控制模块");
        Connection dbConn = null;

        try {

            if (dbConn == null || dbConn.isClosed()) {
                LOG.info("init dbConn start....");
                LOG.info("init dbConn end....");
            }

            HashMap<String, String> map = Maps.newHashMap();

            map.put("US","AREA_US");
            map.put("TW","AREA_CT");
            map.put("HK","AREA_CT");

            areasMap = map;


        } catch (Exception e) {
            LOG.error("init database [status:error]", e);
            throw new RuntimeException(" static article rule list db select error!  , " e.getMessage()) ;

        } finally {
            if(dbConn != null) {
                try {
                    dbConn.close();
                } catch (SQLException e) {
                    LOG.error("dbConn conn close error!",e);
                }
            }


        }
    }




    public static class Build {

        // 数据库查询标准信息内容

        public static Map<String, String> newAreasMap = new HashMap<String, String>();

        public void reloadRule() throws Exception {
            LOG.info("再次复位 DB reloadRule 控制模块");
            Connection dbConn = null;
            try {
                if (dbConn == null || dbConn.isClosed()) {
                    LOG.info("init dbConn start....");
                    LOG.info("init dbConn end....");
                }

                HashMap<String, String> map = Maps.newHashMap();

                map.put("US","AREA_US");
                map.put("TW","AREA_CT");
                map.put("HK","AREA_CT");
                map.put("AM","AREA_CT");

                newAreasMap = map;

            } catch (Exception e) {
                LOG.error("init database [status:error]", e);
                throw e;
            } finally {
                if(dbConn != null) {
                    try {
                        dbConn.close();
                    } catch (SQLException e) {
                        LOG.error("dbConn conn close error!",e);
                    }
                }


            }
        }

        public static Map<String, String> getNewAreasMap() {
            return newAreasMap;
        }
    }



    public static Build build() throws Exception {
        final DbBroadCastListInitUtil.Build build = new DbBroadCastListInitUtil.Build();
        build.reloadRule();
        return build;
    }


}

 

 
  • 程序流程通道类

 

/**
 * @author 互联网大数据武林
 * @version 1.0
 * @date 2021/4/25.
 * <p>
 * 应用 kafka 輸出流和 redis 輸出流 开展合拼清理
 * <p>
 * 网上应用的方法
 */
public class 广播节目方法3应用DB对方法广播节目 {
    public static void main(String[] args) throws Exception {

        //1 获得实行自然环境

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(3);//并行度在于 kafka 中的系统分区数 维持与kafka 一致

        //2 设定 checkpoint

        //打开checkpoint 一分钟一次
        env.enableCheckpointing(60000);
        //设定checkpoint 仅一次词义
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //2次checkpoint的间隔时间
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        //最多个适用一个checkpoint另外实行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //checkpoint请求超时的時间
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 每日任务不成功后也保存 checkPonit数据信息
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 试着重新启动的频次
                Time.of(10, TimeUnit.SECONDS) // 间距
        ));

        // 设定 checkpoint 途径
        //env.setStateBackend(new FsStateBackend("hdfs://192.168.123.103:9000/flink/checkpoint"));

        //3 设定 kafka Flink 消費
        //建立 Kafka 消費信息内容

        String topic = "data_flink_bigdata_test";
        Properties consumerProperties = new Properties();
        consumerProperties.put("bootstrap.servers", "10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092");
        consumerProperties.put("group.id", "data_flink_bigdata_test_consumer");
        consumerProperties.put("enable.auto.commit", "false");
        consumerProperties.put("auto.offset.reset", "earliest");


        //4 获得 kafka 与 redis 数据库

        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), consumerProperties);

        DataStreamSource<String> kafkaSourceData = env.addSource(consumer);

        // 获得 redis 数据库而且开展广播节目  网上的广播节目也是 source   广播节目方式

        MapStateDescriptor<String, String> descriptor = new MapStateDescriptor<String, String>(
                "RedisBdStream",
                String.class,
                String.class
        );

        //应用 数据库查询源 来开展广播节目

        BroadcastStream<Map<String, Object>> broadcast = env.addSource(new BigDataDBBroadSource()).broadcast(descriptor);


        //5 2个数据库开展 ETL 解决 应用 connect 联接解决  数据库表信息内容开展广播节目

        SingleOutputStreamOperator<String> etlData = kafkaSourceData.connect(broadcast).process(new MyETLProcessFunction());


        //6 新创建一个 kafka 经营者 开展推送
        String outputTopic = "allDataClean";


        // 輸出给中下游 kafka

      /*  Properties producerProperties = new Properties();
        producerProperties.put("bootstrap.servers","10.20.7.20:9092,10.20.7.51:9092,10.20.7.50:9092");

        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(outputTopic,
                new KeyedSerializationSchemaWrapper<String>(new SimpleStringSchema()),
                producerProperties);

        etlData.addSink(producer);*/


        etlData.print();


        //7 递交每日任务实行

        env.execute("DataClean");


    }

    /**
     * in 1 kafka source
     * in 2 redis source
     * <p>
     * out 合拼后的source
     *
     *
     * TODO 程序流程运行后产生的事:
     *
     * 1 运作 open 方式 ,开启静态方法给 areasMap 取值
     * 2 运作 processElement 方式前, areasMap 肯定是值的,一切正常开展解决
     * 3 当到 BigDataDBBroadSource 轮流培训的時间后 ,更新数据库表数据信息到 areasMap ,这时 areasMap 会添加新值,进行广播节目自变量的升级
     * 4 广播节目自变量升级后 再次开展 processElement 数据处理方法
     *
     */
    private static class MyETLProcessFunction extends BroadcastProcessFunction<String, Map<String, Object>, String> {

        public  Map<String, String> areasMap = new HashMap<String, String>();

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            //开启静态方法去取值
            areasMap = DbBroadCastListInitUtil.areasMap;


        }

        //逻辑性的解决方式 kafka 的数据信息
        @Override
        public void processElement(String line, ReadOnlyContext readOnlyContext, Collector<String> collector) throws Exception {
            //将 kafka 数据信息 按 redis 数据信息开展更换
            // s -> kafka 数据信息
            //allMap -> redis 数据信息
            System.out.println("into  processElement ");
            JSONObject jsonObject = JSONObject.parseObject(line);
            String dt = jsonObject.getString("dt");
            String countryCode = jsonObject.getString("countryCode");
            //能够依据countryCode获得战区的名称
            // String area = allDataMap.get(countryCode);
            //从MapState中获得相匹配的Code
            String area =areasMap.get(countryCode);

            JSONArray data = jsonObject.getJSONArray("data");
            for (int i = 0; i < data.size(); i  ) {
                JSONObject dataObject = data.getJSONObject(i);
                System.out.println("战区:"   area);
                dataObject.put("dt", dt);
                dataObject.put("area", area);
                //中下游获得到数据信息的情况下,也就是一个json文件格式的数据信息
                collector.collect(dataObject.toJSONString());
            }


        }

        @Override
        public void processBroadcastElement(Map<String, Object> value, Context ctx, Collector<String> out) throws Exception {

            //广播节目算法按时更新后 将数据信息发送至中下游
            if (value != null && value.size() > 0) {
                Object obj = value.getOrDefault("dbsource", null);
                if (obj != null) {

                    DbBroadCastListInitUtil.Build biulder = (DbBroadCastListInitUtil.Build) obj;
                    //升级了 数据库查询数据信息
                    areasMap = biulder.getNewAreasMap();
                    System.out.println("数据库查询更新算法运作进行!");

                }
            }

        }
    }

}

 

 

 

留意看最终处理函数运行后产生的事:

     

  1.  运作 open 方式 ,开启数据库操作java工具静态方法给 areasMap 取值

     

  2.  运作实行类 processElement 方式前,这时 areasMap 肯定是值的,一切正常开展解决

     

  3.  当到数据库查询源轮流培训的時间后 ,更新数据库表数据信息到 areasMap ,这时 areasMap 会添加新值,进行广播节目自变量的升级

     

  4.  广播节目自变量升级后 再次开展实行类 processElement 数据处理方法

 

 

 


 

到此 广播节目程序流程的应用详细介绍完后, 针对广播节目数据信息不用更改的状况 参照基本示例;针对从缓存文件或数据库查询等获得广播节目自变量,另外又必须更改的状况,参照转化成示例就可以。

 


PS:  原文中编码详细地址  ----   https://gitee.com/fanpengyi0922/flink-window-broadcast

 

 

   THE END  

 

评论(0条)

刀客源码 游客评论