您现在的位置是: 首页 > Storm整合redis


Storm整合redis

纯真年代 2018-11-17 23:51:23 0 330人围观

Storm整合redis官方直达车

该文示例代码直达车


介绍

    Storm提供和和redis集成的基本bolt的实现, 包括RedisLookupBolt(查询)、RedisStoreBolt(存储)和RedisFilterBolt(过滤)。它们都是继承了AbstractRedisBolt进行实现的,如果这三个不满足需求,可另外实现。

    使用RedisDataTypeDescription来说明要使用的数据类型。

    当然,上面的bolt还要与之对应的RedisLookupMapper、RedisStoreMapper、RedisFilterMapper相结合使用(mapper相关的其实就是storm与其他框架整合中间的一个映射)

    官方的示例代码很全面。

示例

wordcount案例进行改造,将结果存储在redis,key类型是hashKey。(这里只是进行结果存储,其实应该是先查询redis是否存在记录,有就+1,无则插入)

WordCountStoreMapper,见方法名就知道其意图

/**
* 存储数据到redis
*/
public static class WordCountStoreMapper implements RedisStoreMapper {
private RedisDataTypeDescription description;
private final String hashKey = "mystorm:wordCount";

public WordCountStoreMapper() {
description = new RedisDataTypeDescription(
RedisDataTypeDescription.RedisDataType.HASH, hashKey);
}

/**
* 说明我们要使用的数据类型
* @return
*/
@Override
public RedisDataTypeDescription getDataTypeDescription() {
return description;
}

@Override
public String getKeyFromTuple(ITuple tuple) {
return tuple.getStringByField("word");
}

@Override
public String getValueFromTuple(ITuple tuple) {
return tuple.getIntegerByField("count").toString();
}
}

RedisStoreBolt,使用此bolt将上游发过来数据存储到redis

//存储到redis的bolt
JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
.setHost("127.0.0.1").setPort(6379).build();
RedisStoreMapper storeMapper = new WordCountStoreMapper();
RedisStoreBolt storeBolt = new RedisStoreBolt(poolConfig, storeMapper);
builder.setBolt("RedisStoreBolt", storeBolt).shuffleGrouping("WordCountBolt");

效果图:




全部评论