配色: 字号:
kettle集成应用之---java调用执行transformation和job
2012-11-01 | 阅:  转:  |  分享 
  
本文介绍如何在java应用程序中调用执行transformation和job。

(一)起步,配置资源库和数据库连接

运行Spoon.bat,启动登录界面,如下图所示:



第一次运行时,Repository为空,需要创建Repository。

(什么是Repository?Repository即资源库,是kettle用于存储元数据的多张数据表,在资源库模式下设计的transformation和job都被存储在这些数据表中。)



如果点击界面上的“Norepository”,可以在无资源库模式下进行设计,设计的对象最终以xml文件的形式存储到本地目录。



点击“New”配置新的Repository,点击“Edit”编辑现有的Repository,点击“Delete”删除现有的Repository。



资源库配置:



配置Repository需要先配置DatabaseConnection(数据库连接)。

(什么是DatabaseConnection?简单的来说,就是在数据库中分配一个空间存储资源库的元数据表,以Oracle为例,就是分配一个用户给kettle,具体配置见数据库连接配置)



配置好DatabaseConnection后,填写Name和Description,点击“CreateorUpgrade”按钮,就可以将kettle的元数据表创建到你指定的数据库中。



RepositoryName:kettle

默认的用户名和密码都是admin(登录后可以进行修改)





数据库连接配置:



用于存储Repository元数据。



配置信息如下:

ConnectionName:merit113(名称,自定义)

ConnectionType:Oracle(如果数据库是Oracle)

Access:Native(选这个即可)

HostName:localhost(主机名)

DatabaseName:merit113(数据库SID)

PortNumber:1521(端口)

UserName:kettle(用户名)

Password:merit(密码)

配置结束,可以点击界面上的“Test”进行测试。



全部配置结束后,在登录界面中,选择配置的Repository,填上登录名和密码(初始均为admin),就可以以Repository模式进入kettle设计界面,在菜单栏的“Repository”中可以连接和断开Repository,可以查看Repository中的transformation和job,管理用户和编辑当前用户信息。







(二)在应用程序中集成kettle

设计好transformation和job后,如何在java里面调用执行呢?



首先,需要在项目中引入执行kettle所需要的jar包:



除了系统jre之外,以上红色标记的外部jar包(可以在kettle对应的目录下找到)都是必须引入的资源,否则,程序不能正常运行;我自己的做的例子中需要写xls文件,所以用到了jxl.jar。



另外,将kettle目录下plugins目录,将其拷贝到你的应用程序根目录下,这一点很重要,否则在用transformation的时候可能会出现以下异常信息:

org.pentaho.di.core.exception.KettleException:

一个数据库错误发生在从资源库文件读取转换时



Unabletoloadclassforstep/pluginwithid[DummyPlugin

].CheckifthepluginisavailableinthepluginssubdirectoryoftheKettledistribution.

调用资源库中的transformation

示例代码如下: /

调用trans



@throwsKettleException

/

publicvoidexecuteTrans()throwsKettleException{

//初始化

EnvUtil.environmentInit();

StepLoader.init();



//日志

LogWriterlog=LogWriter.getInstance("TransTest.log",true,

LogWriter.LOG_LEVEL_DETAILED);



//用户

UserInfouserInfo=newUserInfo();

userInfo.setLogin("admin");

userInfo.setPassword("admin");



//数据库连接元对象

DatabaseMetaconnection=newDatabaseMeta("merit113","Oracle",

"Native","localhost","merit113","1521","kettle","merit");



//资源库元对象

RepositoryMetarepinfo=newRepositoryMeta();

repinfo.setConnection(connection);



//资源库

Repositoryrep=newRepository(log,repinfo,userInfo);



//连接资源库

rep.connect("");



//资源库目录对象

RepositoryDirectorydir=newRepositoryDirectory(rep);



//转换元对象

TransMetatransMeta=newTransMeta(rep,"code_trans",dir);



//转换

Transtrans=newTrans(transMeta);



//执行转换

trans.execute(null);

//等待转换执行结束

trans.waitUntilFinished();



}



调用资源库中的job

示例代码如下: /

调用job



@throwsKettleException

/

publicvoidexecuteJobs()throwsKettleException{

//初始化

EnvUtil.environmentInit();

JobEntryLoader.init();

StepLoader.init();



//日志

LogWriterlog=LogWriter.getInstance("TransTest.log",true,

LogWriter.LOG_LEVEL_DETAILED);



//用户

UserInfouserInfo=newUserInfo();

userInfo.setLogin("admin");

userInfo.setPassword("admin");



//数据库连接元对象

DatabaseMetaconnection=newDatabaseMeta("merit113","Oracle",

"Native","localhost","merit113","1521","kettle","merit");



//资源库元对象

RepositoryMetarepinfo=newRepositoryMeta();

repinfo.setConnection(connection);



//资源库

Repositoryrep=newRepository(log,repinfo,userInfo);



//连接资源库

rep.connect("");



//资源库目录对象

RepositoryDirectorydir=newRepositoryDirectory(rep);



//步骤加载对象

StepLoadersteploader=StepLoader.getInstance();



//job元对象

JobMetajobmeta=newJobMeta(log,rep,"job1",dir);



//job

Jobjob=newJob(log,steploader,rep,jobmeta);



//执行job

job.execute();



//等待job执行结束

job.waitUntilFinished();



}



调用本地的transformation

示例代码如下: /

调用本地trans



@paramtransFileName

trans文件路径

@throwsKettleException

/

publicvoidcallNativeTrans(StringtransFileName)throwsKettleException{



//初始化

EnvUtil.environmentInit();

StepLoader.init();



//转换元对象

TransMetatransMeta=newTransMeta(transFileName);



//转换

Transtrans=newTrans(transMeta);



//执行转换

trans.execute(null);



//等待转换执行结束

trans.waitUntilFinished();



}

调用本地的job

示例代码如下: /

调用本地job



@paramjobFileName

job文件路径

@throwsKettleException

/

publicvoidcallNativeJob(StringjobFileName)throwsKettleException{



//初始化

EnvUtil.environmentInit();

JobEntryLoader.init();

StepLoader.init();



//日志

LogWriterlog=LogWriter.getInstance("TransTest.log",true,

LogWriter.LOG_LEVEL_DETAILED);



//job元对象

JobMetajobMeta=newJobMeta(log,jobFileName,null);



//job

Jobjob=newJob(log,StepLoader.getInstance(),null,jobMeta);



jobMeta.setInternalKettleVariables(job);



//执行job

job.execute();



//等待job执行结束

job.waitUntilFinished();

}

献花(0)
+1
(本文系hehffyy首藏)