博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink jdbc的ParameterValuesProvider
阅读量:6826 次
发布时间:2019-06-26

本文共 25952 字,大约阅读时间需要 86 分钟。

  hot3.png

本文主要研究一下flink jdbc的ParameterValuesProvider

ParameterValuesProvider

flink-jdbc_2.11-1.8.0-sources.jar!/org/apache/flink/api/java/io/jdbc/split/ParameterValuesProvider.java

/** * This interface is used by the {@link JDBCInputFormat} to compute the list of parallel query to run (i.e. splits). * Each query will be parameterized using a row of the matrix provided by each {@link ParameterValuesProvider} * implementation. */public interface ParameterValuesProvider {	/** Returns the necessary parameters array to use for query in parallel a table. */	Serializable[][] getParameterValues();}
  • ParameterValuesProvider接口定义了getParameterValues方法,用于返回并行表查询所需的参数,该参数主要是用于将一个大的sql查询分为几个分段查询用于并行处理;它有两个实现类分别是GenericParameterValuesProvider及NumericBetweenParametersProvider

GenericParameterValuesProvider

flink-jdbc_2.11-1.8.0-sources.jar!/org/apache/flink/api/java/io/jdbc/split/GenericParameterValuesProvider.java

/** * This splits generator actually does nothing but wrapping the query parameters * computed by the user before creating the {@link JDBCInputFormat} instance. */public class GenericParameterValuesProvider implements ParameterValuesProvider {	private final Serializable[][] parameters;	public GenericParameterValuesProvider(Serializable[][] parameters) {		this.parameters = parameters;	}	@Override	public Serializable[][] getParameterValues(){		//do nothing...precomputed externally		return parameters;	}}
  • GenericParameterValuesProvider实际上没有做其他事情,它实现的getParameterValues方法返回的值是构造器要求输入的

NumericBetweenParametersProvider

flink-jdbc_2.11-1.8.0-sources.jar!/org/apache/flink/api/java/io/jdbc/split/NumericBetweenParametersProvider.java

/** * This query parameters generator is an helper class to parameterize from/to queries on a numeric column. * The generated array of from/to values will be equally sized to fetchSize (apart from the last one), * ranging from minVal up to maxVal. * * 

For example, if there's a table BOOKS with a numeric PK id, using a query like: *

 *   SELECT * FROM BOOKS WHERE id BETWEEN ? AND ? * 
* *

You can take advantage of this class to automatically generate the parameters of the BETWEEN clause, * based on the passed constructor parameters. * */public class NumericBetweenParametersProvider implements ParameterValuesProvider { private final long fetchSize; private final long minVal; private final long maxVal; /** * NumericBetweenParametersProvider constructor. * * @param fetchSize the max distance between the produced from/to pairs * @param minVal the lower bound of the produced "from" values * @param maxVal the upper bound of the produced "to" values */ public NumericBetweenParametersProvider(long fetchSize, long minVal, long maxVal) { checkArgument(fetchSize > 0, "Fetch size must be greater than 0."); checkArgument(minVal <= maxVal, "Min value cannot be greater than max value."); this.fetchSize = fetchSize; this.minVal = minVal; this.maxVal = maxVal; } @Override public Serializable[][] getParameterValues() { double maxElemCount = (maxVal - minVal) + 1; int numBatches = new Double(Math.ceil(maxElemCount / fetchSize)).intValue(); Serializable[][] parameters = new Serializable[numBatches][2]; int batchIndex = 0; for (long start = minVal; start <= maxVal; start += fetchSize, batchIndex++) { long end = start + fetchSize - 1; if (end > maxVal) { end = maxVal; } parameters[batchIndex] = new Long[]{start, end}; } return parameters; }}

  • NumericBetweenParametersProvider为基于numeric主键的范围查询(WHERE id BETWEEN ? AND ?)自动生成了分段的参数,其构造器要求输入每次的fetchSize、最小值minVal、最大值maxVal;getParameterValues方法会根据这几个值计算numBatches,然后计算好分段的参数值

JDBCInputFormat

flink-jdbc_2.11-1.8.0-sources.jar!/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java

public class JDBCInputFormat extends RichInputFormat
implements ResultTypeQueryable
{ private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(JDBCInputFormat.class); private String username; private String password; private String drivername; private String dbURL; private String queryTemplate; private int resultSetType; private int resultSetConcurrency; private RowTypeInfo rowTypeInfo; private transient Connection dbConn; private transient PreparedStatement statement; private transient ResultSet resultSet; private int fetchSize; private boolean hasNext; private Object[][] parameterValues; public JDBCInputFormat() { } @Override public RowTypeInfo getProducedType() { return rowTypeInfo; } @Override public void configure(Configuration parameters) { //do nothing here } @Override public void openInputFormat() { //called once per inputFormat (on open) try { Class.forName(drivername); if (username == null) { dbConn = DriverManager.getConnection(dbURL); } else { dbConn = DriverManager.getConnection(dbURL, username, password); } statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency); if (fetchSize == Integer.MIN_VALUE || fetchSize > 0) { statement.setFetchSize(fetchSize); } } catch (SQLException se) { throw new IllegalArgumentException("open() failed." + se.getMessage(), se); } catch (ClassNotFoundException cnfe) { throw new IllegalArgumentException("JDBC-Class not found. - " + cnfe.getMessage(), cnfe); } } @Override public void closeInputFormat() { //called once per inputFormat (on close) try { if (statement != null) { statement.close(); } } catch (SQLException se) { LOG.info("Inputformat Statement couldn't be closed - " + se.getMessage()); } finally { statement = null; } try { if (dbConn != null) { dbConn.close(); } } catch (SQLException se) { LOG.info("Inputformat couldn't be closed - " + se.getMessage()); } finally { dbConn = null; } parameterValues = null; } /** * Connects to the source database and executes the query in a
parallel * fashion if * this {@link InputFormat} is built using a parameterized query (i.e. using * a {@link PreparedStatement}) * and a proper {@link ParameterValuesProvider}, in a
non-parallel * fashion otherwise. * * @param inputSplit which is ignored if this InputFormat is executed as a * non-parallel source, * a "hook" to the query parameters otherwise (using its *
splitNumber) * @throws IOException if there's an error during the execution of the query */ @Override public void open(InputSplit inputSplit) throws IOException { try { if (inputSplit != null && parameterValues != null) { for (int i = 0; i < parameterValues[inputSplit.getSplitNumber()].length; i++) { Object param = parameterValues[inputSplit.getSplitNumber()][i]; if (param instanceof String) { statement.setString(i + 1, (String) param); } else if (param instanceof Long) { statement.setLong(i + 1, (Long) param); } else if (param instanceof Integer) { statement.setInt(i + 1, (Integer) param); } else if (param instanceof Double) { statement.setDouble(i + 1, (Double) param); } else if (param instanceof Boolean) { statement.setBoolean(i + 1, (Boolean) param); } else if (param instanceof Float) { statement.setFloat(i + 1, (Float) param); } else if (param instanceof BigDecimal) { statement.setBigDecimal(i + 1, (BigDecimal) param); } else if (param instanceof Byte) { statement.setByte(i + 1, (Byte) param); } else if (param instanceof Short) { statement.setShort(i + 1, (Short) param); } else if (param instanceof Date) { statement.setDate(i + 1, (Date) param); } else if (param instanceof Time) { statement.setTime(i + 1, (Time) param); } else if (param instanceof Timestamp) { statement.setTimestamp(i + 1, (Timestamp) param); } else if (param instanceof Array) { statement.setArray(i + 1, (Array) param); } else { //extends with other types if needed throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet)."); } } if (LOG.isDebugEnabled()) { LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()]))); } } resultSet = statement.executeQuery(); hasNext = resultSet.next(); } catch (SQLException se) { throw new IllegalArgumentException("open() failed." + se.getMessage(), se); } } /** * Closes all resources used. * * @throws IOException Indicates that a resource could not be closed. */ @Override public void close() throws IOException { if (resultSet == null) { return; } try { resultSet.close(); } catch (SQLException se) { LOG.info("Inputformat ResultSet couldn't be closed - " + se.getMessage()); } } /** * Checks whether all data has been read. * * @return boolean value indication whether all data has been read. * @throws IOException */ @Override public boolean reachedEnd() throws IOException { return !hasNext; } /** * Stores the next resultSet row in a tuple. * * @param row row to be reused. * @return row containing next {@link Row} * @throws java.io.IOException */ @Override public Row nextRecord(Row row) throws IOException { try { if (!hasNext) { return null; } for (int pos = 0; pos < row.getArity(); pos++) { row.setField(pos, resultSet.getObject(pos + 1)); } //update hasNext after we've read the record hasNext = resultSet.next(); return row; } catch (SQLException se) { throw new IOException("Couldn't read data - " + se.getMessage(), se); } catch (NullPointerException npe) { throw new IOException("Couldn't access resultSet", npe); } } @Override public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { return cachedStatistics; } @Override public InputSplit[] createInputSplits(int minNumSplits) throws IOException { if (parameterValues == null) { return new GenericInputSplit[]{new GenericInputSplit(0, 1)}; } GenericInputSplit[] ret = new GenericInputSplit[parameterValues.length]; for (int i = 0; i < ret.length; i++) { ret[i] = new GenericInputSplit(i, ret.length); } return ret; } @Override public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { return new DefaultInputSplitAssigner(inputSplits); } @VisibleForTesting PreparedStatement getStatement() { return statement; } //......}
  • JDBCInputFormat继承了RichInputFormat,同时实现了ResultTypeQueryable接口
  • createInputSplits方法会根据parameterValues来创建GenericInputSplit数组,如果parameterValues为null则默认创建的totalNumberOfPartitions为1
  • getInputSplitAssigner方法根据InputSplit数组创建了DefaultInputSplitAssigner;getStatistics方法返回的是方法参数cachedStatistics
  • openInputFormat方法主要是获取数据库连接,准备好statement;closeInputFormat方法主要是关闭statement以及关闭数据库连接
  • open方法接收inputSplit,其主要是根据inputSplit从parameterValues提取查询参数,并设置到statement,之后执行statement.executeQuery()获取resultSet;nextRecord方法主要是遍历resultSet读取数据;close方法主要是关闭resultSet

InputSplit

/flink-core-1.8.0-sources.jar!/org/apache/flink/core/io/InputSplit.java

/** * This interface must be implemented by all kind of input splits that can be assigned to input formats. *  * 

Input splits are transferred in serialized form via the messages, so they need to be serializable * as defined by {@link java.io.Serializable}.

*/@Publicpublic interface InputSplit extends Serializable { /** * Returns the number of this input split. * * @return the number of this input split */ int getSplitNumber();}
  • InputSplit接口定义了getSplitNumber方法用于返回当前input的split number

GenericInputSplit

flink-core-1.8.0-sources.jar!/org/apache/flink/core/io/GenericInputSplit.java

/** * A generic input split that has only a partition number. */@Publicpublic class GenericInputSplit implements InputSplit, java.io.Serializable {	private static final long serialVersionUID = 1L;	/** The number of this split. */	private final int partitionNumber;	/** The total number of partitions */	private final int totalNumberOfPartitions;		// --------------------------------------------------------------------------------------------	/**	 * Creates a generic input split with the given split number.	 * 	 * @param partitionNumber The number of the split's partition.	 * @param totalNumberOfPartitions The total number of the splits (partitions).	 */	public GenericInputSplit(int partitionNumber, int totalNumberOfPartitions) {		this.partitionNumber = partitionNumber;		this.totalNumberOfPartitions = totalNumberOfPartitions;	}	// --------------------------------------------------------------------------------------------	@Override	public int getSplitNumber() {		return this.partitionNumber;	}		public int getTotalNumberOfSplits() {		return this.totalNumberOfPartitions;	}		// --------------------------------------------------------------------------------------------	@Override	public int hashCode() {		return this.partitionNumber ^ this.totalNumberOfPartitions;	}		@Override	public boolean equals(Object obj) {		if (obj instanceof GenericInputSplit) {			GenericInputSplit other = (GenericInputSplit) obj;			return this.partitionNumber == other.partitionNumber &&					this.totalNumberOfPartitions == other.totalNumberOfPartitions;		} else {			return false;		}	}		public String toString() {		return "GenericSplit (" + this.partitionNumber + '/' + this.totalNumberOfPartitions + ')';	}}
  • GenericInputSplit实现了InputSplit接口,其getSplitNumber方法返回的是partitionNumber

InputSplitAssigner

flink-core-1.8.0-sources.jar!/org/apache/flink/core/io/InputSplitAssigner.java

/** * An input split assigner distributes the {@link InputSplit}s among the instances on which a * data source exists. */@PublicEvolvingpublic interface InputSplitAssigner {	/**	 * Returns the next input split that shall be consumed. The consumer's host is passed as a parameter	 * to allow localized assignments.	 * 	 * @param host The host address of split requesting task.	 * @param taskId The id of the split requesting task.	 * @return the next input split to be consumed, or null if no more splits remain.	 */	InputSplit getNextInputSplit(String host, int taskId);}
  • InputSplitAssigner接口定义了getNextInputSplit方法,其方法接收两个参数分别是host及taskId,该方法用于返回下一个inputSplit

DefaultInputSplitAssigner

flink-core-1.8.0-sources.jar!/org/apache/flink/api/common/io/DefaultInputSplitAssigner.java

/** * This is the default implementation of the {@link InputSplitAssigner} interface. The default input split assigner * simply returns all input splits of an input vertex in the order they were originally computed. */@Internalpublic class DefaultInputSplitAssigner implements InputSplitAssigner {	/** The logging object used to report information and errors. */	private static final Logger LOG = LoggerFactory.getLogger(DefaultInputSplitAssigner.class);	/** The list of all splits */	private final List
splits = new ArrayList
(); public DefaultInputSplitAssigner(InputSplit[] splits) { Collections.addAll(this.splits, splits); } public DefaultInputSplitAssigner(Collection
splits) { this.splits.addAll(splits); } @Override public InputSplit getNextInputSplit(String host, int taskId) { InputSplit next = null; // keep the synchronized part short synchronized (this.splits) { if (this.splits.size() > 0) { next = this.splits.remove(this.splits.size() - 1); } } if (LOG.isDebugEnabled()) { if (next == null) { LOG.debug("No more input splits available"); } else { LOG.debug("Assigning split " + next + " to " + host); } } return next; }}
  • DefaultInputSplitAssigner是InputSplitAssigner的默认实现,其getNextInputSplit方法会使用synchronized修改splits值,移除最后一个元素

InputFormatSourceFunction

flink-streaming-java_2.11-1.8.0-sources.jar!/org/apache/flink/streaming/api/functions/source/InputFormatSourceFunction.java

@Internalpublic class InputFormatSourceFunction
extends RichParallelSourceFunction
{ private static final long serialVersionUID = 1L; private TypeInformation
typeInfo; private transient TypeSerializer
serializer; private InputFormat
format; private transient InputSplitProvider provider; private transient Iterator
splitIterator; private volatile boolean isRunning = true; @SuppressWarnings("unchecked") public InputFormatSourceFunction(InputFormat
format, TypeInformation
typeInfo) { this.format = (InputFormat
) format; this.typeInfo = typeInfo; } @Override @SuppressWarnings("unchecked") public void open(Configuration parameters) throws Exception { StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).setRuntimeContext(context); } format.configure(parameters); provider = context.getInputSplitProvider(); serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig()); splitIterator = getInputSplits(); isRunning = splitIterator.hasNext(); } @Override public void run(SourceContext
ctx) throws Exception { try { Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed"); if (isRunning && format instanceof RichInputFormat) { ((RichInputFormat) format).openInputFormat(); } OUT nextElement = serializer.createInstance(); while (isRunning) { format.open(splitIterator.next()); // for each element we also check if cancel // was called by checking the isRunning flag while (isRunning && !format.reachedEnd()) { nextElement = format.nextRecord(nextElement); if (nextElement != null) { ctx.collect(nextElement); } else { break; } } format.close(); completedSplitsCounter.inc(); if (isRunning) { isRunning = splitIterator.hasNext(); } } } finally { format.close(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).closeInputFormat(); } isRunning = false; } } @Override public void cancel() { isRunning = false; } @Override public void close() throws Exception { format.close(); if (format instanceof RichInputFormat) { ((RichInputFormat) format).closeInputFormat(); } } /** * Returns the {@code InputFormat}. This is only needed because we need to set the input * split assigner on the {@code StreamGraph}. */ public InputFormat
getFormat() { return format; } private Iterator
getInputSplits() { return new Iterator
() { private InputSplit nextSplit; private boolean exhausted; @Override public boolean hasNext() { if (exhausted) { return false; } if (nextSplit != null) { return true; } final InputSplit split; try { split = provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader()); } catch (InputSplitProviderException e) { throw new RuntimeException("Could not retrieve next input split.", e); } if (split != null) { this.nextSplit = split; return true; } else { exhausted = true; return false; } } @Override public InputSplit next() { if (this.nextSplit == null && !hasNext()) { throw new NoSuchElementException(); } final InputSplit tmp = this.nextSplit; this.nextSplit = null; return tmp; } @Override public void remove() { throw new UnsupportedOperationException(); } }; }}
  • InputFormatSourceFunction的splitIterator的hasNext()方法会使用provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader())来获取nextInputSplit,其provider为RpcInputSplitProvider

InputSplitProvider

flink-runtime_2.11-1.8.0-sources.jar!/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java

/** * An input split provider can be successively queried to provide a series of {@link InputSplit} objects a * task is supposed to consume in the course of its execution. */@Publicpublic interface InputSplitProvider {	/**	 * Requests the next input split to be consumed by the calling task.	 *	 * @param userCodeClassLoader used to deserialize input splits	 * @return the next input split to be consumed by the calling task or null if the	 *         task shall not consume any further input splits.	 * @throws InputSplitProviderException if fetching the next input split fails	 */	InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException;}
  • InputSplitProvider接口定义了getNextInputSplit方法,用于给每个task调用获取它要处理的inputSplit

RpcInputSplitProvider

flink-runtime_2.11-1.8.0-sources.jar!/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java

public class RpcInputSplitProvider implements InputSplitProvider {	private final JobMasterGateway jobMasterGateway;	private final JobVertexID jobVertexID;	private final ExecutionAttemptID executionAttemptID;	private final Time timeout;	public RpcInputSplitProvider(			JobMasterGateway jobMasterGateway,			JobVertexID jobVertexID,			ExecutionAttemptID executionAttemptID,			Time timeout) {		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);		this.jobVertexID = Preconditions.checkNotNull(jobVertexID);		this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID);		this.timeout = Preconditions.checkNotNull(timeout);	}	@Override	public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {		Preconditions.checkNotNull(userCodeClassLoader);		CompletableFuture
futureInputSplit = jobMasterGateway.requestNextInputSplit( jobVertexID, executionAttemptID); try { SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit()); if (serializedInputSplit.isEmpty()) { return null; } else { return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader); } } catch (Exception e) { throw new InputSplitProviderException("Requesting the next input split failed.", e); } }}
  • RpcInputSplitProvider的getNextInputSplit方法主要是通过jobMasterGateway.requestNextInputSplit,像jobMaster请求nextInputSplit

JobMaster.requestNextInputSplit

flink-runtime_2.11-1.8.0-sources.jar!/org/apache/flink/runtime/jobmaster/JobMaster.java

public class JobMaster extends FencedRpcEndpoint
implements JobMasterGateway, JobMasterService { //...... @Override public CompletableFuture
requestNextInputSplit( final JobVertexID vertexID, final ExecutionAttemptID executionAttempt) { final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt); if (execution == null) { // can happen when JobManager had already unregistered this execution upon on task failure, // but TaskManager get some delay to aware of that situation if (log.isDebugEnabled()) { log.debug("Can not find Execution for attempt {}.", executionAttempt); } // but we should TaskManager be aware of this return FutureUtils.completedExceptionally(new Exception("Can not find Execution for attempt " + executionAttempt)); } final ExecutionJobVertex vertex = executionGraph.getJobVertex(vertexID); if (vertex == null) { log.error("Cannot find execution vertex for vertex ID {}.", vertexID); return FutureUtils.completedExceptionally(new Exception("Cannot find execution vertex for vertex ID " + vertexID)); } final InputSplitAssigner splitAssigner = vertex.getSplitAssigner(); if (splitAssigner == null) { log.error("No InputSplitAssigner for vertex ID {}.", vertexID); return FutureUtils.completedExceptionally(new Exception("No InputSplitAssigner for vertex ID " + vertexID)); } final LogicalSlot slot = execution.getAssignedResource(); final int taskId = execution.getVertex().getParallelSubtaskIndex(); final String host = slot != null ? slot.getTaskManagerLocation().getHostname() : null; final InputSplit nextInputSplit = splitAssigner.getNextInputSplit(host, taskId); if (log.isDebugEnabled()) { log.debug("Send next input split {}.", nextInputSplit); } try { final byte[] serializedInputSplit = InstantiationUtil.serializeObject(nextInputSplit); return CompletableFuture.completedFuture(new SerializedInputSplit(serializedInputSplit)); } catch (Exception ex) { log.error("Could not serialize the next input split of class {}.", nextInputSplit.getClass(), ex); IOException reason = new IOException("Could not serialize the next input split of class " + nextInputSplit.getClass() + ".", ex); vertex.fail(reason); return FutureUtils.completedExceptionally(reason); } } //......}
  • JobMaster的requestNextInputSplit方法会通过splitAssigner.getNextInputSplit(host, taskId)来获取nextInputSplit,然后返回给请求的RpcInputSplitProvider

小结

  • ParameterValuesProvider接口定义了getParameterValues方法,用于返回并行表查询所需的参数,该参数主要是用于将一个大的sql查询分为几个分段查询用于并行处理;它有两个实现类分别是GenericParameterValuesProvider及NumericBetweenParametersProvider
  • GenericParameterValuesProvider实际上没有做其他事情,它实现的getParameterValues方法返回的值是构造器要求输入的;NumericBetweenParametersProvider为基于numeric主键的范围查询(WHERE id BETWEEN ? AND ?)自动生成了分段的参数,其构造器要求输入每次的fetchSize、最小值minVal、最大值maxVal;getParameterValues方法会根据这几个值计算numBatches,然后计算好分段的参数值
  • JDBCInputFormat继承了RichInputFormat,同时实现了ResultTypeQueryable接口;createInputSplits方法会根据parameterValues来创建GenericInputSplit数组,如果parameterValues为null则默认创建的totalNumberOfPartitions为1;getInputSplitAssigner方法根据InputSplit数组创建了DefaultInputSplitAssigner;getStatistics方法返回的是方法参数cachedStatistics;openInputFormat方法主要是获取数据库连接,准备好statement;closeInputFormat方法主要是关闭statement以及关闭数据库连接;open方法接收inputSplit,其主要是根据inputSplit从parameterValues提取查询参数,并设置到statement,之后执行statement.executeQuery()获取resultSet;nextRecord方法主要是遍历resultSet读取数据;close方法主要是关闭resultSet
  • InputSplit接口定义了getSplitNumber方法用于返回当前input的split number;GenericInputSplit实现了InputSplit接口,其getSplitNumber方法返回的是partitionNumber;InputSplitAssigner接口定义了getNextInputSplit方法,其方法接收两个参数分别是host及taskId,该方法用于返回下一个inputSplit;DefaultInputSplitAssigner是InputSplitAssigner的默认实现,其getNextInputSplit方法会使用synchronized修改splits值,移除最后一个元素
  • InputFormatSourceFunction的splitIterator的hasNext()方法会使用provider.getNextInputSplit(getRuntimeContext().getUserCodeClassLoader())来获取nextInputSplit,其provider为RpcInputSplitProvider;InputSplitProvider接口定义了getNextInputSplit方法,用于给每个task调用获取它要处理的inputSplit;RpcInputSplitProvider的getNextInputSplit方法主要是通过jobMasterGateway.requestNextInputSplit,像jobMaster请求nextInputSplit;JobMaster的requestNextInputSplit方法会通过splitAssigner.getNextInputSplit(host, taskId)来获取nextInputSplit,然后返回给请求的RpcInputSplitProvider

doc

转载于:https://my.oschina.net/go4it/blog/3041227

你可能感兴趣的文章
求最长不下降子序列(nlogn)
查看>>
【算法学习笔记】37.区间合并问题 SJTU OJ 1262 Milking Cow
查看>>
JS框架常识。
查看>>
Springboot学习05-自定义错误页面完整分析
查看>>
数字证书应用综合揭秘(包括证书生成、加密、解密、签名、验签)
查看>>
Hadoop大数据之安装JDK
查看>>
ubuntu 关闭显示器的命令
查看>>
Chosen通用初始化
查看>>
递归式的求解
查看>>
CF A and B and Chess
查看>>
centos7下haproxy1.7的使用与配置
查看>>
阅读笔记六
查看>>
tomcat健康检查监控脚本
查看>>
JS - 兼容到ie7的自定义样式的滚动条封装
查看>>
inflate()引发NullPointerException
查看>>
[C++]unordered_map的使用
查看>>
cvReleaseImage()函数说明
查看>>
NoSQL设计思想(从辅到主)
查看>>
使用物化视图来同步数据on prebuilt table
查看>>
NYOJ176整数划分(二)
查看>>