1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183
| package cn.edata.StageTest;
import cn.edata.Lineage.LineageLogger; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveVariableSource; import org.apache.hadoop.hive.conf.VariableSubstitution; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.QueryDisplay; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.hooks.WriteEntity; import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.optimizer.lineage.LineageCtx; import org.apache.hadoop.hive.ql.parse.*; import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.processors.SetProcessor; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import javax.sql.CommonDataSource; import java.io.IOException; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map;
/** * 改写hive本身编译过程,获取字段依赖关系 */ public class LineageInfo { private static final String KEY_ADD = "add "; private static final String KEY_DROP = "drop "; private static final String KEY_SET = "set "; private static final String KEY_FUNCTION = "create temporary function "; private static final SetProcessor setProcessor = new SetProcessor(); static Logger LOG = LoggerFactory.getLogger("LineageInfo");
private static final LineageLogger lineageLogger = new LineageLogger();
public void getLineageInfo(HiveConf conf, String filePath, boolean testTableDenpendency) throws LockException, IOException, ParseException, SemanticException { SessionState ss = SessionState.start(conf); ss.initTxnMgr(conf);
System.out.println("filePath: " + filePath); // TODO: 2020/8/3 后续添加对直接读取文件的支持 String command2 = "select * from model_ennenergy_ccs.a_md_ccs_common_h limit 2"; List<String> commandList = new LinkedList<>(); commandList.add("use model_ennenergy_ccs"); commandList.add("select * from a_md_ccs_common_h limit 2"); for(String command : commandList){ String lowerSql = command.toLowerCase(); // add / drop 无需解析 if (lowerSql.startsWith(KEY_ADD) || lowerSql.startsWith(KEY_DROP)) { continue; } // 设置参数 if (lowerSql.startsWith(KEY_SET)) { setProcessor.run(command.substring(KEY_SET.length())); continue; } command = new VariableSubstitution(new HiveVariableSource() { @Override public Map<String, String> getHiveVariable() { return ss.getHiveVariables(); } }).substitute(conf, command); Context ctx = new Context(conf); ctx.setCmd(command); System.out.println("ctx: " + ctx); ASTNode tree; try { ParseDriver pd = new ParseDriver(); tree = pd.parse(command, ctx); tree = ParseUtils.findRootNonNullToken(tree); } catch (ParseException e) { throw e; } System.out.println("tree: " + tree);
// 切换数据库 if (tree.getToken().getType() == HiveParser.TOK_SWITCHDATABASE) { ss.setCurrentDatabase(tree.getChild(0).getText()); continue; }
ss.setupQueryCurrentTimestamp(); System.out.println("ss: " + ss); // 6.2.0 版本 // QueryState queryState = new QueryState(conf); // System.out.println("queryState: " + queryState); // BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree); BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(conf, tree); sem.analyze(tree, ctx); sem.validate(); System.out.println("sem: " + sem);
Schema schema = getSchame(sem, conf); // System.out.println("schema: " + schema); // 查询计划 // QueryDisplay queryDisplay = new QueryDisplay(); // queryDisplay.setQueryStr(command); // queryDisplay.setQueryId(QueryPlan.makeQueryId()); // QueryPlan queryPlan = new QueryPlan(command, sem, 0L, QueryPlan.makeQueryId(), SessionState.get().getHiveOperation(), schema, queryDisplay); // System.out.println("queryPlan: " + queryPlan); List<FieldSchema> fieldSchemas = schema.getFieldSchemas(); // System.out.println("fieldSchemas: " + fieldSchemas);
// 部分语句不能完整的分析出schema // 例:ALTER TABLE model_icome_cheme.cheme_icome_kpi_month_h SET // TBLPROPERTIES('comment' = '化工月指标') // 可以针对 alter 开头的语句进行过滤,无需解析 if (fieldSchemas == null) { continue; } HashSet<WriteEntity> outputs = sem.getOutputs(); System.out.println("outputs: " + outputs);
// 字段血缘分析信息输出 LineageCtx.Index index = ss.getLineageState().getIndex(); // 6.2.0 版本 // LineageCtx.Index index = queryState.getLineageState().getIndex(); System.out.println("index: " + index);
String result = lineageLogger.getJsonString(command, fieldSchemas, outputs, index); System.out.println("result: " + result);
if(testTableDenpendency){ DependencyInfo dependencyInfo = new DependencyInfo(); dependencyInfo.getDependencyInfo(result); } }
}
// 拿过来Driver类下的方法 private Schema getSchame(BaseSemanticAnalyzer sem, HiveConf conf) { Schema schema = null; if (sem != null) { if (sem.getResultSchema() != null) { List<FieldSchema> lst = sem.getResultSchema(); schema = new Schema(lst, (Map) null); } else if (sem.getFetchTask() != null) { FetchTask ft = sem.getFetchTask(); TableDesc td = ft.getTblDesc(); if (td == null && ft.getWork() != null && ((FetchWork) ft.getWork()).getPartDesc() != null && ((FetchWork) ft.getWork()).getPartDesc().size() > 0) { td = ((PartitionDesc) ((FetchWork) ft.getWork()).getPartDesc().get(0)).getTableDesc(); }
if (td != null) { String tableName = "result"; List lst = null;
try { lst = MetaStoreUtils.getFieldsFromDeserializer(tableName, td.getDeserializer(conf)); } catch (Exception e) { System.out.println("Error getting schema: " + StringUtils.stringifyException(e)); }
if (lst != null) { schema = new Schema(lst, (Map) null); } } } } if (schema == null) { schema = new Schema(); } // System.out.println("Returning Hive schema: " + schema); return schema; } }
|