你的位置:首页 > Java教程

[Java教程]Hadoop学习2


搭建伪分布式完成之后:伪分布式安装详细介绍:http://www.powerxing.com/install-hadoop/

           练习1 编写Java程序实现以下函数

           1.向HDFS中上传文件

           2.从HDFS下载文件到本地

           3.显示文件目录

           4.移动文件

           5.新建文件夹

           6.移除文件夹

    

package cn.itcast.hadoop.hdfs;
import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.IOException;import org.apache.commons.compress.utils.IOUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.fs.FileStatus;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.LocatedFileStatus;import org.apache.hadoop.fs.Path;import org.apache.hadoop.fs.RemoteIterator;import org.junit.Before;import org.junit.Test;public class temp { static FileSystem fs = null; /* * initiation */ @Before public void init() throws IOException{ Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", "hdfs://zpfbuaa:9000/"); fs = FileSystem.get(configuration); } /* * upload files */ @Test public void upload() throws IOException{ init(); Path dstPath = new Path("hdfs://zpfbuaa:9000/aa/my.jar"); FSDataOutputStream os = fs.create(dstPath); FileInputStream is = new FileInputStream("/home/hadoop/download/my.jar"); IOUtils.copy(is, os); } /* * upload files to HDFS */ @Test public void upload2() throws IOException{ fs.copyFromLocalFile(new Path("/home/hadoop/download/my.jar"), new Path("hdfs://zpfbuaa:9000/aaa/bbb/ccc/my3.jar")); } /* * download files to local */ public void download(){ } /* * list the information of files */ @Test public void listfile() throws FileNotFoundException, IllegalArgumentException, IOException{ RemoteIterator<LocatedFileStatus> filesIterator = fs.listFiles(new Path("/"), true); while(filesIterator.hasNext()){ LocatedFileStatus fileStatus = filesIterator.next(); Path path = fileStatus.getPath(); String filename = path.getName(); System.out.println(filename); } System.out.println("---------------------------------------------"); FileStatus[] listStatus = fs.listStatus(new Path("/")); for(FileStatus status : listStatus){ String name = status.getPath().getName(); System.out.println(name + (status.isDirectory()?" is a dir":" is a file")); } } /* * make a new file */ @Test public void makdir() throws IllegalArgumentException, IOException{ fs.mkdirs(new Path("/aaa/bbb/ccc")); } /* * delete a old file */ public void rm() throws IllegalArgumentException, IOException{ fs.delete(new Path("/aaa/bbb"), true); } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub Configuration configuration = new Configuration(); configuration.set("fs.defaultFS", "hdfs://zpfbuaa:9000/"); fs = FileSystem.get(configuration); FSDataInputStream is = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz")); FileOutputStream os = new FileOutputStream("/home/hadoop/download/my.jar"); IOUtils.copy(is,os); }}

 

    练习2 编写Java程序实现客户端和服务器端的socket信息交互以及函数调用

  LoginServiceImpl.class 服务器实例类

  

package cn.itcast.hadoop.rpc;public class LoginServiceImpl implements LoginServiceInterface{  @Override  public String Login(String username, String password) {        return username + " logged in successfully!";  }    }

  LoginServiceInterface 接口类(服务器端以及本地端均实现的)

  

package cn.itcast.hadoop.rpc;public interface LoginServiceInterface {    public static final long versionID = 1L;    public String Login(String username,String password);}

     starter.class 创建服务器端类  
package cn.itcast.hadoop.rpc;import java.io.IOException;import org.apache.hadoop.HadoopIllegalArgumentException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.RPC;import org.apache.hadoop.ipc.RPC.Server;import org.apache.hadoop.ipc.RPC.Builder;public class starter {    public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {        Builder builder = new RPC.Builder(new Configuration());        builder.setBindAddress("zpfbuaa").setPort(10000).setProtocol(LoginServiceInterface.class).setInstance(new LoginServiceImpl());        Server server = builder.build();          }  }

LoginController登录类

  

package cn.itcast.hadoop.rpc;import java.io.IOException;import java.net.InetSocketAddress;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.RPC;public class LoginController {  public static void main(String[] args) throws IOException {        LoginServiceInterface proxy = RPC.getProxy(LoginServiceInterface.class, 1L, new InetSocketAddress("zpfbuaa", 10000), new Configuration());            String result = proxy.Login("zpfbuaa", "123456789");        System.out.println(result);  }}

LoginServiceInterface 接口类

  

package cn.itcast.hadoop.rpc;public interface LoginServiceInterface {    public static final long versionID = 1L;    public String Login(String username,String password);}

  需要注意的是:

  1.为了进行远程调用的模仿,将LoginServiceImpl.class以及LoginServiceInterface.class接口类和 starter.class类放在虚拟机上。本地放LoginController类以及LoginServiceInterface接口类。

  2.首先需要将服务器端的服务启动,上述例子会监听虚拟机的10000端口。

  3.容易忽略的地方:版本号versionID. 对于不同的版本拥有不同的版本号。在上述例子中简单的均定义版本号为Long类型 并且为final类型 赋值为1L。

  4.jar包的导入以及版本的控制。

  5.本地以及服务器端的函数都要实现一样的接口类,但是为了防止调用时版本的不对应,所以在Build实例的时候需要将版本号也就是versionID声明清楚,这样调用的时候可以通过版本号的不同将函数进行区别开。

Hadoop自身的远程调用实现机制RPC

  主要步骤如下:

  1.将本地socket以及接口类封装为一个proxy,生成动态本地代理实例

  2.该实例调用相对应的函数并且传入相应的参数。

  3.本地socket得到动态代理调用的函数以及传入的参数

  4.使用网络传输协议实现本地socket与远程服务器的socket进行连接,实现信息传递

  5.服务器端socket得到调用的函数以及传入的参数,生成动态服务器端的代理实例

  6.该服务器端实例调用服务器端的函数,并且传入得到的参数。

  7.函数调用结果返回给服务器端socket。

  8.服务器端socket将返回结果通过网络传输协议传递给本地socket。

  9.本地socket将返回结果传递给本地动态代理proxy。

  RPC的优点:

    1.实现了controller和implement的分离

    2.利用RPC机制可以实现信息的有效传递。

    3.保障数据的可靠性(DataNode需要定时向NameNode传递自身保存的blocks信息,以便NameNode进行blocks的维护)。

 

远程调用的底层实现机制

clip_image001

实现RPC机制:

clip_image002

查看FileSystem fs = FileSystem.get(new Configuration());

一步一步查看fs的生成过程!

clip_image003

加入断点后,逐步进行查看!

clip_image004




泰国旅游怎么样几月去泰国旅游最便宜怎样去泰国旅游7月泰国旅游泰国旅游多少钱一人2015漳浦闽台美食节有哪些特色美食? 天适从化樱花悠乐园游玩需要多久?天适樱花悠乐园一日游路线? 《奔跑吧兄弟》电影什么时候上映?有哪些演员? 天目湖水世界 水上娱乐王国 阳朔菩萨水岩网上购票?菩萨水岩怎么预约? 阳朔菩萨水岩网上订票?菩萨水岩预约价格? 佛冈羊角山团购优惠吗?清远羊角山漂流团购门票在哪买? 安徽文化之旅:一场和文化相关的旅行 宝晶宫高考生优惠活动?英德宝晶宫凭准考证半价吗? 暑假去深圳野生动物园有什么好玩的?深圳野生动物园暑期活动? 深圳海洋天地表演时间?深圳动物园海洋天地有什么表演? 深圳海洋天地在野生动物园里面吗?深圳野生动物园海洋天地门票多少钱? 中国旅游老三难:涨价太猛 景区太挤 休假扎堆 女性出境最佳旅游目的地盘点_全球最适合女性出游的五大目的地 越南越美,走近诗意西贡,记住的不只是贫穷 跟着《舌尖上的中国2》游走浙江 MG02-1206Q7RC Datasheet MG02-1206Q7RC Datasheet MG02-1206Q7YC Datasheet MG02-1206Q7YC Datasheet MG02-1206QBC Datasheet MG02-1206QBC Datasheet 世纪公园门票价格 世纪公园门票价格 世纪公园门票价格 洛阳桥在哪里 洛阳桥在哪里 洛阳桥在哪里 九寨沟淡季门票 九寨沟淡季门票 九寨沟淡季门票