微博与elastic的亿级数据实践

本文适用于:
1、非计算机相关专业,并以微博内容为基础的科研人员或者做毕业设计的学生。
2、寻求存储和处理大规模JSON数据的人员。

注意:本文旨在提供解决存储和查询相关问题的思路,仅起抛砖引玉的作用。数据为网络上公开的数据集,采用梁博分享的2016国庆微博数据,在这里感谢梁博为广大科研工作者和做毕业设计的学生提供数据。下载链接为
http://pan.baidu.com/s/1cy1hyY 。总共有七个文件,解压出来100G左右。

MongoDB与 Elastic 的取舍

MongoDB和Elastic都比较擅长处理JSON内容,但这MongoDB对于初学者来说不太友好,原因如下:
1、MongoDB并未对于多核处理器进行优化,一个实例只能运行在一个核心上。单机可以运行多个实例,但配置起来较为复杂。
2、MongoDB的槽点之一,吃内存。
3、MongoDB分布式化的过程比较复杂,Elastic分布式化设置较为简单。
4、Elastic配套的软件较多,Kibana比较好用。
5、Elastic设置索引更加细致,并且支持分词。

几点注意事项:
1、建议同时安装Kibana和kopf,配合起来可以比较方便的进行数据可视化、调试以及监控elastic状态。
2、你要根据自己的需求配置mapping,这个虽然导入数据后可以更改但是流程非常麻烦。
3、要是不在装有elastic的那台机器上导入的话,记得更改elastic
bind的地址。
4、如果你想导入所有数据的话,服务器可能需要200g的磁盘空间。

实践环节

首先解压文件,并用split命令按行分割,方便导入处理。这里拿第一个文件举例:

1
split -l 100000 weibo_freshdata.2016-10-01

然后设置Elastic的mapping。
具体的mapping设置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
PUT weibo
{
"settings": {
"index": {
"codec": "best_compression",
"number_of_replicas": 0
}
},
"mappings": {
"tweet": {
"properties": {
"timestamp": {
"type": "date",
"format": "epoch_second"
},
"crawl_timestamp": {
"type": "date",
"format": "epoch_millis"
},
"user_level": {
"type": "text",
"index": "not_analyzed"
},
"source": {
"type": "text",
"index": "not_analyzed"
},
"poiid": {
"type": "text",
"index": "not_analyzed"
},
"user_screen_name": {
"type": "text",
"index": "not_analyzed"
},
"text": {
"type": "text",
"index": false
}
}
}
}
}

注意:Elastic中mapping的设置虽然可以进行更改,但是弄起来非常麻烦。建议先明确自己的需求,再建立索引。
你可以在Kibana的Dev Tools中直接粘贴上面的内容生成索引。

对于上面设置的解释:
crawl_timestamp字段是我根据梁博提供的数据推测出前几个字段的含义,也有可能是不正确的。
有些字段的index设置为not_analyzed,在官方文档中解释为treat as keyword
field。如果你需要在Kibana中生成词云的话可能需要更改相关设置。
对于text字段,我设置为关闭索引,只进行存储。因为这个字段中包含许多HTML标签,如果开启索引的话会占用很多空间,所以在导入的脚本中我使用BeautifulSoup过滤掉HTML,并生成raw_text字段。这个字段默认是开启全文搜索的,但是默认的全文搜索效果不太好,有相关需求的话可以装个IK分词插件。
在导入数据时还使用正则表达式提取微博的poiid,并记录在poiid这个字段中。(关于poiid的使用,可以自行搜索相关内容)
为了节省磁盘空间,我还开启了最大压缩模式。

导入数据的python脚本如下。将服务器地址和文件夹路径改成你自己的,然后运行代码开始导入数据。(你可能还需要安装BeautifulSoup和ElasticSearch的python插件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import re
from elasticsearch import Elasticsearch
import elasticsearch.helpers
from multiprocessing import Pool
import os
from bs4 import BeautifulSoup
es = Elasticsearch('address to your elastic server with port', sniffer_timeout=600, timeout=600)
def load_file(file):
print(file)
file = open(file, 'r', encoding='utf-8', errors='ignore')
rows = []
for line in file:
line = line.split('\t')
dic = {
'crawl_timestamp': int(line[2]),
'timestamp': int(line[17]),
'user_id': int(line[4]),
'user_screen_name': line[5],
'user_level': line[7],
'tweet_id': int(line[8]),
'text': line[9],
'reposts_count': int(line[10]),
'comments_count': int(line[11]),
'attitudes_count': int(line[12]),
'source': line[14]
}
if '<' and '>' in line[9]:
try:
soup = BeautifulSoup(line[9], 'lxml')
dic['raw_text'] = soup.get_text()
except:
pass
else:
dic['raw_text'] = line[9]
locations = re.findall('100101+[^&^?^%^"]*', dic['text'])
if len(locations) > 0:
poiid = locations[0].replace('100101', '')
if len(poiid) > 10:
dic['poiid'] = poiid
if int(line[3]) == 1:
try:
dic1 = {
'crawl_timestamp': int(line[2]),
'user_id': int(line[18]),
'user_screen_name': line[19],
'user_level': line[20],
'tweet_id': int(line[21]),
'text': line[22],
'reposts_count': int(line[23]),
'comments_count': int(line[24]),
'attitudes_count': int(line[25]),
'source': line[27],
'timestamp': int(line[30])}
if '<' and '>' in line[22]:
try:
soup = BeautifulSoup(line[22], 'lxml')
dic1['raw_text'] = soup.get_text()
except:
pass
else:
dic1['raw_text'] = line[22]
locations = re.findall('100101+[^&^?^%^"]*', dic['text'])
if len(locations) > 0:
poiid = locations[0].replace('100101', '')
if len(poiid) > 10:
dic1['poiid'] = poiid
dic['retweet_id'] = dic1['tweet_id']
rows.append(dic1)
except:
pass
rows.append(dic)
actions = [
{
'_op_type': 'index',
'_index': "weibo",
'_type': "tweet",
'_source': d
}
for d in rows
]
elasticsearch.helpers.bulk(es, actions, stats_only=True)
file.close()
if __name__ == '__main__':
file_list = os.listdir("path to your data dir")
os.chdir("path to your data dir")
pool = Pool(6)
pool.map(load_file, file_list)
pool.close()
pool.join()

在这个脚本中,我将转发的tweet单独提取出来,这样有效的减少了字段的数量,也使tweet_id这个字段得到了统一。也就是说,通过tweet_id这个字段,你可以搜索到所有的tweet(尽管有时很多人会转发同一条tweet,这可能造成一个tweet_id搜索到多个结果,这个在后面用图片说明)。
你可以根据你服务器和电脑的性能更改进程池的大小,默认为6也就是6个进程同时导入数据。

导入完成后,先在kopf中看一下:
1.png
一共有1亿9千多万条数据,对于一些科研或者毕设来说应该是足够了。
用Kibana进行搜索也是很方便的
例如搜索含有poiid的tweet:
2.png
一共有800多万条tweet包含poiid。
你也可以搜索某个poiid对应了那些tweet。
我们在Dev
Tools中可以查询一下有多少个不同的poiid,复制下面的内容并点击绿色的开始按钮执行。

1
2
3
4
5
6
7
8
9
10
11
GET /weibo/tweet/_search
{
"size" : 0,
"aggs" : {
"unique_poiid" : {
"cardinality" : {
"field" : "poiid"
}
}
}
}

你会发现返回了一个错误,如下。

1
2
	
Fielddata is disabled on text fields by default. Set fielddata=true on [poiid] in order to load fielddata in memory by uninverting the inverted index. Note that this can however use significant memory. Alternatively use a keyword field instead.

执行下面的内容打开poiid字段的field data。

1
2
3
4
5
6
7
8
9
PUT weibo/_mapping/tweet
{
"properties": {
"poiid": {
"type": "text",
"fielddata": true
}
}
}

然后再执行查询语句,返回如下结果:
3.png
你会发现,不同的poiid只有21万多个。这说明很多tweet对应的poiid是重复的。
我们看一下转发tweet的数量:
4.png
有4千4百万条数据是转发别人的tweet的。
我们随便选取其中一个retweet_id,并进行搜索:
5.png
可以看到这个retweet_id对应了112条数据,每条数据的timestamp都是一样的,也就是转发的这条tweet的发布时间,后面crawl_timestamp对应的时间是不同的。这是因为梁博的爬虫在不同的时间抓取了不同人转发的同一条tweet,应该是正常现象。
我们再选上转发、赞和评论数量的字段,并按抓取时间进行排序:
6.png
可以看出转发、赞和评论的数量随时间的变化大体上是增加的,但是转发的数量不知道为什么略有波动。
简单的进行全文搜索:
7.png
返回了很多条数据。Kibana对中文字符的高亮显示可能存在一些问题,所以在图中会看到一些空格。你可以看到中国的国字也被高亮了,这是因为没装分词插件的缘故,如果有全文搜索的需求建议安装分词插件。

常年泡在群里,发现大家做科研或者毕设最基本的需求就是数据,其次才是如何利用数据。许多科研工作者将大量的时间浪费在获取数据上,梁博为大家提供了这么多的数据,如何有效的利用这些数据是下一个迫在眉睫的问题。于是这篇博文就诞生了,旨在提供一些基本的方法,抛砖引玉,有问题希望大家提出,有不周之处还望大家海涵。

本文版权归作者所有,转载请联系作者。