您现在的位置是: 首页 > Storm整合jdbc


Storm整合jdbc

纯真年代 2018-11-18 00:43:41 1 399人围观

Storm整合redis官方直达车

该文示例代码直达车


介绍

    Storm整合jdbc提供了两种实现,JdbcInsertBolt、JdbcLookupBolt。

    使用JdbcInsertBolt需要实现ConnectionProvider,这个是做与数据库连接池相关的接口,官方使用的是HikariCP数据源(springboot默认好像也是这个数据源),所以官方提供了一个HikariCPConnectionProvider实现,我们直接用就行了。

    JdbcLookupBolt中,你需要查询的输出字段。源码中:首先判断字段上游有没有发过来,有的话,就使用上游字段对应的值,没有的话,就使用数据库查询出来的值。

    JdbcInsertBolt中如果使用自定义sql,第二种:需要注意,我们上游发送过来的字段顺序,这个很重要

JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withTableName("user")
.withQueryTimeoutSecs(30);
Or
JdbcInsertBolt userPersistanceBolt = new JdbcInsertBolt(connectionProvider, simpleJdbcMapper)
.withInsertQuery("insert into user values (?,?)")
.withQueryTimeoutSecs(30);


看源码:columnLists是从Tuple中获取出来的,有顺序的。



第一种方式:在这个方法里面,它拼接sql的时候其实字段和值都是对应上的。



第二种方式是占位符方式,占位符的中的值要和你的sql对应上。所以要注意上游发过来的字段顺序。



示例

wordcount案例进行改造,将结果存储在jdbc中。(这里自定义实现了一个先查询,后插入或更新的bolt),详情见案例代码。

/**
* 1.数据库存在此word记录(整合的是官方的查询模块)
* 2.判断数据库是否有记录?
* 无记录:插入数据:整合的是官方提供的插入模块(这里必须使用自定义的sql,因为上游bolt输出的字段名和数据库不一样)
* 有记录:将数据库记录值 + 1 进行更新
*/

效果图:




全部评论