package flink.stream.asyncIOSide;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.redis.RedisClient;
import io.vertx.redis.RedisOptions;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static stranger.PropertyLoader.getPropertiesConfig;
/**
* @author [tu.tengfei]
* @description 通过异步的方式,将流表与维表进行连接
* @date 2019/5/28
*/
public class AsyncIOSideJoinRedis {
public static void main(String[] args) throws Exception {
//读取配置文件
final String configPath = "config.properties";
final Properties pro = getPropertiesConfig(configPath);
final String topic = "stranger";
final String groupId = "mainStranger";
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);
//读取kafka数据流
String bootstrapServers = pro.getProperty("bootstrap.servers");
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", bootstrapServers);//kafka的节点的IP或者hostName,多个使用逗号分隔
properties.setProperty("group.id", groupId);//flink consumer flink的消费者的group.id
FlinkKafkaConsumer011<String> kafkaStream = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), properties);
kafkaStream.setStartFromLatest();
DataStreamSource<String> mainStream = env.addSource(kafkaStream);
// mainStream.map(x->x.split("\t"));
//处理kafka数据:时间 人员ID 行为状态 卡口ID
DataStream<Tuple4<String, String, String, String>> mainMap = mainStream.map(new MapFunction<String, Tuple4<String, String, String, String>>() {
@Override
public Tuple4<String, String, String, String> map(String s) throws Exception {
String[] split = s.split("\t");
return new Tuple4<>(split[0], split[1], split[2], split[3]);
}
});
//构建异步查询
DataStream<Tuple5<String, String, String, String, String>> asyncMainStream;
if (true) {
asyncMainStream = AsyncDataStream.orderedWait(
mainMap,
new AsyncBehavior(),
1000000L,
TimeUnit.MILLISECONDS,
20);
} else {
asyncMainStream = AsyncDataStream.unorderedWait(
mainMap,
new AsyncBehavior(),
10000,
TimeUnit.MILLISECONDS,
20);
}
asyncMainStream.print();
env.execute("Async select test");
}
/**
* 异步查询
*/
private static class AsyncBehavior extends RichAsyncFunction<Tuple4<String, String, String, String>, Tuple5<String, String, String, String, String>> {
private static final String configPath = "config.properties";
private transient RedisClient redisClient;
private Cache<String, String> Cache;
/**
* 初始化
*
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
//配置缓存
Cache = Caffeine
.newBuilder()
.maximumSize(1025)
.expireAfterAccess(10, TimeUnit.MINUTES)
.build();
Properties pro = getPropertiesConfig(configPath);
String servers = pro.getProperty("redis.servers");
int port = Integer.parseInt(pro.getProperty("redis.port"));
RedisOptions redisOptions = new RedisOptions();
redisOptions.setHost(servers);
redisOptions.setPort(port);
VertxOptions vo = new VertxOptions();
vo.setEventLoopPoolSize(10);
vo.setWorkerPoolSize(20);
Vertx vertx = Vertx.vertx(vo);
redisClient = RedisClient.create(vertx, redisOptions);
}
/**
* 关闭连接
*
* @throws Exception
*/
@Override
public void close() throws Exception {
super.close();
if (redisClient != null)
redisClient.close(close -> {
if (close.failed()) {
throw new RuntimeException(close.cause());
}
});
if (Cache != null)
Cache.cleanUp();
}
/**
* 查询逻辑
*
* @param input 时间 人员ID 行为状态 卡口ID
* @param out 时间 人员ID 行为状态 卡口ID 行为分值
* @throws Exception
*/
@Override
public void asyncInvoke(Tuple4<String, String, String, String> input, ResultFuture<Tuple5<String, String, String, String, String>> out) throws Exception {
String behavior = input.f2;
// 从缓存中读取
String ifPresent = Cache.getIfPresent(behavior);
if (ifPresent != null) {
out.complete(Collections.singleton(new Tuple5<>(input.f0, input.f1, input.f2, input.f3, ifPresent)));
return;
} else {
redisClient.get(behavior, res -> {
if (res.failed()) {
out.complete(null);
} else {
String result = res.result();
if (result == null){
out.complete(Collections.singleton(new Tuple5<>(input.f0, input.f1, input.f2, input.f3, null)));
}else {
Cache.put(behavior, result);
out.complete(Collections.singleton(new Tuple5<>(input.f0, input.f1, input.f2, input.f3, result)));
}
}
});
}
}
}
}
package flink.stream.asyncIOSide;
impor