Hadoop

/ Java / 0 条评论 / 951浏览
http://apache.fayea.com/hadoop/common/ https://blog.csdn.net/yongge1981/article/details/80504935 hadoop3更新 /usr/local/hadoop-3.1.1 http://hadoop.apache.org/docs/r3.1.1/ netstat -nltp| grep 19888 //0.0.0.0 JPS ![jps][2] Hadoop是一个能够对海量数据进行分布式处理的系统架构。 Hadoop框架的核心是:HDFS和MapReduce( map函数和reduce函数 ) 安装hadoop时候的配置文件例如:core-site.xml、hdfs-site.xml和mapred-site.xml等等文件里的信息 ![hdfs][3] ![map-reduce][4] MapReduce由JobTracker接收用户提交的Job,然后下发任务到各个节点上,由节点上的Task Tracker负责具体执行 1. 客户端(client):编写mapreduce程序,配置作业,提交作业,这就是程序员完成的工作; 2. JobTracker:初始化作业,分配作业,与TaskTracker通信,协调整个作业的执行; 3. TaskTracker:保持与JobTracker的通信,在分配的数据片段上执行Map或Reduce任务,TaskTracker和JobTracker的不同有个很重要的 方面,就是在执行任务时候TaskTracker可以有n多个,JobTracker则只会有一个(JobTracker只能有一个就和hdfs里namenode一样存在单点故障, 我会在后面的mapreduce的相关问题里讲到这个问题的) 4. Hdfs:保存作业的数据、配置信息等等,最后的结果也是保存在hdfs上面 ![请输入图片描述][5] ![请输入图片描述][6] // 需要在一堆扑克牌(张数未知)中统计四种花色的牌有多少张,只需要找几个人,每人给一堆,数出来四种花色的张数,然后汇总给另外一个人就可 以了。比如两个人每人一堆扑克牌,查出红桃、黑桃、梅花、方片之 后四个人,每个人只负责统计一种花色,最终将结果汇报给一个人,这是典型的map-reduce模型 // shuffle阶段又可以分为Map端的shuffle和Reduce端的shuffle。 https://blog.csdn.net/afandaafandaafanda/article/details/52828870 ![shuffle][7] Map端的shuffle会处理输入数据并产生中间结果,这个中间结果会写到本地磁盘,而不是HDFS。每个Map的输出会先写到内存缓冲区中,当写入的数 据达到设定的阈值时,系统将会启动一个线程将缓冲区的数据写到磁盘,这个过程叫做spill Reduce端的shuffle主要包括三个阶段,copy、sort(merge)和reduce。 最后就是Reduce过程了,在这个过程中产生了最终的输出结果,并将其写到HDFS上 ![请输入图片描述][8] 本机免密登录 RSAAuthentication yes PubkeyAuthentication yes PermitRootLogin no //禁止root用户登录 1.3生成密钥 输入命令 ssh-keygen -t rsa 然后一路回车即可 1.4复制到公共密钥中 cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys 1.5再次登录,即可免密匙 [root@master ~]# ssh localhost https://www.2cto.com/net/201706/651203.html maven + host + test //运行 hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar grep input output 'dfs[a-z.]+' hdfs namenode -format //格式化文件系统 问题1:but there is no HDFS_NAMENODE_USER defined. Aborting launch https://blog.csdn.net/mxfeng/article/details/72770432?locationNum=5&fps=1 vi etc/hadoop/core-site.xml fs.defaultFS hdfs://172.16.10.150:9000 vi etc/hadoop/hdfs-site.xml dfs.replication 1 hadoop fs -ls /user/test/input/ //检查hdfs文件拷贝 关闭防火墙 systemctl disable firewalld systtemctl stop firewalld 编辑/etc/selinux/config文件SELINUX=enforcing修改成SELINUX=disable http://172.16.10.150:9870 //namenode http://172.16.10.150:9864 //datenode #执行 hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.1.jar grep /user/test/input output 'dfs[a-z.]+' hdfs dfs -cat output/* sbin/start-yarn.sh http://172.16.10.150:8088/cluster ResourceManager+NodeMananger https://blog.csdn.net/xiaoduan_/article/details/79689882 配置并启动jobhistory yarn-site.xml yarn.log-aggregation-enabletrue mapred-site.xml mapreduce.jobhistory.address 172.16.10.150:10020 mapreduce.jobhistory.webapp.address 172.16.10.150:19888 mapreduce.jobhistory.intermediate-done-dir /history/done_intermediate mapreduce.jobhistory.done-dir /history/done 启动 sbin/mr-jobhistory-daemon.sh start historyserver #jobhistory ============================================= https://www.cnblogs.com/zh-yp/p/7884084.html IDEA远程调用hadoop https://github.com/steveloughran/winutils -hadoop.dll winutils.exe 2. 配置环境变量HADOOP_HOME classpath. [winutils.exe放入新建的文件夹hadoop3.1.1/bin] 3. 运行代码 POM.XML 4.0.0 com.jason testWord 1.0-SNAPSHOT UTF-8 3.1.1 junit junit 4.10 org.apache.hadoop hadoop-hdfs ${hadoop.version} org.apache.hadoop hadoop-client ${hadoop.version} org.apache.hadoop hadoop-common ${hadoop.version} org.slf4j slf4j-log4j12 1.7.2 HDFS - javaAPI操作 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.junit.Before; import org.junit.Test; import java.net.URI; import java.util.Iterator; import java.util.Map.Entry; /** * * 客户端去操作hdfs时,是有一个用户身份的 * 默认情况下,hdfs客户端api会从jvm中获取一个参数来作为自己的用户身份:-DHADOOP_USER_NAME=hadoop * * 也可以在构造客户端fs对象时,通过参数传递进去 * @author * */ public class HdfsClientDemo { FileSystem fs = null; Configuration conf = null; static String ip = "172.16.10.150"; @Before public void init() throws Exception{ conf = new Configuration(); //conf.set("fs.defaultFS", "hdfs://"+ip+":9000"); //conf.set("hadoop.home.dir", "D:/Developer/hadoop-2.8.4"); //拿到一个文件系统操作的客户端实例对象 /*fs = FileSystem.get(conf);*/ //可以直接传入 uri和用户身份 fs = FileSystem.get(new URI("hdfs://"+ip+":9000"),conf, "root"); //最后一个参数为用户名 } @Test public void testUpload() throws Exception { Thread.sleep(2000); fs.copyFromLocalFile(new Path("F:\\工作目录\\users.txt"), new Path("/input/users.txt")); fs.close(); } @Test public void listTest() throws Exception{ FileStatus[] listStatus = fs.listStatus(new Path("/")); for (FileStatus fileStatus : listStatus) { System.err.println(fileStatus.getPath()+"================="+fileStatus.toString()); } //会递归找到所有的文件 RemoteIterator listFiles = fs.listFiles(new Path("/output/"), true); while(listFiles.hasNext()){ LocatedFileStatus next = listFiles.next(); String name = next.getPath().getName(); Path path = next.getPath(); System.out.println(name + "---" + path.toString()); } } @Test public void testDownload() throws Exception { fs.copyToLocalFile(new Path("/user/test/input/users.txt"), new Path("d:/users.txt")); fs.close(); } @Test public void testConf(){ Iterator> iterator = conf.iterator(); while (iterator.hasNext()) { Entry entry = iterator.next(); System.out.println(entry.getValue() + "--" + entry.getValue());//conf加载的内容 } } /** * 创建目录 */ @Test public void makdirTest() throws Exception { boolean mkdirs = fs.mkdirs(new Path("/aaa/bbb")); System.out.println(mkdirs); } /** * 删除 */ public static void deleteDir(FileSystem fs, String dir) throws Exception{ boolean delete = fs.delete(new Path("/"+dir), true);//true, 递归删除 System.out.println(delete); } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://"+ip+":9000"); //拿到一个文件系统操作的客户端实例对象 FileSystem fs = FileSystem.get(conf); fs.copyFromLocalFile(new Path("D:/unLockPass.txt"), new Path("/input/unLockPass.txt")); fs.close(); } } ===================================WordCount================================================ ![wordcount-cmd][9] import org.apache.hadoop.fs.*; import org.apache.hadoop.io.LongWritable; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class WordCount { static String ip = "172.16.10.150"; public static class WordCountMap extends Mapper { private final IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer token = new StringTokenizer(line); while (token.hasMoreTokens()) { word.set(token.nextToken()); context.write(word, one); } } } public static class WordCountReduce extends Reducer { public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); } } public static void printDSFile(FileSystem fs, String dir){ BufferedReader in = null; String line; try { System.out.println("uri:" + fs.getUri()); RemoteIterator listFiles = fs.listFiles(new Path("/"+dir+"/"), true); while(listFiles.hasNext()){ LocatedFileStatus next = listFiles.next(); String name = next.getPath().getName(); Path path = next.getPath(); FSDataInputStream result = fs.open(path); in = new BufferedReader(new InputStreamReader(result, "UTF-8")); String out=""; while ((line = in.readLine()) != null) { out+=line+"\r\n"; } System.out.println(name + ":\r\n" + out); } }catch(IOException ex){ ex.printStackTrace(); }finally { if (in != null) { try { in.close(); }catch(IOException ex){ ex.printStackTrace(); } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://"+ip+":9000"); FileSystem fs = FileSystem.get(new URI("hdfs://"+ip+":9000"),conf, "root"); HdfsClientDemo.deleteDir(fs, "output"); Job job = new Job(conf); job.setJarByClass(WordCount.class); job.setJobName("wordcount"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(fs.getUri()+"/input/")); FileOutputFormat.setOutputPath(job, new Path(fs.getUri()+"/output/")); job.waitForCompletion(true); printDSFile(fs, "output"); /* fs.getHomeDirectory() - xxxx/user/root hadoop fs -rmdir /output hadoop fs -cat /output/* */ } } ![idea][10] [2]: http://blog.jasonsky.com.cn/upload/2018/12/pl7u33hk10i94qkngm0emd2imd.png [3]: http://blog.jasonsky.com.cn/upload/2018/12/r7gilath00g6hqt03lregtvc1t.png [4]: http://blog.jasonsky.com.cn/upload/2018/12/27mro74r0shlepvpmlpo1fjchi.png [5]: http://blog.jasonsky.com.cn/upload/2018/12/7krs3oiarmjhloumjfuv07if8l.png [6]: http://blog.jasonsky.com.cn/upload/2018/12/r3l4nfjl0mhljpke50vsbpd1b1.png [7]: http://blog.jasonsky.com.cn/upload/2018/12/rttrisvcmij0tqb3g1mvcjvlb2.png [8]: http://blog.jasonsky.com.cn/upload/2018/12/oqofvt7s38jgrq0dagp55f04ob.png [9]: http://blog.jasonsky.com.cn/upload/2018/12/3fsj9k57t8gstqgu988smckinu.png [10]: http://blog.jasonsky.com.cn/upload/2018/12/vf0kirt1gsg41rdmab26ua2fom.png