Flink笔记-异步IO

Flink笔记-异步IO,第1张

1、异步IO

AsyncDataStream

        Async I/O提供了两种模式:ORDERED和UNORDERED.

                UNORDERED:异步请求一结束就马上输出结果,因为异步请求完成时间的不确定性,结果输出的顺序可能和输入不同.

                此模式调用:AsyncDataStream.unorderedWait(...)

                ORDERED:在这个模式下结果输出的顺序和输入的顺序是一样的,为了实现这一点,后输入的数据异步请求先完成了只能缓存在一个指定的结果中,直到在此之前的记录全部完成异步请求并输出后,才能输出,因此带来了一些额外的延迟和checkpoint开销.

                此模式吊用:AsyncDataStream.orderedWait(...)

public class AsyncIOStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        DataStreamSource source = env.addSource(new FlinkKafkaConsumer010("dwd", new SimpleStringSchema(), KafkaUtils.comsumerProps()));

        SingleOutputStreamOperator> vodStream = source.map(new MapFunction>() {
            @Override
            public Tuple2 map(String line) throws Exception {
                JSONObject jn = JSON.parseObject(line);
                return new Tuple2(jn.getString("vodid"), Integer.parseInt(jn.getString("seconds")));
            }
        });

        SingleOutputStreamOperator> result = vodStream
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        String url = "jdbc:mysql://192.168.3.160:3306/datax_a?characterEncoding=utf8&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true";
        String user = "root";
        String passwd = "root1234";
        String sql = "insert into flink_dws_table (vodid,seconds) values (?,?)";

        // 通过AsyncDataStream,将流异步应用到RichAsyncFunction上
        AsyncDataStream.unorderedWait(result, new MysqlAsyncSink(url, user, passwd, sql), 100, TimeUnit.SECONDS, 100);

        env.execute();
    }
}

class MysqlAsyncSink extends RichAsyncFunction, Void> {

    private final String url;
    private final String user;
    private final String passwd;
    private final String sql;
    private Connection conn;
    private PreparedStatement statement;

    public MysqlAsyncSink(String url, String user, String passwd, String sql) {
        this.url = url;
        this.user = user;
        this.passwd = passwd;
        this.sql = sql;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        conn = DriverManager.getConnection(url, user, passwd);
        statement = conn.prepareStatement(sql);
        super.open(parameters);
    }

    @Override
    public void asyncInvoke(Tuple2 value, ResultFuture resultFuture) throws Exception {
        System.out.println(value);
        statement.setString(1, value.f0);
        statement.setInt(2, value.f1);
        statement.execute();
    }

    @Override
    protected Object clone() throws CloneNotSupportedException {
        try {
            statement.close();
            conn.close();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }

        return super.clone();
    }
}

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/905331.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-05-15
下一篇2022-05-15

发表评论

登录后才能评论

评论列表(0条)

    保存