Flink DataStream转Table 使用SQL:join

inner join 相当于全局窗口,之前的消息也一直保存着,来了一条能关联上的消息,则输出新消息的笛卡尔积!

  1. package SQL;
  2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  3. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  4. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  5. import org.apache.flink.table.api.EnvironmentSettings;
  6. import org.apache.flink.table.api.Table;
  7. import org.apache.flink.table.api.java.StreamTableEnvironment;
  8. import org.apache.flink.types.Row;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import java.util.Date;
  12. import java.util.Random;
  13. import java.util.concurrent.TimeUnit;
  14. /**
  15. * @Author you guess
  16. * @Date 2021/1/6 15:22
  17. * @Version 1.0
  18. * @Desc
  19. */
  20. public class DataStreamSql1_Join {
  21. private static final Logger LOG = LoggerFactory.getLogger(MinMinByMaxMaxBy.MinMinByMaxMaxByTest.class);
  22. private static final String[] TYPE = {"a苹果", "b梨", "c西瓜", "d葡萄", "e火龙果"};
  23. public static void main(String[] args) throws Exception {
  24. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  25. EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
  26. StreamTableEnvironment stEnv = StreamTableEnvironment.create(env, envSettings);
  27. //添加自定义数据源,每秒发出一笔订单信息{商品名称,商品数量}
  28. DataStreamSource<Order> orderSourceA = env.addSource(new SourceFunction<Order>() {
  29. private volatile boolean isRunning = true;
  30. private final Random random = new Random();
  31. @Override
  32. public void run(SourceContext<Order> ctx) throws Exception {
  33. while (isRunning) {
  34. TimeUnit.SECONDS.sleep(1);
  35. Order order = new Order(TYPE[random.nextInt(TYPE.length)], Long.valueOf(random.nextInt(10)));
  36. System.out.println(new Date() + ",orderSourceA提交元素:" + order);
  37. ctx.collect(order);
  38. }
  39. }
  40. @Override
  41. public void cancel() {
  42. isRunning = false;
  43. }
  44. }, "order-infoA");
  45. DataStreamSource<Order> orderSourceB = env.addSource(new SourceFunction<Order>() {
  46. private volatile boolean isRunning = true;
  47. private final Random random = new Random();
  48. @Override
  49. public void run(SourceContext<Order> ctx) throws Exception {
  50. while (isRunning) {
  51. TimeUnit.SECONDS.sleep(1);
  52. Order order = new Order(TYPE[random.nextInt(TYPE.length)], Long.valueOf(random.nextInt(10)));
  53. System.out.println(new Date() + ",orderSourceB提交元素:" + order);
  54. ctx.collect(order);
  55. }
  56. }
  57. @Override
  58. public void cancel() {
  59. isRunning = false;
  60. }
  61. }, "order-infoB");
  62. stEnv.registerDataStream("tableA", orderSourceA);
  63. stEnv.registerDataStream("tableB", orderSourceB);
  64. Table result = stEnv.sqlQuery("SELECT A.name,A.qtty,B.qtty from tableA A inner join tableB B on A.name = B.name");
  65. stEnv.toRetractStream(result, Row.class).print();//这里要用Row类型
  66. env.execute("Flink Streaming Java API Skeleton");
  67. }
  68. /**
  69. * Simple POJO.
  70. */
  71. public static class Order {
  72. public String name;
  73. public Long qtty;
  74. public Order() {
  75. }
  76. public Order(String name, Long qtty) {
  77. this.name = name;
  78. this.qtty = qtty;
  79. }
  80. @Override
  81. public String toString() {
  82. return "Order{" +
  83. "name=" + name + \ +
  84. ", qtty=" + qtty +
  85. };
  86. }
  87. }
  88. }

Wed Jan 06 15:28:04 CST 2021,orderSourceA提交元素:Order{name=’d葡萄’, qtty=9}

Wed Jan 06 15:28:04 CST 2021,orderSourceB提交元素:Order{name=’b梨’, qtty=8}

Wed Jan 06 15:28:05 CST 2021,orderSourceA提交元素:Order{name=’b梨’, qtty=4}

Wed Jan 06 15:28:05 CST 2021,orderSourceB提交元素:Order{name=’b梨’, qtty=6}

5> (true,b梨,4,8)

5> (true,b梨,4,6)

Wed Jan 06 15:28:06 CST 2021,orderSourceA提交元素:Order{name=’a苹果’, qtty=3}

Wed Jan 06 15:28:06 CST 2021,orderSourceB提交元素:Order{name=’e火龙果’, qtty=1}

Wed Jan 06 15:28:07 CST 2021,orderSourceA提交元素:Order{name=’e火龙果’, qtty=4}

Wed Jan 06 15:28:07 CST 2021,orderSourceB提交元素:Order{name=’a苹果’, qtty=6}

9> (true,a苹果,3,6)

4> (true,e火龙果,4,1)

Wed Jan 06 15:28:08 CST 2021,orderSourceB提交元素:Order{name=’e火龙果’, qtty=9}

Wed Jan 06 15:28:08 CST 2021,orderSourceA提交元素:Order{name=’e火龙果’, qtty=6}

4> (true,e火龙果,4,9)

4> (true,e火龙果,6,1)

4> (true,e火龙果,6,9)

Wed Jan 06 15:28:09 CST 2021,orderSourceA提交元素:Order{name=’e火龙果’, qtty=1}

Wed Jan 06 15:28:09 CST 2021,orderSourceB提交元素:Order{name=’e火龙果’, qtty=2}

4> (true,e火龙果,4,2)

4> (true,e火龙果,6,2)

4> (true,e火龙果,1,1)

4> (true,e火龙果,1,2)

4> (true,e火龙果,1,9)

Wed Jan 06 15:28:10 CST 2021,orderSourceA提交元素:Order{name=’c西瓜’, qtty=5}

Wed Jan 06 15:28:10 CST 2021,orderSourceB提交元素:Order{name=’e火龙果’, qtty=6}

4> (true,e火龙果,4,6)

4> (true,e火龙果,6,6)

4> (true,e火龙果,1,6)

Wed Jan 06 15:28:11 CST 2021,orderSourceA提交元素:Order{name=’c西瓜’, qtty=6}

Wed Jan 06 15:28:11 CST 2021,orderSourceB提交元素:Order{name=’c西瓜’, qtty=0}

8> (true,c西瓜,5,0)

8> (true,c西瓜,6,0)

Wed Jan 06 15:28:12 CST 2021,orderSourceA提交元素:Order{name=’b梨’, qtty=2}

Wed Jan 06 15:28:12 CST 2021,orderSourceB提交元素:Order{name=’d葡萄’, qtty=7}//之前没有与orderSourceA提交元素:Order{name=’d葡萄’, qtty=9} 相匹配上的,所以没输出,现在才能匹配上!!

3> (true,d葡萄,9,7)

5> (true,b梨,2,8)

5> (true,b梨,2,6)

Wed Jan 06 15:28:13 CST 2021,orderSourceB提交元素:Order{name=’d葡萄’, qtty=1}

Wed Jan 06 15:28:13 CST 2021,orderSourceA提交元素:Order{name=’d葡萄’, qtty=5}

3> (true,d葡萄,9,1)

3> (true,d葡萄,5,1)

3> (true,d葡萄,5,7)

Wed Jan 06 15:28:14 CST 2021,orderSourceB提交元素:Order{name=’d葡萄’, qtty=5}

Wed Jan 06 15:28:14 CST 2021,orderSourceA提交元素:Order{name=’e火龙果’, qtty=9}

3> (true,d葡萄,5,5)

3> (true,d葡萄,9,5)

4> (true,e火龙果,9,6)

4> (true,e火龙果,9,1)

4> (true,e火龙果,9,2)

4> (true,e火龙果,9,9)

Wed Jan 06 15:28:15 CST 2021,orderSourceB提交元素:Order{name=’e火龙果’, qtty=5} //每次新来一个元素,若能匹配上,都是输出笛卡尔积

Wed Jan 06 15:28:15 CST 2021,orderSourceA提交元素:Order{name=’e火龙果’, qtty=4} //每次新来一个元素,若能匹配上,都是输出笛卡尔积

4> (true,e火龙果,4,5)

4> (true,e火龙果,6,5)

4> (true,e火龙果,1,5)

4> (true,e火龙果,9,5)

4> (true,e火龙果,4,5)

4> (true,e火龙果,4,6)

4> (true,e火龙果,4,1)

4> (true,e火龙果,4,2)

4> (true,e火龙果,4,9)

不同flink版本的语法差别挺大:

  1. <flink.version>1.9.2</flink.version>

jdk1.8


文章标签:

原文连接:https://juejin.cn/post/6979986118036946957

相关推荐

【PostgreSQL 15】PostgreSQL 15对UNIQUE和NULL的改进

基于SqlSugar的开发框架循序渐进介绍(12)-- 拆分页面模块内容为组件,实现分而治之的处理

Mysql 系列 | 日志模块

MySQL 啥时候用表锁,啥时候用行锁?

效率低?响应慢?报表工具痛点及其解决方案

shell脚本实现mysql数据库双机定时备份

【云原生】SQL(及存储过程)跑得太慢怎么办?

MySql搭建主从复制

开源数据计算引擎,实现媲美ElasticSearch的高性能并发查询

mysql语法使用详细代码版

MySQL - 多表查询与案例详解

【干货】MySQL底层架构设计,你了解多少?

009 面试题 SQL语句各部分的执行顺序

【PostgreSQL 】PostgreSQL 15对distinct的优化

高效理解 FreeSql WhereDynamicFilter,深入了解设计初衷[.NET ORM]

【MySQL系列】MySQL表的增删改查(基础)

MyBatis从入门到精通—源码剖析之Configuration、SqlSession、Executor、StatementHandler细节

前端+后端项目 - 论坛信息管理系统(Web+servlet+MySQL+JDBC)

在 SQL Server 中查找活动连接和死锁

Zabbix+Mysql Fpmmm(MPM)监控