Web API I/O 连接器
- Java SDK
Beam SDK 包含一个名为 RequestResponseIO 的内置转换,以支持与 REST 或 gRPC 等 Web API 的读写操作。
以下讨论重点介绍 Java SDK。Python 示例将在未来添加;请参见跟踪器问题:#30422。此外,Go SDK 的支持尚不可用;请参见跟踪器问题:#30423。
RequestResponseIO 功能
此转换提供的功能包括
- 开发人员提供调用 Web API 端点的最小代码
- 委托给转换以处理请求重试和指数回退
- 请求和响应关联的可选缓存
- 可选指标
本指南目前侧重于上面前两点,即最小代码要求和错误处理。未来,它可能会扩展到显示其他功能的示例。下面提供了指向其他资源的链接。
其他资源
开始之前
要使用 RequestResponseIO,请将依赖项添加到您的 Gradle build.gradle(.kts)
或 Maven pom.xml
文件中。请参见 Maven Central 以了解可用的版本。
下面显示了一个示例,将 Beam BOM 和相关依赖项(如 Beam 核心)添加到您的 build.gradle(.kts)
文件中。
// Apache Beam BOM
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-bom
implementation("org.apache.beam:beam-sdks-java-bom:2.60.0")
// Beam Core SDK
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-core
implementation("org.apache.beam:beam-sdks-java-core")
// RequestResponseIO dependency
// https://central.sonatype.com/artifact/org.apache.beam/beam-sdks-java-io-rrio
implementation("org.apache.beam:beam-sdks-java-io-rrio")
或者使用 Maven,将工件依赖项添加到您的 pom.xml
文件中。
RequestResponseIO 基础
最小代码
读取或写入 Web API 所需的最小代码是
- 调用者 实现。
- 实例化 RequestResponseIO。
实现调用者
调用者 只需要覆盖一个方法:call,其目的是与 API 交互,将请求转换为响应。转换的 DoFn 在其 DoFn.ProcessElement 方法中调用此方法。转换处理所有其他操作,包括重复失败的请求和指数回退(下面将进一步讨论)。
// MyCaller invokes a Web API with MyRequest and returns the resulting MyResponse.
class MyCaller<MyRequest, MyResponse> implements Caller<MyRequest, MyResponse> {
@Override
public MyResponse call(MyRequest request) throws UserCodeExecutionException {
// Do something with request and return the response.
}
}
实例化 RequestResponseIO
使用 RequestResponseIO 就像下面所示一样简单。如前所述,它至少需要两个参数:调用者
和预期的 编码器。 (注意:如果您不熟悉 Beam 编码器的概念,请参阅有关此主题的 Apache Beam 编程指南。本指南下面也提供了一个示例。)
RequestResponseIO
转换返回一个 Result,该结果捆绑了任何故障和成功的响应的 PCollection
。在 Beam 中,我们称之为 附加输出 模式,它通常需要一些样板代码,但转换会为您处理好。使用此转换,您可以通过 Result::getFailures 和 Result::getResponses 获取成功和失败的 PCollection
。
下面显示了转换如何在管道中工作的简化片段。
// Step 1. Define the Coder for the response.
Coder<MyResponse> responseCoder = ...
// Step 2. Build the request PCollection.
PCollection<MyRequest> requests = ...
// Step 3. Instantiate the RequestResponseIO with the Caller and Coder and apply it to the request PCollection.
Result<MyResponse> result = requests.apply(RequestResponseIO.of(new MyCaller(), responseCoder));
// Step 4a. Do something with the responses.
result.getResponses().apply( ... );
// Step 4b. Apply failures to a dead letter sink.
result.getFailures().apply( ... );
RequestResponseIO
处理调用每个请求的 调用者
所需的所有其他操作。它不关心您在 调用者
中做什么,无论您是进行原始 HTTP 调用还是使用客户端代码。本指南后面将讨论这种设计在测试中的优势。
API 调用重复和故障
如上所述,RequestResponseIO
返回一个 Result,该结果捆绑了 调用者
所产生的成功和失败的 PCollection
。本节将提供有关处理故障和 API 调用重复以及回退的更多详细信息。
处理故障
故障是一个 ApiIOError PCollection
,您可以将其应用于日志转换或将错误保存到下游接收器以供以后分析和故障排除的转换。
由于 ApiIOError
已映射到 Beam 模式,因此它与大多数 Beam 的现有 I/O 连接器兼容。 (注意:如果您不熟悉 Beam 模式的概念,请参阅 Beam 编程指南。) 例如,您可以轻松地将 ApiIOError
记录发送到 BigQuery 以供分析和故障排除,如以下所示 **无需** 先将记录转换为 TableRow。
static void writeFailuresToBigQuery(
PCollection<ApiIOError> failures,
TableReference tableReference,
BigQueryIO.Write.CreateDisposition createDisposition,
BigQueryIO.Write.WriteDisposition writeDisposition) {
// PCollection<ApiIOError> failures = ...
// TableReference tableReference = ...
// BigQueryIO.Write.CreateDisposition createDisposition = ...
// BigQueryIO.Write.WriteDisposition writeDisposition = ...
failures.apply(
"Dead letter",
BigQueryIO.<ApiIOError>write()
.useBeamSchema()
.to(tableReference)
.withCreateDisposition(createDisposition)
.withWriteDisposition(writeDisposition));
}
API 调用重复和回退
在将数据发送到错误的 PCollection
之前,转换会在规定的指数型回退之后,针对特定错误尝试重新执行。您的 Caller
必须抛出特定的错误,以指示转换进行带回退的重试。抛出 UserCodeExecutionException 将立即将错误发送到 ApiIOError
PCollection
中。
当 Caller
抛出以下错误时,RequestResponseIO
将尝试带回退的重试。
在超过阈值的重试次数后,错误将被发送到错误的 PCollection
中。
测试
由于 RequestResponseIO
不关心您在 Caller
实现中做了什么,这使得某些测试更加方便。您可以不用依赖于在某些测试中对真实 API 的直接调用,从而不依赖于您的外部资源,只需实现一个根据您的测试逻辑返回响应或抛出异常的 Caller
版本。例如,如果您想测试管道中的下游步骤对特定响应的处理,比如空记录,您可以轻松地通过以下方式实现。有关测试 Beam 管道的更多信息,请参阅 Beam 编程指南。
@Test
void givenEmptyResponse_thenExpectSomething() {
// Test expects PTransform underTest should do something as a result of empty records, for example.
PTransform<Iterable<String>, ?> underTest = ...
PCollection<String> requests = pipeline.apply(Create.of("aRequest"));
IterableCoder<String> coder = IterableCoder.of(StringUtf8Coder.of());
Result<Iterable<String>> result = requests.apply(RequestResponseIO.of(new MockEmptyIterableResponse()), coder);
PAssert.that(result.getResponses().apply(underTest)).containsInAnyOrder(...)
pipeline.run();
}
// MockEmptyIterableResponse simulates when there are no results from the API.
class MockEmptyIterableResponse<String, Iterable<String>> implements Caller<String, Iterable<String>> {
@Override
public Iterable<String> call(String request) throws UserCodeExecutionException {
return Collections.emptyList();
}
}
实际示例
下面展示了两个示例,我们将在端到端的 Beam 管道中将它们整合在一起。这个管道的目标是下载图像并使用 Vertex AI 上的 Gemini 来识别图像内容。
请注意,此示例不会替代我们现有的 AI/ML 解决方案。有关使用 Beam 与 AI/ML 的更多详细信息,请参阅 AI/ML 管道入门。
直接使用 HTTP 调用
首先,我们需要下载图像。为此,我们需要对图像 URL 发起 HTTP 请求,并将它们的内容发送到 PCollection
中,以供 Gemini API 使用。此示例本身的价值在于演示了如何使用 RequestResponseIO
发起原始 HTTP 请求。
定义调用者
我们实现了 Caller
,即 HttpImageClient
,它接收 ImageRequest
并返回 ImageResponse
。
出于演示目的,此示例使用 KV 来保留返回的 ImageResponse
中包含 KV
的原始 URL。
简化代码片段
下面展示了 HttpImageClient
的简化版本,其中显示了重要部分。
class HttpImageClient implements Caller<KV<String, ImageRequest>, KV<String, ImageResponse>> {
private static final HttpRequestFactory REQUEST_FACTORY =
new NetHttpTransport().createRequestFactory();
@Override
public KV<String, ImageResponse> call(KV<String, ImageRequest> requestKV) throws UserCodeExecutionException {
ImageRequest request = requestKV.getValue();
GenericUrl url = new GenericUrl(request.getImageUrl());
HttpRequest imageRequest = REQUEST_FACTORY.buildGetRequest(url);
HttpResponse response = imageRequest.execute();
return KV.of(
requestKV.getKey(),
ImageResponse
.builder()
// Build ImageResponse from HttpResponse
.build()
);
}
}
完整示例
完整实现如下所示,展示了根据 HTTP 响应代码抛出各种异常。
/**
* Implements {@link Caller} to process an {@link ImageRequest} into an {@link ImageResponse} by
* invoking the HTTP request.
*/
class HttpImageClient implements Caller<KV<String, ImageRequest>, KV<String, ImageResponse>> {
private static final int STATUS_TOO_MANY_REQUESTS = 429;
private static final int STATUS_TIMEOUT = 408;
private static final HttpRequestFactory REQUEST_FACTORY =
new NetHttpTransport().createRequestFactory();
static HttpImageClient of() {
return new HttpImageClient();
}
/**
* Invokes an HTTP Get request from the {@param request}, returning an {@link ImageResponse}
* containing the image data.
*/
@Override
public KV<String, ImageResponse> call(KV<String, ImageRequest> requestKV)
throws UserCodeExecutionException {
String key = requestKV.getKey();
ImageRequest request = requestKV.getValue();
Preconditions.checkArgument(request != null);
GenericUrl url = new GenericUrl(request.getImageUrl());
try {
HttpRequest imageRequest = REQUEST_FACTORY.buildGetRequest(url);
HttpResponse response = imageRequest.execute();
if (response.getStatusCode() >= 500) {
// Tells transform to repeat the request.
throw new UserCodeRemoteSystemException(response.getStatusMessage());
}
if (response.getStatusCode() >= 400) {
switch (response.getStatusCode()) {
case STATUS_TOO_MANY_REQUESTS:
// Tells transform to repeat the request.
throw new UserCodeQuotaException(response.getStatusMessage());
case STATUS_TIMEOUT:
// Tells transform to repeat the request.
throw new UserCodeTimeoutException(response.getStatusMessage());
default:
// Tells the tranform to emit immediately into failure PCollection.
throw new UserCodeExecutionException(response.getStatusMessage());
}
}
InputStream is = response.getContent();
byte[] bytes = ByteStreams.toByteArray(is);
return KV.of(
key,
ImageResponse.builder()
.setMimeType(request.getMimeType())
.setData(ByteString.copyFrom(bytes))
.build());
} catch (IOException e) {
// Tells the tranform to emit immediately into failure PCollection.
throw new UserCodeExecutionException(e);
}
}
}
定义请求
ImageRequest
是我们为 HttpImageClient
提供的自定义请求(如上例所示),用于调用获取图像的 HTTP 请求。 此示例恰好使用了 Google AutoValue,但您可以使用任何自定义的 Serializable
Java 类,就像在任何 Beam PCollection
中一样,包括固有的 Java 类,例如 String
、Double
等。为方便起见,此示例使用 @DefaultSchema(AutoValueSchema.class)
,使我们能够根据其 getter 自动将自定义类型映射到 Beam Schema。
/** An HTTP request for an image. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class ImageRequest implements Serializable {
static final TypeDescriptor<ImageRequest> TYPE = TypeDescriptor.of(ImageRequest.class);
private static final Map<String, String> EXT_MIMETYPE_MAP =
ImmutableMap.of(
"jpg", "image/jpeg",
"jpeg", "image/jpeg",
"png", "image/png");
/** Derive the MIME type of the image from the url based on its extension. */
private static String mimeTypeOf(String url) {
String ext = FileNameUtils.getExtension(url);
if (!EXT_MIMETYPE_MAP.containsKey(ext)) {
throw new IllegalArgumentException(
String.format("could not map extension to mimetype: ext %s of url: %s", ext, url));
}
return EXT_MIMETYPE_MAP.get(ext);
}
static Builder builder() {
return new AutoValue_ImageRequest.Builder();
}
/** Build an {@link ImageRequest} from a {@param url}. */
static ImageRequest of(String url) {
return builder().setImageUrl(url).setMimeType(mimeTypeOf(url)).build();
}
/** The URL of the image request. */
abstract String getImageUrl();
/** The MIME type of the image request. */
abstract String getMimeType();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setImageUrl(String value);
abstract Builder setMimeType(String value);
abstract ImageRequest build();
}
}
定义响应
ImageResponse
是我们从 HttpImageClient
返回的自定义响应(如上例所示),它包含作为调用远程服务器(使用图像 URL)的结果而获得的图像数据。 同样,此示例恰好使用了 Google AutoValue,但您可以使用任何自定义的 Serializable
Java 类,就像在任何 Beam PCollection
中一样,包括固有的 Java 类,例如 String
、Double
等。
/** An HTTP response of an image request. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
abstract class ImageResponse implements Serializable {
static Builder builder() {
return new AutoValue_ImageResponse.Builder();
}
/** The MIME type of the response payload. */
abstract String getMimeType();
/** The payload of the response containing the image data. */
abstract ByteString getData();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setMimeType(String value);
abstract Builder setData(ByteString value);
abstract ImageResponse build();
}
}
定义响应编码器
RequestResponseIO
需要响应的 Coder 作为其第二个必需参数,如下例所示。有关 Beam Coders 的更多信息,请参阅 Beam 编程指南。
/** A {@link CustomCoder} of an {@link ImageResponse}. */
class ImageResponseCoder extends CustomCoder<ImageResponse> {
public static ImageResponseCoder of() {
return new ImageResponseCoder();
}
private static final Coder<byte[]> BYTE_ARRAY_CODER = ByteArrayCoder.of();
private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
@Override
public void encode(ImageResponse value, OutputStream outStream)
throws CoderException, IOException {
BYTE_ARRAY_CODER.encode(value.getData().toByteArray(), outStream);
STRING_CODER.encode(value.getMimeType(), outStream);
}
@Override
public ImageResponse decode(InputStream inStream) throws CoderException, IOException {
byte[] data = BYTE_ARRAY_CODER.decode(inStream);
String mimeType = STRING_CODER.decode(inStream);
return ImageResponse.builder().setData(ByteString.copyFrom(data)).setMimeType(mimeType).build();
}
}
从 URL 获取图像数据
下面展示了一个示例,说明如何将所有内容整合到一个端到端管道中。从图像 URL 列表中,该示例构建了 ImageRequest
的 PCollection
,并将该 PCollection
应用于使用 HttpImageClient
Caller
实现实例化的 RequestResponseIO
。
任何错误(可从 Result
的 getFailures
getter 获取)都会输出到日志中。如上所述,可以将这些错误写入数据库或文件系统。
/** Example demonstrating downloading a list of image URLs using {@link RequestResponseIO}. */
static void readFromGetEndpointExample(List<String> urls, Pipeline pipeline) {
// Pipeline pipeline = Pipeline.create();
// List<String> urls = ImmutableList.of(
// "https://storage.googleapis.com/generativeai-downloads/images/cake.jpg",
// "https://storage.googleapis.com/generativeai-downloads/images/chocolate.png",
// "https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg",
// "https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg",
// "https://storage.googleapis.com/generativeai-downloads/images/factory.png",
// "https://storage.googleapis.com/generativeai-downloads/images/scones.jpg"
// );
// Step 1: Convert the list of URLs to a PCollection of ImageRequests.
PCollection<KV<String, ImageRequest>> requests = Images.requestsOf(urls, pipeline);
// Step 2: RequestResponseIO requires a Coder as its second parameter.
KvCoder<String, ImageResponse> responseCoder =
KvCoder.of(StringUtf8Coder.of(), ImageResponseCoder.of());
// Step 3: Process ImageRequests using RequestResponseIO instantiated from the Caller
// implementation and the expected PCollection response Coder.
Result<KV<String, ImageResponse>> result =
requests.apply(
ImageResponse.class.getSimpleName(),
RequestResponseIO.of(HttpImageClient.of(), responseCoder));
// Step 4: Log any failures to stderr.
result.getFailures().apply("logErrors", Log.errorOf());
// Step 5: Log output to stdout.
Images.displayOf(result.getResponses()).apply("logResponses", Log.infoOf());
}
管道输出(如下所示)显示了下载图像的摘要,包括其 URL、mimetype 和大小。
KV{https://storage.googleapis.com/generativeai-downloads/images/factory.png, mimeType=image/png, size=23130}
KV{https://storage.googleapis.com/generativeai-downloads/images/scones.jpg, mimeType=image/jpeg, size=394671}
KV{https://storage.googleapis.com/generativeai-downloads/images/cake.jpg, mimeType=image/jpeg, size=253809}
KV{https://storage.googleapis.com/generativeai-downloads/images/chocolate.png, mimeType=image/png, size=29375}
KV{https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg, mimeType=image/jpeg, size=207281}
KV{https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg, mimeType=image/jpeg, size=1121752}
使用 API 客户端代码
最后一个示例演示了直接调用 HTTP 请求。但是,某些 API 服务提供了应该在 Caller
实现中使用的客户端代码。在 Beam 中使用客户端代码会带来独特的挑战,即序列化。此外,某些客户端代码需要在设置和拆卸方面进行显式处理。
对于这些场景,RequestResponseIO
可以处理一个名为 SetupTeardown
的额外接口。
SetupTeardown
接口只有两个方法:setup 和 teardown。
转换会在其 DoFn 的 @Setup 和 @Teardown 方法中分别调用这些 setup 和 teardown 方法。
转换还会处理带有回退的重试,同样取决于抛出的异常,如本指南前面所述。
使用 SetupTeardown 定义调用者
下面是一个示例,它改编了 Vertex AI Gemini Java 客户端 以在使用 RequestResponseIO
的 Beam 管道中工作,除了必需的 Caller
之外,还增加了对 SetupTeardown
接口的使用。与上面的简单 HTTP 示例相比,它需要更多样板代码。
简化代码片段
下面显示了简化的代码片段,其中显示了重要部分。
setup
方法是 GeminiAIClient
实例化 VertexAI
和 GenerativeModel
的地方,最终在 teardown
期间关闭 VertexAI
。最后,它的 call
方法与上面的 HTTP 示例类似,它接受一个请求,使用它调用 API,然后返回响应。
class GeminiAIClient implements
Caller<KV<String, GenerateContentRequest>, KV<String, GenerateContentResponse>>,
SetupTeardown {
@Override
public KV<String, GenerateContentResponse> call(KV<String, GenerateContentRequest> requestKV)
throws UserCodeExecutionException {
GenerateContentResponse response = client.generateContent(request.getContentsList());
return KV.of(requestKV.getKey(), response);
}
@Override
public void setup() throws UserCodeExecutionException {
vertexAI = new VertexAI(getProjectId(), getLocation());
client = new GenerativeModel(getModelName(), vertexAI);
}
@Override
public void teardown() throws UserCodeExecutionException {
vertexAI.close();
}
}
完整示例
下面展示了完整示例。此示例的关键在于 com.google.cloud.vertexai.VertexAI
和 com.google.cloud.vertexai.generativeai.GenerativeModel
不可序列化,因此需要使用 transient
实例化。如果您 的 Java 项目没有使用 https://checkerframework.org/,则可以忽略 @MonotonicNonNull
。
/**
* Example {@link Caller} and {@link SetupTeardown} implementation for use with {@link
* RequestResponseIO} to process Gemini AI {@link GenerateContentRequest}s into {@link
* GenerateContentResponse}s.
*/
@AutoValue
abstract class GeminiAIClient
implements Caller<KV<String, GenerateContentRequest>, KV<String, GenerateContentResponse>>,
SetupTeardown {
static Builder builder() {
return new AutoValue_GeminiAIClient.Builder();
}
static final String MODEL_GEMINI_PRO = "gemini-pro";
static final String MODEL_GEMINI_PRO_VISION = "gemini-pro-vision";
private transient @MonotonicNonNull VertexAI vertexAI;
private transient @MonotonicNonNull GenerativeModel client;
@Override
public KV<String, GenerateContentResponse> call(KV<String, GenerateContentRequest> requestKV)
throws UserCodeExecutionException {
String key = requestKV.getKey();
GenerateContentRequest request = requestKV.getValue();
if (request == null) {
throw new UserCodeExecutionException("request is empty");
}
if (request.getContentsList().isEmpty()) {
throw new UserCodeExecutionException("contentsList is empty");
}
try {
GenerateContentResponse response =
checkStateNotNull(client).generateContent(request.getContentsList());
return KV.of(key, response);
} catch (IOException e) {
throw new UserCodeExecutionException(e);
}
}
@Override
public void setup() throws UserCodeExecutionException {
vertexAI = new VertexAI(getProjectId(), getLocation());
client = new GenerativeModel(getModelName(), vertexAI);
}
@Override
public void teardown() throws UserCodeExecutionException {
if (vertexAI != null) {
vertexAI.close();
}
}
abstract String getModelName();
abstract String getProjectId();
abstract String getLocation();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setModelName(String name);
abstract Optional<String> getModelName();
abstract Builder setProjectId(String value);
abstract Builder setLocation(String value);
abstract GeminiAIClient autoBuild();
final GeminiAIClient build() {
if (!getModelName().isPresent()) {
setModelName(MODEL_GEMINI_PRO);
}
return autoBuild();
}
}
让 Gemini AI 识别图像
现在,让我们将前面的获取图像的示例与这个 Gemini AI 客户端结合起来,让它识别图像。
下面展示了我们之前看到的代码,但封装在一个方便的方法中。它接受一个 List
类型的 URL,并返回一个包含图像数据的 ImageResponse
的 PCollection
。
/**
* Processes a list of raw image URLs into a {@link ImageResponse} {@link PCollection} using
* {@link RequestResponseIO}. The resulting {@link KV#getKey} is the original image URL.
*/
static Result<KV<String, ImageResponse>> imagesOf(List<String> urls, Pipeline pipeline) {
Coder<KV<String, ImageResponse>> kvCoder = KvCoder.of(STRING_CODER, ImageResponseCoder.of());
return requestsOf(urls, pipeline)
.apply(
ImageResponse.class.getSimpleName(),
RequestResponseIO.of(HttpImageClient.of(), kvCoder));
}
接下来,我们将 ImageResponse
转换为 GenerateContentRequest
的 PCollection
。
// PCollection<KV<Struct, ImageResponse>> imagesKV = ...
return imagesKV
.apply(
stepName,
MapElements.into(requestKVType)
.via(
kv -> {
String key = kv.getKey();
ImageResponse safeResponse = checkStateNotNull(kv.getValue());
ByteString data = safeResponse.getData();
return buildAIRequest(key, prompt, data, safeResponse.getMimeType());
}))
.setCoder(kvCoder);
最后,我们将 GenerateContentRequest
的 PCollection
应用于 RequestResponseIO
,该 RequestResponseIO
使用上面定义的 GeminiAIClient
实例化。请注意,我们不是使用 RequestResponseIO.of
,而是使用 RequestResponseIO.ofCallerAndSetupTeardown
。ofCallerAndSetupTeardown
方法只是告诉编译器我们同时提供了 Caller
和 SetupTeardown
接口的实现。
// PCollection<KV<Struct, GenerateContentRequest>> requestKV = ...
// GeminiAIClient client =
// GeminiAIClient.builder()
// .setProjectId(options.getProject())
// .setLocation(options.getLocation())
// .setModelName(MODEL_GEMINI_PRO_VISION)
// .build();
return requestKV.apply(
"Ask Gemini AI", RequestResponseIO.ofCallerAndSetupTeardown(client, responseCoder));
下面展示了完整的端到端管道。
/** Demonstrates using Gemini AI to identify a images, acquired from their URLs. */
static void whatIsThisImage(List<String> urls, GeminiAIOptions options) {
// GeminiAIOptions options = PipelineOptionsFactory.create().as(GeminiAIOptions.class);
// options.setLocation("us-central1");
// options.setProjectId("your-google-cloud-project-id");
//
//
// List<String> urls = ImmutableList.of(
// "https://storage.googleapis.com/generativeai-downloads/images/cake.jpg",
// "https://storage.googleapis.com/generativeai-downloads/images/chocolate.png",
// "https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg",
// "https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg",
// "https://storage.googleapis.com/generativeai-downloads/images/factory.png",
// "https://storage.googleapis.com/generativeai-downloads/images/scones.jpg"
// );
// Step 1: Instantiate GeminiAIClient, the Caller and SetupTeardown implementation.
GeminiAIClient client =
GeminiAIClient.builder()
.setProjectId(options.getProject())
.setLocation(options.getLocation())
.setModelName(MODEL_GEMINI_PRO_VISION)
.build();
Pipeline pipeline = Pipeline.create(options);
// Step 2: Download the images from the list of urls.
Result<KV<String, ImageResponse>> getImagesResult = Images.imagesOf(urls, pipeline);
// Step 3: Log any image download errors.
getImagesResult.getFailures().apply("Log get images errors", Log.errorOf());
// Step 4: Build Gemini AI requests from the download image data with the prompt 'What is this
// picture?'.
PCollection<KV<String, GenerateContentRequest>> requests =
buildAIRequests("Identify Image", "What is this picture?", getImagesResult.getResponses());
// Step 5: Using RequestResponseIO, ask Gemini AI 'What is this picture?' for each downloaded
// image.
Result<KV<String, GenerateContentResponse>> responses = askAI(client, requests);
// Step 6: Log any Gemini AI errors.
responses.getFailures().apply("Log AI errors", Log.errorOf());
// Step 7: Log the result of Gemini AI's image recognition.
responses.getResponses().apply("Log AI answers", Log.infoOf());
pipeline.run();
}
下面展示了运行完整管道后的简化输出,其中我们看到了 Gemini AI 识别图像的结果。
KV{https://storage.googleapis.com/generativeai-downloads/images/chocolate.png, candidates {
content {
role: "model"
parts {
text: " This is a picture of a chocolate bar."
}
}
KV{https://storage.googleapis.com/generativeai-downloads/images/dog_form.jpg, candidates {
content {
role: "model"
parts {
text: " The picture is a dog walking application form. It has two sections, one for information
about the dog and one for information about the owner. The dog\'s name is Fido,
he is a Cavoodle, and he is black and tan. He is 3 years old and has a friendly
temperament. The owner\'s name is Mark, and his phone number is 0491570006. He would
like Fido to be walked once a week on Tuesdays and Thursdays in the morning."
}
}
}
KV{https://storage.googleapis.com/generativeai-downloads/images/croissant.jpg
content {
role: "model"
parts {
text: " The picture shows a basket of croissants. Croissants are a type of pastry that is made
from a yeast-based dough that is rolled and folded several times in the rising process.
The result is a light, flaky pastry that is often served with butter, jam, or chocolate.
Croissants are a popular breakfast food and can also be used as a dessert or snack."
}
}
}
最后更新于 2024/10/31
您是否找到了所有您需要的信息?
所有内容都清晰易懂吗?您想更改任何内容吗?请告诉我们!