本文主要研究一下flink Table的Joins
实例 Inner Join Table left = tableEnv.fromDataSet(ds1, "a, b, c"); Table right = tableEnv.fromDataSet(ds2, "d, e, f"); Table result = left.join(right).where("a = d").select("a, b, e"); 复制代码 join方法即inner join Outer Join Table left = tableEnv.fromDataSet(ds1, "a, b, c"); Table right = tableEnv.fromDataSet(ds2, "d, e, f"); Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e"); Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e"); Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e"); 复制代码 outer join分为leftOuterJoin、rightOuterJoin、fullOuterJoin三种 Time-windowed Join Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime"); Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime"); Table result = left.join(right) .where("a = d && ltime >= rtime - 5.minutes && ltime < rtime + 10.minutes") .select("a, b, e, ltime"); 复制代码 time-windowed join需要至少一个等值条件,然后还需要一个与两边时间相关的条件(可以使用<, <=, >=, >) Inner Join with Table Function // register User-Defined Table Function TableFunction<String> split = new MySplitUDTF(); tableEnv.registerFunction("split", split); // join Table orders = tableEnv.scan("Orders"); Table result = orders .join(new Table(tableEnv, "split(c)").as("s", "t", "v")) .select("a, b, s, t, v"); 复制代码 Table也可以跟table function进行inner join,如果table function返回空,则table的记录被丢弃 Left Outer Join with Table Function // register User-Defined Table Function TableFunction<String> split = new MySplitUDTF(); tableEnv.registerFunction("split", split); // join Table orders = tableEnv.scan("Orders"); Table result = orders .leftOuterJoin(new Table(tableEnv, "split(c)").as("s", "t", "v")) .select("a, b, s, t, v"); 复制代码 Table也可以跟table function进行left outer join,如果table function返回空,则table的记录保留,空的部分为null值 Join with Temporal Table Table ratesHistory = tableEnv.scan("RatesHistory"); // register temporal table function with a time attribute and primary key TemporalTableFunction rates = ratesHistory.createTemporalTableFunction( "r_proctime", "r_currency"); tableEnv.registerFunction("rates", rates); // join with "Orders" based on the time attribute and key Table orders = tableEnv.scan("Orders"); Table result = orders .join(new Table(tEnv, "rates(o_proctime)"), "o_currency = r_currency") 复制代码 Table也可以跟Temporal tables进行join,Temporal tables通过Table的createTemporalTableFunction而来,目前仅仅支持inner join的方式 Table 实例 Inner Join Table le