博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink分布式缓存Distributed Cache
阅读量:4664 次
发布时间:2019-06-09

本文共 1718 字,大约阅读时间需要 5 分钟。

1 分布式缓存

  • Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。
  • 此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它
  • 2 使用技巧

    • 1:注册一个文件

      env.registerCachedFile("hdfs:///path/to/your/file", "hdfsFile")
    • 2:访问数据

      File myFile = getRuntimeContext().getDistributedCache().getFile("hdfsFile");

 

  3 应用案例实战

 

3.1 在D盘创建一个文件discache.txt,并进行registerCachedFile

3.2 每一个TaskManager都会存在一份,防止MapTask重复拉取文件。

import org.apache.commons.io.FileUtilsimport org.apache.flink.api.common.functions.RichMapFunctionimport org.apache.flink.api.scala.ExecutionEnvironmentimport org.apache.flink.configuration.Configurationobject BatchDemoDisCacheScala {  def main(args: Array[String]): Unit = {    val env = ExecutionEnvironment.getExecutionEnvironment    import org.apache.flink.api.scala._    //1:注册文件    env.registerCachedFile("d:\\data\\file\\a.txt","b.txt")    val data = env.fromElements("a","b","c","d")    val result = data.map(new RichMapFunction[String,String] {      override def open(parameters: Configuration): Unit = {        super.open(parameters)        val myFile = getRuntimeContext.getDistributedCache.getFile("b.txt")        val lines = FileUtils.readLines(myFile)        val it = lines.iterator()        while (it.hasNext){          val line = it.next();          println("line:"+line)        }      }      override def map(value: String) = {        value      }    })    result.print()  }}

 

参考:

https://blog.csdn.net/shenshouniu/article/details/84499655

https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/

转载于:https://www.cnblogs.com/linkmust/p/10902050.html

你可能感兴趣的文章
生男生女预测软件,千人验证无误
查看>>
js call()和apply()方法的区别和用法详解
查看>>
Android之Service
查看>>
HDU 2795 Billboard 解题报告
查看>>
多线程——newCachedThreadPool线程池
查看>>
日志传输与清除脚本
查看>>
maven小知识点
查看>>
flash bulider 生成app无法安装在xcode模拟器上
查看>>
路由器中的PPP配置与DHCP服务器配置
查看>>
hdu3436 splaytree树模拟队列+离散化缩点
查看>>
2016弱校联盟十一专场10.2---Around the World(深搜+组合数、逆元)
查看>>
Windows下用C语言获取进程cpu使用率,内存使用,IO情况
查看>>
nandflash擦除、写操作的状态判断
查看>>
iOS照片缩略图thumbnail模糊问题
查看>>
有关使用百度编辑器Ueditor的问题
查看>>
定位决定人生成败
查看>>
ORACLE 创建新表
查看>>
在C#中获取IronPthon2.7异常时的调用方法堆栈,调试使用。
查看>>
oracle解决显示数据的层次问题--实现数据缩进
查看>>
解决Undefined symbols for architecture x86_64: 报错 和 ld: warning: ld: warning: ignoring file警告...
查看>>