一、内存

1、几个关键的数据结构

1
2
3
4
5
struct redisServer {
...
redisDb *db;
...
}
1
2
3
4
5
6
7
8
9
10
typedef struct redisDb {
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys;
dict *ready_keys;
dict *watched_keys;
struct evictionPoolEntry *eviction_pool;
int id;
long long avg_ttl;
} redisDb;
1
2
3
4
5
6
7
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
int iterators; /* number of iterators currently running */
} dict;
1
2
3
4
5
6
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;
1
2
3
4
5
6
7
8
9
10
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
dictEntry *dictFind(dict *d, const void *key)
{
dictEntry *he;
unsigned int h, idx, table;
if (d->ht[0].size == 0) return NULL; /* We don't have a table at all */
if (dictIsRehashing(d)) _dictRehashStep(d);// 渐进式rehash迁移数据
h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
while(he) {
if (dictCompareKeys(d, key, he->key))
return he;
he = he->next;
}
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}

2、rehash

1)rehash标志

1
#define dictIsRehashing(d) ((d)->rehashidx != -1)

-1 表示没有rehash,>=1 表示 该index后面的没有进行rehash

2)rehash过程

渐进式rehash 见上述dictFind实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
/* Performs N steps of incremental rehashing. Returns 1 if there are still
* keys to move from the old to the new hash table, otherwise 0 is returned.
*
* Note that a rehashing step consists in moving a bucket (that may have more
* than one key as we use chaining) from the old to the new hash table, however
* since part of the hash table may be composed of empty spaces, it is not
* guaranteed that this function will rehash even a single bucket, since it
* will visit at max N*10 empty buckets in total, otherwise the amount of
* work it does would be unbound and the function may block for a long time. */
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
unsigned int h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}

3)rehash条件

负载因子 = ht[0].used / ht[0].size
负载因子 >= 5 说明 hash冲突已经很明显 => 扩张table
负载因子 < 0.1 说明表太大 => 收缩

1
2
3
4
5
6
7
8
9
10
/* Using dictEnableResize() / dictDisableResize() we make possible to
* enable/disable resizing of the hash table as needed. This is very important
* for Redis, as we use copy-on-write and don't want to move too much memory
* around when there is a child performing saving operations.
*
* Note that even when dict_can_resize is set to 0, not all resizes are
* prevented: a hash table is still allowed to grow if the ratio between
* the number of elements and the buckets > dict_force_resize_ratio. */
static int dict_can_resize = 1;
static unsigned int dict_force_resize_ratio = 5;
1
2
3
4
5
6
7
8
int htNeedsResize(dict *dict) {
long long size, used;
size = dictSlots(dict);
used = dictSize(dict);
return (size && used && size > DICT_HT_INITIAL_SIZE &&
(used*100/size < REDIS_HT_MINFILL));
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* Expand the hash table if needed */
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
if (dictIsRehashing(d)) return DICT_OK;
/* If the hash table is empty expand it to the initial size. */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}

used和size分别是什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
dictEntry *dictAddRaw(dict *d, void *key)
{
int index;
dictEntry *entry;
dictht *ht;
if (dictIsRehashing(d)) _dictRehashStep(d);
/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key)) == -1)
return NULL;
/* Allocate the memory and store the new entry */
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}

3、过期机制

单独维护 expires,见上述redisDB

获取key的过期时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* Return the expire time of the specified key, or -1 if no expire
* is associated with this key (i.e. the key is non volatile) */
long long getExpire(redisDb *db, robj *key) {
dictEntry *de;
/* No expire? return ASAP */
if (dictSize(db->expires) == 0 ||
(de = dictFind(db->expires,key->ptr)) == NULL) return -1;
/* The entry was found in the expire dict, this means it should also
* be present in the main dict (safety check). */
redisAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
return dictGetSignedIntegerVal(de);
}

下面的函数是get命令的实现,由此可看出key过期是一种惰性删除

1
2
3
4
5
6
7
8
9
10
11
robj *lookupKeyRead(redisDb *db, robj *key) {
robj *val;
expireIfNeeded(db,key);
val = lookupKey(db,key);
if (val == NULL)
server.stat_keyspace_misses++;
else
server.stat_keyspace_hits++;
return val;
}

随机淘汰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/* ======================= Cron: called every 100 ms ======================== */
/* Helper function for the activeExpireCycle() function.
* This function will try to expire the key that is stored in the hash table
* entry 'de' of the 'expires' hash table of a Redis database.
*
* If the key is found to be expired, it is removed from the database and
* 1 is returned. Otherwise no operation is performed and 0 is returned.
*
* When a key is expired, server.stat_expiredkeys is incremented.
*
* The parameter 'now' is the current time in milliseconds as is passed
* to the function to avoid too many gettimeofday() syscalls. */
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
long long t = dictGetSignedIntegerVal(de);
if (now > t) {
sds key = dictGetKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj);
dbDelete(db,keyobj);
notifyKeyspaceEvent(REDIS_NOTIFY_EXPIRED,
"expired",keyobj,db->id);
decrRefCount(keyobj);
server.stat_expiredkeys++;
return 1;
} else {
return 0;
}
}
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise
* it will get more aggressive to avoid that too much memory is used by
* keys that can be removed from the keyspace.
*
* No more than REDIS_DBCRON_DBS_PER_CALL databases are tested at every
* iteration.
*
* This kind of call is used when Redis detects that timelimit_exit is
* true, so there is more work to do, and we do it more incrementally from
* the beforeSleep() function of the event loop.
*
* Expire cycle type:
*
* If type is ACTIVE_EXPIRE_CYCLE_FAST the function will try to run a
* "fast" expire cycle that takes no longer than EXPIRE_FAST_CYCLE_DURATION
* microseconds, and is not repeated again before the same amount of time.
*
* If type is ACTIVE_EXPIRE_CYCLE_SLOW, that normal expire cycle is
* executed, where the time limit is a percentage of the REDIS_HZ period
* as specified by the REDIS_EXPIRELOOKUPS_TIME_PERC define. */
void activeExpireCycle(int type) {
/* This function has some global state in order to continue the work
* incrementally across calls. */
static unsigned int current_db = 0; /* Last DB tested. */
static int timelimit_exit = 0; /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* When last fast cycle ran. */
int j, iteration = 0;
int dbs_per_call = REDIS_DBCRON_DBS_PER_CALL;
long long start = ustime(), timelimit;
if (type == ACTIVE_EXPIRE_CYCLE_FAST) {
/* Don't start a fast cycle if the previous cycle did not exited
* for time limt. Also don't repeat a fast cycle for the same period
* as the fast cycle total duration itself. */
if (!timelimit_exit) return;
if (start < last_fast_cycle + ACTIVE_EXPIRE_CYCLE_FAST_DURATION*2) return;
last_fast_cycle = start;
}
/* We usually should test REDIS_DBCRON_DBS_PER_CALL per iteration, with
* two exceptions:
*
* 1) Don't test more DBs than we have.
* 2) If last time we hit the time limit, we want to scan all DBs
* in this iteration, as there is work to do in some DB and we don't want
* expired keys to use memory for too much time. */
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
/* We can use at max ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC percentage of CPU time
* per iteration. Since this function gets called with a frequency of
* server.hz times per second, the following is the max amount of
* microseconds we can spend in this function. */
timelimit = 1000000*ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC/server.hz/100;
timelimit_exit = 0;
if (timelimit <= 0) timelimit = 1;
if (type == ACTIVE_EXPIRE_CYCLE_FAST)
timelimit = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; /* in microseconds. */
for (j = 0; j < dbs_per_call; j++) {
int expired;
redisDb *db = server.db+(current_db % server.dbnum);
/* Increment the DB now so we are sure if we run out of time
* in the current DB we'll restart from the next. This allows to
* distribute the time evenly across DBs. */
current_db++;
/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
/* If there is nothing to expire try next DB ASAP. */
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
slots = dictSlots(db->expires);
now = mstime();
/* When there are less than 1% filled slots getting random
* keys is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (num && slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;
/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
ttl_sum = 0;
ttl_samples = 0;
if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
while (num--) {
dictEntry *de;
long long ttl;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
ttl = dictGetSignedIntegerVal(de)-now;
if (activeExpireCycleTryExpire(db,de,now)) expired++;
if (ttl < 0) ttl = 0;
ttl_sum += ttl;
ttl_samples++;
}
/* Update the average TTL stats for this database. */
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
/* Smooth the value averaging with the previous one. */
db->avg_ttl = (db->avg_ttl+avg_ttl)/2;
}
/* We can't block forever here even if there are many keys to
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. */
iteration++;
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
long long elapsed = ustime()-start;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);
if (elapsed > timelimit) timelimit_exit = 1;
}
if (timelimit_exit) return;
/* We don't repeat the cycle if there are less than 25% of keys
* found expired in the current DB. */
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
}
}

也就是说每秒钟 最多淘汰10个过期的key

二、持久化

redis服务启动的时候,会去load持久化数据,优先级是有aof则aof,没有aof就rdb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* Function called at startup to load RDB or AOF file in memory. */
void loadDataFromDisk(void) {
long long start = ustime();
if (server.aof_state == REDIS_AOF_ON) {
if (loadAppendOnlyFile(server.aof_filename) == REDIS_OK)
redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
} else {
if (rdbLoad(server.rdb_filename) == REDIS_OK) {
redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
(float)(ustime()-start)/1000000);
} else if (errno != ENOENT) {
redisLog(REDIS_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);
}
}
}

1、aof

格式

以redis协议的格式存储每一条写命令
协议类似:

1
2
3
4
5
6
7
8
9
命令:SET HENRY  HENRYFAN
协议:
*3\r\n
$3\r\n
SET\r\n
$5\r\n
HENRY\r\n
$8\r\n
HENRYFAN\r\n

加载

server启动时aof加载会不处理客户端的请求(不是阻塞,返回错误)
加载时,建立一个fake客户端,循环执行aof中的命令(见aof.c/loadAppendOnlyFile)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
/* Replay the append log file. On success REDIS_OK is returned. On non fatal
* error (the append only file is zero-length) REDIS_ERR is returned. On
* fatal error an error message is logged and the program exists. */
int loadAppendOnlyFile(char *filename) {
struct redisClient *fakeClient;
FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int old_aof_state = server.aof_state;
long loops = 0;
off_t valid_up_to = 0; /* Offset of the latest well-formed command loaded. */
if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) {
server.aof_current_size = 0;
fclose(fp);
return REDIS_ERR;
}
if (fp == NULL) {
redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
exit(1);
}
/* Temporarily disable AOF, to prevent EXEC from feeding a MULTI
* to the same file we're about to read. */
server.aof_state = REDIS_AOF_OFF;
fakeClient = createFakeClient();
startLoading(fp);
while(1) {
int argc, j;
unsigned long len;
robj **argv;
char buf[128];
sds argsds;
struct redisCommand *cmd;
/* Serve the clients from time to time */
if (!(loops++ % 1000)) {
loadingProgress(ftello(fp));
processEventsWhileBlocked();
}
if (fgets(buf,sizeof(buf),fp) == NULL) {
if (feof(fp))
break;
else
goto readerr;
}
if (buf[0] != '*') goto fmterr;
if (buf[1] == '\0') goto readerr;
argc = atoi(buf+1);
if (argc < 1) goto fmterr;
argv = zmalloc(sizeof(robj*)*argc);
fakeClient->argc = argc;
fakeClient->argv = argv;
for (j = 0; j < argc; j++) {
if (fgets(buf,sizeof(buf),fp) == NULL) {
fakeClient->argc = j; /* Free up to j-1. */
freeFakeClientArgv(fakeClient);
goto readerr;
}
if (buf[0] != '$') goto fmterr;
len = strtol(buf+1,NULL,10);
argsds = sdsnewlen(NULL,len);
if (len && fread(argsds,len,1,fp) == 0) {
sdsfree(argsds);
fakeClient->argc = j; /* Free up to j-1. */
freeFakeClientArgv(fakeClient);
goto readerr;
}
argv[j] = createObject(REDIS_STRING,argsds);
if (fread(buf,2,1,fp) == 0) {
fakeClient->argc = j+1; /* Free up to j. */
freeFakeClientArgv(fakeClient);
goto readerr; /* discard CRLF */
}
}
/* Command lookup */
cmd = lookupCommand(argv[0]->ptr);
if (!cmd) {
redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr);
exit(1);
}
/* Run the command in the context of a fake client */
cmd->proc(fakeClient);
/* The fake client should not have a reply */
redisAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
/* The fake client should never get blocked */
redisAssert((fakeClient->flags & REDIS_BLOCKED) == 0);
/* Clean up. Command code may have changed argv/argc so we use the
* argv/argc of the client instead of the local variables. */
freeFakeClientArgv(fakeClient);
if (server.aof_load_truncated) valid_up_to = ftello(fp);
}
/* This point can only be reached when EOF is reached without errors.
* If the client is in the middle of a MULTI/EXEC, log error and quit. */
if (fakeClient->flags & REDIS_MULTI) goto uxeof;
loaded_ok: /* DB loaded, cleanup and return REDIS_OK to the caller. */
fclose(fp);
freeFakeClient(fakeClient);
server.aof_state = old_aof_state;
stopLoading();
aofUpdateCurrentSize();
server.aof_rewrite_base_size = server.aof_current_size;
return REDIS_OK;
readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */
if (!feof(fp)) {
redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
exit(1);
}
uxeof: /* Unexpected AOF end of file. */
if (server.aof_load_truncated) {
redisLog(REDIS_WARNING,"!!! Warning: short read while loading the AOF file !!!");
redisLog(REDIS_WARNING,"!!! Truncating the AOF at offset %llu !!!",
(unsigned long long) valid_up_to);
if (valid_up_to == -1 || truncate(filename,valid_up_to) == -1) {
if (valid_up_to == -1) {
redisLog(REDIS_WARNING,"Last valid command offset is invalid");
} else {
redisLog(REDIS_WARNING,"Error truncating the AOF file: %s",
strerror(errno));
}
} else {
/* Make sure the AOF file descriptor points to the end of the
* file after the truncate call. */
if (server.aof_fd != -1 && lseek(server.aof_fd,0,SEEK_END) == -1) {
redisLog(REDIS_WARNING,"Can't seek the end of the AOF file: %s",
strerror(errno));
} else {
redisLog(REDIS_WARNING,
"AOF loaded anyway because aof-load-truncated is enabled");
goto loaded_ok;
}
}
}
redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file. You can: 1) Make a backup of your AOF file, then use ./redis-check-aof --fix <filename>. 2) Alternatively you can set the 'aof-load-truncated' configuration option to yes and restart the server.");
exit(1);
fmterr: /* Format error. */
redisLog(REDIS_WARNING,"Bad file format reading the append only file: make a backup of your AOF file, then use ./redis-check-aof --fix <filename>");
exit(1);
}
落盘

redis 每次执行命令都会调用redis.c/call函数,这个函数会调用aof.c/feedAppendOnlyFile函数将每个写命令写入aof buffer,同时如果正在rewrite,也会写入aof rewrite buffer。如果当前db不是aof_selected_db,会在该命令前追加一个select命令,也就是说所有的db都会落入一个aof文件

每次事件循环前都会执行redis.c/beforeSleep函数,该函数会执行flushAppendOnlyFile将buf中的内容刷到磁盘,落盘的时候有三种策略:

1
2
3
4
/* Append only defines */
#define AOF_FSYNC_NO 0 // 交由操作系统决定 操作系统没落盘部分在断电后会丢失
#define AOF_FSYNC_ALWAYS 1 // 每个命令都落盘,最多丢失一个命令的数据
#define AOF_FSYNC_EVERYSEC 2 // 每秒落盘,最多丢失一秒的数据(目前我们线上采取的是这种策略)

1
2
3
4
5
6
7
8
9
10
11
12
13
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* aof_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
aof_fsync(server.aof_fd); /* Let's try to get this data on the disk */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) aof_background_fsync(server.aof_fd);
server.aof_last_fsync = server.unixtime;
}
1
2
3
4
5
6
/* Define aof_fsync to fdatasync() in Linux and fsync() for all the rest */
#ifdef __linux__
#define aof_fsync fdatasync
#else
#define aof_fsync fsync
#endif
rewrite

当aof增长过多的时候会采取aof rewrite来缩小aof文件的大小
采取的方式不是去分析aof文件,而是fock一个子进程将内存中的数据dump出一份aof,替换老的aof文件,这当中会忽略已过期的键。如果此时server还在处理命令则将命令也放在rewrite buffer内,当完成aof rewrite后将rewrite buffer中的内容追加到新aof文件中

1
2
3
no-appendfsync-on-rewrite yes
默认值是 no
#yes : 在日志重写时,不进行命令追加操作,而只是将其放在缓冲区里,如果子进程和父进程都在写磁盘,可能会造成IO冲突

另外
这里还涉及fork子进程,以及内存写时复制等知识

rewrite触发条件
  • 执行命令 bgrewriteaof
  • 自动触发: 满足触发条件时事件循环时会触发
    1
    2
    3
    int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */
    int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */
    off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */

2、rdb

rdb 文件可以被压缩

文件结构

REDIS | RDB-VERSION | SELECT-DB | KEY-VALUE-PAIRS | EOF | CHECK-SUM

触发条件
  • 执行命令 bgsave
  • 自动触发 经过多少秒且多少个key有改变就进行,可以配置多个
    我们的线上配置是 save 600 50000,也就是说每600秒 且 至少有50000个key发生变化就执行一次bgsave
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* REDIS_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
REDIS_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == REDIS_OK))
{
redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveBackground(server.rdb_filename);
break;
}
}

参考