在Hadoop Hive环境下生成数值型唯一ID

434 views

如题,在Hadoop Hive环境下,不借助外部组件,依赖Hadoop平台自身的能力,得到数值型的唯一ID,可以找到如下文章。

上述文章中的代码存在一处明显的bug。
对于学习Hadoop平台的原理,Hive上UDF的开发,应该没大问题,但假如要应用到生产环境,则需要先修复Bug。
问题代码如下:

@Override 
public void configure(MapredContext context) { 
    increment = context.getJobConf().getNumMapTasks(); // 问题在这里,提取增量时,只获取job的Map Task的数量。
    if(increment == 0) { 
        throw new IllegalArgumentException("mapred.map.tasks is zero"); 
    } 
     
    initID = getInitId(context.getJobConf().get("mapred.task.id"),increment); 
    if(initID == 0l) { 
        throw new IllegalArgumentException("mapred.task.id"); 
    } 
     
    System.out.println("initID : " + initID + "  increment : " + increment); 
} 


//attempt_1478926768563_0537_m_000004_0 // return 0+1 
private long getInitId (String taskAttemptIDstr,int numTasks) 
        throws IllegalArgumentException { 
    try { 
        String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR)); 
        if(parts.length == 6) { 
            if(parts[0].equals(ATTEMPT)) { 
                if(!parts[3].equals("m") && !parts[3].equals("r")) { 
                    throw new Exception(); 
                } 
                long result = Long.parseLong(parts[4]); 
                if(result >= numTasks) { //if taskid >= numtasks   // 当Reduce Task的数量超出Map Task的数量时,将进入本分支,导致代码抛出异常。
                    throw new Exception("TaskAttemptId string : " + taskAttemptIDstr 
                            + "  parse ID [" + result + "] >= numTasks[" + numTasks + "] .."); 
                } 
                return result + 1; 
            } 
        } 
    } catch (Exception e) {} 
    throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr 
            + " is not properly formed"); 
} 

修改方案很简单,获取初始ID时,依据任务类型,分别获取对应的任务数量。

@Override 
public void configure(MapredContext context) { 
     
    // 获取初始ID
    initID = getInitId(context); 
    if(initID == 0l) { 
        throw new IllegalArgumentException("mapred.task.id"); 
    } 
     
    System.out.println("initID : " + initID + "  increment : " + increment); 
} 

//attempt_1478926768563_0537_m_000004_0 // return 0+1 
private long getInitId (MapredContext context) 
        throws IllegalArgumentException { 
    String taskAttemptIDstr = context.getJobConf().get("mapred.task.id");
    increment = 0;
    try { 
        String[] parts = taskAttemptIDstr.split(Character.toString(SEPARATOR)); 
        if(parts.length == 6) { 
            if(ATTEMPT.equals(parts[0])) { 
                String taskType = parts[3];
                // 依据任务类型,使用对应的方法提取Task的数量
                if ("m".equals(taskType)) {
                    increment = context.getJobConf().getNumMapTasks(); // 获取job的Map Task的数量。
                }
                else if ("r".equals(taskType)) {
                    increment = context.getJobConf().getNumReduceTasks(); // 获取job的Reduce Task的数量。
                }
                else {
                    throw new Exception(); 
                }
                long result = Long.parseLong(parts[4]); 
                if(result >= numTasks) { //if taskid >= numtasks   
                    throw new Exception("TaskAttemptId string : " + taskAttemptIDstr 
                            + "  parse ID [" + result + "] >= numTasks[" + numTasks + "] .."); 
                } 
                return result + 1; 
            } 
        } 
    } catch (Exception e) {
        // 为便于定位问题,这里要把异常抛出去
        throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr 
                + " is not properly formed", e); 
    } 
    throw new IllegalArgumentException("TaskAttemptId string : " + taskAttemptIDstr 
            + " is not properly formed"); 
} 


若非注明,均为原创,欢迎转载,转载请注明来源:在Hadoop Hive环境下生成数值型唯一ID

关于 JackieAtHome

基层程序员,八年之后重新启航

此条目发表在 Hive, 大数据 分类目录,贴了 , 标签。将固定链接加入收藏夹。

发表评论

电子邮件地址不会被公开。 必填项已用*标注

Protected with IP Blacklist CloudIP Blacklist Cloud