对于单节点,Flink开箱即用,无需更改默认配置即可开始使用。
开箱即用的配置将使用的默认Java安装。可以手动设置环境变量JAVA_HOME或配置项conf/flink-conf.yaml中的env.java.home动态指定。
本文不仅列出安装通常所需的最常用选项和用途。此外,还列出了所有可用配置参数的完整列表。方便大家学习和查找。
常见选项
· env.java.home:要使用的Java安装的路径(DEFAULT:系统的默认Java安装,如果找到)。如果启动脚本无法自动解析java主目录,则需要指定。可以指定指向特定的java安装或版本。如果未指定此选项,则启动脚本还会评估$JAVA_HOME环境变量。
· env.java.opts:设置自定义JVM选项。Flink的启动脚本(JobManager和TaskManager以及Flink的YARN客户端)都遵循此值。这可用于设置不同的垃圾收集器或将远程调试器包含在运行Flink服务的JVM中。用双引号括起选项会延迟参数替换,从而允许从Flink的启动脚本访问变量。分别使用env.java.opts.jobmanager和env.java.opts.taskmanager 用于JobManager或特定于TaskManager的选项。
· env.java.opts.jobmanager:JobManager的JVM大小
· env.java.opts.taskmanager:TaskManager的JVM大小
· jobmanager.rpc.address:JobManager的外部地址,它是分布式系统的主/协调器(DEFAULT:localhost)。注意:包括客户端在内的所有节点都应该可以访问地址(主机名或IP)。
· jobmanager.rpc.port:JobManager的端口号(默认值:6123)。
· jobmanager.heap.mb:JobManager的JVM堆大小(以兆字节为单位)。如果正在运行非常大的应用程序(具有许多运算符),或者保留了很长的历史记录,则可能必须增加JobManager的堆大小。
· taskmanager.heap.mb:TaskManagers的JVM堆大小(以兆字节为单位),它们是系统的并行工作者。与此相反的Hadoop,Flink运行运营商(例如,连接,聚集体)和用户定义的功能(例如,地图,减少,协同组)的任务管理内部(包括排序/散列/高速缓存),所以此值应尽可能地大。如果群集专门运行Flink,则每台计算机的可用内存总量减去操作系统的某些内存(可能为1-2 GB)是一个很好的值。在YARN设置中,此值自动配置为TaskManager的YARN容器的大小,减去一定的容差值。
· taskmanager.numberOfTaskSlots:单个TaskManager可以运行的并行操作符或用户函数实例的数量(默认值:1)。如果此值大于1,则单个TaskManager将获取函数或运算符的多个实例。这样,TaskManager可以使用多个CPU内核,但同时,可用内存在不同的运算符或函数实例之间划分。此值通常与TaskManager的计算机具有的物理CPU核心数成比例(例如,等于核心数,或核心数的一半)。
· parallelism.default:用于没有指定并行性的程序的默认并行度。(默认:1)。对于没有并行作业运行的设置,将此值设置为NumTaskManagers * NumslotsPerTaskManager将使系统使用所有可用的执行资源来执行程序。注意:默认的并行性可以通过调用会自动覆盖了整个作业setParallelism(int parallelism)上ExecutionEnvironment或通过传递-p <parallelism>到Flink命令行前端。可以通过调用setParallelism(int parallelism)运算符来覆盖单个转换。
· fs.default-scheme:要使用的默认文件系统方案,具有必要的联系权限,例如,在HDFS的情况下,NameNode的host:端口(如果需要)。默认情况下,将file:///其设置为指向本地文件系统的位置。这意味着本地文件系统将用于搜索用户指定的文件,而无需显式方案定义。作为另一个示例,如果将其设置为hdfs://localhost:9000/,则将/user/USERNAME/in.txt转换为 没有显式方案定义的用户指定的文件路径,例如hdfs://localhost:9000/user/USERNAME/in.txt。如果在用户提供的(明确地)未指定其他方案,则仅使用此方案URI。
· fs.hdfs.hadoopconf:Hadoop文件系统(HDFS)配置目录的绝对路径(可选值)。指定此值允许程序使用短URI引用HDFS文件(hdfs:///path/to/files不包括文件URI中NameNode的地址和端口)。如果没有此选项,则可以访问HDFS文件,但需要完全限定的URI hdfs://address:port/path/to/files。此选项还会导致文件编写者获取HDFS的块大小和复制因子的默认值。Flink将在指定目录中查找"core-site.xml"和"hdfs-site.xml"文件。
高级选项
计算
· taskmanager.compute.numa:启用时,会在每个NUMA节点上为conf / slaves中列出的每个worker启动TaskManager (DEFAULT:false)。注意:仅在将Flink部署为独立群集时受支持。
管理内存
默认情况下,Flink 为其托管内存分配0.7通过配置的总内存的一小部分taskmanager.heap.mb。托管内存有助于Flink高效运行批处理运算符。它会阻止OutOfMemoryExceptions,因为Flink知道它可以用来执行操作的内存量。如果Flink耗尽托管内存,则会占用磁盘空间。使用托管内存,可以直接对原始数据执行某些操作,而无需反序列化数据以将其转换为Java对象。总而言之,托管内存可提高系统的稳健性和速度。
可以使用taskmanager.memory.fraction参数调整托管内存的默认分数。可以使用taskmanager.memory.size(覆盖分数参数)来设置绝对值。如果需要,可以在JVM堆外部分配托管内存。这可以提高具有大内存大小的设置的性能。
· taskmanager.memory.size:任务管理器在堆上或堆外(取决于taskmanager.memory.off-heap)排序,哈希表和中间结果缓存的内存量(以兆字节为单位)。如果未指定(-1),则内存管理器将根据指定的任务管理器JVM的大小采用固定比率taskmanager.memory.fraction。(默认:-1)
· taskmanager.memory.fraction:taskmanager.heap.mb任务管理器为排序,哈希表和中间结果的缓存保留的相对内存量(相对于)。例如,值0.8表示任务管理器taskmanager.memory.off-heap为内部数据缓冲区保留80%的内存(堆上或堆外),为用户创建的对象留下20%的可用内存用于任务管理器堆定义的功能。(默认值:0.7)仅taskmanager.memory.size在未设置此参数的情况下评估此参数。
· taskmanager.memory.off-heap:如果设置为true,则任务管理器分配用于排序,散列表以及在JVM堆外部缓存中间结果的内存。对于具有大量内存的设置,这可以提高对内存执行的操作的效率(DEFAULT:false)。
· taskmanager.memory.segment-size:内存管理器和网络堆栈使用的内存缓冲区的大小(以字节为单位)(默认值:32768(= 32 KiBytes))。
· taskmanager.memory.preallocate:可以是true或者false。指定任务管理器是否应在启动时分配所有托管内存。(默认:错误)。当taskmanager.memory.off-heap设置为true,则建议此配置也设置为true。如果将此配置设置为false清除已分配的offheap内存,则仅在通过触发完整GC达到配置的JVM参数MaxDirectMemorySize时才会发生。注意:对于流设置,我们强烈建议将此值设置false为核心状态,后端当前不使用托管内存。
内存和性能调试
这些选项对于调试Flink应用程序的内存和垃圾收集相关问题很有用,例如性能和内存不足的进程终止或异常。
· taskmanager.debug.memory.startLogThread:使TaskManagers定期记录内存和垃圾收集统计信息。统计信息包括堆内存池的当前堆,堆外和其他内存池利用率以及垃圾回收所花费的时间。
· taskmanager.debug.memory.logIntervalMs:TaskManagers记录内存和垃圾回收统计信息的间隔(以毫秒为单位)。只有效果,如果taskmanager.debug.memory.startLogThread设置为true。
基于Kerberos的安全性
Flink支持以下服务的Kerberos身份验证:
· Hadoop组件,例如HDFS,YARN或HBase (2.6.1及更高版本;所有其他版本都有严重错误,可能会意外地使Flink作业失败)。
· Kafka连接器(0.9 及以上版本)。
为Kerberos安全配置Flink涉及三个方面,在以下小节中单独说明。
1.为集群提供Kerberos凭证(即密钥表或票证通过kinit)
要为群集提供Kerberos凭据,Flink支持使用Kerberos keytab文件或由其管理的票证缓存kinit。
· security.kerberos.login.use-ticket-cache:指示是否从Kerberos票证缓存中读取(默认值:) true。
· security.kerberos.login.keytab:包含用户凭据的Kerberos密钥表文件的绝对路径。
· security.kerberos.login.principal:与keytab关联的Kerberos主体名称。
如果同时security.kerberos.login.keytab与security.kerberos.login.principal已经提供的值,keytabs将被用于验证。对于长时间运行的作业,最好使用keytabs,以避免票证到期问题。如果更喜欢使用票证缓存,请与管理员讨论增加Hadoop委派令牌生存期的问题。
请注意,仅在将Flink作为独立群集或在YARN上部署时,才支持使用票证缓存进行身份验证。
2.根据需要使组件和连接器可以使用Kerberos凭据
对于Hadoop组件,Flink将根据是否启用Hadoop安全性(in core-site.xml),在连接到HDFS,HBase和其他Hadoop组件时自动检测是否应使用配置的Kerberos凭据。
对于使用JAAS配置文件的任何连接器或组件,通过使用以下配置分别为每个配置JAAS登录上下文来为其提供Kerberos凭据:
· security.kerberos.login.contexts:以逗号分隔的登录上下文列表,用于提供Kerberos凭据(例如,Client,KafkaClient使用凭据进行ZooKeeper身份验证和Kafka身份验证)。
这允许独立地为不同的连接器或组件启用Kerberos身份验证。例如,可以启用Hadoop安全性,而无需为ZooKeeper使用Kerberos,反之亦然。
还可以使用描述的机制提供静态JAAS配置文件,其条目将覆盖上述配置选项生成的那些。
3.配置组件和/或连接器以使用Kerberos身份验证
最后,请务必根据需要在Flink程序或组件中配置连接器以使用Kerberos身份验证。
下面是Flink for Kerberos身份验证当前支持的第一类连接器或组件的列表:
· Kafka:有关配置Kafka连接器以使用Kerberos身份验证的详细信息。
· Zookeeper(用于HA):有关Zookeeper安全配置的详细信息,请参阅以使用此处提到的基于Kerberos的安全配置。
其他
· taskmanager.tmp.dirs:临时文件的目录,或由系统的目录定界符分隔的目录列表(例如Linux / Unix上的':'(冒号))。如果指定了多个目录,则临时文件将以循环方式分布在目录中。I / O管理器组件将为每个目录生成一个读取和一个写入线程。可以多次列出目录以使I / O管理器为其使用多个线程(例如,如果它物理存储在非常快的磁盘或RAID上)(默认值:系统的tmp目录)。
· taskmanager.log.path:config参数,用于定义taskmanager日志文件位置
· jobmanager.web.address:JobManager的Web界面的地址(DEFAULT:anyLocalAddress())。
· jobmanager.web.port:JobManager的Web界面的端口(默认值:8081)。
· jobmanager.web.tmpdir:此配置参数允许定义Web界面要使用的Flink Web目录。Web界面将其静态文件复制到目录中。如果未覆盖,上载的作业jar也会存储在目录中。默认情况下,使用临时目录。
· jobmanager.web.upload.dir:config参数,用于定义上载作业jar的目录。如果未指定,将在jobmanager.web.tmpdir指定的目录下使用动态目录。
· fs.overwrite-files:指定默认情况下文件输出编写器是否应覆盖现有文件。默认情况下设置为true以覆盖,否则设置为false。(默认:错误)
· fs.output.always-create-directory:以大于1的并行度运行的文件编写器为输出文件路径创建目录,并将不同的结果文件(每个并行编写器任务一个)放入该目录中。如果此选项设置为true,则并行度为1的编写器也将创建一个目录并将单个结果文件放入其中。如果该选项设置为false,则编写器将直接在输出路径上直接创建文件,而不创建包含目录。(默认:错误)
· taskmanager.network.numberOfBuffers:网络堆栈可用的缓冲区数。此数字决定了TaskManager可以同时拥有多少个流数据交换通道以及通道缓冲的程度。如果作业被拒绝或者收到系统没有足够缓冲区可用的警告,请增加该值(默认值:2048)。
· state.backend:如果启用了检查点,则将用于存储运营商状态检查点的后端。支持的后端:
· jobmanager:内存状态,备份到JobManager的/ ZooKeeper的内存。应该只用于最小状态(Kafka偏移)或测试和本地调试。
· filesystem:状态是TaskManagers的内存,状态快照存储在文件系统中。支持Flink支持的所有文件系统,例如HDFS,S3,......
· state.backend.fs.checkpointdir:用于在Flink支持的文件系统中存储检查点的目录。注意:必须可以从JobManager访问状态后端,file://仅用于本地设置。
· state.backend.rocksdb.checkpointdir:用于存储RocksDB文件的本地目录,或由系统目录分隔符分隔的目录列表(例如Linux / Unix上的':'(冒号))。(DEFAULT值是taskmanager.tmp.dirs)
· state.checkpoints.dir:元数据的目标目录。
· state.checkpoints.num-retained:要保留的已完成检查点实例的数量。如果最新检查点已损坏,则拥有多个允许恢复回退到较早的检查点。(默认值:1)
· high-availability.zookeeper.storageDir:HA必需。用于存储JobManager元数据的目录; 这在状态后端持久存在,只有指向此状态的指针存储在ZooKeeper中。与检查点目录完全相同,它必须可以从JobManager访问,本地文件系统应该只用于本地部署。以前这个密钥被命名了recovery.zookeeper.storageDir。
· blob.storage.directory:用于在TaskManagers上存储blob(例如用户JAR)的目录。
· blob.server.port:TaskManagers上blob服务器(服务用户JAR)的端口定义。默认情况下,端口设置为0,这意味着操作系统正在选择一个临时端口。Flink还接受端口列表("50100,50101"),范围("50100-50200")或两者的组合。当多台JobManagers在同一台机器上运行时,建议设置一系列端口以避免冲突。
· blob.service.ssl.enabled:标记为blob客户端/服务器通信启用ssl。仅当全局ssl标志security.ssl.enabled设置为true(DEFAULT:true)时,这才适用。
· restart-strategy:如果没有为作业指定,则使用默认。选项是:
· 固定延迟策略:fixed-delay。
· 失败率策略:failure-rate。
· 没有重启: none
默认值是none,除非检查点是为在这种情况下,默认的是作业启用fixed-delay与Integer.MAX_VALUE重启尝试和10s延迟。
· restart-strategy.fixed-delay.attempts:重启尝试次数,如果默认重启策略设置为"fixed-delay",则使用该次数。默认值为1,除非通过启用检查点激活"固定延迟",在这种情况下默认值为Integer.MAX_VALUE。
· restart-strategy.fixed-delay.delay:重新启动尝试之间的延迟,如果默认重启策略设置为"fixed-delay",则使用此延迟。默认值为Akka.ask.timeout,除非通过启用检查点激活"固定延迟",在这种情况下默认值为10秒。
· restart-strategy.failure-rate.max-failures-per-interval:在"失败率"策略中失败作业之前的给定时间间隔内的最大重启次数。默认值为1。
· restart-strategy.failure-rate.failure-rate-interval:"故障率"策略中测量故障率的时间间隔。默认值是1 minute。
· restart-strategy.failure-rate.delay:重新启动尝试之间的延迟,如果默认重启策略设置为"失败率",则使用此延迟。默认值是akka.ask.timeout。
完整参考
HDFS
这些参数配置Flink使用的默认HDFS。未指定HDFS配置的设置必须指定HDFS文件的完整路径(hdfs://address:port/path/to/files)文件也将使用默认HDFS参数(块大小,复制因子)编写。
· fs.hdfs.hadoopconf:Hadoop配置目录的绝对路径。系统将在该目录中查找"core-site.xml"和"hdfs-site.xml"文件(DEFAULT:null)。
· fs.hdfs.hdfsdefault:Hadoop自己的配置文件"hdfs-default.xml"的绝对路径(DEFAULT:null)。
· fs.hdfs.hdfssite:Hadoop自己的配置文件"hdfs-site.xml"的绝对路径(DEFAULT:null)。
JobManager和TaskManager
以下参数配置Flink的JobManager和TaskManagers。
· jobmanager.rpc.address:JobManager的外部地址,它是分布式系统的主/协调器(DEFAULT:localhost)。注意:包括客户端在内的所有节点都应该可以访问地址(主机名或IP)。
· jobmanager.rpc.port:JobManager的端口号(默认值:6123)。
· taskmanager.hostname:TaskManager绑定到的网络接口的主机名。默认情况下,TaskManager搜索可以连接到JobManager和其他TaskManagers的网络接口。如果该策略由于某种原因失败,则此选项可用于定义主机名。由于不同的TaskManagers需要此选项的不同值,因此通常在其他非共享的特定于TaskManager的配置文件中指定。
· taskmanager.rpc.port:任务管理器的IPC端口(DEFAULT:0,允许操作系统选择一个空闲端口)。
· taskmanager.data.port:任务管理器用于数据交换操作的端口(DEFAULT:0,允许操作系统选择一个空闲端口)。
· taskmanager.data.ssl.enabled:为taskmanager数据传输启用SSL支持。仅当全局ssl标志security.ssl.enabled设置为true时才适用(DEFAULT:true)
· jobmanager.heap.mb:JobManager的JVM堆大小(以兆字节为单位)(默认值:256)。
· taskmanager.heap.mb:TaskManagers的JVM堆大小(以兆字节为单位),它们是系统的并行工作者。与Hadoop相比,Flink在TaskManager中运行运算符(例如,join,aggregate)和用户定义的函数(例如,Map,Reduce,CoGroup)(包括排序/散列/缓存),因此该值应尽可能大(默认:512)。在YARN设置中,此值自动配置为TaskManager的YARN容器的大小,减去一定的容差值。
· taskmanager.numberOfTaskSlots:单个TaskManager可以运行的并行操作符或用户函数实例的数量(默认值:1)。如果此值大于1,则单个TaskManager将获取函数或运算符的多个实例。这样,TaskManager可以使用多个CPU内核,但同时,可用内存在不同的操作员或功能实例之间划分。此值通常与TaskManager的计算机具有的物理CPU核心数成比例(例如,等于核心数,或核心数的一半)。
· taskmanager.tmp.dirs:临时文件的目录,或由系统的目录定界符分隔的目录列表(例如Linux / Unix上的':'(冒号))。如果指定了多个目录,则临时文件将以循环方式分布在目录中。I / O管理器组件将为每个目录生成一个读取和一个写入线程。可以多次列出目录以使I / O管理器为其使用多个线程(例如,如果它物理存储在非常快的磁盘或RAID上)(默认值:系统的tmp目录)。
· taskmanager.network.numberOfBuffers:网络堆栈可用的缓冲区数。此数字决定了TaskManager可以同时拥有多少个流数据交换通道以及通道缓冲的程度。如果作业被拒绝或者收到系统没有足够缓冲区可用的警告,请增加该值(默认值:2048)。
· taskmanager.memory.size:任务管理器在JVM的堆空间上保留的内存量(以兆字节为单位),用于排序,哈希表和中间结果的缓存。如果未指定(-1),则内存管理器将采用JVM可用的堆内存的固定比率,如下所示taskmanager.memory.fraction。(默认:-1)
· taskmanager.memory.fraction:任务管理器为排序,哈希表和中间结果的缓存预留的相对内存量。例如,值为0.8表示TaskManagers为内部数据缓冲区保留80%的JVM堆空间,为用户定义的函数创建的对象留出20%的JVM堆空间。(默认值:0.7)仅taskmanager.memory.size在未设置此参数的情况下评估此参数。
· taskmanager.debug.memory.startLogThread:使TaskManagers定期记录内存和垃圾收集统计信息。统计信息包括堆内存池的当前堆,堆外和其他内存池利用率以及垃圾回收所花费的时间。
· taskmanager.debug.memory.logIntervalMs:TaskManagers记录内存和垃圾回收统计信息的间隔(以毫秒为单位)。只有效果,如果taskmanager.debug.memory.startLogThread设置为true。
· taskmanager.maxRegistrationDuration:定义TaskManager注册所需的最长时间。如果在未成功注册的情况下超过持续时间,则TaskManager将终止。最大登记持续时间需要时间单位指定符(ms / s / min / h / d)(例如"10分钟")。(默认:Inf)
· taskmanager.initial-registration-pause:两次连续注册尝试之间的初始注册暂停。每次新的注册尝试暂停加倍,直到达到最大注册暂停。初始登记暂停需要时间单位指定符(ms / s / min / h / d)(例如"5s")。(默认值:500毫秒)
· taskmanager.max-registration-pause:两次连续注册尝试之间的最大注册暂停时间。最大注册暂停需要时间单位指定符(ms / s / min / h / d)(例如"5 s")。(默认值:30秒)
· taskmanager.refused-registration-pause:在重试连接之前,作业管理器拒绝了注册后的暂停。拒绝注册暂停需要时间单位指定符(ms / s / min / h / d)(例如"5s")。(默认:10秒)
· taskmanager.jvm-exit-on-oom:表示如果任务线程抛出OutOfMemoryError(DEFAULT:false),TaskManager应立即终止JVM 。
· blob.fetch.retries:TaskManager从JobManager下载BLOB(例如JAR文件)的重试次数(DEFAULT:50)。
· blob.fetch.num-concurrent:JobManager服务的并发BLOB提取数(例如JAR文件下载数)(默认值:50)。
· blob.fetch.backlog:JobManager允许的最大排队BLOB提取数(例如JAR文件下载数)(默认值:1000)。
· task.cancellation-interval:两次连续任务取消尝试之间的时间间隔(以毫秒为单位)(默认值:30000)。
分布式协调(通过Akka)
· akka.ask.timeout:超时用于所有期货和阻止Akka通话。如果Flink由于超时而失败,那么应该尝试增加此值。超时可能是由于机器速度慢或网络拥挤造成的。超时值需要时间单位指定符(ms / s / min / h / d)(默认值:10秒)。
· akka.lookup.timeout:超时用于查找JobManager。超时值必须包含时间单位说明符(ms / s / min / h / d)(默认值:10秒)。
· akka.client.timeout:Flink客户端(例如CliFrontend,ClusterClient)与Flink群集通信时使用的超时。超时值必须包含时间单位说明符(ms / s / min / h / d)(默认值:60秒)。
· akka.framesize:在JobManager和TaskManager之间发送的消息的最大大小。如果Flink由于消息超出此限制而失败,那么应该增加它。消息大小需要大小单位说明符(默认值:10485760b)。
· akka.watch.heartbeat.interval:Akka的DeathWatch机制的心跳间隔,用于检测死亡的TaskManagers。如果由于心跳消息丢失或延迟而导致TaskManagers被错误标记为死亡,那么应该增加此值。可在找到Akka's DeathWatch的详细描述(默认值:10秒)。
· akka.watch.heartbeat.pause:Akka的DeathWatch机制可接受的心跳暂停。较低的值不允许心律不齐。可在找到Akka的DeathWatch的详细描述(默认值:60秒)。
· akka.watch.threshold:DeathWatch故障检测器的阈值。较低的值容易出现误报,而较高的值会增加检测死的TaskManager的时间。可在找到Akka's DeathWatch的详尽描述(默认值:12)。
· akka.transport.heartbeat.interval:Akka传输故障检测器的心跳间隔。由于Flink使用TCP,因此不需要检测器。因此,通过将间隔设置为非常高的值来禁用检测器。如果需要传输故障检测器,请将间隔设置为某个合理的值。间隔值需要时间单位指定符(ms / s / min / h / d)(默认值:1000秒)。
· akka.transport.heartbeat.pause:Akka的传输故障检测器可接受的心跳暂停。由于Flink使用TCP,因此不需要检测器。因此,通过将暂停设置为非常高的值来禁用检测器。如果需要传输故障检测器,请将暂停设置为某个合理的值。暂停值需要时间单位指定符(ms / s / min / h / d)(默认值:6000秒)。
· akka.transport.threshold:传输故障检测器的阈值。由于Flink使用TCP,因此检测器不是必需的,因此阈值设置为高值(默认值:300)。
· akka.tcp.timeout:所有出站连接的超时。如果由于网络速度较慢而导致连接到TaskManager时遇到问题,则应增加此值(默认值:20秒)。
· akka.throughput:在将线程返回到池之前批处理的消息数。低值表示公平调度,而高值可以以不公平为代价提高性能(默认值:15)。
· akka.log.lifecycle.events:打开Akka远程记录事件。在调试的情况下将此值设置为"true"(DEFAULT:false)。
· akka.startup-timeout:超时之后,远程组件的启动被视为失败(DEFAULT:akka.ask.timeout)。
· akka.ssl.enabled:为Akka的远程通信打开SSL。仅当全局ssl标志security.ssl.enabled设置为true(DEFAULT:true)时,这才适用。
SSL设置
· security.ssl.enabled:打开SSL以进行内部网络通信。这可以选择由不同传输模块中定义的标志覆盖(DEFAULT:false)。
· security.ssl.keystore:flink端点用于其SSL密钥和证书的Java密钥库文件。
· security.ssl.keystore-password:解密密钥库文件的秘密。
· security.ssl.key-password:解密密钥库中的服务器密钥的秘密。
· security.ssl.truststore:信任库文件,包含flink端点用于验证对等方证书的公共CA证书。
· security.ssl.truststore-password:解密信任库的秘密。
· security.ssl.protocol:ssl传输支持的SSL协议版本(DEFAULT:TLSv1.2)。请注意,它不支持以逗号分隔的列表。
· security.ssl.algorithms:要支持的标准SSL算法的逗号分隔列表。阅读更多(默认:TLS_RSA_WITH_AES_128_CBC_SHA)。
· security.ssl.verify-hostname:标记以在ssl握手期间启用对等方的主机名验证(DEFAULT:true)。
网络通讯(通过Netty)
这些参数允许高级调整。在大型群集上运行并发高吞吐量作业时,默认值就足够了。
· taskmanager.net.num-arenas:Netty竞技场的数量(DEFAULT:taskmanager.numberOfTaskSlots)。
· taskmanager.net.server.numThreads:Netty服务器线程的数量(DEFAULT:taskmanager.numberOfTaskSlots)。
· taskmanager.net.client.numThreads:Netty客户端线程的数量(DEFAULT:taskmanager.numberOfTaskSlots)。
· taskmanager.net.server.backlog:netty服务器连接积压。
· taskmanager.net.client.connectTimeoutSec:Netty客户端连接超时(默认值:120秒)。
· taskmanager.net.sendReceiveBufferSize:Netty发送和接收缓冲区大小。默认为系统缓冲区大小(cat /proc/sys/net/ipv4/tcp_[rw]mem),在现代Linux中为4 MiB。
· taskmanager.net.transport:Netty传输类型,"nio"或"epoll"(DEFAULT:nio)。
JobManager Web前端
· jobmanager.web.port:JobManager的Web界面的端口,显示正在运行的作业的状态和已完成作业的执行时间细分(默认值:8081)。设置此值可-1禁用Web前端。
· jobmanager.web.history:JobManager的Web前端在其历史记录中的最新作业数(默认值:5)。
· jobmanager.web.checkpoints.disable:禁用检查点统计信息(默认值:) false。
· jobmanager.web.checkpoints.history:要记住的检查点统计信息的数量(默认值:) 10。
· jobmanager.web.backpressure.cleanup-interval:如果未访问,则清除缓存的统计信息的时间(默认值:60000010分钟)。
· jobmanager.web.backpressure.refresh-interval:不推荐使用可用统计信息并需要刷新的时间(默认值:600001分钟)。
· jobmanager.web.backpressure.num-samples:用于确定背压的堆栈跟踪样本数(默认值:) 100。
· jobmanager.web.backpressure.delay-between-samples:堆栈跟踪样本之间的延迟以确定背压(默认值:50,50 ms)。
· jobmanager.web.ssl.enabled:启用对Web前端的https访问。仅当全局ssl标志security.ssl.enabled设置为true(DEFAULT :)时,这才适用true。
· jobmanager.web.access-control-allow-origin:为allow origin标头启用自定义访问控制参数,默认为*。
文件系统
参数定义创建结果文件的任务的行为。
· fs.default-scheme:要使用的默认文件系统方案,具有必要的联系权限,例如,在HDFS的情况下,NameNode的host:端口(如果需要)。默认情况下,将file:///其设置为指向本地文件系统的位置。这意味着本地文件系统将用于搜索用户指定的文件,而无需显式方案定义。如果在用户提供的(明确地)未指定其他方案,则仅使用此方案URI。
· fs.overwrite-files:指定默认情况下文件输出编写器是否应覆盖现有文件。默认情况下设置为true以覆盖,否则设置为false。(默认:错误)
· fs.output.always-create-directory:以大于1的并行度运行的文件编写器为输出文件路径创建目录,并将不同的结果文件(每个并行编写器任务一个)放入该目录中。如果此选项设置为true,则并行度为1的编写器也将创建一个目录并将单个结果文件放入其中。如果该选项设置为false,则编写器将直接在输出路径上直接创建文件,而不创建包含目录。(默认:错误)
编译/优化
· compiler.delimited-informat.max-line-samples:编译器为分隔输入采用的最大行样本数。样本用于估计记录数。对于具有输入格式参数的特定输入,可以覆盖此值(DEFAULT:10)。
· compiler.delimited-informat.min-line-samples:编译器为分隔输入采用的最小行样本数。样本用于估计记录数。对于具有输入格式参数的特定输入,可以覆盖此值(DEFAULT:2)。
· compiler.delimited-informat.max-sample-len:编译器用于分隔输入的行样本的最大长度。如果单个样本的长度超过此值(可能是因为解析器配置错误),则取样将中止。对于具有输入格式参数的特定输入,可以覆盖此值(DEFAULT:2097152(= 2 MiBytes))。
运行时算法
· taskmanager.runtime.hashjoin-bloom-filters:标记以在混合散列连接实现中激活/停用bloom过滤器。如果散列连接需要溢出到磁盘(数据集大于保留的内存部分),这些布隆过滤器可以大大减少溢出记录的数量,但代价是CPU周期。(默认:错误)
· taskmanager.runtime.max-fan:用于溢出哈希表的外部合并连接和扇出的最大扇入。限制每个运算符的文件句柄数,但如果设置得太小(默认值:128),可能会导致中间合并/分区。
· taskmanager.runtime.sort-spilling-threshold:当内存预算的这一部分已满时,排序操作开始溢出(默认值:0.8)。
资源经理
本节中的配置键独立于使用的资源管理框架(YARN,Mesos,Standalone,...)
· resourcemanager.rpc.port:config参数,用于定义要连接的网络端口以与资源管理器进行通信。默认情况下,JobManager的端口,因为使用了相同的ActorSystem。无法使用此配置键定义端口范围。
纱
· containerized.heap-cutoff-ratio:(默认值为0.25)从YARN启动的容器中删除的堆空间百分比。当用户为每个TaskManager容器(例如4 GB)请求一定量的内存时,我们无法将此数量作为JVM(-Xmx参数)的最大堆空间传递,因为JVM还在堆外部分配内存。YARN对于使用比请求使用更多内存的容器非常严格。因此,我们从请求的堆中删除15%的内存作为安全边际。
· containerized.heap-cutoff-min:(默认600 MB)切断请求的堆大小的最小内存量。
· yarn.maximum-failed-containers(默认值:请求的容器数)。系统在发生故障时将重新分配的最大容器数。
· yarn.application-attempts(默认值:1)。ApplicationMaster重启次数。请注意,整个Flink群集将重新启动,YARN客户端将断开连接。此外,JobManager地址将更改,需要手动设置JM主机:端口。建议将此选项保留为1。
· yarn.heartbeat-delay(默认值:5秒)。使用ResourceManager的心跳之间的时间。
· yarn.properties-file.location(默认:临时目录)。将Flink作业提交给YARN时,JobManager的主机和可用处理槽的数量将写入属性文件,以便Flink客户端能够选择这些详细信息。此配置参数允许更改该文件的默认位置(例如,对于在用户之间共享Flink安装的环境)
· yarn.containers.vcores每个YARN容器的虚拟核心数(vcores)。默认情况下,数量vcores设置为每个TaskManager的插槽数(如果已设置),或者设置为1,否则设置为1。
· containerized.master.env.ENV_VAR1 = value前缀的配置值containerized.master.env.将作为环境变量传递给ApplicationMaster / JobManager进程。例如,LD_LIBRARY_PATH要将env变量作为传递给ApplicationMaster,请设置:
containerized.master.env.LD_LIBRARY_PATH: "/usr/lib/native"
· containerized.taskmanager.env. 与配置前缀about类似,此前缀允许为TaskManager进程设置自定义环境变量。
· yarn.container-start-command-template:FINK在YARN上启动时使用以下模板: %java% %jvmmem% %jvmopts% %logging% %class% %args% %redirects%。此配置参数允许用户传递自定义设置(例如JVM路径,参数等)。请注意,在大多数情况下,使用env.java.opts设置就足够了,该设置是%jvmopts%String中的变量。
· yarn.application-master.port(默认值:0,允许操作系统选择临时端口)使用此配置选项,用户可以为Application Master(和JobManager)RPC端口指定端口,端口范围或端口列表。默认情况下,我们建议使用默认值(0)让操作系统选择适当的端口。特别是当多个AM在同一物理主机上运行时,固定端口分配会阻止AM启动。
例如,在具有限制性防火墙的环境中在YARN上运行Flink时,此选项允许指定一系列允许的端口。
· yarn.tags 要应用于Flink YARN应用程序的以逗号分隔的标记列表。
Mesos
· mesos.initial-tasks:主服务器启动时启动的初始工作程序(默认值:集群启动时指定的工作程序数)。
· mesos.constraints.hard.hostattribute:在mesos上放置任务的约束(DEFAULT:None)。
· mesos.maximum-failed-tasks:群集失败之前失败的工作程序的最大数量(默认值:初始工作程序数)。可以设置为-1以禁用此功能。
· mesos.master:Mesos主URL。该值应采用以下形式之一:
· host:port
· zk://host1:port1,host2:port2,.../path
· zk://username:password@host1:port1,host2:port2,.../path
· file:///path/to/file
· mesos.failover-timeout:Mesos调度程序的故障转移超时(以秒为单位),之后将自动关闭正在运行的任务(默认值:600)。
· mesos.resourcemanager.artifactserver.port:config参数,用于定义要使用的Mesos工件服务器端口。将端口设置为0将允许操作系统选择可用端口。
· mesos.resourcemanager.framework.name:Mesos框架名称(DEFAULT: Flink)
· mesos.resourcemanager.framework.role:Mesos框架角色定义(DEFAULT: *)
· mesos.resourcemanager.framework.principal:Mesos框架主体(NO DEFAULT)
· mesos.resourcemanager.framework.secret:Mesos框架秘密(NO DEFAULT)
· mesos.resourcemanager.framework.user:Mesos框架用户(DEFAULT: "")
· mesos.resourcemanager.artifactserver.ssl.enabled:为Flink工件服务器启用SSL(DEFAULT:true)。请注意,security.ssl.enabled还需要将其设置为true加密以启用加密。
· mesos.resourcemanager.tasks.mem:以MB为单位分配给Mesos worker的内存(默认值:1024)
· mesos.resourcemanager.tasks.cpus:要分配给Mesos工作者的CPU(默认值:0.0)
· mesos.resourcemanager.tasks.container.type:使用的集装箱类型:"mesos"或"docker"(DEFAULT:mesos);
· mesos.resourcemanager.tasks.container.image.name:用于容器的图像名称(NO DEFAULT)
· high-availability.zookeeper.path.mesos-workers:ZooKeeper根路径,用于保存Mesos工作者信息。
高可用性(HA)
· high-availability:定义用于群集执行的高可用性模式。目前,Flink支持以下模式:
· none(默认值):没有高可用性。单个JobManager运行,并且没有检查点JobManager状态。
· zookeeper:支持执行多个JobManagers和JobManager状态检查点。在JobManagers组中,ZooKeeper选择其中一个作为负责集群执行的领导者。如果JobManager失败,备用JobManager将被选为新的领导者,并被赋予最后一个检查点的JobManager状态。为了使用'zookeeper'模式,还必须定义high-availability.zookeeper.quorum配置值。
以前此键已命名recovery.mode,默认值为standalone。
基于ZooKeeper的HA模式
· high-availability.zookeeper.quorum:定义ZooKeeper仲裁URL,用于在选择"zookeeper"HA模式时连接到ZooKeeper群集。以前这个密钥被命名了recovery.zookeeper.quorum。
· high-availability.zookeeper.path.root:(默认/flink)定义ZooKeeper HA模式将在其下创建命名空间目录的根目录。以前这个凯特被命名了recovery.zookeeper.path.root。
· high-availability.zookeeper.path.namespace:(默认/default_ns情况下,在独立群集模式下,或在YARN下)定义根目录下的子目录,其中ZooKeeper HA模式将创建znodes。这允许在同一个ZooKeeper上隔离多个应用程序。以前这个密钥名为`recovery.zookeeper.path.namespace`。
· high-availability.zookeeper.path.latch:(默认/leaderlatch)定义用于选择领导者的领导者锁存器的znode。以前这个密钥被命名了recovery.zookeeper.path.latch。
· high-availability.zookeeper.path.leader:(默认/leader)定义领导者的znode,其中包含领导者的URL和当前领导者会话ID。以前这个密钥被命名了recovery.zookeeper.path.leader。
· high-availability.zookeeper.storageDir:定义状态后端中将存储JobManager元数据的目录(ZooKeeper仅保留指向它的指针)。HA要求。以前这个密钥被命名了recovery.zookeeper.storageDir。
· high-availability.zookeeper.client.session-timeout:(默认60000)以ms为单位定义ZooKeeper会话的会话超时。以前这个密钥被命名了recovery.zookeeper.client.session-timeout
· high-availability.zookeeper.client.connection-timeout:(默认15000)以ms为单位定义ZooKeeper的连接超时。以前这个密钥被命名了recovery.zookeeper.client.connection-timeout。
· high-availability.zookeeper.client.retry-wait:(默认5000)定义连续重试之间的暂停(以毫秒为单位)。以前这个密钥被命名了recovery.zookeeper.client.retry-wait。
· high-availability.zookeeper.client.max-retry-attempts:(默认3)定义客户端放弃之前的连接重试次数。以前这个密钥被命名了recovery.zookeeper.client.max-retry-attempts。
· high-availability.job.delay:(默认akka.ask.timeout)定义在主恢复情况下恢复持久作业之前的延迟。以前这个密钥被命名了recovery.job.delay。
ZooKeeper安全
· zookeeper.sasl.disable:(默认值true:)定义是否需要启用或禁用基于SASL的身份验证。如果ZooKeeper集群以安全模式(Kerberos)运行,则配置值可以设置为"true"。
· zookeeper.sasl.service-name:(默认值zookeeper:)如果ZooKeeper服务器配置了不同的服务名称(默认值:"zookeeper"),则可以使用此配置提供。客户端和服务器配置之间的服务名称不匹配将导致身份验证失败。
基于Kerberos的安全性
· security.kerberos.login.use-ticket-cache:指示是否从Kerberos票证缓存中读取(默认值:) true。
· security.kerberos.login.keytab:包含用户凭据的Kerberos密钥表文件的绝对路径。
· security.kerberos.login.principal:与keytab关联的Kerberos主体名称。
· security.kerberos.login.contexts:以逗号分隔的登录上下文列表,用于提供Kerberos凭据(例如,Client,KafkaClient使用凭据进行ZooKeeper身份验证和Kafka身份验证)。
环境
· env.log.dir:(默认为logFlink主目录下的目录)定义保存Flink日志的目录。它必须是一条绝对的道路。
· env.log.max:(默认值5:)要保留的最大旧日志文件数。
· env.ssh.opts:启动或停止JobManager,TaskManager和Zookeeper服务时传递给SSH客户端的其他命令行选项(start-cluster.sh,stop-cluster.sh,start-zookeeper-quorum.sh,stop-zookeeper-quorum.sh)。
可查询状态
服务器
· query.server.enable:启用可查询状态(默认值:) true。
· query.server.port:将可查询状态服务器0绑定到的端口(默认值:,绑定到随机端口)。
· query.server.network-threads:网络数(Netty的事件循环)可查询状态服务器的线程(默认值:0,选择插槽数)。
· query.server.query-threads:可查询状态服务器的查询线程数(默认值:0,选择插槽数)。
客户
· query.client.network-threads:网络数(Netty的事件循环)可查询状态客户端的线程(默认值:0,选择返回的可用核心数Runtime.getRuntime().availableProcessors())。
· query.client.lookup.num-retries:由于不可用的JobManager导致KvState查找失败的重试次数(默认值:) 3。
· query.client.lookup.retry-delay:由于不可用的JobManager导致KvState查找失败时重试延迟(以毫秒为单位1000)(默认值:) 。
度量
· metrics.reporters:命名记者列表,即"foo,bar"。
· metrics.reporter.<name>.<config>:<config>记者的通用设置命名<name>。
· metrics.reporter.<name>.class:记者班用于为记者命名<name>。
· metrics.reporter.<name>.interval:记者间隔用于记者的名字<name>。
· metrics.scope.jm:(默认值:<host> .jobmanager)定义应用于作用于JobManager的所有度量的范围格式字符串。
· metrics.scope.jm.job:(默认值:<host> .jobmanager。<job_name>)定义范围格式字符串,该字符串应用于作用于JobManager上作业的所有度量标准。
· metrics.scope.tm:(默认值:<host> .taskmanager。<tm_id>)定义应用于作用于TaskManager的所有度量的范围格式字符串。
· metrics.scope.tm.job:(默认值:<host> .taskmanager。<tm_id>。<job_name>)定义范围格式字符串,该字符串应用于作用于TaskManager上作业的所有度量标准。
· metrics.scope.task:(默认值:<host> .taskmanager。<tm_id>。<job_name>。<task_name>。<subtask_index>)定义应用于作用于任务的所有度量标准的范围格式字符串。
· metrics.scope.operator:(默认值:<host> .taskmanager。<tm_id>。<job_name>。<operator_name>。<subtask_index>)定义应用于作用域的所有度量的范围格式字符串。
· metrics.latency.history-size:(默认值:128)定义要在每个运算符处维护的测量延迟数
历史服务器
如果要通过HistoryServer的Web前端显示它们,则必须进行配置jobmanager.archive.fs.dir以存档已终止的作业并将其添加到受监视目录列表中historyserver.archive.fs.dir。
· jobmanager.archive.fs.dir:将有关已终止作业的信息上载到的目录。必须将此目录添加到历史服务器的受监视目录列表中historyserver.archive.fs.dir。
· historyserver.archive.fs.dir:逗号分隔的目录列表,用于从中获取已归档的作业。历史服务器将监视这些目录以获取已存档的作业。可以将JobManager配置为通过将作业存档到目录jobmanager.archive.fs.dir。
· historyserver.archive.fs.refresh-interval:刷新已归档作业目录的时间间隔(以毫秒为单位10000)(默认值:) 。
· historyserver.web.tmpdir:此配置参数允许定义历史服务器Web界面使用的Flink Web目录。Web界面将其静态文件复制到目录(DEFAULT:本地系统临时目录)。
· historyserver.web.address:HistoryServer的Web界面的地址(DEFAULT:)anyLocalAddress()。
· historyserver.web.port:HistoryServers的Web界面的端口(DEFAULT:)8082。
· historyserver.web.ssl.enabled:启用对HistoryServer Web前端的HTTP访问。仅当全局SSL标志security.ssl.enabled设置为true(DEFAULT :)时,这才适用false。
其他资料
配置网络缓冲区
如果看到异常java.io.IOException: Insufficient number of network buffers,请使用以下公式来调整网络缓冲区的数量:
#slots-per-TM^2 * #TMs * 4
哪里#slots per TM是和#TMs是任务管理器的总数。
网络缓冲区是通信层的关键资源。它们用于在通过网络传输之前缓冲记录,并在将传入数据解析为记录并将其传递给应用程序之前缓冲传入数据。足够数量的网络缓冲区对于实现良好的吞吐量至关重要。
通常,将任务管理器配置为具有足够的缓冲区,以使希望同时打开的每个逻辑网络连接都具有专用缓冲区。对于网络上的每个点对点数据交换存在逻辑网络连接,这通常发生在重新分区或广播步骤(混洗阶段)。在那些中,TaskManager中的每个并行任务必须能够与所有其他并行任务进行通信。因此,任务管理器上所需的缓冲区数是总并行度(目标数)* 节点内并行性(一个任务管理器中的源数)* n。这里,n是一个常量,它定义了希望同时激活多少重新分区/广播步骤。
由于节点内并行性通常是核心数量,并且超过4个重新分区或广播信道很少并行活动,因此它经常归结为#slots-per-TM^2 * #TMs * 4。
为了支持20个8插槽机器的集群,应该使用大约5000个网络缓冲区来获得最佳吞吐量。
默认情况下,每个网络缓冲区的大小为32 KiBytes。在上面的示例中,系统将为网络缓冲区分配大约300 MiBytes。
可以使用以下参数配置网络缓冲区的数量和大小:
· taskmanager.network.numberOfBuffers,和
· taskmanager.memory.segment-size。
配置临时I / O目录
虽然Flink的目标是尽可能多地处理主内存中的数据,但是需要处理的内存数量比内存可用的情况要少得多。Flink的运行时用于将临时数据写入磁盘以处理这些情况。
该taskmanager.tmp.dirs参数指定Flink写入临时文件的目录列表。目录的路径需要用':'(冒号字符)分隔。Flink将同时向(从)每个配置的目录写入(或读取)一个临时文件。这样,临时I / O可以均匀地分布在多个独立的I / O设备(如硬盘)上,以提高性能。要利用快速I / O设备(例如,SSD,RAID,NAS),可以多次指定目录。
如果taskmanager.tmp.dirs未显式指定参数,Flink会将临时数据写入操作系统的临时目录,例如Linux系统中的/ tmp。
配置TaskManager处理槽
Flink通过将程序分成子任务并将这些子任务调度到处理槽来并行执行程序。
每个Flink TaskManager都在集群中提供处理插槽。插槽数通常与每个 TaskManager 的可用CPU核心数成比例。作为一般建议,可用的CPU核心数量是一个很好的默认值taskmanager.numberOfTaskSlots。
启动Flink应用程序时,用户可以提供用于该作业的默认插槽数。因此调用命令行值-p(用于并行)。此外,可以为整个应用程序和各个操作因子。
Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved