阅读背景:

flink stream join redis_weixin_43315211的博客_flink join redis

来源:互联网 
package flink.stream.asyncIOSide;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.redis.RedisClient;
import io.vertx.redis.RedisOptions;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static stranger.PropertyLoader.getPropertiesConfig;

/**
 * @author [tu.tengfei]
 * @description 通过异步的方式,将流表与维表进行连接
 * @date 2019/5/28
 */
public class AsyncIOSideJoinRedis {
    public static void main(String[] args) throws Exception {

        //读取配置文件
        final String configPath = "config.properties";
        final Properties pro = getPropertiesConfig(configPath);
        final String topic = "stranger";
        final String groupId = "mainStranger";

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setParallelism(1);

        //读取kafka数据流
        String bootstrapServers = pro.getProperty("bootstrap.servers");
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", bootstrapServers);//kafka的节点的IP或者hostName,多个使用逗号分隔
        properties.setProperty("group.id", groupId);//flink consumer flink的消费者的group.id
        FlinkKafkaConsumer011<String> kafkaStream = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties);
        kafkaStream.setStartFromLatest();

        DataStreamSource<String> mainStream = env.addSource(kafkaStream);
//        mainStream.map(x->x.split("\t"));

        //处理kafka数据:时间  人员ID  行为状态  卡口ID
        DataStream<Tuple4<String, String, String, String>> mainMap = mainStream.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
            @Override
            public Tuple4<String, String, String, String> map(String s) throws Exception {
                String[] split = s.split("\t");
                return new Tuple4<>(split[0], split[1], split[2], split[3]);
            }
        });

        //构建异步查询
        DataStream<Tuple5<String, String, String, String, String>> asyncMainStream;
        if (true) {
            asyncMainStream = AsyncDataStream.orderedWait(
                    mainMap,
                    new AsyncBehavior(),
                    1000000L,
                    TimeUnit.MILLISECONDS,
                    20);
        } else {
            asyncMainStream = AsyncDataStream.unorderedWait(
                    mainMap,
                    new AsyncBehavior(),
                    10000,
                    TimeUnit.MILLISECONDS,
                    20);
        }

        asyncMainStream.print();
        env.execute("Async select test");


    }

    /**
     * 异步查询
     */
    private static class AsyncBehavior extends RichAsyncFunction<Tuple4<String, String, String, String>, Tuple5<String, String, String, String, String>> {
        private static final String configPath = "config.properties";
        private transient RedisClient redisClient;
        private Cache<String, String> Cache;

        /**
         * 初始化
         *
         * @param parameters
         * @throws Exception
         */
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //配置缓存
            Cache = Caffeine
                    .newBuilder()
                    .maximumSize(1025)
                    .expireAfterAccess(10, TimeUnit.MINUTES)
                    .build();

            Properties pro = getPropertiesConfig(configPath);
            String servers = pro.getProperty("redis.servers");
            int port = Integer.parseInt(pro.getProperty("redis.port"));

            RedisOptions redisOptions = new RedisOptions();
            redisOptions.setHost(servers);
            redisOptions.setPort(port);

            VertxOptions vo = new VertxOptions();
            vo.setEventLoopPoolSize(10);
            vo.setWorkerPoolSize(20);

            Vertx vertx = Vertx.vertx(vo);

            redisClient = RedisClient.create(vertx, redisOptions);
        }

        /**
         * 关闭连接
         *
         * @throws Exception
         */
        @Override
        public void close() throws Exception {
            super.close();
            if (redisClient != null)
                redisClient.close(close -> {
                    if (close.failed()) {
                        throw new RuntimeException(close.cause());
                    }
                });
            if (Cache != null)
                Cache.cleanUp();
        }

        /**
         * 查询逻辑
         *
         * @param input 时间  人员ID  行为状态  卡口ID
         * @param out   时间  人员ID  行为状态  卡口ID  行为分值
         * @throws Exception
         */
        @Override
        public void asyncInvoke(Tuple4<String, String, String, String> input, ResultFuture<Tuple5<String, String, String, String, String>> out) throws Exception {
            String behavior = input.f2;
            // 从缓存中读取
            String ifPresent = Cache.getIfPresent(behavior);
            if (ifPresent != null) {
                out.complete(Collections.singleton(new Tuple5<>(input.f0, input.f1, input.f2, input.f3, ifPresent)));
                return;
            } else {
                redisClient.get(behavior, res -> {
                    if (res.failed()) {
                        out.complete(null);
                    } else {
                        String result = res.result();
                        if (result == null){
                            out.complete(Collections.singleton(new Tuple5<>(input.f0, input.f1, input.f2, input.f3, null)));
                        }else {
                            Cache.put(behavior, result);
                            out.complete(Collections.singleton(new Tuple5<>(input.f0, input.f1, input.f2, input.f3, result)));
                        }
                    }
                });
            }
        }

    }
}
package flink.stream.asyncIOSide;

impor



你的当前访问异常,请进行认证后继续阅读剩余内容。

分享到: