连接kafka
import com.tc.flink.conf.KafkaConfig;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;
.....
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env);
Kafka kafkaConnect=new Kafka().version("0.11").topic("topic-test").startFromLatest().property("bootstrap.servers", KafkaConfig.KAFKA_BROKER_LIST).property("group.id", "trafficwisdom-streaming");
Schema tableSchema=new Schema().field("proctime", Types.SQL_TIMESTAMP).proctime()
.field("interruptCode", Types.STRING).field("interruptMsg", Types.STRING).field("requestId", Types.STRING).field("transferType", Types.STRING);
tableEnvironment.connect(kafkaConnect).withFormat(new Json().failOnMissingField(true).deriveSchema()).withSchema(tableSchema).inAppendMode().registerTableSource("search_log_error");
import com.tc.flink.conf.Kafka