
《云原生内存数据库Tair训练营》正在火热开营,感兴趣的同学快快报名参与吧!
Tair 是阿里云自研的云原生内存数据库,完全兼容 Redis,且支持多种自研数据结构,可以帮助业务快速完成需求和创新。本篇文章包括 20 个 Tair 企业场景的示例代码,涵盖 Java、Go、Python、.Net 多个编程语言版本(已开源,完整代码见文末),干货满满,快快收藏用起来吧:
分布式锁 商品秒杀 限流与计数器 多维排行榜 分布式架构排行榜 论坛全文索引 多列索引联合查询 JSON数据管理 游戏背包数据存储 目标人群圈选 大规模计数BitMap CPU曲线秒级监控 电动汽车轨迹监测 同城 LBS 应用 密码会员有效期管理 应用设备登录态维护 BloomFilter高效去重 精准爬虫系统检测 防控信用卡欺诈交易 向量相似性搜索
01.分布式锁
/*** locks atomically via set with NX flag* @param lockKey the key* @param requestId prevents the lock from being deleted by mistake* @param expireTime prevent the deadlock of business machine downtime* @return success: true, fail: false.*/public static boolean tryGetDistributedLock(String lockKey, String requestId, int expireTime) {try (Jedis jedis = jedisPool.getResource()) {String result = jedis.set(lockKey, requestId, SetParams.setParams().nx().ex(expireTime));if ("OK".equals(result)) {return true;}} catch (Exception e) {logger.error(e);return false;}}/*** atomically releases the lock via the CAD command* @param lockKey the key* @param requestId ensures that the released lock is added by itself* @return success: true, fail: false.*/public static boolean releaseDistributedLock(String lockKey, String requestId) {try (Jedis jedis = jedisPool.getResource()) {TairString tairString = new TairString(jedis);Long ret = tairString.cad(lockKey, requestId);if (1 == ret) {return true;}} catch (Exception e) {logger.error(e);return false;}}
// TryLock locks atomically via setnx// requestId prevents the lock from being deleted by mistake// expireTime is to prevent the deadlock of business machine downtimefunc TryLock(key, requestId string, expireTime time.Duration) bool {res, err := tairClient.SetNX(ctx, key, requestId, expireTime).Result()if err != nil {fmt.Println(err.Error())return false}return res}// ReleaseLock atomically releases the lock via the CAD command// requestId ensures that the released lock is added by itselffunc ReleaseLock(key, requestId string) bool {res, err := tairClient.Cad(ctx, key, requestId).Result()if err != nil {fmt.Println(err.Error())return false}return res == int64(1)}
# try_lock locks atomically via set with NX flag# request_id prevents the lock from being deleted by mistake# expire_time is to prevent the deadlock of business machine downtimedef try_lock(key: str, request_id: str, expire_time: int) -> bool:try:result = tair.set(key, request_id, ex=expire_time, nx=True)# if the command was successful, return True# else return Nonereturn result is not Noneexcept TairError as e:print(e)return False# release_lock atomically releases the lock via the CAD command# request_id ensures that the released lock is added by itselfdef release_lock(key: str, request_id: str) -> bool:try:result = tair.cad(key, request_id)# if the key doesn't exist, return -1# if the request_id doesn't match, return 0# else return 1return result == 1except TairError as e:print(e)return False
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairString tair = new(connDC, 0);private static readonly IDatabase db = connDC.GetDatabase(0);/// <summary>/// locks atomically via set with NX flag./// </summary>/// <param name="lockkey"></param>/// <param name="requestId"></param>/// <param name="expireTime"></param>/// <returns></returns>public static bool tryGetDistributedLock(string lockkey, string requestId, int expireTime){try{var ret = db.StringSet(lockkey, requestId, TimeSpan.FromSeconds(expireTime), When.NotExists,CommandFlags.None);if (ret){return ret;}}catch (Exception e){logger.error(e);}return false;}/// <summary>/// atomically releases the lock via the CAD command./// </summary>/// <param name="lockkey"></param>/// <param name="requestId"></param>/// <returns></returns>public static bool releaseDistributedLock(string lockkey, string requestId){try{var ret = tair.cad(lockkey, requestId);return ret == 1;}catch (Exception e){logger.error(e);return false;}}
02.商品秒杀
商品秒杀场景中,对于库存的扣减需要防止超卖情况的发生,使用TairString的 lowerBound 则可以很简单的完成且是原子的。
/*** bargainRush decrements the value of key from upperBound by 1 until lowerBound* @param key the key* @param upperBound the max value* @param lowerBound the min value* @return acquire success: true; fail: false*/public static boolean bargainRush(String key, int upperBound, int lowerBound) {try (Jedis jedis = jedisPool.getResource()) {TairString tairString = new TairString(jedis);tairString.exincrBy(key, -1, ExincrbyParams.ExincrbyParams().def(upperBound).min(lowerBound));return true;} catch (Exception e) {logger.error(e);return false;}}
# bargainRush decrements the value of key from upperBound by 1 until lowerBound# @param key the key# @param upperBound the max value# @param lowerBound the min value# @return acquire success: true; fail: falsedef bargain_rush(key: str, upper_bound: int, lower_bound: int) -> bool:try:tair = get_tair()tair.exincrby(key, -1, minval=lower_bound, define=upper_bound)return Trueexcept ResponseError as e:print(e)return False
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairString tairString = new(connDC, 0);/// <summary>/// bargainRush decrements the value of key from upperBound by 1 until lowerBound./// </summary>/// <param name="key"></param>/// <param name="upperBound"></param>/// <param name="lowerBound"></param>/// <returns></returns>public static Boolean bargainRush(string key, int upperBound, int lowerBound){var param = new ExincrbyParams();param.min(lowerBound);param.max(upperBound);try{tairString.exincrBy(key, -1, param);return true;}catch (Exception e){Console.WriteLine(e.Message);return false;}}
func (l *BargainRush) bargainRush(key string, upperBound int64, lowerBound int64) bool {_, err := tairClient.ExIncrByArgs(ctx, key, -1, tair.ExIncrByArgs{}.New().Def(upperBound).Min(lowerBound)).Result()if err != nil {process errpanic(err)return false}return true}
03.限流与计数器
限流器是在单位时间内指定阈值并逐步接近的控制过程,一旦超过就返回失败,Tair 的限流器可以用在限速,速率控制的场景中。
/*** tryAcquire is thread-safe and will increment the key from 0 to the upper bound within an interval of time,* and return failure once it exceeds* @param key the key* @param upperBound the max value* @param interval the time interval* @return acquire success: true; fail: false*/public static boolean tryAcquire(String key, int upperBound, int interval) {try (Jedis jedis = jedisPool.getResource()) {jedis.eval("if redis.call('exists', KEYS[1]) == 1 "+ "then return redis.call('EXINCRBY', KEYS[1], '1', 'MAX', ARGV[1], 'KEEPTTL') "+ "else return redis.call('EXSET', KEYS[1], 0, 'EX', ARGV[2]) end",Arrays.asList(key), Arrays.asList(String.valueOf(upperBound), String.valueOf(interval)));return true;} catch (Exception e) {logger.error(e);return false;}}
def init() -> bool:try:tair = get_tair()tair.exset(KEY, "100")return Trueexcept Exception as e:print(e)return Falsedef purchase(user_id) -> bool:try:tair = get_tair()ret = tair.exincrby(KEY, num=-1, minval=0)print(f"the user {user_id} has purchased an item, and currently there are {ret} left")return Trueexcept ResponseError as e:print(f"the user {user_id} purchased failed: {e}")return False
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");/// <summary>/// tryAcquire is thread-safe and will increment the key from 0 to the upper bound within an interval of time./// </summary>/// <param name="key"></param>/// <param name="upperBound"></param>/// <param name="initerval"></param>/// <returns></returns>public static bool tryAcquire(string key, int upperBound, int initerval){try{var Script = "if redis.call('exists', @KEYS) == 1 "+ "then return redis.call('EXINCRBY', @KEYS, '1', 'MAX', @ARGV1, 'KEEPTTL') "+ "else return redis.call('EXSET', @KEYS, 0, 'EX', @ARGV2) end";var db = connDC.GetDatabase(0);var prepard = LuaScript.Prepare(Script);var ret = db.ScriptEvaluate(prepard, new {KEYS = (RedisKey) key, ARGV1 = upperBound, ARGV2 = initerval});return true;}catch (Exception e){logger.error(e);return false;}}
func (l *BoundedCounter) tryAcquire(key string, upperBoud int64, interval int64) bool {a := make([]string, 0)strings := append(a, key)_, err := tairClient.Eval(ctx, "if redis.call('exists', KEYS[1]) == 1 then return redis.call('EXINCRBY', KEYS[1], '1', 'MAX', ARGV[1], 'KEEPTTL')"+" else return redis.call('EXSET', KEYS[1], 0, 'EX', ARGV[2]) end", strings, upperBoud, interval).Result()if err != nil {process errpanic(err)return true}return false}
04.多维排行榜
多维排行榜在实际业务场景中非常实用,例如奥运奖牌榜,当金牌数目相同时,按照银牌数目排序,否则按照铜牌排序。Tair支持最多 256 维度 Double 数据的排序。
/*** Add User with Multi scores.* @param key the key* @param member the member* @param scores the multi dimensional score* @return success: true; fail: false*/public static boolean addUser(final String key, final String member, final double... scores) {try {tairZset.exzadd(key, member, scores);return true;} catch (Exception e) {logger.error(e);return false;}}/*** Get the top element of the leaderboard.* @param key the key* @param startOffset start offset* @param endOffset end offset* @return the top elements.*/public static List<String> top(final String key, final long startOffset, final long endOffset) {try {return tairZset.exzrevrange(key, startOffset, endOffset);} catch (Exception e) {logger.error(e);return new ArrayList<>();}}
# Add User with Multi scores.# @param key the key# @param member the member# @param scores the multi dimensional score# @return success: true; fail: falsedef add_user(key: str, member: str, score: Union[float, str]) -> bool:try:tair = get_tair()tair.exzadd(key, {member: score})return Trueexcept:return False# Get the top element of the leaderboard.# @param key the key# @param startOffset start offset# @param endOffset end offset# @return the top elements.def top(key: str, start_offset: int, end_offset: int) -> list:try:tair = get_tair()return tair.exzrevrange(key, start_offset, end_offset, withscores=True)except ResponseError as e:print(e)return []
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairZset tairzset = new(connDC, 0);/// <summary>/// Add User with Multi scores./// </summary>/// <param name="key"></param>/// <param name="member"></param>/// <param name="scores"></param>/// <returns></returns>public static bool addUser(string key, string member, params double[] scores){try{tairzset.exzadd(key, member, scores);return true;}catch (Exception e){logger.error(e);return false;}}/// <summary>/// Get the top element of the leaderboard./// </summary>/// <param name="key"></param>/// <param name="startOffset"></param>/// <param name="endOffset"></param>/// <returns></returns>public static List<string> top(string key, long startOffset, long endOffset){try{return tairzset.exzrevrange(key, startOffset, endOffset);}catch (Exception e){logger.error(e);return new List<string>();}}
func (l *LeaderBoard) addUser(key, member string, scores ...string) bool {_, err := tairClient.ExZAddManyScore(ctx, key, member, scores...).Result()if err != nil {process errpanic(err)return true}return false}func (l *LeaderBoard) top(key string, startOffSet int, endOffset int) []string {result, err := tairClient.ExZRevRange(ctx, key, startOffSet, endOffset).Result()if err != nil {process errpanic(err)}return result}
05.分布式架构排行榜
分布式架构排行榜解决单机排行榜容量或性能不足的问题,通过在客户端实现对底层排行榜的冗余,从而将数据散在不同的节点上获得更大的存储容量和更强的算力。(暂只支持Java版,其余语言暂时需要用户自己封装)
static {JedisPool config: https://help.aliyun.com/document_detail/98726.htmlconfig.setMaxTotal(32);config.setMaxIdle(32);config.setMaxIdle(20);jedisPool = new JedisPool(config, HOST, PORT, DEFAULT_CONNECTION_TIMEOUT,DEFAULT_SO_TIMEOUT, PASSWORD, 0, null);dlb = new DistributedLeaderBoard("distributed_leaderboard", jedisPool,shardKeySize, pageSize, reverse, useZeroIndexForRank);}public static void main(String[] args) {JedisPool jedisPool = new JedisPool();Create distribute leaderboardGold Silver Bronzedlb.addMember("A", 32, 21, 16);dlb.addMember("D", 14, 4, 16);dlb.addMember("C", 20, 7, 12);dlb.addMember("B", 25, 29, 21);dlb.addMember("E", 13, 21, 18);dlb.addMember("F", 13, 17, 14);Get A rankdlb.rankFor("A"); 1System.out.println(dlb.rankFor("A"));Get top3dlb.top(3);System.out.println(dlb.top(3));[{"member":"A","score":"32#21#16","rank":1},{"member":"B","score":"25#29#21","rank":2},{"member":"C","score":"20#7#12","rank":3}]}
06.论坛全文索引
/*** create index, The field of index is parsed according to the field corresponding to the text* @param index the index* @param schema the index schema* @return success: true, fail: false.*/public static boolean createIndex(final String index, final String schema) {try {tairSearch.tftcreateindex(index, schema);return true;} catch (Exception e) {logger.error(e);return false;}}/*** Add doc to index, doc is JSON format.* @param index the index* @param doc the doc content* @return unique doc id*/public static String addDoc(final String index, final String doc) {try {return tairSearch.tftadddoc(index, doc);} catch (Exception e) {// logger.error(e)return null;}}
# create index, The field of index is parsed according to the field corresponding to the text# @param index the index# @param schema the index schema# @return success: true, fail: false.def create_index(index: str, schema: str) -> bool:try:tair = get_tair()return tair.tft_createindex(index, schema)except ResponseError as e:print(e)return False# Add doc to index, doc is JSON format.# @param index the index# @param doc the doc content# @return unique doc iddef add_doc(index: str, doc: str):try:tair = get_tair()return tair.tft_adddoc(index, doc)except ResponseError as e:print(e)return None
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairSearch tairSearch = new(connDC, 0);/// <summary>/// create index, The field of index is parsed according to the field corresponding to the text./// </summary>/// <param name="index"></param>/// <param name="schema"></param>/// <returns></returns>public static bool createIndex(string index, string schema){try{tairSearch.tftcreateindex(index, schema);return true;}catch (Exception e){// logger.error(e);return false;}}/// <summary>/// Add doc to index, doc is JSON format./// </summary>/// <param name="index"></param>/// <param name="doc"></param>/// <returns></returns>public static string addDoc(string index, string doc){try{return tairSearch.tftadddoc(index, doc);}catch (Exception e){// logger.error(e);return null;}}/// <summary>/// search index by request./// </summary>/// <param name="index"></param>/// <param name="request"></param>/// <returns></returns>public static string searchIndex(string index, string request){try{return tairSearch.tftsearch(index, request);}catch (Exception e){// logger.error(e);return null;}}
func (l *FullTextSearch) createIndex(index, schema string) bool {_, err := tairClient.TftCreateIndex(ctx, index, schema).Result()if err != nil {// process err//panic(err)return true}return false}func (l *FullTextSearch) addDoc(index, doc string) string {result, err := tairClient.TftAddDoc(ctx, index, doc).Result()if err != nil {// process err//panic(err)return ""}return result}
/*** search index by request* @param index the index* @param request the request* @return*/public static String searchIndex(final String index, final String request) {try {return tairSearch.tftsearch(index, request);} catch (Exception e) {// logger.error(e);return null;}}
# search index by request# @param index the index# @param request the request# @returndef search_index(index: str, request: str):try:tair = get_tair()return tair.tft_search(index, request)except ResponseError as e:print(e)return None
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairSearch tairSearch = new(connDC, 0);/// <summary>/// search index by request./// </summary>/// <param name="index"></param>/// <param name="request"></param>/// <returns></returns>public static string searchIndex(string index, string request){try{return tairSearch.tftsearch(index, request);}catch (Exception e){// logger.error(e);return null;}}
tairClient.TftCreateIndex(ctx, key, "{\"mappings\":{\"properties\":{\"departure\":{\"type\":\"keyword\"},"+"\"destination\":{\"type\":\"keyword\"},\"date\":{\"type\":\"keyword\"},\"seat\":{\"type\":\"keyword\"},"+"\"with\":{\"type\":\"keyword\"},\"flight_id\":{\"type\":\"keyword\"},\"price\":{\"type\":\"double\"},"+"\"departure_time\":{\"type\":\"long\"},\"destination_time\":{\"type\":\"long\"}}}}")tairClient.TftAddDoc(ctx, key, "{\"departure\":\"zhuhai\",\"destination\":\"hangzhou\",\"date\":\"2022-09-01\","+"\"seat\":\"first\",\"with\":\"baby\",\"flight_id\":\"CZ1000\",\"price\":986.1,"+"\"departure_time\":1661991010,\"destination_time\":1661998210}")request := "{\"sort\":[\"departure_time\"],\"query\":{\"bool\":{\"must\":[{\"term\":{\"date\":\"2022-09" +"-01\"}},{\"term\":{\"seat\":\"first\"}}]}}}"fmt.Println(tairClient.TftSearch(ctx, key, request))
/*** Save JSON in key at path.* @param key the key* @param path the path, can be JSONPath or JSONPointer* @param json the json content* @return success: true, fail: false*/public static boolean jsonSave(final String key, final String path, final String json) {try {String result = tairDoc.jsonset(key, path, json);if ("OK".equals(result)) {return true;}} catch (Exception e) {// logger.error(e);}return false;}/*** Get JSON elements from path* @param key the key* @param path the path, can be JSONPath or JSONPointer* @return the JSON elements*/public static String jsonGet(final String key, final String path) {try {return tairDoc.jsonget(key, path);} catch (Exception e) {// logger.error(e)return null;}}
if __name__ == "__main__":tair = get_tair()key = "JSONDocument"json = """{"name": "tom","age": 22,"description": "A man with a blue lightsaber","friends": []}"""tair.json_set(key, ".", json)description = tair.json_get(key, ".description").decode()print(description)
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairDoc tairDoc = new(connDC, 0);/// <summary>/// Save JSON in key at path./// </summary>/// <param name="key"></param>/// <param name="path"></param>/// <param name="json"></param>/// <returns></returns>public static bool jsonSave(string key, string path, string json){try{var result = tairDoc.jsonset(key, path, json);if (result.Equals("OK")){return true;}}catch (Exception e){// logger.error(e);}return false;}/// <summary>/// Get JSON elements from path/// </summary>/// <param name="key"></param>/// <param name="path"></param>/// <returns></returns>public static string jsonGet(string key, string path){try{return tairDoc.jsonget(key, path);}catch (Exception e){// logger.error(e);return null;}}
func (l *JSONDocument) jsonSave(key, path, json string) bool {result, err := tairClient.JsonSet(ctx, key, path, json).Result()if err != nil {// process err//panic(err)}if result == "OK" {return true}return false}func (l *JSONDocument) jsonGet(key, path string) string {result, err := tairClient.JsonGetPath(ctx, key, path).Result()if err != nil {// process err//panic(err)}return result}
/*** Add equipment to package* @param key the key* @param packagePath the package path* @param equipment the new equipment* @return total number of equipment*/public static Long addEquipment(final String key, final String packagePath, final String equipment) {try {return tairDoc.jsonarrAppend(key, packagePath, equipment);} catch (Exception e) {// logger.error(e);return null;}}
# Add equipment to package# @param key the key# @param packagePath the package path# @param equipment the new equipment# @return total number of equipmentdef add_equipment(key: str, package_path: str, equipment: str) -> int:try:tair = get_tair()return tair.json_arrappend(key, package_path, equipment)except ResponseError as e:print(e)return -1if __name__ == "__main__":key = "GamePackage"tair = get_tair()tair.json_set(key, ".", "[]")print(add_equipment(key, ".", ['"lightsaber"']))print(add_equipment(key, ".", ['"howitzer"']))print(add_equipment(key, ".", ['"gun"']))print(tair.json_get(key, ".").decode())
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairDoc tairDoc = new(connDC, 0);/// <summary>/// dd equipment to package./// </summary>/// <param name="key"></param>/// <param name="packetPath"></param>/// <param name="equipment"></param>/// <returns></returns>public static long addEquipment(string key, string packetPath, string equipment){try{return tairDoc.jsonarrAppend(key, packetPath, equipment);}catch (Exception e){// logger.error(e);return -1;}}
func (l *GamePackage) addEquipment(key, packagePath, equipment string) int64 {result, err := tairClient.JsonArrAppendWithPath(ctx, key, packagePath, equipment).Result()if err != nil {// process err//panic(err)}return result}
10.目标人群圈选
/*** Set key offset value, value can be 0 or 1.* @param key the key* @param offset the offset* @param value the new value* @return success: true, fail: false*/public static boolean setBit(final String key, final long offset, final long value) {try {tairRoaring.trsetbit(key, offset, value);return true;} catch (Exception e) {// logger.error(e)return false;}}/*** Get key offset value.* @param key the key* @param offset the offset* @return the offset value, if not exists, return 0*/public static long getBit(final String key, final long offset) {try {return tairRoaring.trgetbit(key, offset);} catch (Exception e) {// logger.error(e);return -1;}}/*** AND the two bitmaps and store the result in a new destkey.* @param destkey the dest key* @param keys the source key* @return success: true, fail: false*/public static boolean bitAnd(final String destkey, final String... keys) {try {tairRoaring.trbitop(destkey, "AND", keys);return true;} catch (Exception e) {// logger.error(e);return false;}}
# Get key offset value.# @param key the key# @param offset the offset# @return the offset value, if not exists, return 0def getbit(key: str, offset: int) -> int:try:tair = get_tair()return tair.tr_getbit(key, offset)except:return -1# AND the two bitmaps and store the result in a new destkey.# @param destkey the dest key# @param keys the source key# @return success: true, fail: falsedef bitand(destkey: str, keys: List[str]) -> bool:try:tair = get_tair()tair.tr_bitop(destkey, "AND", keys)return Trueexcept ResponseError as e:print(e)return False
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairRoaring tairRoaring = new(connDC, 0);/// <summary>/// Get key offset value./// </summary>/// <param name="key"></param>/// <param name="offset"></param>/// <returns>he offset value, if not exists, return 0</returns>public static long getBit(string key, long offset){try{return tairRoaring.trgetbit(key, offset);}catch (Exception e){// logger.error(e);return -1;}}/// <summary>/// AND the two bitmaps and store the result in a new destkey./// </summary>/// <param name="destkey"></param>/// <param name="keys"></param>/// <returns></returns>public static bool bitAnd(string destkey, params string[] keys){try{tairRoaring.trbitop(destkey, "AND", keys);return true;}catch (Exception e){// logger.error(e);return false;}}
11.大规模计数BitMap
/*** Set key offset value, value can be 0 or 1.* @param key the key* @param offset the offset* @param value the new value* @return success: true, fail: false*/public static boolean setBit(final String key, final long offset, final long value) {try {tairRoaring.trsetbit(key, offset, value);return true;} catch (Exception e) {// logger.error(e)return false;}}/*** Count the number of elements in the bitmap.* @param key the key* @return the number of elements*/public static long bitCount(final String key) {try {return tairRoaring.trbitcount(key);} catch (Exception e) {// logger.error(e);return -1;}}
if __name__ == "__main__":tair = get_tair()key = "BitCount"tair.tr_setbit(key, 0, 1)tair.tr_setbit(key, 1, 1)tair.tr_setbit(key, 2, 1)print(tair.tr_bitcount(key))
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairRoaring tairRoaring = new(connDC, 0);/// <summary>/// Count the number of elements in the bitmap./// </summary>/// <param name="key"></param>/// <returns></returns>public static long bitCount(string key){try{return tairRoaring.trbitcount(key);}catch (Exception e){return -1;}}
func (l *BitCount) setBit(key string, offset int64, value int64) bool {_, err := tairClient.TrSetBit(ctx, key, offset, value).Result()if err != nil {// process err//panic(err)return false}return true}func (l *BitCount) bitCount(key string) int64 {result, err := tairClient.TrBitCount(ctx, key).Result()if err != nil {// process err//panic(err)return -1}return result}
12.CPU曲线秒级监控
/*** add point to CPU_LOAD series* @param ip machine ip* @param ts the timestamp* @param value the value* @return success: true, fail: false.*/public static boolean addPoint(final String ip, final String ts, final double value) {try {String result = tairTs.extsadd("CPU_LOAD", ip, ts, value);if ("OK".equals(result)) {return true;}} catch (Exception e) {// logger.error(e);}return false;}/*** Range all data in a certain time series* @param ip machine ip* @param startTs start timestamp* @param endTs end timestamp* @return*/public static ExtsSkeyResult rangePoint(final String ip, final String startTs, final String endTs) {try {return tairTs.extsrange("CPU_LOAD", ip, startTs, endTs);} catch (Exception e) {// logger.error(e);}return null;}
# add point to CPU_LOAD series# @param ip machine ip# @param ts the timestamp# @param value the value# @return success: true, fail: false.def add_point(ip: str, ts: str, value: float) -> bool:try:tair = get_tair()return tair.exts_s_add("CPU_LOAD", ip, ts, value)except ResponseError as e:print(e)return False# Range all data in a certain time series# @param ip machine ip# @param startTs start timestamp# @param endTs end timestamp# @returndef range_point(ip: str, start_ts: str, end_ts: str):try:tair = get_tair()return tair.exts_s_range("CPU_LOAD", ip, start_ts, end_ts)except ResponseError as e:print(e)return None
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairTs tairTs = new(connDC, 0);/// <summary>/// dd point to CPU_LOAD series./// </summary>/// <param name="ip">mechine ip</param>/// <param name="ts">the timestamp</param>/// <param name="value">the value</param>/// <returns></returns>public static bool addPoint(string ip, string ts, double value){try{var result = tairTs.extsadd("CPU_LOAD", ip, ts, value);if (result.Equals("OK")){return true;}}catch (Exception e){// logger.error(e);}return false;}/// <summary>/// Range all data in a certain time series./// </summary>/// <param name="ip">machine ip</param>/// <param name="startTs">start timestamp</param>/// <param name="endTs">end timestamp</param>/// <returns></returns>public static ExtsRangeResult rangePoint(string ip, string startTs, string endTs){try{return tairTs.extsrange("CPU_LOAD", ip, startTs, endTs);}catch (Exception e){// logger.error(e);}return null;}
func (l *CpuCurve) addPoint(ip, ts string, value float64) bool {result, err := tairClient.ExTsAdd(ctx, "CPU_LOAD", ip, ts, value).Result()if err != nil {// process err//panic(err)}if "OK" == result {return true}return false}func (l *CpuCurve) rangePoint(ip, startTs, endTs string) *tair.ExTsSKeyCmd {result, err := tairClient.ExTsRange(ctx, "CPU_LOAD", ip, startTs, endTs).Result()if err != nil {// process err//panic(err)return nil}return result}
13.电动汽车轨迹监测
/*** add longitude/latitude to key, timestamp represents the current moment.* @param key the key* @param ts the timestamp* @param longitude the longitude* @param latitude the latitude* @return success: true, fail: false*/public static boolean addCoordinate(final String key, final String ts, final double longitude, final double latitude) {try {Long ret = tairGis.gisadd(key, ts, "POINT (" + longitude + " " + latitude + ")");if (ret == 1) {return true;}} catch (Exception e) {// logger.error(e);}return false;}/*** Get all points under a key.* @param key the key* @return A map, the key is the time, and the value is the coordinate*/public static Map<String, String> getAllCoordinate(final String key) {try {return tairGis.gisgetall(key);} catch (Exception e) {// logger.error(e);return null;}}
# add longitude/latitude to key, timestamp represents the current moment.# @param key the key# @param ts the timestamp# @param longitude the longitude# @param latitude the latitude# @return success: true, fail: falsedef add_coordinate(key: str, ts: str, longitude: float, latitude: float) -> bool:try:tair = get_tair()ret = tair.gis_add(key, {ts: f"POINT ({longitude} {latitude})"})return ret == 1except ResponseError as e:print(e)return False# Get all points under a key.# @param key the key# @return A map, the key is the time, and the value is the coordinatedef get_all_coordinate(key: str):try:tair = get_tair()return tair.gis_getall(key)except ResponseError as e:print(e)return None
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairGis tairGis = new(connDC, 0);/// <summary>/// add longitude/latitude to key, timestamp represents the current moment./// </summary>/// <param name="key"></param>/// <param name="ts"></param>/// <param name="longitude"></param>/// <param name="latitude"></param>/// <returns></returns>public static Boolean addCoordinate(string key, string ts, double longitude, double latitude){try{long ret = tairGis.gisadd(key, ts, "POINT (" + longitude + " " + latitude + ")");if (ret == 1){return true;}}catch (Exception e){//}return false;}/// <summary>/// Get all points under a key./// </summary>/// <param name="key"></param>/// <returns></returns>public static Dictionary<string, string> getAllCoordinate(string key){try{return tairGis.gisgetall(key);}catch (Exception e){return null;}}
func (l *CarTrack) addCoordinate(key, ts string, longitude, latitude float64) bool {result, err := tairClient.GisAdd(ctx, key, ts, "POINT ("+strconv.FormatFloat(longitude, 'E', -1, 64)+" "+strconv.FormatFloat(latitude, 'E', -1, 64)+")").Result()if err != nil {// process err//panic(err)}if result == 1 {return true}return false}func (l *CarTrack) getAllCoordinate(key string) map[string]string {result, err := tairClient.GisGetAll(ctx, key).Result()if err != nil {// process err//panic(err)return nil}return result}
14.同城 LBS 应用
/*** Add a service store geographical scope.* @param key the key* @param storeName the store name* @param storeWkt the store wkt* @return success: true, fail: false*/public static boolean addPolygon(final String key, final String storeName, final String storeWkt) {try {Long ret = tairGis.gisadd(key, storeName, storeWkt);if (ret == 1) {return true;}} catch (Exception e) {// logger.error(e);}return false;}/*** Determine whether the user's location is within the service range of the store.* @param key the key* @param userLocation the user location* @return Stores that can serve users*/public static Map<String, String> getServiceStore(final String key, final String userLocation) {try {return tairGis.giscontains(key, userLocation);} catch (Exception e) {// logger.error(e);return null;}}
# Add a service store geographical scope.# @param key the key# @param storeName the store name# @param storeWkt the store wkt# @return success: true, fail: falsedef add_polygon(key: str, store_name: str, store_wkt) -> bool:try:tair = get_tair()ret = tair.gis_add(key, {store_name: store_wkt})return ret == 1except ResponseError as e:print(e)return False# Determine whether the user's location is within the service range of the store.# @param key the key# @param userLocation the user location# @return Stores that can serve usersdef get_service_store(key: str, user_location: str):try:tair = get_tair()return tair.gis_contains(key, user_location)except:return None
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairGis tairGis = new(connDC, 0);/// <summary>/// Add a service store geographical scope./// </summary>/// <param name="key"></param>/// <param name="storeName"></param>/// <param name="storeWkt"></param>/// <returns></returns>public static bool addPolygon(string key, string storeName, String storeWkt){try{long ret = tairGis.gisadd(key, storeName, storeWkt);return ret == 1;}catch (Exception e){// logger.error(e);}return false;}/// <summary>/// Determine whether the user's location is within the service range of the store./// </summary>/// <param name="key"></param>/// <param name="userLocation"></param>/// <returns></returns>public static Dictionary<string, string> getServiceStore(string key, string userLocation){try{return tairGis.giscontains(key, userLocation);}catch (Exception e){// logger.error(e);return null;}}
func (l *LbsBuy) AddPolygon(key, storeName, storeWkt string) bool {result, err := tairClient.GisAdd(ctx, key, storeName, storeWkt).Result()if err != nil {// process err//panic(err)}if result == 1 {return true}return false}func (l *LbsBuy) GetServiceStore(key, userLocation string) map[string]string {result, err := tairClient.GisContains(ctx, key, userLocation).Result()if err != nil {panic(err.Error())}return result}
15.密码有效期管理
/*** Add a user and password with a timeout.* @param key the key* @param user the user* @param password the password* @param timeout the password expiration time* @return success: true, fail: false*/public static boolean addUserPass(final String key, final String user, final String password, final int timeout) {try {Long ret = tairHash.exhset(key, user, password, ExhsetParams.ExhsetParams().ex(timeout));return ret == 1;} catch (Exception e) {// logger.error(e);return false;}}
# Add a user and password with a timeout.# @param key the key# @param user the user# @param password the password# @param timeout the password expiration time# @return success: true, fail: falsedef add_user_pass(key: str, user: str, password: str, timeout: int) -> bool:try:tair = get_tair()ret = tair.exhset(key, user, password, ex=timeout)return ret == 1except ResponseError as e:print(e)return False
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairHash tairHash = new(connDC, 0);/// <summary>/// Add a user and password with a timeout./// </summary>/// <param name="key"></param>/// <param name="user"></param>/// <param name="password"></param>/// <param name="timeout"></param>/// <returns></returns>public static bool addUserPass(string key, string user, string password, int timeout){try{var ret = tairHash.exhset(key, user, password, new ExhsetParams().ex(timeout));return ret == 1;}catch (Exception e){// logger.error(e);return false;}}
func (l *PasswordExpire) addUserPass(key string, user string, password string, timeout int64) bool {result, err := tairClient.ExHSetArgs(ctx, key, user, password, tair.ExHSetArgs{}.New().Ex(time.Duration(timeout))).Result()if err != nil {// process err//panic(err)}return result == 1}
16.应用设备登录态维护
/*** Record the login time and device name of the device, and set the login status expiration time* @param key the key* @param loginTime the login time* @param device the device name* @param timeout the timeout* @return success: true, fail: false*/public static boolean deviceLogin(final String key, final String loginTime, final String device, final int timeout) {try {Long ret = tairHash.exhset(key, loginTime, device, ExhsetParams.ExhsetParams().ex(timeout));return ret == 1;} catch (Exception e) {// logger.error(e);return false;}}
# Record the login time and device name of the device, and set the login status expiration time# @param key the key# @param loginTime the login time# @param device the device name# @param timeout the timeout# @return success: true, fail: falsedef device_login(key: str, login_time: str, device: str, timeout: int) -> bool:try:tair = get_tair()ret = tair.exhset(key, login_time, device, ex=timeout)return ret == 1except ResponseError as e:print(e)return False
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairHash tairHash = new(connDC, 0);/// <summary>/// Record the login time and device name of the device, and set the login status expiration time./// </summary>/// <param name="key"></param>/// <param name="loginTime"></param>/// <param name="device"></param>/// <param name="timeout"></param>/// <returns></returns>public static bool deviceLogin(string key, string loginTime, string device, int timeout){try{long ret = tairHash.exhset(key, loginTime, device, new ExhsetParams().ex(timeout));return ret == 1;}catch (Exception e){// logger.error(e);return false;}}
func (d *DeviceLogin) DeviceLogin(key, loginTime, device string, timeout int64) bool {result, err := tairClient.ExHSetArgs(ctx, key, loginTime, device, tair.ExHSetArgs{}.New().Ex(time.Duration(timeout))).Result()if err != nil {// process err//panic(err)}if result == 1 {return true}return false}
17.BloomFilter高效去重
/*** Recommend the doc to the user, ignore it if it has been recommended, otherwise recommend it and mark it.* @param userid the user id* @param docid the doc id*/public static void recommendedSystem(final String userid, final String docid) {if (tairBloom.bfexists(userid, docid)) {// do nothing} else {// recommend to user sendRecommendMsg(docid);// add userid with docidtairBloom.bfadd(userid, docid);}}
# Recommend the doc to the user, ignore it if it has been recommended, otherwise recommend it and mark it.# @param userid the user id# @param docid the doc iddef recommended_system(userid: str, docid: str) -> None:tair = get_tair()if tair.bf_exists(userid, docid):print(f"{docid} may exist in {userid}")else:# recommend to user sendRecommendMsg(docid);# add userid with docidtair.bf_add(userid, docid)print(f"{docid} does not exist in {userid}")
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairBloom tairBloom = new(connDC, 0);/// <summary>/// Recommend the doc to the user, ignore it if it has been recommended, otherwise recommend it and mark it./// </summary>/// <param name="userid"></param>/// <param name="docid"></param>public static void recommendedSystem(string userid, string docid){if (tairBloom.bfexists(userid, docid)){//do nothing}else{// recommend to user sendRecommendMsg(docid);// add userid with docidtairBloom.bfadd(userid, docid);}}
func (l *BloomFilter) recommendedSystem(userId, docId string) {result, err := tairClient.BfExists(ctx, userId, docId).Result()if err != nil {// process err//panic(err)}if result {// do nothing} else {// recommend to user sendRecommendMsg(docid);// add userid with docidtairClient.BfAdd(ctx, userId, docId)}}
18.爬虫系统检测
/*** Determine if the URL has been crawled* @param key key* @param urls the urls*/public static Boolean[] bfMexists(final String key, final String... urls) {try {return tairBloom.bfmexists(key, urls);} catch (Exception e) {// logger.error(e);return null;}}
# Determine if the URL has been crawled# @param key key# @param urls the urlsdef bf_mexists(key: str, urls: List[str]):try:tair = get_tair()return tair.bf_mexists(key, urls)except ResponseError as e:print(e)return None
private static readonly ConnectionMultiplexer connDC = ConnectionMultiplexer.Connect("localhost:6379");private static readonly TairBloom tairBloom = new(connDC, 0);/// <summary>/// Determine if the URL has been crawled./// </summary>/// <param name="key"></param>/// <param name="urls"></param>/// <returns></returns>public static bool[] bfMexists(string key, params string[] urls){try{return tairBloom.bfmexists(key, urls);}catch (Exception e){// logger.error(e);return null;}}
func (l *CrawlerSystem) JudeUrlsExists(key string, urls ...string) []bool {result, err := tairClient.BfMExists(ctx, key, urls...).Result()if err != nil {// process err//panic(err)return nil}return result}
19.防控信用卡欺诈交易
/*** update item to creditInfo* @param key the key* @param item the item* @return success: true, fail: false*/public static boolean creditAdd(final String key, final String item) {try {String ret = creditInfo.cpcUpdate(key, item);return "OK".equals(ret);} catch (Exception e) {// logger.error(e)return false;}}/*** Estimate all quantities in creditInfo.* @param key the key* @return the number of elements.*/public static Double creditEstimate(final String key) {try {return creditInfo.cpcEstimate(key);} catch (Exception e) {// logger.error(e)return null;}}
if __name__ == "__main__":tair = get_tair()key = "FraudPrevention"tair.cpc_update(key, "a")tair.cpc_update(key, "b")tair.cpc_update(key, "c")print(tair.cpc_estimate(key))tair.cpc_update(key, "d")print(tair.cpc_estimate(key))
/// <summary>/// update item to key./// </summary>/// <param name="key"></param>/// <param name="item"></param>/// <returns></returns>public static bool creditAdd(string key, string item){try{string ret = creditInfo.cpcUpdate(key, item);return ret.Equals("OK");}catch (Exception e){// logger.error(e);return false;}}
func (l *FraudPrevention) creditAdd(key, item string) bool {result, err := tairClient.CpcUpdate(ctx, key, item).Result()if err != nil {// process err//panic(err)}if "OK" == result {return true}return false}func (l *FraudPrevention) creditEstimate(key string) float64 {result, err := tairClient.CpcEstimate(ctx, key).Result()if err != nil {// process err//panic(err)}return result}
20.向量相似性搜索
/*** insert entity into tair vector** @param index index name* @param entityid entity id* @param vector vector info* @param params scalar attribute key, value** @return integer-reply specifically:* {@literal k} if success, k is the number of fields that were added..* throw error like "(error) Illegal vector dimensions" if error*/public static boolean addVector(final String index, final String entityid, final String vector, final String...params) {try {tairVector.tvshset(index, entityid, vector, params);return true;} catch (Exception e) {// logger.error(e);return false;}}/*** query entity by vector** @param index index name* @param topn topn result* @param vector query vector* @return VectorBuilderFactory.Knn<>*/public static VectorBuilderFactory.Knn<String> knnSearch(final String index, Long topn, final String vector) {try {return tairVector.tvsknnsearch(index, topn, vector);} catch (Exception e) {// logger.error(e);return null;}}
附:
Java 完整代码地址:
Go 完整代码地址:
https://github.com/alibaba/tair-go/tree/main/example
Python 完整代码地址:
https://github.com/alibaba/tair-py/tree/main/examples
.Net 完整代码地址:
https://github.com/alibaba/AlibabaCloud.TairSDK/tree/main/Example







点击「阅读原文」查看 云原生内存数据库Tair 更多信息
文章转载自阿里云数据库,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。







