阅读背景:

Flink入门(六)Table-Sql_baifanwudi的专栏

来源:互联网 

连接kafka

import com.tc.flink.conf.KafkaConfig;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.types.Row;

.....

       	final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        StreamTableEnvironment tableEnvironment = TableEnvironment.getTableEnvironment(env);
        Kafka kafkaConnect=new Kafka().version("0.11").topic("topic-test").startFromLatest().property("bootstrap.servers", KafkaConfig.KAFKA_BROKER_LIST).property("group.id", "trafficwisdom-streaming");
        Schema tableSchema=new Schema().field("proctime", Types.SQL_TIMESTAMP).proctime()
                .field("interruptCode", Types.STRING).field("interruptMsg", Types.STRING).field("requestId", Types.STRING).field("transferType", Types.STRING); 
        tableEnvironment.connect(kafkaConnect).withFormat(new Json().failOnMissingField(true).deriveSchema()).withSchema(tableSchema).inAppendMode().registerTableSource("search_log_error");
import com.tc.flink.conf.Kafka



你的当前访问异常,请进行认证后继续阅读剩余内容。

分享到: