血缘分析

血缘分析

1. LineageLogger

首先需要对 org.apache.hadoop.hive.ql.hooks.LineageLogger 这个类进行改写,原类会把 lineage 字段依赖信息打印到日志里去,但是现在我们需要把 lineage 信息直接 return 回来。

1
2
3
4
5
6
7
8
9
10
String lineage = out.toString();
if (testMode) {
log(lineage);
} else {
LOG.info(lineage);
}

=>
String lineage = out.toString();
return lineage;

2. 添加 hook

Hive 提供了多个 hook 给开发者调用,对于字段分析来说,需要在 conf 里添加 hConf.set("hive.exec.post.hooks", "org.apache.hadoop.hive.ql.hooks.LineageLogger")

3. LineageInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package cn.edata.StageTest;

import cn.edata.Lineage.LineageLogger;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveVariableSource;
import org.apache.hadoop.hive.conf.VariableSubstitution;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Schema;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryDisplay;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FetchTask;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx;
import org.apache.hadoop.hive.ql.parse.*;
import org.apache.hadoop.hive.ql.plan.FetchWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.processors.SetProcessor;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.sql.CommonDataSource;
import java.io.IOException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

/**
* 改写hive本身编译过程,获取字段依赖关系
*/
public class LineageInfo {
private static final String KEY_ADD = "add ";
private static final String KEY_DROP = "drop ";
private static final String KEY_SET = "set ";
private static final String KEY_FUNCTION = "create temporary function ";
private static final SetProcessor setProcessor = new SetProcessor();
static Logger LOG = LoggerFactory.getLogger("LineageInfo");

private static final LineageLogger lineageLogger = new LineageLogger();

public void getLineageInfo(HiveConf conf, String filePath, boolean testTableDenpendency) throws LockException, IOException, ParseException, SemanticException {
SessionState ss = SessionState.start(conf);
ss.initTxnMgr(conf);

System.out.println("filePath: " + filePath);
// TODO: 2020/8/3 后续添加对直接读取文件的支持
String command2 = "select * from model_ennenergy_ccs.a_md_ccs_common_h limit 2";
List<String> commandList = new LinkedList<>();
commandList.add("use model_ennenergy_ccs");
commandList.add("select * from a_md_ccs_common_h limit 2");

for(String command : commandList){
String lowerSql = command.toLowerCase();
// add / drop 无需解析
if (lowerSql.startsWith(KEY_ADD) || lowerSql.startsWith(KEY_DROP)) {
continue;
}
// 设置参数
if (lowerSql.startsWith(KEY_SET)) {
setProcessor.run(command.substring(KEY_SET.length()));
continue;
}

command = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return ss.getHiveVariables();
}
}).substitute(conf, command);
Context ctx = new Context(conf);
ctx.setCmd(command);
System.out.println("ctx: " + ctx);
ASTNode tree;
try {
ParseDriver pd = new ParseDriver();
tree = pd.parse(command, ctx);
tree = ParseUtils.findRootNonNullToken(tree);
} catch (ParseException e) {
throw e;
}
System.out.println("tree: " + tree);

// 切换数据库
if (tree.getToken().getType() == HiveParser.TOK_SWITCHDATABASE) {
ss.setCurrentDatabase(tree.getChild(0).getText());
continue;
}

ss.setupQueryCurrentTimestamp();
System.out.println("ss: " + ss);
// 6.2.0 版本
// QueryState queryState = new QueryState(conf);
// System.out.println("queryState: " + queryState);
// BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree);
sem.analyze(tree, ctx);
sem.validate();
System.out.println("sem: " + sem);

Schema schema = getSchame(sem, conf);
// System.out.println("schema: " + schema);

// 查询计划
// QueryDisplay queryDisplay = new QueryDisplay();
// queryDisplay.setQueryStr(command);
// queryDisplay.setQueryId(QueryPlan.makeQueryId());
// QueryPlan queryPlan = new QueryPlan(command, sem, 0L, QueryPlan.makeQueryId(), SessionState.get().getHiveOperation(), schema, queryDisplay);
// System.out.println("queryPlan: " + queryPlan);

List<FieldSchema> fieldSchemas = schema.getFieldSchemas();
// System.out.println("fieldSchemas: " + fieldSchemas);

// 部分语句不能完整的分析出schema
// 例:ALTER TABLE model_icome_cheme.cheme_icome_kpi_month_h SET
// TBLPROPERTIES('comment' = '化工月指标')
// 可以针对 alter 开头的语句进行过滤,无需解析
if (fieldSchemas == null) {
continue;
}

HashSet<WriteEntity> outputs = sem.getOutputs();
System.out.println("outputs: " + outputs);

// 字段血缘分析信息输出
LineageCtx.Index index = ss.getLineageState().getIndex();
// 6.2.0 版本
// LineageCtx.Index index = queryState.getLineageState().getIndex();
System.out.println("index: " + index);

String result = lineageLogger.getJsonString(command, fieldSchemas, outputs, index);
System.out.println("result: " + result);

if(testTableDenpendency){
DependencyInfo dependencyInfo = new DependencyInfo();
dependencyInfo.getDependencyInfo(result);
}
}

}

// 拿过来Driver类下的方法
private Schema getSchame(BaseSemanticAnalyzer sem, HiveConf conf) {
Schema schema = null;
if (sem != null) {
if (sem.getResultSchema() != null) {
List<FieldSchema> lst = sem.getResultSchema();
schema = new Schema(lst, (Map) null);
} else if (sem.getFetchTask() != null) {
FetchTask ft = sem.getFetchTask();
TableDesc td = ft.getTblDesc();
if (td == null && ft.getWork() != null && ((FetchWork) ft.getWork()).getPartDesc() != null && ((FetchWork) ft.getWork()).getPartDesc().size() > 0) {
td = ((PartitionDesc) ((FetchWork) ft.getWork()).getPartDesc().get(0)).getTableDesc();
}

if (td != null) {
String tableName = "result";
List lst = null;

try {
lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(conf));
} catch (Exception e) {
System.out.println("Error getting schema: " + StringUtils.stringifyException(e));
}

if (lst != null) {
schema = new Schema(lst, (Map) null);
}
}
}
}
if (schema == null) {
schema = new Schema();
}
// System.out.println("Returning Hive schema: " + schema);
return schema;
}
}

4. TableDependency

除了字段的血缘分析,还可以进一步去获取表之间的依赖关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.edata.StageTest;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;

import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* 拿到返回的字段依赖信息,解析数据,获取表之间的依赖关系
*/
public class DependencyInfo {
public void getDependencyInfo(String lineageInfo){
JSONObject result = JSONObject.parseObject(lineageInfo);
System.out.println("#########");

JSONArray verticesArray = JSONArray.parseArray(result.getString("vertices"));

HashSet<String> modelTables = new HashSet<>();
HashSet<String> originTables = new HashSet<>();

verticesArray.forEach(data->{
JSONObject tmp = JSONObject.parseObject(data.toString());
String vertextId = tmp.getString("vertexId");
if(vertextId.startsWith("model"))
modelTables.add(vertextId.split("\\.")[0]+"."+vertextId.split("\\.")[1]);
if(vertextId.startsWith("origin"))
originTables.add(vertextId);
});

System.out.println("####");
System.out.println("modelTables: " + modelTables.toString());
System.out.println("originTables: " + originTables.toString());


}

}

5. pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
<properties>
<!-- 查询-->
<!-- <cdh.hadoop.version>3.0.0-cdh6.2.0</cdh.hadoop.version>-->
<!-- <cdh.hive.version>2.1.1-cdh6.2.0</cdh.hive.version>-->

<cdh.hadoop.version>2.6.0-cdh5.14.4</cdh.hadoop.version>
<cdh.hive.version>1.1.0-cdh5.14.4</cdh.hive.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.4</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${cdh.hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${cdh.hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${cdh.hive.version}</version>
<exclusions>
<exclusion>
<artifactId>
hadoop-yarn-server-resourcemanager
</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<!-- <exclusion>-->
<!-- <artifactId>gson</artifactId>-->
<!-- <groupId>com.google.code.gson</groupId>-->
<!-- </exclusion>-->
<exclusion>
<artifactId>hive-shims</artifactId>
<groupId>org.apache.hive</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${cdh.hive.version}</version>
<exclusions>
<exclusion>
<artifactId>hadoop-common</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>hbase-server</artifactId>
<groupId>org.apache.hbase</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-compiler</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>jasper-runtime</artifactId>
<groupId>tomcat</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-hdfs</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-rewrite</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-server</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-runner</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-servlet</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>jetty-webapp</artifactId>
<groupId>org.eclipse.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>hadoop-yarn-registry</artifactId>
<groupId>org.apache.hadoop</groupId>
</exclusion>
<exclusion>
<artifactId>jetty</artifactId>
<groupId>org.mortbay.jetty</groupId>
</exclusion>
<exclusion>
<artifactId>hbase-hadoop2-compat</artifactId>
<groupId>org.apache.hbase</groupId>
</exclusion>
<exclusion>
<artifactId>gson</artifactId>
<groupId>com.google.code.gson</groupId>
</exclusion>
<exclusion>
<artifactId>hive-shims-0.23</artifactId>
<groupId>org.apache.hive.shims</groupId>
</exclusion>
<exclusion>
<artifactId>hive-common</artifactId>
<groupId>org.apache.hive</groupId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${cdh.hadoop.version}</version>
</dependency>

</dependencies>